Semgrep SAST
Semgrep Static Application Security Testing วิเคราะห์โค้ดหาช่องโหว่ Security Pattern Matching 30+ ภาษา เร็ว CI/CD Community Rules 3000+ กฎ
Stream Processing ประมวลผล Real-time ทันที Kafka Streams Flink Spark Streaming Event Processing Analytics IoT Log Processing
Semgrep Setup และ Rules
# === Semgrep Setup ===
# 1. ติดตั้ง
# pip install semgrep
# brew install semgrep
# 2. Quick Scan
# semgrep --config auto .
# semgrep --config p/python .
# semgrep --config p/security-audit .
# semgrep --config p/owasp-top-ten .
# 3. Custom Rule — ตรวจหา Hardcoded Secrets
# rules:
# - id: hardcoded-kafka-password
# patterns:
# - pattern: |
# $CONFIG = {..., "sasl.password": "...", ...}
# message: "Hardcoded Kafka password detected. Use environment variables."
# severity: ERROR
# languages: [python]
# metadata:
# cwe: "CWE-798: Use of Hard-coded Credentials"
# owasp: "A07:2021 - Identification and Authentication Failures"
# 4. Custom Rule — SQL Injection in Stream
# rules:
# - id: sql-injection-stream
# patterns:
# - pattern: |
# cursor.execute(f"... {$VAR} ...")
# - pattern-not: |
# cursor.execute("...", (...,))
# message: "Possible SQL injection. Use parameterized queries."
# severity: ERROR
# languages: [python]
# 5. Custom Rule — Missing TLS in Kafka
# rules:
# - id: kafka-no-tls
# patterns:
# - pattern: |
# KafkaProducer(..., security_protocol="PLAINTEXT", ...)
# message: "Kafka connection without TLS. Use SASL_SSL."
# severity: WARNING
# languages: [python]
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class SemgrepRule:
rule_id: str
severity: str
description: str
languages: List[str]
cwe: str = ""
class SemgrepScanner:
"""Semgrep Security Scanner"""
def __init__(self):
self.rules: List[SemgrepRule] = []
self.findings: List[dict] = []
def add_rule(self, rule: SemgrepRule):
self.rules.append(rule)
def scan(self, code: str, filename: str) -> List[dict]:
"""จำลอง Scan"""
findings = []
for rule in self.rules:
if "password" in code.lower() and "hardcoded" in rule.rule_id:
findings.append({
"rule": rule.rule_id,
"severity": rule.severity,
"message": rule.description,
"file": filename,
})
self.findings.extend(findings)
return findings
def show_report(self):
print(f"\n{'='*55}")
print(f"Semgrep Scan Report")
print(f"{'='*55}")
print(f" Rules: {len(self.rules)}")
print(f" Findings: {len(self.findings)}")
by_severity = {}
for f in self.findings:
by_severity.setdefault(f["severity"], []).append(f)
for severity, items in by_severity.items():
print(f"\n [{severity}] ({len(items)})")
for item in items:
print(f" {item['rule']}: {item['message']}")
print(f" File: {item['file']}")
scanner = SemgrepScanner()
rules = [
SemgrepRule("hardcoded-kafka-password", "ERROR", "Hardcoded Kafka password", ["python"], "CWE-798"),
SemgrepRule("sql-injection-stream", "ERROR", "SQL injection in stream query", ["python"], "CWE-89"),
SemgrepRule("kafka-no-tls", "WARNING", "Kafka without TLS encryption", ["python"], "CWE-319"),
SemgrepRule("missing-auth-grpc", "ERROR", "Missing authentication in gRPC", ["python"], "CWE-306"),
SemgrepRule("insecure-deserialization", "ERROR", "Insecure deserialization in stream", ["python"], "CWE-502"),
]
for rule in rules:
scanner.add_rule(rule)
print("Semgrep Rules for Stream Processing:")
for rule in rules:
print(f" [{rule.severity}] {rule.rule_id}: {rule.description} ({rule.cwe})")
Stream Processing Security
# stream_security.py — Secure Stream Processing
from dataclasses import dataclass
from typing import Dict, List
# Kafka Secure Configuration
kafka_secure_config = {
"bootstrap.servers": "kafka-1:9093, kafka-2:9093, kafka-3:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "stream-app", # จาก Environment Variable
# "sasl.password": os.environ["KAFKA_PASSWORD"], # ห้าม Hardcode
"ssl.ca.location": "/etc/ssl/certs/ca.pem",
"ssl.certificate.location": "/etc/ssl/certs/client.pem",
"ssl.key.location": "/etc/ssl/private/client-key.pem",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"group.id": "stream-security-group",
}
kafka_insecure_config = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "PLAINTEXT", # Semgrep จะแจ้งเตือน
"sasl.password": "hardcoded-password-123", # Semgrep จะแจ้งเตือน
}
print("Kafka Configuration:")
print(f"\n Secure Config:")
for key, value in kafka_secure_config.items():
print(f" {key}: {value}")
print(f"\n Insecure Config (Semgrep will flag):")
for key, value in kafka_insecure_config.items():
print(f" {key}: {value}")
# Stream Processing Security Checklist
security_checklist = {
"Authentication": [
"ใช้ SASL_SSL สำหรับ Kafka",
"mTLS สำหรับ gRPC Streams",
"API Key / JWT สำหรับ HTTP Streams",
],
"Encryption": [
"TLS 1.3 สำหรับ Transport",
"Encrypt Sensitive Fields ใน Messages",
"KMS สำหรับ Key Management",
],
"Authorization": [
"Kafka ACLs จำกัด Topic Access",
"RBAC สำหรับ Schema Registry",
"Service Accounts Least Privilege",
],
"Data Validation": [
"Schema Validation (Avro/Protobuf)",
"Input Sanitization ก่อน Process",
"ไม่ใช้ Pickle/Java Serialization",
],
"Monitoring": [
"Log ทุก Authentication Failure",
"Alert เมื่อ Consumer Lag สูง",
"Audit Trail สำหรับ Data Access",
],
}
print(f"\n\nStream Security Checklist:")
for category, items in security_checklist.items():
print(f"\n [{category}]")
for item in items:
print(f" [x] {item}")
CI/CD Integration
# cicd_semgrep.py — Semgrep in CI/CD
# GitHub Actions
# name: Security Scan
# on: [push, pull_request]
# jobs:
# semgrep:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - name: Semgrep Scan
# uses: returntocorp/semgrep-action@v1
# with:
# config: >-
# p/security-audit
# p/owasp-top-ten
# p/python
# ./custom-rules/
# env:
# SEMGREP_APP_TOKEN: }
# GitLab CI
# semgrep:
# image: returntocorp/semgrep
# script:
# - semgrep ci --config auto --config ./custom-rules/
# rules:
# - if: $CI_MERGE_REQUEST_ID
cicd_integration = {
"GitHub Actions": {
"action": "returntocorp/semgrep-action@v1",
"config": "p/security-audit, p/owasp-top-ten, custom rules",
"trigger": "Push, Pull Request",
},
"GitLab CI": {
"action": "returntocorp/semgrep Docker image",
"config": "semgrep ci --config auto",
"trigger": "Merge Request",
},
"Jenkins": {
"action": "semgrep ci in Pipeline step",
"config": "semgrep --config auto --json > report.json",
"trigger": "PR Build",
},
"Pre-commit Hook": {
"action": "semgrep in .pre-commit-config.yaml",
"config": "semgrep --config ./custom-rules/",
"trigger": "Before Commit",
},
}
print("Semgrep CI/CD Integration:")
for platform, info in cicd_integration.items():
print(f"\n [{platform}]")
for key, value in info.items():
print(f" {key}: {value}")
# Semgrep vs Other SAST Tools
comparison = {
"Semgrep": {"speed": "เร็วมาก", "rules": "Pattern Matching (ง่าย)", "focus": "Security", "cost": "Free CLI / Paid Cloud"},
"SonarQube": {"speed": "ปานกลาง", "rules": "Complex (Java-based)", "focus": "Quality + Security", "cost": "Free Community / Paid"},
"CodeQL": {"speed": "ช้า (Deep Analysis)", "rules": "Query Language (ซับซ้อน)", "focus": "Security", "cost": "Free for OSS"},
"Snyk Code": {"speed": "เร็ว", "rules": "AI-powered", "focus": "Security", "cost": "Free Tier / Paid"},
}
print(f"\n\nSAST Tools Comparison:")
for tool, info in comparison.items():
print(f" {tool}: Speed={info['speed']} | Rules={info['rules']} | {info['cost']}")
Best Practices
- Custom Rules: เขียน Rules เฉพาะสำหรับ Stream Processing ของทีม
- Pre-commit: ใช้ Semgrep เป็น Pre-commit Hook ตรวจก่อน Commit
- SASL_SSL: ใช้ SASL_SSL สำหรับ Kafka ทุกครั้ง
- Schema Validation: ใช้ Avro/Protobuf Validate ข้อมูลใน Stream
- Secrets: ห้าม Hardcode Credentials ใช้ Environment Variables
- Audit: Log ทุก Security Event ใน Stream Pipeline
Semgrep คืออะไร
SAST วิเคราะห์โค้ดหาช่องโหว่ Security Pattern Matching 30+ ภาษา เร็ว CI/CD Community Rules 3000+ กฎ ฟรี Open Source
Stream Processing คืออะไร
ประมวลผล Real-time ทันที Kafka Streams Flink Spark Streaming Event Processing Analytics IoT Log Processing ไม่ต้องรอ Batch
Semgrep ใช้กับ Stream Processing อย่างไร
Scan โค้ด Stream หาช่องโหว่ SQL Injection Deserialization Hardcoded Credentials Missing Auth CI/CD ก่อน Deploy Stream Applications
Semgrep ต่างจาก SonarQube อย่างไร
Semgrep Pattern Matching ง่าย เร็ว เน้น Security ฟรี CLI SonarQube ครอบคลุม Quality Security Dashboard ช้ากว่า ใช้คู่กันได้
สรุป
Semgrep SAST Pattern Matching Security วิเคราะห์โค้ด Stream Processing Kafka SASL_SSL Custom Rules CI/CD Pre-commit Schema Validation Secrets Management Audit Logging
