Semgrep SAST Monitoring และ Alerting คืออะไร
Semgrep เป็น open-source Static Application Security Testing (SAST) tool ที่ใช้วิเคราะห์ source code หาช่องโหว่ด้านความปลอดภัย bugs และ code quality issues โดยไม่ต้องรันโปรแกรม ใช้ pattern-based approach ที่เขียน rules ง่ายและเร็วกว่า traditional SAST tools การ setup monitoring และ alerting สำหรับ Semgrep ช่วยให้ทีม security ติดตามผลการสแกนอย่างต่อเนื่อง แจ้งเตือนเมื่อพบ vulnerabilities ใหม่ และ track trends ของ security posture ขององค์กร
Semgrep Fundamentals
# semgrep_basics.py — Semgrep fundamentals
import json
class SemgrepBasics:
FEATURES = {
"pattern_matching": {
"name": "Pattern Matching",
"description": "เขียน rules ด้วย code patterns — ง่ายกว่า regex, เข้าใจ AST",
},
"multi_language": {
"name": "Multi-Language",
"description": "รองรับ 30+ ภาษา: Python, JavaScript, Java, Go, C#, Ruby, PHP",
},
"fast": {
"name": "Fast Scanning",
"description": "สแกนเร็วมาก — 10,000+ files ใน < 1 นาที",
},
"cicd": {
"name": "CI/CD Integration",
"description": "GitHub Actions, GitLab CI, Jenkins — scan ทุก PR",
},
"custom_rules": {
"name": "Custom Rules",
"description": "เขียน rules เอง ด้วย YAML — ตรวจจับ patterns เฉพาะองค์กร",
},
}
COMMANDS = """
# Semgrep basic commands
# Install
pip install semgrep
# Scan with default rules
semgrep --config auto .
# Scan with specific ruleset
semgrep --config p/owasp-top-ten .
# Scan with custom rule
semgrep --config my-rules.yaml .
# JSON output
semgrep --config auto --json -o results.json .
# Scan specific language
semgrep --config auto --lang python .
# CI mode (fail on findings)
semgrep ci --config auto
"""
def show_features(self):
print("=== Semgrep Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}] {feat['description']}")
def show_commands(self):
print(f"\n=== Basic Commands ===")
print(self.COMMANDS[:400])
basics = SemgrepBasics()
basics.show_features()
basics.show_commands()
Custom Rules
# custom_rules.py — Semgrep custom rules
import json
class SemgrepRules:
SQL_INJECTION = """
# sql-injection.yaml — Detect SQL injection
rules:
- id: sql-injection-format-string
patterns:
- pattern: |
cursor.execute(f"... {$VAR} ...")
- pattern-not: |
cursor.execute(f"... {$CONST} ...", ...)
message: "SQL injection via f-string: use parameterized queries"
languages: [python]
severity: ERROR
metadata:
category: security
cwe: ["CWE-89"]
owasp: ["A03:2021"]
"""
HARDCODED_SECRET = """
# hardcoded-secret.yaml — Detect hardcoded secrets
rules:
- id: hardcoded-api-key
patterns:
- pattern: |
$KEY = "..."
- metavariable-regex:
metavariable: $KEY
regex: "(api_key|apikey|secret|password|token|auth)"
- metavariable-regex:
metavariable: $...EXPR
regex: "[a-zA-Z0-9]{20,}"
message: "Hardcoded secret detected: use environment variables"
languages: [python, javascript, java]
severity: WARNING
metadata:
category: security
cwe: ["CWE-798"]
"""
INSECURE_DESERIALIZE = """
# insecure-deserialize.yaml
rules:
- id: insecure-pickle-load
pattern: pickle.load(...)
message: "pickle.load() is vulnerable to arbitrary code execution"
languages: [python]
severity: ERROR
metadata:
cwe: ["CWE-502"]
fix: "Use json.load() or yaml.safe_load() instead"
"""
def show_rules(self):
print("=== Custom Rules ===\n")
print("[SQL Injection]")
print(self.SQL_INJECTION[:300])
print("\n[Hardcoded Secret]")
print(self.HARDCODED_SECRET[:300])
rules = SemgrepRules()
rules.show_rules()
Monitoring Pipeline
# monitoring.py — Semgrep monitoring pipeline
import json
class SemgrepMonitoring:
CODE = """
# semgrep_monitor.py — Monitor Semgrep scan results
import subprocess
import json
from datetime import datetime
from pathlib import Path
class SemgrepMonitor:
def __init__(self, project_dir, config="auto"):
self.project_dir = project_dir
self.config = config
self.history_file = Path("semgrep_history.json")
def scan(self):
'''Run Semgrep scan'''
cmd = [
"semgrep", "--config", self.config,
"--json", "--quiet",
self.project_dir,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode in [0, 1]: # 0=no findings, 1=findings
return json.loads(result.stdout)
return {"error": result.stderr}
def parse_results(self, scan_output):
'''Parse scan results into summary'''
results = scan_output.get('results', [])
by_severity = {}
by_category = {}
by_file = {}
for r in results:
sev = r.get('extra', {}).get('severity', 'unknown')
by_severity[sev] = by_severity.get(sev, 0) + 1
cat = r.get('extra', {}).get('metadata', {}).get('category', 'unknown')
by_category[cat] = by_category.get(cat, 0) + 1
file = r.get('path', 'unknown')
by_file[file] = by_file.get(file, 0) + 1
return {
'timestamp': datetime.utcnow().isoformat(),
'total_findings': len(results),
'by_severity': by_severity,
'by_category': by_category,
'top_files': dict(sorted(by_file.items(), key=lambda x: -x[1])[:10]),
'critical_findings': [
{
'rule': r.get('check_id', ''),
'file': r.get('path', ''),
'line': r.get('start', {}).get('line', 0),
'message': r.get('extra', {}).get('message', ''),
}
for r in results
if r.get('extra', {}).get('severity') == 'ERROR'
],
}
def compare_with_previous(self, current):
'''Compare with previous scan'''
if self.history_file.exists():
history = json.loads(self.history_file.read_text())
previous = history[-1] if history else None
else:
previous = None
if previous:
return {
'previous_total': previous['total_findings'],
'current_total': current['total_findings'],
'delta': current['total_findings'] - previous['total_findings'],
'trend': 'improving' if current['total_findings'] < previous['total_findings'] else 'worsening',
}
return {'note': 'No previous scan to compare'}
def save_history(self, summary):
'''Save scan to history'''
history = []
if self.history_file.exists():
history = json.loads(self.history_file.read_text())
history.append(summary)
# Keep last 100 scans
if len(history) > 100:
history = history[-100:]
self.history_file.write_text(json.dumps(history, indent=2))
def should_alert(self, summary, thresholds=None):
'''Determine if alert should fire'''
if thresholds is None:
thresholds = {
'critical_max': 0,
'high_max': 5,
'total_max': 50,
}
alerts = []
critical = summary['by_severity'].get('ERROR', 0)
if critical > thresholds['critical_max']:
alerts.append({
'level': 'critical',
'message': f"Critical findings: {critical} (threshold: {thresholds['critical_max']})",
})
high = summary['by_severity'].get('WARNING', 0)
if high > thresholds['high_max']:
alerts.append({
'level': 'high',
'message': f"High findings: {high} (threshold: {thresholds['high_max']})",
})
if summary['total_findings'] > thresholds['total_max']:
alerts.append({
'level': 'medium',
'message': f"Total findings: {summary['total_findings']} (threshold: {thresholds['total_max']})",
})
return alerts
# monitor = SemgrepMonitor("./src")
# output = monitor.scan()
# summary = monitor.parse_results(output)
# alerts = monitor.should_alert(summary)
"""
def show_code(self):
print("=== Semgrep Monitor ===")
print(self.CODE[:600])
monitor = SemgrepMonitoring()
monitor.show_code()
CI/CD Integration
# cicd.py — Semgrep CI/CD with alerting
import json
class SemgrepCICD:
GITHUB_ACTIONS = """
# .github/workflows/semgrep.yml
name: Semgrep Security Scan
on:
push:
branches: [main]
pull_request:
schedule:
- cron: '0 8 * * 1' # Weekly Monday 8am
jobs:
semgrep:
runs-on: ubuntu-latest
container:
image: semgrep/semgrep
steps:
- uses: actions/checkout@v4
- name: Semgrep Scan
run: semgrep ci --config auto --json -o results.json
env:
SEMGREP_APP_TOKEN: }
- name: Parse Results
if: always()
run: |
CRITICAL=$(cat results.json | python3 -c "
import json, sys
data = json.load(sys.stdin)
critical = sum(1 for r in data.get('results', [])
if r.get('extra', {}).get('severity') == 'ERROR')
print(critical)
")
echo "Critical findings: $CRITICAL"
if [ "$CRITICAL" -gt 0 ]; then
echo "CRITICAL_FOUND=true" >> $GITHUB_ENV
fi
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: semgrep-results
path: results.json
- name: Slack Alert
if: env.CRITICAL_FOUND == 'true'
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "CRITICAL: Semgrep found critical vulnerabilities in }"
}
env:
SLACK_WEBHOOK_URL: }
- name: Fail on Critical
if: env.CRITICAL_FOUND == 'true'
run: exit 1
"""
ALERTING = {
"slack": "Webhook → ส่ง message เมื่อพบ critical/high findings",
"jira": "สร้าง Jira ticket อัตโนมัติสำหรับ findings ใหม่",
"email": "ส่ง daily/weekly summary report ให้ security team",
"pagerduty": "Alert เฉพาะ critical findings ที่ต้อง fix ด่วน",
"dashboard": "Grafana dashboard แสดง trends, top findings, fix rate",
}
def show_pipeline(self):
print("=== GitHub Actions ===")
print(self.GITHUB_ACTIONS[:500])
def show_alerting(self):
print(f"\n=== Alerting Channels ===")
for channel, desc in self.ALERTING.items():
print(f" [{channel}] {desc}")
cicd = SemgrepCICD()
cicd.show_pipeline()
cicd.show_alerting()
Dashboard & Reporting
# dashboard.py — Semgrep dashboard and reporting
import json
import random
class SemgrepDashboard:
METRICS = {
"total_findings": "จำนวน findings ทั้งหมด — target: ลดลงทุกสัปดาห์",
"mttr": "Mean Time to Remediate — เวลาเฉลี่ยที่ใช้ fix finding",
"fix_rate": "% ของ findings ที่ fix แล้วต่อสัปดาห์",
"new_vs_fixed": "จำนวน findings ใหม่ vs fixed — ต้อง fixed > new",
"coverage": "% ของ repos ที่มี Semgrep scan — target: 100%",
"false_positive": "% ของ findings ที่เป็น false positive — ต้อง < 10%",
}
WEEKLY_REPORT = {
"period": "Week 24, 2026",
"repos_scanned": 45,
"total_findings": 234,
"critical": 3,
"high": 28,
"medium": 103,
"low": 100,
"new_findings": 12,
"fixed_findings": 18,
"trend": "improving (-6 net findings)",
"top_categories": [
("SQL Injection", 15),
("XSS", 12),
("Hardcoded Secrets", 8),
("Insecure Deserialization", 5),
],
}
def show_metrics(self):
print("=== Dashboard Metrics ===\n")
for metric, desc in self.METRICS.items():
print(f" [{metric}] {desc}")
def show_weekly_report(self):
r = self.WEEKLY_REPORT
print(f"\n=== Weekly Report ({r['period']}) ===")
print(f" Repos Scanned: {r['repos_scanned']}")
print(f" Total: {r['total_findings']} (Critical: {r['critical']}, High: {r['high']})")
print(f" New: {r['new_findings']}, Fixed: {r['fixed_findings']}")
print(f" Trend: {r['trend']}")
print(f"\n Top Categories:")
for cat, count in r['top_categories']:
print(f" {cat}: {count}")
dashboard = SemgrepDashboard()
dashboard.show_metrics()
dashboard.show_weekly_report()
FAQ - คำถามที่พบบ่อย
Q: Semgrep กับ SonarQube อันไหนดีกว่า?
A: Semgrep: เร็วกว่า, rules เขียนง่ายกว่า (YAML), CI/CD friendly, open-source SonarQube: comprehensive กว่า (code quality + security), IDE integration, dashboard ดี เลือก Semgrep: ถ้าเน้น security, ต้องการ custom rules, CI/CD integration เลือก SonarQube: ถ้าต้องการ code quality + security รวมกัน, มี server dedicated ใช้ร่วมกัน: Semgrep ใน CI (fast feedback) + SonarQube สำหรับ deep analysis
Q: False positive เยอะไหม?
A: Semgrep มี false positive rate ต่ำกว่า traditional SAST — เพราะใช้ semantic analysis วิธีลด: ใช้ curated rulesets (p/owasp-top-ten), tune rules ตาม codebase, ใช้ nosemgrep comment suppress False positive management: mark as false positive ใน Semgrep App → ไม่แจ้งซ้ำ ทั่วไป: false positive ~5-15% — ดีกว่า SAST tools อื่นที่อาจสูง 30-50%
Q: Semgrep ฟรีไหม?
A: Semgrep OSS: ฟรี 100% — CLI scanner + community rules + custom rules Semgrep App (Cloud): Free tier สำหรับ small teams, Team/Enterprise plans มีค่าใช้จ่าย สิ่งที่ต้องจ่าย: dashboard, RBAC, advanced features, Semgrep Supply Chain (SCA) สำหรับส่วนใหญ่: Semgrep OSS + GitHub Actions ฟรีเพียงพอ
Q: ใช้เวลาสแกนนานไหม?
A: เร็วมาก — เป็นจุดเด่นของ Semgrep Benchmark: 10,000 files ใน < 60 วินาที, 100,000 files ใน < 5 นาที เปรียบเทียบ: SonarQube อาจใช้ 10-30 นาที สำหรับ codebase เดียวกัน เหตุผล: Semgrep ทำงาน locally ไม่ต้อง compile, ใช้ parallel processing เหมาะสำหรับ PR checks — ไม่ทำให้ CI pipeline ช้า
