SiamCafe · Blog
Semgrep SAST กับ Stream Processing — วิธีใช้
บทความ

Semgrep SAST กับ Stream Processing — วิธีใช้

เผยแพร่ 28 พฤษภาคม 2569

Semgrep SAST

Semgrep Static Application Security Testing วิเคราะห์โค้ดหาช่องโหว่ Security Pattern Matching 30+ ภาษา เร็ว CI/CD Community Rules 3000+ กฎ

Stream Processing ประมวลผล Real-time ทันที Kafka Streams Flink Spark Streaming Event Processing Analytics IoT Log Processing

Semgrep Setup และ Rules

# === Semgrep Setup ===

# 1. ติดตั้ง
# pip install semgrep
# brew install semgrep

# 2. Quick Scan
# semgrep --config auto .
# semgrep --config p/python .
# semgrep --config p/security-audit .
# semgrep --config p/owasp-top-ten .

# 3. Custom Rule — ตรวจหา Hardcoded Secrets
# rules:
#   - id: hardcoded-kafka-password
#     patterns:
#       - pattern: |
#           $CONFIG = {..., "sasl.password": "...", ...}
#     message: "Hardcoded Kafka password detected. Use environment variables."
#     severity: ERROR
#     languages: [python]
#     metadata:
#       cwe: "CWE-798: Use of Hard-coded Credentials"
#       owasp: "A07:2021 - Identification and Authentication Failures"

# 4. Custom Rule — SQL Injection in Stream
# rules:
#   - id: sql-injection-stream
#     patterns:
#       - pattern: |
#           cursor.execute(f"... {$VAR} ...")
#       - pattern-not: |
#           cursor.execute("...", (...,))
#     message: "Possible SQL injection. Use parameterized queries."
#     severity: ERROR
#     languages: [python]

# 5. Custom Rule — Missing TLS in Kafka
# rules:
#   - id: kafka-no-tls
#     patterns:
#       - pattern: |
#           KafkaProducer(..., security_protocol="PLAINTEXT", ...)
#     message: "Kafka connection without TLS. Use SASL_SSL."
#     severity: WARNING
#     languages: [python]

from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class SemgrepRule:
    rule_id: str
    severity: str
    description: str
    languages: List[str]
    cwe: str = ""

class SemgrepScanner:
    """Semgrep Security Scanner"""

    def __init__(self):
        self.rules: List[SemgrepRule] = []
        self.findings: List[dict] = []

    def add_rule(self, rule: SemgrepRule):
        self.rules.append(rule)

    def scan(self, code: str, filename: str) -> List[dict]:
        """จำลอง Scan"""
        findings = []
        for rule in self.rules:
            if "password" in code.lower() and "hardcoded" in rule.rule_id:
                findings.append({
                    "rule": rule.rule_id,
                    "severity": rule.severity,
                    "message": rule.description,
                    "file": filename,
                })
        self.findings.extend(findings)
        return findings

    def show_report(self):
        print(f"\n{'='*55}")
        print(f"Semgrep Scan Report")
        print(f"{'='*55}")
        print(f"  Rules: {len(self.rules)}")
        print(f"  Findings: {len(self.findings)}")

        by_severity = {}
        for f in self.findings:
            by_severity.setdefault(f["severity"], []).append(f)

        for severity, items in by_severity.items():
            print(f"\n  [{severity}] ({len(items)})")
            for item in items:
                print(f"    {item['rule']}: {item['message']}")
                print(f"      File: {item['file']}")

scanner = SemgrepScanner()
rules = [
    SemgrepRule("hardcoded-kafka-password", "ERROR", "Hardcoded Kafka password", ["python"], "CWE-798"),
    SemgrepRule("sql-injection-stream", "ERROR", "SQL injection in stream query", ["python"], "CWE-89"),
    SemgrepRule("kafka-no-tls", "WARNING", "Kafka without TLS encryption", ["python"], "CWE-319"),
    SemgrepRule("missing-auth-grpc", "ERROR", "Missing authentication in gRPC", ["python"], "CWE-306"),
    SemgrepRule("insecure-deserialization", "ERROR", "Insecure deserialization in stream", ["python"], "CWE-502"),
]

for rule in rules:
    scanner.add_rule(rule)

print("Semgrep Rules for Stream Processing:")
for rule in rules:
    print(f"  [{rule.severity}] {rule.rule_id}: {rule.description} ({rule.cwe})")

Stream Processing Security

# stream_security.py — Secure Stream Processing
from dataclasses import dataclass
from typing import Dict, List

# Kafka Secure Configuration
kafka_secure_config = {
    "bootstrap.servers": "kafka-1:9093, kafka-2:9093, kafka-3:9093",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "stream-app",  # จาก Environment Variable
    # "sasl.password": os.environ["KAFKA_PASSWORD"],  # ห้าม Hardcode
    "ssl.ca.location": "/etc/ssl/certs/ca.pem",
    "ssl.certificate.location": "/etc/ssl/certs/client.pem",
    "ssl.key.location": "/etc/ssl/private/client-key.pem",
    "enable.auto.commit": False,
    "auto.offset.reset": "earliest",
    "group.id": "stream-security-group",
}

kafka_insecure_config = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "PLAINTEXT",  # Semgrep จะแจ้งเตือน
    "sasl.password": "hardcoded-password-123",  # Semgrep จะแจ้งเตือน
}

print("Kafka Configuration:")
print(f"\n  Secure Config:")
for key, value in kafka_secure_config.items():
    print(f"    {key}: {value}")
print(f"\n  Insecure Config (Semgrep will flag):")
for key, value in kafka_insecure_config.items():
    print(f"    {key}: {value}")

# Stream Processing Security Checklist
security_checklist = {
    "Authentication": [
        "ใช้ SASL_SSL สำหรับ Kafka",
        "mTLS สำหรับ gRPC Streams",
        "API Key / JWT สำหรับ HTTP Streams",
    ],
    "Encryption": [
        "TLS 1.3 สำหรับ Transport",
        "Encrypt Sensitive Fields ใน Messages",
        "KMS สำหรับ Key Management",
    ],
    "Authorization": [
        "Kafka ACLs จำกัด Topic Access",
        "RBAC สำหรับ Schema Registry",
        "Service Accounts Least Privilege",
    ],
    "Data Validation": [
        "Schema Validation (Avro/Protobuf)",
        "Input Sanitization ก่อน Process",
        "ไม่ใช้ Pickle/Java Serialization",
    ],
    "Monitoring": [
        "Log ทุก Authentication Failure",
        "Alert เมื่อ Consumer Lag สูง",
        "Audit Trail สำหรับ Data Access",
    ],
}

print(f"\n\nStream Security Checklist:")
for category, items in security_checklist.items():
    print(f"\n  [{category}]")
    for item in items:
        print(f"    [x] {item}")

CI/CD Integration

# cicd_semgrep.py — Semgrep in CI/CD

# GitHub Actions
# name: Security Scan
# on: [push, pull_request]
# jobs:
#   semgrep:
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - name: Semgrep Scan
#         uses: returntocorp/semgrep-action@v1
#         with:
#           config: >-
#             p/security-audit
#             p/owasp-top-ten
#             p/python
#             ./custom-rules/
#         env:
#           SEMGREP_APP_TOKEN: }

# GitLab CI
# semgrep:
#   image: returntocorp/semgrep
#   script:
#     - semgrep ci --config auto --config ./custom-rules/
#   rules:
#     - if: $CI_MERGE_REQUEST_ID

cicd_integration = {
    "GitHub Actions": {
        "action": "returntocorp/semgrep-action@v1",
        "config": "p/security-audit, p/owasp-top-ten, custom rules",
        "trigger": "Push, Pull Request",
    },
    "GitLab CI": {
        "action": "returntocorp/semgrep Docker image",
        "config": "semgrep ci --config auto",
        "trigger": "Merge Request",
    },
    "Jenkins": {
        "action": "semgrep ci in Pipeline step",
        "config": "semgrep --config auto --json > report.json",
        "trigger": "PR Build",
    },
    "Pre-commit Hook": {
        "action": "semgrep in .pre-commit-config.yaml",
        "config": "semgrep --config ./custom-rules/",
        "trigger": "Before Commit",
    },
}

print("Semgrep CI/CD Integration:")
for platform, info in cicd_integration.items():
    print(f"\n  [{platform}]")
    for key, value in info.items():
        print(f"    {key}: {value}")

# Semgrep vs Other SAST Tools
comparison = {
    "Semgrep": {"speed": "เร็วมาก", "rules": "Pattern Matching (ง่าย)", "focus": "Security", "cost": "Free CLI / Paid Cloud"},
    "SonarQube": {"speed": "ปานกลาง", "rules": "Complex (Java-based)", "focus": "Quality + Security", "cost": "Free Community / Paid"},
    "CodeQL": {"speed": "ช้า (Deep Analysis)", "rules": "Query Language (ซับซ้อน)", "focus": "Security", "cost": "Free for OSS"},
    "Snyk Code": {"speed": "เร็ว", "rules": "AI-powered", "focus": "Security", "cost": "Free Tier / Paid"},
}

print(f"\n\nSAST Tools Comparison:")
for tool, info in comparison.items():
    print(f"  {tool}: Speed={info['speed']} | Rules={info['rules']} | {info['cost']}")

Best Practices

  • Custom Rules: เขียน Rules เฉพาะสำหรับ Stream Processing ของทีม
  • Pre-commit: ใช้ Semgrep เป็น Pre-commit Hook ตรวจก่อน Commit
  • SASL_SSL: ใช้ SASL_SSL สำหรับ Kafka ทุกครั้ง
  • Schema Validation: ใช้ Avro/Protobuf Validate ข้อมูลใน Stream
  • Secrets: ห้าม Hardcode Credentials ใช้ Environment Variables
  • Audit: Log ทุก Security Event ใน Stream Pipeline

Semgrep คืออะไร

SAST วิเคราะห์โค้ดหาช่องโหว่ Security Pattern Matching 30+ ภาษา เร็ว CI/CD Community Rules 3000+ กฎ ฟรี Open Source