Soda Data Quality และ Pod Scheduling
Soda เป็น Data Quality Platform ที่ตรวจสอบข้อมูลด้วย SodaCL Checks รองรับหลาย Data Sources ใช้ใน CI/CD Pipeline ตรวจจับปัญหาก่อนที่จะกระทบ Downstream
Kubernetes Pod Scheduling กำหนดว่า Pod จะรันบน Node ไหน ใช้ Affinity, Taints, Resource Requests จัดสรร Workload อย่างเหมาะสม รวมกับ Soda ให้ Data Quality Jobs รันบน Node ที่เหมาะสม
Soda Configuration และ Checks
# === Soda Installation และ Configuration ===
# pip install soda-core-postgres soda-core-bigquery soda-core-spark
# 1. Configuration File
# configuration.yml
# data_source my_postgres:
# type: postgres
# host: localhost
# port: 5432
# username:
# password:
# database: analytics
# schema: public
#
# data_source my_bigquery:
# type: bigquery
# project_id: my-project
# dataset: analytics
# credentials_path: /secrets/gcp-key.json
# 2. SodaCL Checks
# checks/orders.yml
# checks for orders:
# - row_count > 0
# - missing_count(order_id) = 0
# - missing_count(customer_id) = 0
# - missing_count(amount) = 0
# - duplicate_count(order_id) = 0
# - invalid_count(email) = 0:
# valid format: email
# - min(amount) >= 0
# - max(amount) < 1000000
# - avg(amount) between 50 and 500
# - freshness(created_at) < 1d
# - schema:
# fail:
# when required column missing:
# [order_id, customer_id, amount, status, created_at]
# when wrong type:
# order_id: integer
# amount: numeric
#
# checks for customers:
# - row_count > 100
# - missing_count(email) = 0
# - duplicate_count(email) = 0
# - invalid_count(email) = 0:
# valid format: email
# - values in (status) must be in ('active', 'inactive', 'suspended')
#
# checks for daily_revenue:
# - row_count > 0
# - anomaly detection for row_count
# - anomaly detection for total_revenue
# - change for row_count < 50%
# 3. รัน Soda Checks
# soda scan -d my_postgres -c configuration.yml checks/orders.yml
# 4. Soda ใน Python
from soda.scan import Scan
def run_data_quality_checks(config_path, checks_path, data_source):
"""รัน Soda Data Quality Checks"""
scan = Scan()
scan.set_data_source_name(data_source)
scan.add_configuration_yaml_file(config_path)
scan.add_sodacl_yaml_file(checks_path)
scan.execute()
results = {
"passed": scan.get_checks_pass_count(),
"warned": scan.get_checks_warn_count(),
"failed": scan.get_checks_fail_count(),
"errors": scan.get_checks_error_count(),
}
print(f"\nSoda Scan Results:")
print(f" Passed: {results['passed']}")
print(f" Warned: {results['warned']}")
print(f" Failed: {results['failed']}")
print(f" Errors: {results['errors']}")
if results["failed"] > 0:
print(f"\n Failed Checks:")
for check in scan.get_checks_fail():
print(f" FAIL: {check}")
return results["failed"] == 0
# run_data_quality_checks(
# "configuration.yml", "checks/orders.yml", "my_postgres"
# )
Kubernetes Pod Scheduling
# === Kubernetes Pod Scheduling Configuration ===
# 1. Resource Requests and Limits
# apiVersion: v1
# kind: Pod
# metadata:
# name: soda-scan-job
# spec:
# containers:
# - name: soda
# image: sodadata/soda-core:latest
# resources:
# requests:
# cpu: "500m"
# memory: "512Mi"
# limits:
# cpu: "2"
# memory: "2Gi"
# command: ["soda", "scan", "-d", "postgres", "-c", "/config/configuration.yml"]
# volumeMounts:
# - name: config
# mountPath: /config
# - name: checks
# mountPath: /checks
# volumes:
# - name: config
# configMap:
# name: soda-config
# - name: checks
# configMap:
# name: soda-checks
# 2. Node Affinity — รันบน Data Node
# apiVersion: batch/v1
# kind: CronJob
# metadata:
# name: soda-quality-check
# spec:
# schedule: "0 */6 * * *"
# jobTemplate:
# spec:
# template:
# spec:
# affinity:
# nodeAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# nodeSelectorTerms:
# - matchExpressions:
# - key: node-type
# operator: In
# values: ["data-processing"]
# preferredDuringSchedulingIgnoredDuringExecution:
# - weight: 100
# preference:
# matchExpressions:
# - key: disk-type
# operator: In
# values: ["ssd"]
# containers:
# - name: soda
# image: sodadata/soda-core:latest
# resources:
# requests:
# cpu: "1"
# memory: "1Gi"
# restartPolicy: OnFailure
# 3. Pod Anti-Affinity — กระจาย Soda Jobs
# spec:
# affinity:
# podAntiAffinity:
# preferredDuringSchedulingIgnoredDuringExecution:
# - weight: 100
# podAffinityTerm:
# labelSelector:
# matchExpressions:
# - key: app
# operator: In
# values: ["soda-scan"]
# topologyKey: kubernetes.io/hostname
# 4. Taints and Tolerations
# kubectl taint nodes data-node-1 workload=data:NoSchedule
#
# spec:
# tolerations:
# - key: "workload"
# operator: "Equal"
# value: "data"
# effect: "NoSchedule"
# 5. Topology Spread Constraints
# spec:
# topologySpreadConstraints:
# - maxSkew: 1
# topologyKey: topology.kubernetes.io/zone
# whenUnsatisfiable: ScheduleAnyway
# labelSelector:
# matchLabels:
# app: soda-scan
# 6. Priority Class
# apiVersion: scheduling.k8s.io/v1
# kind: PriorityClass
# metadata:
# name: data-quality-high
# value: 1000
# globalDefault: false
# description: "High priority for data quality jobs"
echo "Pod Scheduling configured:"
echo " Node Affinity: data-processing nodes"
echo " Anti-Affinity: spread across hosts"
echo " Tolerations: data workload taint"
echo " Priority: data-quality-high"
CI/CD Pipeline สำหรับ Data Quality
# === GitHub Actions — Data Quality Pipeline ===
# .github/workflows/data-quality.yml
name: Data Quality Pipeline
on:
schedule:
- cron: '0 */6 * * *'
push:
paths: ['dbt/**', 'checks/**']
workflow_dispatch:
env:
SODA_CLOUD_API_KEY: }
jobs:
data-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Soda
run: pip install soda-core-postgres soda-core-bigquery
- name: Run Pre-transform Checks
run: |
soda scan -d source_db \
-c configuration.yml \
checks/source_checks.yml
env:
POSTGRES_USER: }
POSTGRES_PASSWORD: }
- name: Run dbt
run: |
pip install dbt-postgres
cd dbt && dbt run --target prod
- name: Run Post-transform Checks
run: |
soda scan -d analytics_db \
-c configuration.yml \
checks/analytics_checks.yml
- name: Notify on Failure
if: failure()
run: |
curl -X POST "}" \
-H 'Content-Type: application/json' \
-d '{"text":"Data Quality check failed! Check pipeline logs."}'
deploy-k8s:
needs: data-quality
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Deploy Soda CronJob
run: |
kubectl apply -f k8s/soda-cronjob.yml
kubectl apply -f k8s/soda-config.yml
echo "Soda CronJob deployed"
Best Practices
- Checks as Code: เก็บ SodaCL Checks ใน Git Version Control ร่วมกับ dbt Models
- Anomaly Detection: ใช้ Soda Anomaly Detection ตรวจจับ Unexpected Changes
- Freshness Checks: ตรวจสอบ Data Freshness ทุก Pipeline Run
- Resource Requests: กำหนด CPU/Memory Requests ให้ Soda Jobs เพื่อ Scheduling ที่ถูกต้อง
- Node Affinity: รัน Data Quality Jobs บน Data-processing Nodes ที่มี Network Access ถึง Database
- CronJob: ใช้ Kubernetes CronJob รัน Checks อัตโนมัติทุก 6 ชั่วโมง
Soda คืออะไร
Open-source Data Quality Platform ตรวจสอบคุณภาพข้อมูลด้วย SodaCL Checks รองรับ PostgreSQL BigQuery Snowflake Spark ตรวจจับ Missing Duplicates Schema Changes Anomalies ใช้ใน CI/CD ได้
Pod Scheduling คืออะไร
กระบวนการ Kubernetes Scheduler เลือก Node สำหรับ Pod พิจารณา Resource Requests Node Affinity Taints/Tolerations Pod Affinity/Anti-Affinity Topology Spread กระจาย Workload เหมาะสม
SodaCL คืออะไร
ภาษาเขียน Data Quality Checks ใน Soda ใช้ YAML Syntax อ่านง่าย ตรวจ row_count missing_count duplicate_count freshness schema Custom Metrics รัน CLI หรือ API
Node Affinity ต่างจาก Taints/Tolerations อย่างไร
Affinity Pod-centric Pod เลือก Node ที่มี Label ตรง Taints/Tolerations Node-centric Node ไล่ Pod ไม่มี Toleration ใช้ร่วมกัน Taint ป้องกัน Affinity ดึง Pod ที่ต้องการ
สรุป
Soda Data Quality ร่วมกับ Kubernetes Pod Scheduling ให้ระบบตรวจสอบคุณภาพข้อมูลที่อัตโนมัติและเชื่อถือได้ ใช้ SodaCL เขียน Checks เก็บใน Git CronJob รันอัตโนมัติ Node Affinity รันบน Data Nodes Resource Requests จัดสรร CPU/Memory Anomaly Detection ตรวจจับปัญหา
