SiamCafe.net Blog
Technology

Soda Data Quality Pod Scheduling

soda data quality pod scheduling
Soda Data Quality Pod Scheduling | SiamCafe Blog
2025-06-03· อ. บอม — SiamCafe.net· 9,771 คำ

Soda Data Quality และ Pod Scheduling

Soda เป็น Data Quality Platform ที่ตรวจสอบข้อมูลด้วย SodaCL Checks รองรับหลาย Data Sources ใช้ใน CI/CD Pipeline ตรวจจับปัญหาก่อนที่จะกระทบ Downstream

Kubernetes Pod Scheduling กำหนดว่า Pod จะรันบน Node ไหน ใช้ Affinity, Taints, Resource Requests จัดสรร Workload อย่างเหมาะสม รวมกับ Soda ให้ Data Quality Jobs รันบน Node ที่เหมาะสม

Soda Configuration และ Checks

# === Soda Installation และ Configuration ===
# pip install soda-core-postgres soda-core-bigquery soda-core-spark

# 1. Configuration File
# configuration.yml
# data_source my_postgres:
#   type: postgres
#   host: localhost
#   port: 5432
#   username: 
#   password: 
#   database: analytics
#   schema: public
#
# data_source my_bigquery:
#   type: bigquery
#   project_id: my-project
#   dataset: analytics
#   credentials_path: /secrets/gcp-key.json

# 2. SodaCL Checks
# checks/orders.yml
# checks for orders:
#   - row_count > 0
#   - missing_count(order_id) = 0
#   - missing_count(customer_id) = 0
#   - missing_count(amount) = 0
#   - duplicate_count(order_id) = 0
#   - invalid_count(email) = 0:
#       valid format: email
#   - min(amount) >= 0
#   - max(amount) < 1000000
#   - avg(amount) between 50 and 500
#   - freshness(created_at) < 1d
#   - schema:
#       fail:
#         when required column missing:
#           [order_id, customer_id, amount, status, created_at]
#         when wrong type:
#           order_id: integer
#           amount: numeric
#
# checks for customers:
#   - row_count > 100
#   - missing_count(email) = 0
#   - duplicate_count(email) = 0
#   - invalid_count(email) = 0:
#       valid format: email
#   - values in (status) must be in ('active', 'inactive', 'suspended')
#
# checks for daily_revenue:
#   - row_count > 0
#   - anomaly detection for row_count
#   - anomaly detection for total_revenue
#   - change for row_count < 50%

# 3. รัน Soda Checks
# soda scan -d my_postgres -c configuration.yml checks/orders.yml

# 4. Soda ใน Python
from soda.scan import Scan

def run_data_quality_checks(config_path, checks_path, data_source):
    """รัน Soda Data Quality Checks"""
    scan = Scan()
    scan.set_data_source_name(data_source)
    scan.add_configuration_yaml_file(config_path)
    scan.add_sodacl_yaml_file(checks_path)
    scan.execute()

    results = {
        "passed": scan.get_checks_pass_count(),
        "warned": scan.get_checks_warn_count(),
        "failed": scan.get_checks_fail_count(),
        "errors": scan.get_checks_error_count(),
    }

    print(f"\nSoda Scan Results:")
    print(f"  Passed: {results['passed']}")
    print(f"  Warned: {results['warned']}")
    print(f"  Failed: {results['failed']}")
    print(f"  Errors: {results['errors']}")

    if results["failed"] > 0:
        print(f"\n  Failed Checks:")
        for check in scan.get_checks_fail():
            print(f"    FAIL: {check}")

    return results["failed"] == 0

# run_data_quality_checks(
#     "configuration.yml", "checks/orders.yml", "my_postgres"
# )

Kubernetes Pod Scheduling

# === Kubernetes Pod Scheduling Configuration ===

# 1. Resource Requests and Limits
# apiVersion: v1
# kind: Pod
# metadata:
#   name: soda-scan-job
# spec:
#   containers:
#     - name: soda
#       image: sodadata/soda-core:latest
#       resources:
#         requests:
#           cpu: "500m"
#           memory: "512Mi"
#         limits:
#           cpu: "2"
#           memory: "2Gi"
#       command: ["soda", "scan", "-d", "postgres", "-c", "/config/configuration.yml"]
#       volumeMounts:
#         - name: config
#           mountPath: /config
#         - name: checks
#           mountPath: /checks
#   volumes:
#     - name: config
#       configMap:
#         name: soda-config
#     - name: checks
#       configMap:
#         name: soda-checks

# 2. Node Affinity — รันบน Data Node
# apiVersion: batch/v1
# kind: CronJob
# metadata:
#   name: soda-quality-check
# spec:
#   schedule: "0 */6 * * *"
#   jobTemplate:
#     spec:
#       template:
#         spec:
#           affinity:
#             nodeAffinity:
#               requiredDuringSchedulingIgnoredDuringExecution:
#                 nodeSelectorTerms:
#                   - matchExpressions:
#                       - key: node-type
#                         operator: In
#                         values: ["data-processing"]
#               preferredDuringSchedulingIgnoredDuringExecution:
#                 - weight: 100
#                   preference:
#                     matchExpressions:
#                       - key: disk-type
#                         operator: In
#                         values: ["ssd"]
#           containers:
#             - name: soda
#               image: sodadata/soda-core:latest
#               resources:
#                 requests:
#                   cpu: "1"
#                   memory: "1Gi"
#           restartPolicy: OnFailure

# 3. Pod Anti-Affinity — กระจาย Soda Jobs
# spec:
#   affinity:
#     podAntiAffinity:
#       preferredDuringSchedulingIgnoredDuringExecution:
#         - weight: 100
#           podAffinityTerm:
#             labelSelector:
#               matchExpressions:
#                 - key: app
#                   operator: In
#                   values: ["soda-scan"]
#             topologyKey: kubernetes.io/hostname

# 4. Taints and Tolerations
# kubectl taint nodes data-node-1 workload=data:NoSchedule
#
# spec:
#   tolerations:
#     - key: "workload"
#       operator: "Equal"
#       value: "data"
#       effect: "NoSchedule"

# 5. Topology Spread Constraints
# spec:
#   topologySpreadConstraints:
#     - maxSkew: 1
#       topologyKey: topology.kubernetes.io/zone
#       whenUnsatisfiable: ScheduleAnyway
#       labelSelector:
#         matchLabels:
#           app: soda-scan

# 6. Priority Class
# apiVersion: scheduling.k8s.io/v1
# kind: PriorityClass
# metadata:
#   name: data-quality-high
# value: 1000
# globalDefault: false
# description: "High priority for data quality jobs"

echo "Pod Scheduling configured:"
echo "  Node Affinity: data-processing nodes"
echo "  Anti-Affinity: spread across hosts"
echo "  Tolerations: data workload taint"
echo "  Priority: data-quality-high"

CI/CD Pipeline สำหรับ Data Quality

# === GitHub Actions — Data Quality Pipeline ===
# .github/workflows/data-quality.yml

name: Data Quality Pipeline
on:
  schedule:
    - cron: '0 */6 * * *'
  push:
    paths: ['dbt/**', 'checks/**']
  workflow_dispatch:

env:
  SODA_CLOUD_API_KEY: }

jobs:
  data-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install Soda
        run: pip install soda-core-postgres soda-core-bigquery

      - name: Run Pre-transform Checks
        run: |
          soda scan -d source_db \
            -c configuration.yml \
            checks/source_checks.yml
        env:
          POSTGRES_USER: }
          POSTGRES_PASSWORD: }

      - name: Run dbt
        run: |
          pip install dbt-postgres
          cd dbt && dbt run --target prod

      - name: Run Post-transform Checks
        run: |
          soda scan -d analytics_db \
            -c configuration.yml \
            checks/analytics_checks.yml

      - name: Notify on Failure
        if: failure()
        run: |
          curl -X POST "}" \
            -H 'Content-Type: application/json' \
            -d '{"text":"Data Quality check failed! Check pipeline logs."}'

  deploy-k8s:
    needs: data-quality
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Deploy Soda CronJob
        run: |
          kubectl apply -f k8s/soda-cronjob.yml
          kubectl apply -f k8s/soda-config.yml
          echo "Soda CronJob deployed"

Best Practices

Soda คืออะไร

Open-source Data Quality Platform ตรวจสอบคุณภาพข้อมูลด้วย SodaCL Checks รองรับ PostgreSQL BigQuery Snowflake Spark ตรวจจับ Missing Duplicates Schema Changes Anomalies ใช้ใน CI/CD ได้

Pod Scheduling คืออะไร

กระบวนการ Kubernetes Scheduler เลือก Node สำหรับ Pod พิจารณา Resource Requests Node Affinity Taints/Tolerations Pod Affinity/Anti-Affinity Topology Spread กระจาย Workload เหมาะสม

SodaCL คืออะไร

ภาษาเขียน Data Quality Checks ใน Soda ใช้ YAML Syntax อ่านง่าย ตรวจ row_count missing_count duplicate_count freshness schema Custom Metrics รัน CLI หรือ API

Node Affinity ต่างจาก Taints/Tolerations อย่างไร

Affinity Pod-centric Pod เลือก Node ที่มี Label ตรง Taints/Tolerations Node-centric Node ไล่ Pod ไม่มี Toleration ใช้ร่วมกัน Taint ป้องกัน Affinity ดึง Pod ที่ต้องการ

สรุป

Soda Data Quality ร่วมกับ Kubernetes Pod Scheduling ให้ระบบตรวจสอบคุณภาพข้อมูลที่อัตโนมัติและเชื่อถือได้ ใช้ SodaCL เขียน Checks เก็บใน Git CronJob รันอัตโนมัติ Node Affinity รันบน Data Nodes Resource Requests จัดสรร CPU/Memory Anomaly Detection ตรวจจับปัญหา

📖 บทความที่เกี่ยวข้อง

Soda Data Quality Scaling Strategy วิธี Scaleอ่านบทความ → Soda Data Quality Identity Access Managementอ่านบทความ → Soda Data Quality Home Lab Setupอ่านบทความ → Soda Data Quality Container Orchestrationอ่านบทความ → Fivetran Connector Pod Schedulingอ่านบทความ →

📚 ดูบทความทั้งหมด →