SiamCafe.net Blog
Technology

Semgrep SAST Machine Learning Pipeline Security Scanning สำหรับ ML Code

semgrep sast machine learning pipeline
Semgrep SAST Machine Learning Pipeline | SiamCafe Blog
2025-09-05· อ. บอม — SiamCafe.net· 1,371 คำ

Semgrep SAST ?????????????????? ML Pipeline ?????????????????????

Semgrep ???????????? Static Application Security Testing (SAST) tool ???????????????????????????????????? source code ?????? vulnerabilities ?????????????????????????????? run code Machine Learning Pipeline ?????? security risks ?????????????????????????????????????????????????????? software ?????????????????? ???????????? model poisoning, data leakage, insecure model serving, dependency vulnerabilities

ML Pipeline ?????????????????????????????????????????? components Data ingestion (??????????????????????????????????????? sources), Data preprocessing (?????????????????????????????????, transform), Feature engineering (??????????????? features), Model training (train models), Model evaluation (???????????????????????????), Model serving (deploy API), Monitoring (?????????????????? performance) ??????????????? component ?????? security risks ?????????????????????

Semgrep ???????????? scan ML code ?????? Hardcoded credentials ?????? notebook/scripts, Insecure deserialization (pickle, joblib), SQL injection ?????? data queries, Unsafe eval/exec ?????? feature engineering, Missing input validation ?????? model serving, Dependency vulnerabilities

????????????????????? Semgrep ?????????????????? ML Code

Setup Semgrep ??????????????? scan ML projects

# === Semgrep Setup for ML Pipeline ===

# 1. Install Semgrep
pip install semgrep

# 2. Scan ML project
semgrep scan --config auto ./ml_pipeline/

# 3. Scan with ML-specific rulesets
semgrep scan \
  --config p/python \
  --config p/security-audit \
  --config p/secrets \
  --config p/owasp-top-ten \
  --json --output ml_scan_results.json \
  ./ml_pipeline/

# 4. ML-specific .semgrep.yml configuration
cat > .semgrep.yml << 'EOF'
rules:
  # Detect insecure pickle deserialization
  - id: ml.insecure-pickle-load
    patterns:
      - pattern-either:
          - pattern: pickle.load(...)
          - pattern: pickle.loads(...)
          - pattern: joblib.load(...)
          - pattern: torch.load(...)
    message: |
      Insecure deserialization detected. pickle/joblib/torch.load can execute
      arbitrary code. Use safetensors or verify model source before loading.
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-502"
      category: "ml-security"
      fix: "Use safetensors format or validate model checksum before loading"

  # Detect hardcoded API keys in notebooks
  - id: ml.hardcoded-api-key
    patterns:
      - pattern: |
          $KEY = "..."
      - metavariable-regex:
          metavariable: $KEY
          regex: (api_key|apikey|secret|token|password|aws_access|openai)
    message: "Hardcoded credential in ML code. Use environment variables."
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-798"

  # Detect unsafe eval in feature engineering
  - id: ml.unsafe-eval-feature
    pattern-either:
      - pattern: eval(...)
      - pattern: exec(...)
    message: "Unsafe eval/exec in ML pipeline. Use safe alternatives."
    severity: ERROR
    languages: [python]
    metadata:
      cwe: "CWE-94"

  # Detect missing input validation in model serving
  - id: ml.missing-input-validation
    patterns:
      - pattern: |
          @app.route("/predict", ...)
          def $FUNC(...):
              ...
              $MODEL.predict(...)
              ...
      - pattern-not: |
          @app.route("/predict", ...)
          def $FUNC(...):
              ...
              validate(...)
              ...
              $MODEL.predict(...)
              ...
    message: "Model serving endpoint missing input validation."
    severity: WARNING
    languages: [python]
    metadata:
      cwe: "CWE-20"
EOF

semgrep scan --config .semgrep.yml ./ml_pipeline/
echo "Semgrep ML scanning configured"

Custom Rules ?????????????????? ML Security

??????????????? Semgrep rules ????????????????????????????????? ML pipelines

#!/usr/bin/env python3
# ml_security_rules.py ??? ML Security Rules Generator
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("rules")

class MLSecurityRules:
    """Generate and manage Semgrep rules for ML pipelines"""
    
    def __init__(self):
        self.rules = []
    
    def add_rule(self, rule_id, pattern, message, severity, cwe):
        self.rules.append({
            "id": rule_id,
            "pattern": pattern,
            "message": message,
            "severity": severity,
            "metadata": {"cwe": cwe},
        })
    
    def ml_vulnerability_categories(self):
        return {
            "data_poisoning": {
                "description": "Attacker ??????????????? training data ??????????????? model ?????????????????????????????????",
                "detection": "????????????????????? data source authentication, integrity checks",
                "semgrep_rules": [
                    "???????????? data loading ????????? untrusted sources",
                    "???????????? missing checksum verification",
                    "???????????? SQL injection ?????? data queries",
                ],
                "prevention": [
                    "Validate data source integrity (checksums)",
                    "Use parameterized queries",
                    "Implement data validation pipeline",
                    "Monitor training data distribution changes",
                ],
            },
            "model_theft": {
                "description": "Model ????????????????????????????????? API extraction ???????????? unauthorized access",
                "detection": "???????????? model file permissions, API rate limiting",
                "semgrep_rules": [
                    "???????????? model files ??????????????????????????? public directories",
                    "???????????? missing authentication ?????? model serving",
                    "???????????? model serialization format (prefer ONNX/safetensors)",
                ],
                "prevention": [
                    "Rate limit prediction API",
                    "Use authentication for model endpoints",
                    "Encrypt model files at rest",
                    "Monitor unusual query patterns",
                ],
            },
            "prompt_injection": {
                "description": "Attacker inject malicious prompts ?????? LLM applications",
                "detection": "???????????? missing input sanitization ?????????????????????????????? LLM",
                "semgrep_rules": [
                    "???????????? user input concatenated directly to prompt",
                    "???????????? missing prompt template sanitization",
                    "???????????? system prompt exposure",
                ],
                "prevention": [
                    "Sanitize user input before prompt construction",
                    "Use prompt templates with strict escaping",
                    "Implement output filtering",
                    "Monitor for prompt injection attempts",
                ],
            },
            "dependency_vulnerabilities": {
                "description": "ML libraries ?????? known vulnerabilities (numpy, pandas, torch)",
                "detection": "???????????? outdated dependencies, known CVEs",
                "tools": ["pip-audit", "safety", "Snyk", "Dependabot"],
                "prevention": [
                    "Pin dependency versions",
                    "Regular dependency updates",
                    "Use virtual environments",
                    "Automated CVE scanning in CI",
                ],
            },
        }

rules = MLSecurityRules()
categories = rules.ml_vulnerability_categories()
print("ML Security Vulnerability Categories:")
for name, info in categories.items():
    print(f"\n  {name}:")
    print(f"    Description: {info['description']}")
    print(f"    Prevention:")
    for p in info["prevention"][:3]:
        print(f"      - {p}")

Scan ML Pipeline Components

Scan ??????????????? component ????????? ML pipeline

# === Scan ML Pipeline Components ===

# 1. Scan Jupyter Notebooks
cat > scan_notebooks.sh << 'BASH'
#!/bin/bash
# Convert notebooks to Python for scanning
pip install nbconvert

# Convert all notebooks
find . -name "*.ipynb" -exec jupyter nbconvert --to python {} \;

# Scan converted files
semgrep scan \
  --config .semgrep.yml \
  --config p/security-audit \
  --include "*.py" \
  --json --output notebook_scan.json \
  ./notebooks/

# Clean up
find . -name "*.py" -path "*/notebooks/*" -delete

echo "Notebook scan complete"
BASH

# 2. Scan Model Training Code
cat > scan_training.sh << 'BASH'
#!/bin/bash
semgrep scan \
  --config .semgrep.yml \
  --config p/python \
  --include "*.py" \
  --exclude "tests/*" \
  --json --output training_scan.json \
  ./training/

# Check for specific ML issues
echo "=== Insecure Deserialization ==="
semgrep scan --config .semgrep.yml \
  --include "*.py" \
  --pattern "pickle.load(...)" \
  ./training/

echo "=== Hardcoded Secrets ==="
semgrep scan --config p/secrets \
  --include "*.py" \
  ./training/

echo "Training code scan complete"
BASH

# 3. Scan Model Serving Code
cat > scan_serving.sh << 'BASH'
#!/bin/bash
semgrep scan \
  --config .semgrep.yml \
  --config p/owasp-top-ten \
  --config p/security-audit \
  --include "*.py" \
  --json --output serving_scan.json \
  ./serving/

# Check dependency vulnerabilities
pip install pip-audit
pip-audit --format json --output dep_audit.json

echo "Serving code scan complete"
BASH

# 4. Scan Infrastructure Code (Terraform/Kubernetes)
cat > scan_infra.sh << 'BASH'
#!/bin/bash
# Scan Terraform for ML infrastructure
semgrep scan \
  --config p/terraform \
  --include "*.tf" \
  --json --output infra_scan.json \
  ./infrastructure/

# Scan Kubernetes manifests
semgrep scan \
  --config p/kubernetes \
  --include "*.yaml" --include "*.yml" \
  --json --output k8s_scan.json \
  ./k8s/

echo "Infrastructure scan complete"
BASH

chmod +x scan_*.sh
echo "ML pipeline scanning scripts ready"

CI/CD Integration ?????????????????? ML Ops

????????? Semgrep ????????? MLOps CI/CD

# === CI/CD Integration for MLOps ===

# 1. GitHub Actions ??? ML Pipeline Security Scan
cat > .github/workflows/ml-security.yml << 'EOF'
name: ML Pipeline Security Scan

on:
  push:
    branches: [main]
    paths:
      - 'ml_pipeline/**'
      - 'training/**'
      - 'serving/**'
      - 'notebooks/**'
  pull_request:
    branches: [main]

jobs:
  semgrep-scan:
    runs-on: ubuntu-latest
    container:
      image: semgrep/semgrep:latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Run Semgrep (ML Rules)
        run: |
          semgrep scan \
            --config .semgrep.yml \
            --config p/python \
            --config p/security-audit \
            --config p/secrets \
            --sarif --output semgrep.sarif \
            --error --severity ERROR

      - name: Upload SARIF
        uses: github/codeql-action/upload-sarif@v3
        with:
          sarif_file: semgrep.sarif
        if: always()

  dependency-audit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          
      - name: Install dependencies
        run: pip install -r requirements.txt
        
      - name: Audit dependencies
        run: |
          pip install pip-audit safety
          pip-audit --format json --output audit.json || true
          safety check --json --output safety.json || true
          
      - name: Check results
        run: |
          python3 -c "
          import json
          with open('audit.json') as f:
              data = json.load(f)
          vulns = data.get('dependencies', [])
          critical = [v for v in vulns if v.get('vulns')]
          if critical:
              print(f'FAIL: {len(critical)} vulnerable packages')
              for v in critical[:5]:
                  print(f'  {v[\"name\"]}: {v[\"vulns\"]}')
          else:
              print('PASS: No vulnerable packages')
          "

  notebook-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          
      - name: Convert and scan notebooks
        run: |
          pip install nbconvert semgrep
          find . -name "*.ipynb" -exec jupyter nbconvert --to python {} \;
          semgrep scan --config .semgrep.yml --include "*.py" --error --severity ERROR .
EOF

echo "CI/CD integration configured"

Monitoring ????????? Reporting

??????????????????????????????????????????????????? security scan

#!/usr/bin/env python3
# ml_security_report.py ??? ML Pipeline Security Report
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("report")

class MLSecurityReport:
    def __init__(self):
        pass
    
    def scan_summary(self):
        return {
            "scan_date": "2024-06-15",
            "components_scanned": {
                "training_code": {"files": 45, "findings": 3},
                "serving_code": {"files": 12, "findings": 1},
                "notebooks": {"files": 28, "findings": 8},
                "infrastructure": {"files": 15, "findings": 2},
            },
            "total_findings": 14,
            "by_severity": {"ERROR": 4, "WARNING": 7, "INFO": 3},
            "by_category": {
                "insecure_deserialization": 3,
                "hardcoded_credentials": 4,
                "missing_input_validation": 2,
                "sql_injection_risk": 1,
                "unsafe_eval": 2,
                "dependency_vulnerability": 2,
            },
            "top_issues": [
                {
                    "rule": "ml.insecure-pickle-load",
                    "count": 3,
                    "severity": "ERROR",
                    "files": ["train.py:42", "evaluate.py:15", "serve.py:28"],
                    "fix": "Replace pickle.load with safetensors or verify checksum",
                },
                {
                    "rule": "ml.hardcoded-api-key",
                    "count": 4,
                    "severity": "ERROR",
                    "files": ["notebook_01.py:5", "notebook_03.py:12", "config.py:8", "data_loader.py:3"],
                    "fix": "Use os.environ or secrets manager",
                },
            ],
            "dependency_audit": {
                "total_packages": 85,
                "vulnerable": 3,
                "critical": [
                    {"package": "pillow", "version": "9.0.0", "cve": "CVE-2023-XXXXX", "fix": ">=10.2.0"},
                    {"package": "numpy", "version": "1.23.0", "cve": "CVE-2023-YYYYY", "fix": ">=1.26.0"},
                ],
            },
            "recommendations": [
                "Replace all pickle.load with safetensors format (3 occurrences)",
                "Move all API keys to environment variables (4 occurrences)",
                "Add input validation to model serving endpoint",
                "Update pillow and numpy to patched versions",
                "Add Semgrep to pre-commit hooks for early detection",
            ],
        }

report = MLSecurityReport()
summary = report.scan_summary()
print(f"ML Pipeline Security Report ??? {summary['scan_date']}")
print(f"  Total findings: {summary['total_findings']}")
print(f"  By severity: {summary['by_severity']}")

print(f"\nComponents:")
for comp, info in summary["components_scanned"].items():
    print(f"  {comp}: {info['files']} files, {info['findings']} findings")

print(f"\nTop Issues:")
for issue in summary["top_issues"]:
    print(f"  [{issue['severity']}] {issue['rule']}: {issue['count']} occurrences")
    print(f"    Fix: {issue['fix']}")

print(f"\nDependency Vulnerabilities: {summary['dependency_audit']['vulnerable']} packages")
for dep in summary["dependency_audit"]["critical"]:
    print(f"  {dep['package']} {dep['version']}: {dep['cve']} (fix: {dep['fix']})")

print(f"\nRecommendations:")
for rec in summary["recommendations"]:
    print(f"  - {rec}")

FAQ ??????????????????????????????????????????

Q: ???????????? ML Pipeline ???????????? security scan ?????????????????? software ???????????????????

A: ML Pipeline ?????? attack surface ?????????????????????????????? Data poisoning training data ???????????????????????? model ?????????????????????????????????, Model extraction ???????????? model ???????????? API queries, Insecure deserialization pickle/joblib execute arbitrary code ?????????, Notebook secrets Jupyter notebooks ??????????????? API keys hardcoded, Adversarial inputs input ????????????????????????????????????????????? model, Supply chain ML libraries ?????? dependencies ???????????????????????? Semgrep rules ?????????????????? software ???????????????????????????????????????????????? ??????????????????????????? rules ??????????????? ML ???????????? ???????????? pickle.load, ???????????? missing model validation, ???????????? unencrypted model storage

Q: pickle.load ???????????????????????????????????????????

A: pickle.load ?????????????????? execute arbitrary Python code ???????????????????????? deserialize data ????????? load model file ????????? untrusted source attacker ?????????????????? ????????? malicious code ?????? pickle file ????????????????????????????????? model ???????????? ??????????????? load ?????? execute code ??????????????????????????? (reverse shell, data exfiltration) ?????????????????? ????????? safetensors format (????????????????????? ????????? execute code), ????????? ONNX format, ???????????? checksum/hash ????????? model file ???????????? load, ????????? torch.load(weights_only=True) ?????????????????? PyTorch, ????????? load models ????????? untrusted sources ?????????????????? Semgrep rule ???????????? pickle.load/joblib.load/torch.load ????????????????????? flag ???????????? ERROR

Q: Scan Jupyter Notebooks ??????????????????????

A: Jupyter Notebooks (.ipynb) ???????????? JSON format ??????????????????????????? scan ???????????? Semgrep ?????????????????? ????????????????????? ???????????? notebook ???????????? .py ???????????? nbconvert (jupyter nbconvert --to python notebook.ipynb) ???????????? scan .py files ???????????? Semgrep ?????? automated ?????? CI pipeline ???????????? ??? scan ??? ?????? .py files ???????????????????????????????????????????????? notebooks Hardcoded API keys (OpenAI, HuggingFace, AWS), Hardcoded database credentials, pickle.load ????????? downloaded models, eval() ?????????????????? dynamic feature engineering, SQL queries ????????? concatenate user input ??????????????? ????????? nbstripout ?????? output ???????????? commit, ????????? pre-commit hooks scan notebooks ???????????????????????????

Q: Semgrep ????????? Bandit ??????????????????????????????????????????????????? ML?

A: Bandit ???????????? Python-specific security linter ???????????? common Python security issues (eval, exec, pickle, subprocess) ???????????? simple ????????????????????? ?????????????????????????????????????????? Python ??????????????? custom rules ????????? Semgrep Semgrep ?????????????????? 30+ ???????????? (ML pipeline ??????????????? Python, YAML, Terraform, Dockerfile, SQL), custom rules ??????????????????????????????????????? pattern syntax, community rules 2000+, taint analysis (Semgrep Pro), CI/CD integration ?????????????????? ?????????????????? ML pipeline ??????????????? Semgrep ??????????????? scan ????????????????????? Python code, infrastructure (Terraform), Kubernetes manifests, Dockerfile ?????????????????????????????? ????????? Bandit ?????????????????????????????? ??????????????? Bandit ?????? Python-specific checks ????????? rules ????????? Semgrep ???????????????

📖 บทความที่เกี่ยวข้อง

Semgrep SAST Infrastructure as Codeอ่านบทความ → Semgrep SAST Audit Trail Loggingอ่านบทความ → Linux io_uring Machine Learning Pipelineอ่านบทความ → Semgrep SAST Consensus Algorithmอ่านบทความ →

📚 ดูบทความทั้งหมด →