Semgrep SAST กับ Stream Processing — วิธีใช้
Semgrep SAST
Semgrep Static Application Security Testing วิเคราะห์โค้ดหาช่องโหว่ Security Pattern Matching 30+ ภาษา เร็ว CI/CD Community Rules 3000+ กฎ
Stream Processing ประมวลผล Real-time ทันที Kafka Streams Flink Spark Streaming Event Processing Analytics IoT Log Processing
Semgrep Setup และ Rules
# === Semgrep Setup ===
# 1. ติดตั้ง
# pip install semgrep
# brew install semgrep
# 2. Quick Scan
# semgrep --config auto .
# semgrep --config p/python .
# semgrep --config p/security-audit .
# semgrep --config p/owasp-top-ten .
# 3. Custom Rule — ตรวจหา Hardcoded Secrets
# rules:
# - id: hardcoded-kafka-password
# patterns:
# - pattern: |
# $CONFIG = {..., "sasl.password": "...", ...}
# message: "Hardcoded Kafka password detected. Use environment variables."
# severity: ERROR
# languages: [python]
# metadata:
# cwe: "CWE-798: Use of Hard-coded Credentials"
# owasp: "A07:2021 - Identification and Authentication Failures"
# 4. Custom Rule — SQL Injection in Stream
# rules:
# - id: sql-injection-stream
# patterns:
# - pattern: |
# cursor.execute(f"... {$VAR} ...")
# - pattern-not: |
# cursor.execute("...", (...,))
# message: "Possible SQL injection. Use parameterized queries."
# severity: ERROR
# languages: [python]
# 5. Custom Rule — Missing TLS in Kafka
# rules:
# - id: kafka-no-tls
# patterns:
# - pattern: |
# KafkaProducer(..., security_protocol="PLAINTEXT", ...)
# message: "Kafka connection without TLS. Use SASL_SSL."
# severity: WARNING
# languages: [python]
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class SemgrepRule:
rule_id: str
severity: str
description: str
languages: List[str]
cwe: str = ""
class SemgrepScanner:
"""Semgrep Security Scanner"""
def __init__(self):
self.rules: List[SemgrepRule] = []
self.findings: List[dict] = []
def add_rule(self, rule: SemgrepRule):
self.rules.append(rule)
def scan(self, code: str, filename: str) -> List[dict]:
"""จำลอง Scan"""
findings = []
for rule in self.rules:
if "password" in code.lower() and "hardcoded" in rule.rule_id:
findings.append({
"rule": rule.rule_id,
"severity": rule.severity,
"message": rule.description,
"file": filename,
})
self.findings.extend(findings)
return findings
def show_report(self):
print(f"\n{'='*55}")
print(f"Semgrep Scan Report")
print(f"{'='*55}")
print(f" Rules: {len(self.rules)}")
print(f" Findings: {len(self.findings)}")
by_severity = {}
for f in self.findings:
by_severity.setdefault(f["severity"], []).append(f)
for severity, items in by_severity.items():
print(f"\n [{severity}] ({len(items)})")
for item in items:
print(f" {item['rule']}: {item['message']}")
print(f" File: {item['file']}")
scanner = SemgrepScanner()
rules = [
SemgrepRule("hardcoded-kafka-password", "ERROR", "Hardcoded Kafka password", ["python"], "CWE-798"),
SemgrepRule("sql-injection-stream", "ERROR", "SQL injection in stream query", ["python"], "CWE-89"),
SemgrepRule("kafka-no-tls", "WARNING", "Kafka without TLS encryption", ["python"], "CWE-319"),
SemgrepRule("missing-auth-grpc", "ERROR", "Missing authentication in gRPC", ["python"], "CWE-306"),
SemgrepRule("insecure-deserialization", "ERROR", "Insecure deserialization in stream", ["python"], "CWE-502"),
]
for rule in rules:
scanner.add_rule(rule)
print("Semgrep Rules for Stream Processing:")
for rule in rules:
print(f" [{rule.severity}] {rule.rule_id}: {rule.description} ({rule.cwe})")
Stream Processing Security
# stream_security.py — Secure Stream Processing
from dataclasses import dataclass
from typing import Dict, List
# Kafka Secure Configuration
kafka_secure_config = {
"bootstrap.servers": "kafka-1:9093, kafka-2:9093, kafka-3:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "stream-app", # จาก Environment Variable
# "sasl.password": os.environ["KAFKA_PASSWORD"], # ห้าม Hardcode
"ssl.ca.location": "/etc/ssl/certs/ca.pem",
"ssl.certificate.location": "/etc/ssl/certs/client.pem",
"ssl.key.location": "/etc/ssl/private/client-key.pem",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"group.id": "stream-security-group",
}
kafka_insecure_config = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "PLAINTEXT", # Semgrep จะแจ้งเตือน
"sasl.password": "hardcoded-password-123", # Semgrep จะแจ้งเตือน
}
print("Kafka Configuration:")
print(f"\n Secure Config:")
for key, value in kafka_secure_config.items():
print(f" {key}: {value}")
print(f"\n Insecure Config (Semgrep will flag):")
for key, value in kafka_insecure_config.items():
print(f" {key}: {value}")
# Stream Processing Security Checklist
security_checklist = {
"Authentication": [
"ใช้ SASL_SSL สำหรับ Kafka",
"mTLS สำหรับ gRPC Streams",
"API Key / JWT สำหรับ HTTP Streams",
],
"Encryption": [
"TLS 1.3 สำหรับ Transport",
"Encrypt Sensitive Fields ใน Messages",
"KMS สำหรับ Key Management",
],
"Authorization": [
"Kafka ACLs จำกัด Topic Access",
"RBAC สำหรับ Schema Registry",
"Service Accounts Least Privilege",
],
"Data Validation": [
"Schema Validation (Avro/Protobuf)",
"Input Sanitization ก่อน Process",
"ไม่ใช้ Pickle/Java Serialization",
],
"Monitoring": [
"Log ทุก Authentication Failure",
"Alert เมื่อ Consumer Lag สูง",
"Audit Trail สำหรับ Data Access",
],
}
print(f"\n\nStream Security Checklist:")
for category, items in security_checklist.items():
print(f"\n [{category}]")
for item in items:
print(f" [x] {item}")
CI/CD Integration
# cicd_semgrep.py — Semgrep in CI/CD
# GitHub Actions
# name: Security Scan
# on: [push, pull_request]
# jobs:
# semgrep:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - name: Semgrep Scan
# uses: returntocorp/semgrep-action@v1
# with:
# config: >-
# p/security-audit
# p/owasp-top-ten
# p/python
# ./custom-rules/
# env:
# SEMGREP_APP_TOKEN: }
# GitLab CI
# semgrep:
# image: returntocorp/semgrep
# script:
# - semgrep ci --config auto --config ./custom-rules/
# rules:
# - if: $CI_MERGE_REQUEST_ID
cicd_integration = {
"GitHub Actions": {
"action": "returntocorp/semgrep-action@v1",
"config": "p/security-audit, p/owasp-top-ten, custom rules",
"trigger": "Push, Pull Request",
},
"GitLab CI": {
"action": "returntocorp/semgrep Docker image",
"config": "semgrep ci --config auto",
"trigger": "Merge Request",
},
"Jenkins": {
"action": "semgrep ci in Pipeline step",
"config": "semgrep --config auto --json > report.json",
"trigger": "PR Build",
},
"Pre-commit Hook": {
"action": "semgrep in .pre-commit-config.yaml",
"config": "semgrep --config ./custom-rules/",
"trigger": "Before Commit",
},
}
print("Semgrep CI/CD Integration:")
for platform, info in cicd_integration.items():
print(f"\n [{platform}]")
for key, value in info.items():
print(f" {key}: {value}")
# Semgrep vs Other SAST Tools
comparison = {
"Semgrep": {"speed": "เร็วมาก", "rules": "Pattern Matching (ง่าย)", "focus": "Security", "cost": "Free CLI / Paid Cloud"},
"SonarQube": {"speed": "ปานกลาง", "rules": "Complex (Java-based)", "focus": "Quality + Security", "cost": "Free Community / Paid"},
"CodeQL": {"speed": "ช้า (Deep Analysis)", "rules": "Query Language (ซับซ้อน)", "focus": "Security", "cost": "Free for OSS"},
"Snyk Code": {"speed": "เร็ว", "rules": "AI-powered", "focus": "Security", "cost": "Free Tier / Paid"},
}
print(f"\n\nSAST Tools Comparison:")
for tool, info in comparison.items():
print(f" {tool}: Speed={info['speed']} | Rules={info['rules']} | {info['cost']}")
Best Practices
- Custom Rules: เขียน Rules เฉพาะสำหรับ Stream Processing ของทีม
- Pre-commit: ใช้ Semgrep เป็น Pre-commit Hook ตรวจก่อน Commit
- SASL_SSL: ใช้ SASL_SSL สำหรับ Kafka ทุกครั้ง
- Schema Validation: ใช้ Avro/Protobuf Validate ข้อมูลใน Stream
- Secrets: ห้าม Hardcode Credentials ใช้ Environment Variables
- Audit: Log ทุก Security Event ใน Stream Pipeline
Semgrep คืออะไร
SAST วิเคราะห์โค้ดหาช่องโหว่ Security Pattern Matching 30+ ภาษา เร็ว CI/CD Community Rules 3000+ กฎ ฟรี Open Source