Dagster Pipeline Security Hardening ป้องกันแฮก —
Dagster Pipeline Security
Dagster Data Orchestrator Python Software-defined Assets Security Hardening Authentication Secrets Management Network Policies Audit Logging Least Privilege Encryption
| Security Layer | Measure | Tools |
|---|---|---|
| Authentication | Identity Verification | OAuth2, OIDC, LDAP |
| Authorization | Role-based Access | RBAC, Dagster Permissions |
| Secrets | Credential Management | Vault, AWS SM, GCP SM |
| Network | Traffic Control | VPC, Security Groups, mTLS |
| Data | Encryption | AES-256, TLS 1.3 |
| Audit | Activity Logging | CloudTrail, Dagster Events |
Dagster Security Configuration
=== Dagster Security Setup ===
dagster.yaml — Security Configuration
instance:
module: dagster_postgres.PostgresEventLogStorage
config:
postgres_url:
env: DAGSTER_PG_URL # Never hardcode credentials
run_storage:
module: dagster_postgres.PostgresRunStorage
config:
postgres_url:
env: DAGSTER_PG_URL
# Enable authentication
webserver:
host: 0.0.0.0
port: 3000
# Use reverse proxy for TLS termination
workspace.yaml — Secure Workspace
load_from:
- python_module:
module_name: my_pipeline
attribute: defs
# Run in isolated process
working_directory: /opt/dagster/app
Secure Resource Configuration
from dagster import resource, EnvVar
import boto3
@resource
def secure_db_resource(context):
"""Database resource with secrets from environment"""
return {
"host": EnvVar("DB_HOST").get_value(),
"port": int(EnvVar("DB_PORT").get_value()),
"user": EnvVar("DB_USER").get_value(),
"password": EnvVar("DB_PASSWORD").get_value(),
"database": EnvVar("DB_NAME").get_value(),
"sslmode": "verify-full",
"sslrootcert": "/etc/ssl/certs/rds-ca.pem",
}
@resource
def vault_resource(context):
"""HashiCorp Vault resource for secrets"""
import hvac
client = hvac.Client(
url=EnvVar("VAULT_ADDR").get_value(),
token=EnvVar("VAULT_TOKEN").get_value(),
)
return client
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class SecurityChecklist:
category: str
items: List[Dict[str, str]]
priority: str
checklists = [
SecurityChecklist("Authentication", [
{"item": "เปิด OAuth2/OIDC สำหรับ Dagit UI", "status": "Done"},
{"item": "ใช้ Service Account สำหรับ Pipeline", "status": "Done"},
{"item": "MFA สำหรับ Admin Access", "status": "Done"},
{"item": "API Key Rotation ทุก 90 วัน", "status": "Pending"},
], "Critical"),
SecurityChecklist("Secrets Management", [
{"item": "ย้าย Secrets จาก Config ไป Vault", "status": "Done"},
{"item": "Rotate Database Credentials อัตโนมัติ", "status": "Done"},
{"item": "Encrypt Secrets at-rest ด้วย KMS", "status": "Done"},
{"item": "Scan Code สำหรับ Hardcoded Secrets", "status": "Done"},
], "Critical"),
SecurityChecklist("Network", [
{"item": "Dagit อยู่หลัง VPN/Private Network", "status": "Done"},
{"item": "Database ไม่เปิด Public Access", "status": "Done"},
{"item": "mTLS ระหว่าง Services", "status": "Pending"},
{"item": "Egress Firewall Rules จำกัด Outbound", "status": "Done"},
], "High"),
]
print("=== Security Hardening Checklist ===")
for cl in checklists:
done = sum(1 for i in cl.items if i["status"] == "Done")
print(f"\n [{cl.priority}] {cl.category} ({done}/{len(cl.items)})")
for item in cl.items:
icon = "DONE" if item["status"] == "Done" else "TODO"
print(f" [{icon}] {item['item']}")
Input Validation และ Data Security
=== Input Validation & Data Security ===
from dagster import asset, AssetIn, DagsterType, check
import pandas as pd
# Custom Type Check
def validate_user_data(_, value):
if not isinstance(value, pd.DataFrame):
return False
required_cols = ['user_id', 'email', 'amount']
if not all(col in value.columns for col in required_cols):
return False
# Check for SQL injection patterns
for col in value.select_dtypes(include='object').columns:
if value[col].str.contains(r"(DROP|DELETE|INSERT|UPDATE|;|--)",
case=False, regex=True).any():
raise ValueError(f"Suspicious pattern in column {col}")
return True
UserDataType = DagsterType(
name="UserData",
type_check_fn=validate_user_data,
description="Validated user data without injection patterns"
)
@asset(dagster_type=UserDataType)
def clean_user_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Sanitize and validate user data"""
# Remove PII from logs
df = raw_data.copy()
# Hash email for anonymization
import hashlib
df['email_hash'] = df['email'].apply(
lambda x: hashlib.sha256(x.encode()).hexdigest()
)
df = df.drop(columns=['email'])
# Validate amounts
df = df[df['amount'] > 0]
df = df[df['amount'] < 1000000] # Reasonable limit
return df
Dependency Scanning
pip install safety pip-audit
safety check --full-report
pip-audit --strict
vulnerability_scan = {
"Dependency Scan": {
"tool": "safety, pip-audit, Snyk",
"frequency": "ทุก CI/CD Build",
"action": "Block deploy ถ้ามี Critical",
},
"Container Scan": {
"tool": "Trivy, Grype, Snyk Container",
"frequency": "ทุก Image Build",
"action": "Block deploy ถ้ามี High/Critical",
},
"SAST": {
"tool": "Bandit, Semgrep, SonarQube",
"frequency": "ทุก PR",
"action": "Block merge ถ้ามี Issues",
},
"Secret Scan": {
"tool": "GitLeaks, TruffleHog, detect-secrets",
"frequency": "ทุก Commit (pre-commit hook)",
"action": "Block commit ถ้าพบ Secrets",
},
}
print("\nSecurity Scanning:")
for scan, info in vulnerability_scan.items():
print(f"\n [{scan}]")
for k, v in info.items():
print(f" {k}: {v}")
Monitoring และ Incident Response
=== Security Monitoring & Audit ===
Dagster Event Logging
from dagster import op, get_dagster_logger
@op
def secure_data_load(context):
logger = get_dagster_logger()
logger.info("Starting data load",
extra={"user": context.run_config.get("user"),
"source": "production_db"})
try:
# Load data
data = load_from_db()
logger.info(f"Loaded {len(data)} records",
extra={"record_count": len(data)})
return data
except Exception as e:
logger.error(f"Data load failed: {e}",
extra={"error_type": type(e).__name__})
raise
Kubernetes Security
apiVersion: v1
kind: Pod
metadata:
name: dagster-worker
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: dagster
image: dagster:latest
securityContext:
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
resources:
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: dagster-secrets
key: db-password
security_metrics = {
"Failed Login Attempts": {"threshold": "> 5/hour", "action": "Lock account + Alert"},
"Unauthorized API Calls": {"threshold": "> 3/min", "action": "Block IP + Alert"},
"Secret Access Anomaly": {"threshold": "Unusual pattern", "action": "Revoke + Investigate"},
"Data Exfiltration": {"threshold": "> 100MB export", "action": "Block + Alert"},
"Pipeline Modification": {"threshold": "Any unauthorized", "action": "Revert + Alert"},
"Dependency Vulnerability": {"threshold": "Critical/High", "action": "Block deploy"},
}
print("Security Monitoring Metrics:")
for metric, info in security_metrics.items():
print(f"\n [{metric}]")
print(f" Threshold: {info['threshold']}")
print(f" Action: {info['action']}")
Best Practices Summary
best_practices = [
"Least Privilege — ให้สิทธิ์น้อยที่สุดที่จำเป็น",
"Defense in Depth — ป้องกันหลายชั้น",
"Zero Trust — ไม่เชื่อใจอะไรโดยปริยาย ตรวจสอบทุกอย่าง",
"Shift Left — ใส่ Security ตั้งแต่เริ่ม Development",
"Automate — Automate Security Checks ใน CI/CD",
"Monitor — ติดตาม Anomaly อย่างต่อเนื่อง",
"Patch — อัปเดต Dependencies สม่ำเสมอ",
]
print(f"\n\nSecurity Best Practices:")
for i, bp in enumerate(best_practices, 1):
print(f" {i}. {bp}")
เคล็ดลับ
- Vault: ใช้ HashiCorp Vault หรือ Cloud Secrets Manager เก็บ Credentials
- Scan: Scan Dependencies และ Container ทุก Build
- RBAC: ใช้ Role-based Access Control จำกัดสิทธิ์ตามบทบาท
- Encrypt: Encrypt ข้อมูลทั้ง At-rest และ In-transit
- Audit: บันทึก Audit Log ทุกกิจกรรมสำคัญ เก็บอย่างน้อย 1 ปี
Dagster คืออะไร
Open Source Data Orchestrator Python Software-defined Assets Type System Dagit UI Sensors Schedules Docker Kubernetes Cloud