SiamCafe.net Blog
Cybersecurity

Semgrep SAST Monitoring และ Alerting

semgrep sast monitoring และ alerting
Semgrep SAST Monitoring และ Alerting | SiamCafe Blog
2025-08-24· อ. บอม — SiamCafe.net· 1,324 คำ

Semgrep SAST Monitoring และ Alerting คืออะไร

Semgrep เป็น open-source Static Application Security Testing (SAST) tool ที่ใช้วิเคราะห์ source code หาช่องโหว่ด้านความปลอดภัย bugs และ code quality issues โดยไม่ต้องรันโปรแกรม ใช้ pattern-based approach ที่เขียน rules ง่ายและเร็วกว่า traditional SAST tools การ setup monitoring และ alerting สำหรับ Semgrep ช่วยให้ทีม security ติดตามผลการสแกนอย่างต่อเนื่อง แจ้งเตือนเมื่อพบ vulnerabilities ใหม่ และ track trends ของ security posture ขององค์กร

Semgrep Fundamentals

# semgrep_basics.py — Semgrep fundamentals
import json

class SemgrepBasics:
    FEATURES = {
        "pattern_matching": {
            "name": "Pattern Matching",
            "description": "เขียน rules ด้วย code patterns — ง่ายกว่า regex, เข้าใจ AST",
        },
        "multi_language": {
            "name": "Multi-Language",
            "description": "รองรับ 30+ ภาษา: Python, JavaScript, Java, Go, C#, Ruby, PHP",
        },
        "fast": {
            "name": "Fast Scanning",
            "description": "สแกนเร็วมาก — 10,000+ files ใน < 1 นาที",
        },
        "cicd": {
            "name": "CI/CD Integration",
            "description": "GitHub Actions, GitLab CI, Jenkins — scan ทุก PR",
        },
        "custom_rules": {
            "name": "Custom Rules",
            "description": "เขียน rules เอง ด้วย YAML — ตรวจจับ patterns เฉพาะองค์กร",
        },
    }

    COMMANDS = """
# Semgrep basic commands

# Install
pip install semgrep

# Scan with default rules
semgrep --config auto .

# Scan with specific ruleset
semgrep --config p/owasp-top-ten .

# Scan with custom rule
semgrep --config my-rules.yaml .

# JSON output
semgrep --config auto --json -o results.json .

# Scan specific language
semgrep --config auto --lang python .

# CI mode (fail on findings)
semgrep ci --config auto
"""

    def show_features(self):
        print("=== Semgrep Features ===\n")
        for key, feat in self.FEATURES.items():
            print(f"[{feat['name']}] {feat['description']}")

    def show_commands(self):
        print(f"\n=== Basic Commands ===")
        print(self.COMMANDS[:400])

basics = SemgrepBasics()
basics.show_features()
basics.show_commands()

Custom Rules

# custom_rules.py — Semgrep custom rules
import json

class SemgrepRules:
    SQL_INJECTION = """
# sql-injection.yaml — Detect SQL injection
rules:
  - id: sql-injection-format-string
    patterns:
      - pattern: |
          cursor.execute(f"... {$VAR} ...")
      - pattern-not: |
          cursor.execute(f"... {$CONST} ...", ...)
    message: "SQL injection via f-string: use parameterized queries"
    languages: [python]
    severity: ERROR
    metadata:
      category: security
      cwe: ["CWE-89"]
      owasp: ["A03:2021"]
"""

    HARDCODED_SECRET = """
# hardcoded-secret.yaml — Detect hardcoded secrets
rules:
  - id: hardcoded-api-key
    patterns:
      - pattern: |
          $KEY = "..."
      - metavariable-regex:
          metavariable: $KEY
          regex: "(api_key|apikey|secret|password|token|auth)"
      - metavariable-regex:
          metavariable: $...EXPR
          regex: "[a-zA-Z0-9]{20,}"
    message: "Hardcoded secret detected: use environment variables"
    languages: [python, javascript, java]
    severity: WARNING
    metadata:
      category: security
      cwe: ["CWE-798"]
"""

    INSECURE_DESERIALIZE = """
# insecure-deserialize.yaml
rules:
  - id: insecure-pickle-load
    pattern: pickle.load(...)
    message: "pickle.load() is vulnerable to arbitrary code execution"
    languages: [python]
    severity: ERROR
    metadata:
      cwe: ["CWE-502"]
      fix: "Use json.load() or yaml.safe_load() instead"
"""

    def show_rules(self):
        print("=== Custom Rules ===\n")
        print("[SQL Injection]")
        print(self.SQL_INJECTION[:300])
        print("\n[Hardcoded Secret]")
        print(self.HARDCODED_SECRET[:300])

rules = SemgrepRules()
rules.show_rules()

Monitoring Pipeline

# monitoring.py — Semgrep monitoring pipeline
import json

class SemgrepMonitoring:
    CODE = """
# semgrep_monitor.py — Monitor Semgrep scan results
import subprocess
import json
from datetime import datetime
from pathlib import Path

class SemgrepMonitor:
    def __init__(self, project_dir, config="auto"):
        self.project_dir = project_dir
        self.config = config
        self.history_file = Path("semgrep_history.json")
    
    def scan(self):
        '''Run Semgrep scan'''
        cmd = [
            "semgrep", "--config", self.config,
            "--json", "--quiet",
            self.project_dir,
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode in [0, 1]:  # 0=no findings, 1=findings
            return json.loads(result.stdout)
        return {"error": result.stderr}
    
    def parse_results(self, scan_output):
        '''Parse scan results into summary'''
        results = scan_output.get('results', [])
        
        by_severity = {}
        by_category = {}
        by_file = {}
        
        for r in results:
            sev = r.get('extra', {}).get('severity', 'unknown')
            by_severity[sev] = by_severity.get(sev, 0) + 1
            
            cat = r.get('extra', {}).get('metadata', {}).get('category', 'unknown')
            by_category[cat] = by_category.get(cat, 0) + 1
            
            file = r.get('path', 'unknown')
            by_file[file] = by_file.get(file, 0) + 1
        
        return {
            'timestamp': datetime.utcnow().isoformat(),
            'total_findings': len(results),
            'by_severity': by_severity,
            'by_category': by_category,
            'top_files': dict(sorted(by_file.items(), key=lambda x: -x[1])[:10]),
            'critical_findings': [
                {
                    'rule': r.get('check_id', ''),
                    'file': r.get('path', ''),
                    'line': r.get('start', {}).get('line', 0),
                    'message': r.get('extra', {}).get('message', ''),
                }
                for r in results
                if r.get('extra', {}).get('severity') == 'ERROR'
            ],
        }
    
    def compare_with_previous(self, current):
        '''Compare with previous scan'''
        if self.history_file.exists():
            history = json.loads(self.history_file.read_text())
            previous = history[-1] if history else None
        else:
            previous = None
        
        if previous:
            return {
                'previous_total': previous['total_findings'],
                'current_total': current['total_findings'],
                'delta': current['total_findings'] - previous['total_findings'],
                'trend': 'improving' if current['total_findings'] < previous['total_findings'] else 'worsening',
            }
        return {'note': 'No previous scan to compare'}
    
    def save_history(self, summary):
        '''Save scan to history'''
        history = []
        if self.history_file.exists():
            history = json.loads(self.history_file.read_text())
        
        history.append(summary)
        
        # Keep last 100 scans
        if len(history) > 100:
            history = history[-100:]
        
        self.history_file.write_text(json.dumps(history, indent=2))
    
    def should_alert(self, summary, thresholds=None):
        '''Determine if alert should fire'''
        if thresholds is None:
            thresholds = {
                'critical_max': 0,
                'high_max': 5,
                'total_max': 50,
            }
        
        alerts = []
        
        critical = summary['by_severity'].get('ERROR', 0)
        if critical > thresholds['critical_max']:
            alerts.append({
                'level': 'critical',
                'message': f"Critical findings: {critical} (threshold: {thresholds['critical_max']})",
            })
        
        high = summary['by_severity'].get('WARNING', 0)
        if high > thresholds['high_max']:
            alerts.append({
                'level': 'high',
                'message': f"High findings: {high} (threshold: {thresholds['high_max']})",
            })
        
        if summary['total_findings'] > thresholds['total_max']:
            alerts.append({
                'level': 'medium',
                'message': f"Total findings: {summary['total_findings']} (threshold: {thresholds['total_max']})",
            })
        
        return alerts

# monitor = SemgrepMonitor("./src")
# output = monitor.scan()
# summary = monitor.parse_results(output)
# alerts = monitor.should_alert(summary)
"""

    def show_code(self):
        print("=== Semgrep Monitor ===")
        print(self.CODE[:600])

monitor = SemgrepMonitoring()
monitor.show_code()

CI/CD Integration

# cicd.py — Semgrep CI/CD with alerting
import json

class SemgrepCICD:
    GITHUB_ACTIONS = """
# .github/workflows/semgrep.yml
name: Semgrep Security Scan
on:
  push:
    branches: [main]
  pull_request:
  schedule:
    - cron: '0 8 * * 1'  # Weekly Monday 8am

jobs:
  semgrep:
    runs-on: ubuntu-latest
    container:
      image: semgrep/semgrep
    steps:
      - uses: actions/checkout@v4
      
      - name: Semgrep Scan
        run: semgrep ci --config auto --json -o results.json
        env:
          SEMGREP_APP_TOKEN: }
      
      - name: Parse Results
        if: always()
        run: |
          CRITICAL=$(cat results.json | python3 -c "
          import json, sys
          data = json.load(sys.stdin)
          critical = sum(1 for r in data.get('results', [])
                        if r.get('extra', {}).get('severity') == 'ERROR')
          print(critical)
          ")
          echo "Critical findings: $CRITICAL"
          if [ "$CRITICAL" -gt 0 ]; then
            echo "CRITICAL_FOUND=true" >> $GITHUB_ENV
          fi
      
      - name: Upload Results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: semgrep-results
          path: results.json
      
      - name: Slack Alert
        if: env.CRITICAL_FOUND == 'true'
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {
              "text": "CRITICAL: Semgrep found critical vulnerabilities in }"
            }
        env:
          SLACK_WEBHOOK_URL: }
      
      - name: Fail on Critical
        if: env.CRITICAL_FOUND == 'true'
        run: exit 1
"""

    ALERTING = {
        "slack": "Webhook → ส่ง message เมื่อพบ critical/high findings",
        "jira": "สร้าง Jira ticket อัตโนมัติสำหรับ findings ใหม่",
        "email": "ส่ง daily/weekly summary report ให้ security team",
        "pagerduty": "Alert เฉพาะ critical findings ที่ต้อง fix ด่วน",
        "dashboard": "Grafana dashboard แสดง trends, top findings, fix rate",
    }

    def show_pipeline(self):
        print("=== GitHub Actions ===")
        print(self.GITHUB_ACTIONS[:500])

    def show_alerting(self):
        print(f"\n=== Alerting Channels ===")
        for channel, desc in self.ALERTING.items():
            print(f"  [{channel}] {desc}")

cicd = SemgrepCICD()
cicd.show_pipeline()
cicd.show_alerting()

Dashboard & Reporting

# dashboard.py — Semgrep dashboard and reporting
import json
import random

class SemgrepDashboard:
    METRICS = {
        "total_findings": "จำนวน findings ทั้งหมด — target: ลดลงทุกสัปดาห์",
        "mttr": "Mean Time to Remediate — เวลาเฉลี่ยที่ใช้ fix finding",
        "fix_rate": "% ของ findings ที่ fix แล้วต่อสัปดาห์",
        "new_vs_fixed": "จำนวน findings ใหม่ vs fixed — ต้อง fixed > new",
        "coverage": "% ของ repos ที่มี Semgrep scan — target: 100%",
        "false_positive": "% ของ findings ที่เป็น false positive — ต้อง < 10%",
    }

    WEEKLY_REPORT = {
        "period": "Week 24, 2026",
        "repos_scanned": 45,
        "total_findings": 234,
        "critical": 3,
        "high": 28,
        "medium": 103,
        "low": 100,
        "new_findings": 12,
        "fixed_findings": 18,
        "trend": "improving (-6 net findings)",
        "top_categories": [
            ("SQL Injection", 15),
            ("XSS", 12),
            ("Hardcoded Secrets", 8),
            ("Insecure Deserialization", 5),
        ],
    }

    def show_metrics(self):
        print("=== Dashboard Metrics ===\n")
        for metric, desc in self.METRICS.items():
            print(f"  [{metric}] {desc}")

    def show_weekly_report(self):
        r = self.WEEKLY_REPORT
        print(f"\n=== Weekly Report ({r['period']}) ===")
        print(f"  Repos Scanned: {r['repos_scanned']}")
        print(f"  Total: {r['total_findings']} (Critical: {r['critical']}, High: {r['high']})")
        print(f"  New: {r['new_findings']}, Fixed: {r['fixed_findings']}")
        print(f"  Trend: {r['trend']}")
        print(f"\n  Top Categories:")
        for cat, count in r['top_categories']:
            print(f"    {cat}: {count}")

dashboard = SemgrepDashboard()
dashboard.show_metrics()
dashboard.show_weekly_report()

FAQ - คำถามที่พบบ่อย

Q: Semgrep กับ SonarQube อันไหนดีกว่า?

A: Semgrep: เร็วกว่า, rules เขียนง่ายกว่า (YAML), CI/CD friendly, open-source SonarQube: comprehensive กว่า (code quality + security), IDE integration, dashboard ดี เลือก Semgrep: ถ้าเน้น security, ต้องการ custom rules, CI/CD integration เลือก SonarQube: ถ้าต้องการ code quality + security รวมกัน, มี server dedicated ใช้ร่วมกัน: Semgrep ใน CI (fast feedback) + SonarQube สำหรับ deep analysis

Q: False positive เยอะไหม?

A: Semgrep มี false positive rate ต่ำกว่า traditional SAST — เพราะใช้ semantic analysis วิธีลด: ใช้ curated rulesets (p/owasp-top-ten), tune rules ตาม codebase, ใช้ nosemgrep comment suppress False positive management: mark as false positive ใน Semgrep App → ไม่แจ้งซ้ำ ทั่วไป: false positive ~5-15% — ดีกว่า SAST tools อื่นที่อาจสูง 30-50%

Q: Semgrep ฟรีไหม?

A: Semgrep OSS: ฟรี 100% — CLI scanner + community rules + custom rules Semgrep App (Cloud): Free tier สำหรับ small teams, Team/Enterprise plans มีค่าใช้จ่าย สิ่งที่ต้องจ่าย: dashboard, RBAC, advanced features, Semgrep Supply Chain (SCA) สำหรับส่วนใหญ่: Semgrep OSS + GitHub Actions ฟรีเพียงพอ

Q: ใช้เวลาสแกนนานไหม?

A: เร็วมาก — เป็นจุดเด่นของ Semgrep Benchmark: 10,000 files ใน < 60 วินาที, 100,000 files ใน < 5 นาที เปรียบเทียบ: SonarQube อาจใช้ 10-30 นาที สำหรับ codebase เดียวกัน เหตุผล: Semgrep ทำงาน locally ไม่ต้อง compile, ใช้ parallel processing เหมาะสำหรับ PR checks — ไม่ทำให้ CI pipeline ช้า

📖 บทความที่เกี่ยวข้อง

Libvirt KVM Monitoring และ Alertingอ่านบทความ → Semgrep SAST Infrastructure as Codeอ่านบทความ → Semgrep SAST Security Hardening ป้องกันแฮกอ่านบทความ → Semgrep SAST Production Setup Guideอ่านบทความ → Semgrep SAST Citizen Developerอ่านบทความ →

📚 ดูบทความทั้งหมด →