SiamCafe · Blog
Dagster Pipeline Security Hardening ป้องกันแฮก —
บทความ

Dagster Pipeline Security Hardening ป้องกันแฮก —

เผยแพร่ 28 พฤษภาคม 2569

Dagster Pipeline Security

Dagster Data Orchestrator Python Software-defined Assets Security Hardening Authentication Secrets Management Network Policies Audit Logging Least Privilege Encryption

Security LayerMeasureTools
AuthenticationIdentity VerificationOAuth2, OIDC, LDAP
AuthorizationRole-based AccessRBAC, Dagster Permissions
SecretsCredential ManagementVault, AWS SM, GCP SM
NetworkTraffic ControlVPC, Security Groups, mTLS
DataEncryptionAES-256, TLS 1.3
AuditActivity LoggingCloudTrail, Dagster Events

Dagster Security Configuration

=== Dagster Security Setup ===

dagster.yaml — Security Configuration

instance:

module: dagster_postgres.PostgresEventLogStorage

config:

postgres_url:

env: DAGSTER_PG_URL # Never hardcode credentials

run_storage:

module: dagster_postgres.PostgresRunStorage

config:

postgres_url:

env: DAGSTER_PG_URL

# Enable authentication

webserver:

host: 0.0.0.0

port: 3000

# Use reverse proxy for TLS termination

workspace.yaml — Secure Workspace

load_from:

  • python_module:

module_name: my_pipeline

attribute: defs

# Run in isolated process

working_directory: /opt/dagster/app

Secure Resource Configuration

from dagster import resource, EnvVar

import boto3

@resource

def secure_db_resource(context):

"""Database resource with secrets from environment"""

return {

"host": EnvVar("DB_HOST").get_value(),

"port": int(EnvVar("DB_PORT").get_value()),

"user": EnvVar("DB_USER").get_value(),

"password": EnvVar("DB_PASSWORD").get_value(),

"database": EnvVar("DB_NAME").get_value(),

"sslmode": "verify-full",

"sslrootcert": "/etc/ssl/certs/rds-ca.pem",

}

@resource

def vault_resource(context):

"""HashiCorp Vault resource for secrets"""

import hvac

client = hvac.Client(

url=EnvVar("VAULT_ADDR").get_value(),

token=EnvVar("VAULT_TOKEN").get_value(),

)

return client

from dataclasses import dataclass, field

from typing import List, Dict

@dataclass

class SecurityChecklist:

category: str

items: List[Dict[str, str]]

priority: str

checklists = [

SecurityChecklist("Authentication", [

{"item": "เปิด OAuth2/OIDC สำหรับ Dagit UI", "status": "Done"},

{"item": "ใช้ Service Account สำหรับ Pipeline", "status": "Done"},

{"item": "MFA สำหรับ Admin Access", "status": "Done"},

{"item": "API Key Rotation ทุก 90 วัน", "status": "Pending"},

], "Critical"),

SecurityChecklist("Secrets Management", [

{"item": "ย้าย Secrets จาก Config ไป Vault", "status": "Done"},

{"item": "Rotate Database Credentials อัตโนมัติ", "status": "Done"},

{"item": "Encrypt Secrets at-rest ด้วย KMS", "status": "Done"},

{"item": "Scan Code สำหรับ Hardcoded Secrets", "status": "Done"},

], "Critical"),

SecurityChecklist("Network", [

{"item": "Dagit อยู่หลัง VPN/Private Network", "status": "Done"},

{"item": "Database ไม่เปิด Public Access", "status": "Done"},

{"item": "mTLS ระหว่าง Services", "status": "Pending"},

{"item": "Egress Firewall Rules จำกัด Outbound", "status": "Done"},

], "High"),

]

print("=== Security Hardening Checklist ===")

for cl in checklists:

done = sum(1 for i in cl.items if i["status"] == "Done")

print(f"\n [{cl.priority}] {cl.category} ({done}/{len(cl.items)})")

for item in cl.items:

icon = "DONE" if item["status"] == "Done" else "TODO"

print(f" [{icon}] {item['item']}")

Input Validation และ Data Security

=== Input Validation & Data Security ===

from dagster import asset, AssetIn, DagsterType, check

import pandas as pd

# Custom Type Check

def validate_user_data(_, value):

if not isinstance(value, pd.DataFrame):

return False

required_cols = ['user_id', 'email', 'amount']

if not all(col in value.columns for col in required_cols):

return False

# Check for SQL injection patterns

for col in value.select_dtypes(include='object').columns:

if value[col].str.contains(r"(DROP|DELETE|INSERT|UPDATE|;|--)",

case=False, regex=True).any():

raise ValueError(f"Suspicious pattern in column {col}")

return True

UserDataType = DagsterType(

name="UserData",

type_check_fn=validate_user_data,

description="Validated user data without injection patterns"

)

@asset(dagster_type=UserDataType)

def clean_user_data(raw_data: pd.DataFrame) -> pd.DataFrame:

"""Sanitize and validate user data"""

# Remove PII from logs

df = raw_data.copy()

# Hash email for anonymization

import hashlib

df['email_hash'] = df['email'].apply(

lambda x: hashlib.sha256(x.encode()).hexdigest()

)

df = df.drop(columns=['email'])

# Validate amounts

df = df[df['amount'] > 0]

df = df[df['amount'] < 1000000] # Reasonable limit

return df

Dependency Scanning

pip install safety pip-audit

safety check --full-report

pip-audit --strict

vulnerability_scan = {

"Dependency Scan": {

"tool": "safety, pip-audit, Snyk",

"frequency": "ทุก CI/CD Build",

"action": "Block deploy ถ้ามี Critical",

},

"Container Scan": {

"tool": "Trivy, Grype, Snyk Container",

"frequency": "ทุก Image Build",

"action": "Block deploy ถ้ามี High/Critical",

},

"SAST": {

"tool": "Bandit, Semgrep, SonarQube",

"frequency": "ทุก PR",

"action": "Block merge ถ้ามี Issues",

},

"Secret Scan": {

"tool": "GitLeaks, TruffleHog, detect-secrets",

"frequency": "ทุก Commit (pre-commit hook)",

"action": "Block commit ถ้าพบ Secrets",

},

}

print("\nSecurity Scanning:")

for scan, info in vulnerability_scan.items():

print(f"\n [{scan}]")

for k, v in info.items():

print(f" {k}: {v}")

Monitoring และ Incident Response

=== Security Monitoring & Audit ===

Dagster Event Logging

from dagster import op, get_dagster_logger

@op

def secure_data_load(context):

logger = get_dagster_logger()

logger.info("Starting data load",

extra={"user": context.run_config.get("user"),

"source": "production_db"})

try:

# Load data

data = load_from_db()

logger.info(f"Loaded {len(data)} records",

extra={"record_count": len(data)})

return data

except Exception as e:

logger.error(f"Data load failed: {e}",

extra={"error_type": type(e).__name__})

raise

Kubernetes Security

apiVersion: v1

kind: Pod

metadata:

name: dagster-worker

spec:

securityContext:

runAsNonRoot: true

runAsUser: 1000

fsGroup: 1000

containers:

  • name: dagster

image: dagster:latest

securityContext:

readOnlyRootFilesystem: true

allowPrivilegeEscalation: false

capabilities:

drop: ["ALL"]

resources:

limits:

memory: "2Gi"

cpu: "1000m"

env:

  • name: DB_PASSWORD

valueFrom:

secretKeyRef:

name: dagster-secrets

key: db-password

security_metrics = {

"Failed Login Attempts": {"threshold": "> 5/hour", "action": "Lock account + Alert"},

"Unauthorized API Calls": {"threshold": "> 3/min", "action": "Block IP + Alert"},

"Secret Access Anomaly": {"threshold": "Unusual pattern", "action": "Revoke + Investigate"},

"Data Exfiltration": {"threshold": "> 100MB export", "action": "Block + Alert"},

"Pipeline Modification": {"threshold": "Any unauthorized", "action": "Revert + Alert"},

"Dependency Vulnerability": {"threshold": "Critical/High", "action": "Block deploy"},

}

print("Security Monitoring Metrics:")

for metric, info in security_metrics.items():

print(f"\n [{metric}]")

print(f" Threshold: {info['threshold']}")

print(f" Action: {info['action']}")

Best Practices Summary

best_practices = [

"Least Privilege — ให้สิทธิ์น้อยที่สุดที่จำเป็น",

"Defense in Depth — ป้องกันหลายชั้น",

"Zero Trust — ไม่เชื่อใจอะไรโดยปริยาย ตรวจสอบทุกอย่าง",

"Shift Left — ใส่ Security ตั้งแต่เริ่ม Development",

"Automate — Automate Security Checks ใน CI/CD",

"Monitor — ติดตาม Anomaly อย่างต่อเนื่อง",

"Patch — อัปเดต Dependencies สม่ำเสมอ",

]

print(f"\n\nSecurity Best Practices:")

for i, bp in enumerate(best_practices, 1):

print(f" {i}. {bp}")

เคล็ดลับ

  • Vault: ใช้ HashiCorp Vault หรือ Cloud Secrets Manager เก็บ Credentials
  • Scan: Scan Dependencies และ Container ทุก Build
  • RBAC: ใช้ Role-based Access Control จำกัดสิทธิ์ตามบทบาท
  • Encrypt: Encrypt ข้อมูลทั้ง At-rest และ In-transit
  • Audit: บันทึก Audit Log ทุกกิจกรรมสำคัญ เก็บอย่างน้อย 1 ปี

Dagster คืออะไร

Open Source Data Orchestrator Python Software-defined Assets Type System Dagit UI Sensors Schedules Docker Kubernetes Cloud