SiamCafe.net Blog
Technology

Semgrep SAST Batch Processing Pipeline Scan Security ทุก Repos อัตโนมัติ

semgrep sast batch processing pipeline
Semgrep SAST Batch Processing Pipeline | SiamCafe Blog
2025-08-28· อ. บอม — SiamCafe.net· 1,311 คำ

Semgrep SAST ?????????????????????

Semgrep ???????????? Static Application Security Testing (SAST) tool ????????? open source ???????????????????????????????????? source code ????????????????????? vulnerabilities, bugs ????????? code smells ?????????????????????????????? run application ?????????????????? 30+ ???????????? (Python, JavaScript, TypeScript, Go, Java, Ruby, PHP, C#) ????????? pattern-based matching ???????????????????????? rules ???????????????????????? tools ????????????

Batch Processing Pipeline ????????????????????? ????????? scan ???????????? repositories ???????????????????????????????????? batch ???????????????????????? scan ???????????? repo ??????????????????????????? ????????????????????????????????? 50-500+ repositories ????????????????????? scan ?????????????????????????????? schedule, Security audit ???????????? codebase ???????????? release, Compliance scanning ????????????????????? policy violations ????????? repos, Migration ????????????????????? deprecated APIs ???????????? framework upgrade

???????????????????????? Semgrep ????????????????????? (scan 100K lines ?????? 10 ??????????????????), Rules ??????????????????????????? pattern syntax ??????????????? code ????????????, ????????? open source (CLI + rules), Low false positive rate ???????????????????????? SAST tools ????????????, CI/CD integration ????????????

????????????????????? Semgrep ?????????????????? Batch Scanning

Setup Semgrep ?????????????????? scan ???????????? repos

# === Semgrep Batch Scanning Setup ===

# 1. Install Semgrep
pip install semgrep

# Verify installation
semgrep --version

# 2. Basic scan
semgrep scan --config auto .

# 3. Scan with specific rulesets
semgrep scan \
  --config p/owasp-top-ten \
  --config p/security-audit \
  --config p/secrets \
  --json --output results.json \
  /path/to/code

# 4. Batch scanning configuration
cat > batch_config.yaml << 'EOF'
batch_scanning:
  repositories:
    - name: "api-service"
      url: "git@github.com:company/api-service.git"
      branch: "main"
      language: "python"
      
    - name: "web-frontend"
      url: "git@github.com:company/web-frontend.git"
      branch: "main"
      language: "typescript"
      
    - name: "mobile-backend"
      url: "git@github.com:company/mobile-backend.git"
      branch: "develop"
      language: "go"
      
    - name: "payment-service"
      url: "git@github.com:company/payment-service.git"
      branch: "main"
      language: "java"
      priority: "critical"

  scan_config:
    rulesets:
      - "p/owasp-top-ten"
      - "p/security-audit"
      - "p/secrets"
      - "p/ci"
    severity_filter: ["ERROR", "WARNING"]
    max_target_bytes: 5000000
    timeout: 300
    jobs: 4
    
  output:
    format: "json"
    directory: "/tmp/semgrep-results"
    aggregate_report: true
    
  schedule:
    cron: "0 2 * * *"  # Daily at 2 AM
    on_failure: "notify_slack"
EOF

# 5. Clone and scan script
cat > clone_and_scan.sh << 'BASH'
#!/bin/bash
WORKSPACE="/tmp/semgrep-workspace"
RESULTS="/tmp/semgrep-results"
mkdir -p "$WORKSPACE" "$RESULTS"

REPOS=(
  "git@github.com:company/api-service.git"
  "git@github.com:company/web-frontend.git"
  "git@github.com:company/mobile-backend.git"
)

for repo in ""; do
  name=$(basename "$repo" .git)
  echo "Scanning $name..."
  
  git clone --depth 1 "$repo" "$WORKSPACE/$name" 2>/dev/null || \
    git -C "$WORKSPACE/$name" pull
  
  semgrep scan \
    --config p/owasp-top-ten \
    --config p/security-audit \
    --json \
    --output "$RESULTS/.json" \
    "$WORKSPACE/$name"
  
  echo "Done: $name"
done

echo "All scans complete"
BASH

chmod +x clone_and_scan.sh
echo "Batch scanning configured"

??????????????? Batch Processing Pipeline

Python pipeline ?????????????????? batch security scanning

#!/usr/bin/env python3
# batch_scanner.py ??? Semgrep Batch Processing Pipeline
import json
import logging
import os
from typing import Dict, List
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("scanner")

class SemgrepBatchScanner:
    """Batch processing pipeline for Semgrep SAST scanning"""
    
    def __init__(self, workspace="/tmp/semgrep"):
        self.workspace = workspace
        self.results = {}
        self.stats = {"total_repos": 0, "scanned": 0, "failed": 0, "findings": 0}
    
    def scan_repo(self, repo_name, repo_path, rulesets=None):
        """Scan single repository"""
        if rulesets is None:
            rulesets = ["p/owasp-top-ten", "p/security-audit"]
        
        # Simulate semgrep scan results
        findings = [
            {
                "check_id": "python.lang.security.audit.dangerous-exec",
                "path": f"{repo_name}/app/utils.py",
                "start": {"line": 42, "col": 5},
                "severity": "ERROR",
                "message": "Detected use of exec() which can lead to code injection",
                "metadata": {"cwe": "CWE-94", "owasp": "A03:2021"},
            },
            {
                "check_id": "python.lang.security.audit.hardcoded-password",
                "path": f"{repo_name}/config/settings.py",
                "start": {"line": 15, "col": 1},
                "severity": "WARNING",
                "message": "Detected hardcoded password in source code",
                "metadata": {"cwe": "CWE-798", "owasp": "A07:2021"},
            },
        ]
        
        self.results[repo_name] = {
            "findings": findings,
            "scan_time": "12.3s",
            "files_scanned": 156,
            "lines_scanned": 28500,
        }
        self.stats["scanned"] += 1
        self.stats["findings"] += len(findings)
        
        return findings
    
    def batch_scan(self, repos):
        """Scan multiple repositories"""
        self.stats["total_repos"] = len(repos)
        all_findings = []
        
        for repo in repos:
            try:
                findings = self.scan_repo(
                    repo["name"],
                    repo.get("path", f"{self.workspace}/{repo['name']}"),
                    repo.get("rulesets"),
                )
                all_findings.extend(findings)
                logger.info(f"Scanned {repo['name']}: {len(findings)} findings")
            except Exception as e:
                self.stats["failed"] += 1
                logger.error(f"Failed to scan {repo['name']}: {e}")
        
        return all_findings
    
    def aggregate_results(self):
        """Aggregate results across all repos"""
        by_severity = {"ERROR": 0, "WARNING": 0, "INFO": 0}
        by_cwe = {}
        by_repo = {}
        
        for repo_name, result in self.results.items():
            by_repo[repo_name] = len(result["findings"])
            for finding in result["findings"]:
                severity = finding.get("severity", "INFO")
                by_severity[severity] = by_severity.get(severity, 0) + 1
                
                cwe = finding.get("metadata", {}).get("cwe", "Unknown")
                by_cwe[cwe] = by_cwe.get(cwe, 0) + 1
        
        return {
            "summary": self.stats,
            "by_severity": by_severity,
            "by_cwe": dict(sorted(by_cwe.items(), key=lambda x: x[1], reverse=True)),
            "by_repo": dict(sorted(by_repo.items(), key=lambda x: x[1], reverse=True)),
            "scan_date": datetime.now().isoformat(),
        }

# Demo
scanner = SemgrepBatchScanner()
repos = [
    {"name": "api-service", "rulesets": ["p/owasp-top-ten"]},
    {"name": "web-frontend", "rulesets": ["p/security-audit"]},
    {"name": "payment-service", "rulesets": ["p/owasp-top-ten", "p/secrets"]},
]

findings = scanner.batch_scan(repos)
report = scanner.aggregate_results()

print(f"Batch Scan Report:")
print(f"  Repos: {report['summary']['scanned']}/{report['summary']['total_repos']}")
print(f"  Total findings: {report['summary']['findings']}")
print(f"\nBy Severity:")
for sev, count in report["by_severity"].items():
    print(f"  {sev}: {count}")
print(f"\nBy CWE:")
for cwe, count in report["by_cwe"].items():
    print(f"  {cwe}: {count}")

Custom Rules ????????????????????????????????????

??????????????? Semgrep rules ?????????????????????????????????

# === Custom Semgrep Rules ===

# 1. Custom rules file
cat > .semgrep/custom-rules.yaml << 'EOF'
rules:
  # Rule 1: Detect hardcoded API keys
  - id: custom.secrets.hardcoded-api-key
    patterns:
      - pattern: |
          $KEY = "..."
      - metavariable-regex:
          metavariable: $KEY
          regex: (api_key|apikey|api_secret|secret_key|access_token)
    message: "Hardcoded API key detected. Use environment variables instead."
    severity: ERROR
    languages: [python, javascript, typescript]
    metadata:
      cwe: "CWE-798"
      owasp: "A07:2021"
      category: "security"
      fix: "Use os.environ['API_KEY'] or process.env.API_KEY"

  # Rule 2: SQL Injection via string formatting
  - id: custom.security.sql-injection-format
    patterns:
      - pattern: |
          cursor.execute(f"... {$VAR} ...")
      - pattern: |
          cursor.execute("..." + $VAR + "...")
      - pattern: |
          cursor.execute("..." % $VAR)
    message: "Potential SQL injection via string formatting. Use parameterized queries."
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-89"
      owasp: "A03:2021"
      fix: "cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))"

  # Rule 3: Missing input validation (Flask)
  - id: custom.security.flask-missing-validation
    patterns:
      - pattern: |
          @app.route(...)
          def $FUNC(...):
              $DATA = request.args.get(...)
              ...
              return ...
      - pattern-not: |
          @app.route(...)
          def $FUNC(...):
              $DATA = request.args.get(...)
              ...
              validate(...)
              ...
    message: "Flask endpoint uses request input without validation."
    severity: WARNING
    languages: [python]
    metadata:
      cwe: "CWE-20"

  # Rule 4: Deprecated function usage
  - id: custom.quality.deprecated-function
    pattern-either:
      - pattern: datetime.utcnow()
      - pattern: datetime.utcfromtimestamp(...)
    message: "datetime.utcnow() is deprecated in Python 3.12+. Use datetime.now(timezone.utc)."
    severity: WARNING
    languages: [python]
    metadata:
      category: "quality"
      fix: "datetime.now(timezone.utc)"

  # Rule 5: Company policy - no print statements in production
  - id: custom.policy.no-print-in-prod
    pattern: print(...)
    paths:
      include:
        - "src/**"
      exclude:
        - "tests/**"
        - "scripts/**"
    message: "Use logging module instead of print() in production code."
    severity: INFO
    languages: [python]
    metadata:
      category: "policy"
EOF

# 2. Test custom rules
semgrep scan --config .semgrep/custom-rules.yaml --test

# 3. Run with custom + community rules
semgrep scan \
  --config .semgrep/custom-rules.yaml \
  --config p/owasp-top-ten \
  --json \
  src/

echo "Custom rules configured"

CI/CD Integration

????????? Semgrep batch scanning ????????? CI/CD

# === CI/CD Integration ===

# 1. GitHub Actions ??? Semgrep in CI
cat > .github/workflows/semgrep.yml << 'EOF'
name: Semgrep Security Scan

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * 1'  # Weekly Monday

jobs:
  semgrep:
    runs-on: ubuntu-latest
    container:
      image: semgrep/semgrep:latest
    
    steps:
      - uses: actions/checkout@v4

      - name: Run Semgrep
        run: |
          semgrep scan \
            --config p/owasp-top-ten \
            --config p/security-audit \
            --config p/secrets \
            --config .semgrep/custom-rules.yaml \
            --sarif --output semgrep.sarif \
            --json --output semgrep.json \
            --error \
            --severity ERROR

      - name: Upload SARIF
        uses: github/codeql-action/upload-sarif@v3
        with:
          sarif_file: semgrep.sarif
        if: always()

      - name: Check for Critical Findings
        run: |
          python3 -c "
          import json, sys
          with open('semgrep.json') as f:
              data = json.load(f)
          errors = [r for r in data.get('results', []) if r['extra']['severity'] == 'ERROR']
          if errors:
              print(f'FAIL: {len(errors)} critical findings')
              for e in errors[:5]:
                  print(f'  {e[\"check_id\"]}: {e[\"path\"]}:{e[\"start\"][\"line\"]}')
              sys.exit(1)
          print('PASS: No critical findings')
          "

      - name: Post PR Comment
        if: github.event_name == 'pull_request' && failure()
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const data = JSON.parse(fs.readFileSync('semgrep.json'));
            const errors = data.results.filter(r => r.extra.severity === 'ERROR');
            let body = `## Semgrep Security Scan\n\n`;
            body += `Found **** critical issues:\n\n`;
            errors.slice(0, 10).forEach(e => {
              body += `- **** in \`:\`\n`;
              body += `  \n\n`;
            });
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: body
            });
EOF

# 2. Batch scan all repos (nightly)
cat > .github/workflows/batch-scan.yml << 'EOF'
name: Nightly Batch Security Scan

on:
  schedule:
    - cron: '0 2 * * *'

jobs:
  batch-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Install Semgrep
        run: pip install semgrep

      - name: Run Batch Scanner
        env:
          GITHUB_TOKEN: }
        run: python3 scripts/batch_scanner.py

      - name: Upload Results
        uses: actions/upload-artifact@v4
        with:
          name: batch-scan-results
          path: /tmp/semgrep-results/
EOF

echo "CI/CD integration configured"

Reporting ????????? Analytics

???????????????????????????????????????????????????????????????????????????

#!/usr/bin/env python3
# scan_report.py ??? Semgrep Scan Report Generator
import json
import logging
from typing import Dict, List
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("report")

class ScanReportGenerator:
    def __init__(self):
        pass
    
    def generate_report(self, scan_results):
        """Generate comprehensive scan report"""
        total_findings = sum(len(r["findings"]) for r in scan_results.values())
        
        by_severity = {}
        by_owasp = {}
        by_language = {}
        top_issues = {}
        
        for repo, result in scan_results.items():
            for finding in result["findings"]:
                sev = finding.get("severity", "INFO")
                by_severity[sev] = by_severity.get(sev, 0) + 1
                
                owasp = finding.get("metadata", {}).get("owasp", "Other")
                by_owasp[owasp] = by_owasp.get(owasp, 0) + 1
                
                check_id = finding.get("check_id", "unknown")
                top_issues[check_id] = top_issues.get(check_id, 0) + 1
        
        return {
            "report_date": datetime.now().strftime("%Y-%m-%d %H:%M"),
            "summary": {
                "repos_scanned": len(scan_results),
                "total_findings": total_findings,
                "critical": by_severity.get("ERROR", 0),
                "warnings": by_severity.get("WARNING", 0),
                "info": by_severity.get("INFO", 0),
            },
            "by_severity": by_severity,
            "by_owasp": dict(sorted(by_owasp.items(), key=lambda x: x[1], reverse=True)),
            "top_issues": dict(sorted(top_issues.items(), key=lambda x: x[1], reverse=True)[:10]),
            "trend": {
                "last_week": 45,
                "this_week": total_findings,
                "change": f"{((total_findings - 45) / 45 * 100):+.1f}%" if total_findings != 45 else "0%",
            },
            "recommendations": [
                "Fix all ERROR severity findings within 7 days",
                "Enable Semgrep in all CI pipelines (3 repos missing)",
                "Add custom rules for company-specific patterns",
                "Schedule security training for top 3 most common issues",
            ],
        }

# Demo
results = {
    "api-service": {
        "findings": [
            {"severity": "ERROR", "check_id": "sql-injection", "metadata": {"owasp": "A03:2021"}},
            {"severity": "WARNING", "check_id": "hardcoded-secret", "metadata": {"owasp": "A07:2021"}},
            {"severity": "ERROR", "check_id": "sql-injection", "metadata": {"owasp": "A03:2021"}},
        ],
    },
    "web-frontend": {
        "findings": [
            {"severity": "WARNING", "check_id": "xss-reflected", "metadata": {"owasp": "A03:2021"}},
            {"severity": "INFO", "check_id": "no-csrf-token", "metadata": {"owasp": "A01:2021"}},
        ],
    },
    "payment-service": {
        "findings": [
            {"severity": "ERROR", "check_id": "hardcoded-secret", "metadata": {"owasp": "A07:2021"}},
        ],
    },
}

generator = ScanReportGenerator()
report = generator.generate_report(results)
print(f"Security Scan Report ??? {report['report_date']}")
print(f"  Repos: {report['summary']['repos_scanned']}")
print(f"  Findings: {report['summary']['total_findings']} (Critical: {report['summary']['critical']})")
print(f"  Trend: {report['trend']['change']} vs last week")
print(f"\nTop Issues:")
for issue, count in report["top_issues"].items():
    print(f"  {issue}: {count}")
print(f"\nRecommendations:")
for rec in report["recommendations"]:
    print(f"  - {rec}")

FAQ ??????????????????????????????????????????

Q: Semgrep ????????? SonarQube ??????????????????????????????????

A: Semgrep ???????????? security-focused SAST ????????????????????? ??????????????? custom rules ????????????????????? (pattern-based) CLI-first ??????????????? CI/CD ???????????? ????????? open source (CLI + 2,000+ rules) SonarQube ???????????? code quality + security ???????????????????????????????????? (bugs, code smells, coverage, duplications) ?????? UI dashboard ????????? community edition ????????? (??????????????? features) Enterprise ??????????????? $150/year ??????????????? ?????????????????????????????? Semgrep ?????????????????? security scanning ?????? CI (????????????, block PRs), SonarQube ?????????????????? overall code quality tracking (dashboard, trends) ????????????????????????????????????????????????????????? Semgrep ????????????????????? security, SonarQube ????????????????????? code quality

Q: Semgrep scan ???????????????????????????????????? repo ?????????????

A: Semgrep ????????????????????? ???????????????????????? SAST tools ???????????? ???????????????????????????????????? 10K lines ?????? 1-3 ??????????????????, 100K lines ?????? 5-15 ??????????????????, 1M lines ?????? 30-90 ?????????????????? ???????????????????????? SonarQube 5-10x, ???????????????????????? CodeQL 3-5x ??????????????? Semgrep ????????? build code (syntax-based matching) ?????????????????? batch scanning 100 repos (?????????????????? 50K lines) ??????????????????????????????????????? 15-30 ???????????? (sequential) ???????????? 5-10 ???????????? (parallel 4 workers) tips ??????????????????????????????????????? ????????? --jobs flag ?????????????????? parallel scanning, exclude ?????????????????????????????????????????? scan (node_modules, vendor), ????????? --max-target-bytes ???????????????????????????????????????

Q: Custom rules ??????????????????????????????????

A: Semgrep rules ?????????????????????????????????????????????????????????????????? SAST tools ??????????????? pattern syntax ??????????????? code ???????????? ???????????????????????? ?????? eval() ?????? Python ????????? pattern: eval(...) ?????? SQL injection ????????? pattern: cursor.execute(f"...{$VAR}...") ????????? metavariables ($VAR, $FUNC) ????????????????????????????????????????????????????????? ????????? pattern-either ?????????????????? multiple patterns, pattern-not ?????????????????? exceptions ??????????????????????????????????????? 1-2 ????????????????????? ????????? semgrep.dev/learn Semgrep Playground (semgrep.dev/playground) ??????????????? rules online ???????????????????????? ?????????????????? rules ????????????????????? ????????? taint tracking (data flow analysis) ???????????? Semgrep Pro ??????????????????

Q: Batch scanning ????????? repos ????????????????????????????

A: ???????????????????????????????????????????????????????????????????????????????????? Security posture visibility ??????????????????????????????????????? repos ??????????????? vulnerabilities ???????????????????????????, Compliance ???????????? prove ????????? scan ????????? repos ????????? policy (SOC 2, ISO 27001), Dependency tracking ?????? repos ?????????????????? library ??????????????? CVE, Policy enforcement ??????????????????????????????????????? repos follow coding standards ???????????????????????????????????? ?????? repos ???????????? (???????????????????????? 10) scan ?????? CI ???????????????????????? repo ?????????????????????, ??????????????? compliance requirements ??????????????? ???????????????????????? CI per-repo scan ???????????? ??????????????? repos ????????????????????? (20+) ??????????????????????????? batch scanning ?????????????????? visibility ????????? compliance

📖 บทความที่เกี่ยวข้อง

Semgrep SAST Production Setup Guideอ่านบทความ → Semgrep SAST Citizen Developerอ่านบทความ → Semgrep SAST Microservices Architectureอ่านบทความ → Semgrep SAST Troubleshooting แก้ปัญหาอ่านบทความ → Semgrep SAST Security Hardening ป้องกันแฮกอ่านบทความ →

📚 ดูบทความทั้งหมด →