SiamCafe.net Blog
Technology

Soda Data Quality

soda data quality คมอฉบบสมบรณ 2026
Soda Data Quality | SiamCafe Blog
2026-05-05· อ. บอม — SiamCafe.net· 11,329 คำ

Soda Data Quality

Soda Data Quality SodaCL Check Data Pipeline CI/CD Monitoring Alert Freshness Schema Validation Production

Check TypeSodaCL Syntaxตรวจอะไรใช้เมื่อไหร่
Row Countrow_count > 0มีข้อมูลหรือไม่ทุก Table ทุก Run
Missingmissing_count(col) = 0ค่าว่าง NULLColumn สำคัญ
Duplicateduplicate_count(col) = 0ข้อมูลซ้ำPrimary Key, ID
Freshnessfreshness(col) < 1hข้อมูลเก่าเกินไปReal-time/Batch Pipeline
SchemaschemaColumn Type ถูกต้องหลัง Migration, Update
Anomalyanomaly score < defaultPattern ผิดปกติProduction Monitoring

Installation and Setup

# === Soda Installation ===

# pip install soda-core-postgres
# pip install soda-core-bigquery
# pip install soda-core-snowflake
# pip install soda-core-mysql
# pip install soda-core-spark-df

# configuration.yml
# data_source my_postgres:
#   type: postgres
#   host: localhost
#   port: 5432
#   username: 
#   password: 
#   database: analytics
#   schema: public
#
# data_source my_bigquery:
#   type: bigquery
#   project_id: my-project
#   dataset: analytics
#   account_info_json_path: /path/to/service-account.json

# checks/orders.yml
# checks for orders:
#   - row_count > 0
#   - missing_count(order_id) = 0
#   - missing_count(customer_email) = 0
#   - duplicate_count(order_id) = 0
#   - avg(total_amount) between 10 and 1000
#   - max(total_amount) < 50000
#   - freshness(created_at) < 2h
#   - schema:
#       fail:
#         when wrong column type:
#           order_id: integer
#           total_amount: numeric
#           created_at: timestamp
#   - failed rows:
#       fail query: |
#         SELECT * FROM orders
#         WHERE total_amount < 0
#         OR customer_email NOT LIKE '%@%'

# Run scan
# soda scan -d my_postgres -c configuration.yml checks/orders.yml

from dataclasses import dataclass

@dataclass
class SodaCheck:
    check: str
    syntax: str
    description: str
    severity: str
    example: str

checks = [
    SodaCheck("Row Count",
        "row_count > 0 / row_count between 1000 and 50000",
        "ตรวจว่า Table มีข้อมูล ไม่ว่าง ไม่มากผิดปกติ",
        "Critical", "orders table ต้องมีข้อมูลทุกวัน"),
    SodaCheck("Missing Values",
        "missing_count(col) = 0 / missing_percent(col) < 5",
        "ตรวจค่า NULL ว่าง ใน Column สำคัญ",
        "Critical", "email ต้องไม่มีค่าว่าง"),
    SodaCheck("Duplicate",
        "duplicate_count(col) = 0",
        "ตรวจข้อมูลซ้ำ ใน Primary Key หรือ Unique Column",
        "Critical", "order_id ต้องไม่ซ้ำ"),
    SodaCheck("Freshness",
        "freshness(timestamp_col) < 1h",
        "ตรวจว่าข้อมูลใหม่เข้ามาภายในเวลาที่กำหนด",
        "High", "ข้อมูลไม่เก่าเกิน 1 ชั่วโมง"),
    SodaCheck("Value Range",
        "avg(col) between X and Y / min(col) >= 0",
        "ตรวจค่าอยู่ในช่วงที่เหมาะสม",
        "Medium", "total_amount ต้อง > 0 และ < 50000"),
    SodaCheck("Schema",
        "schema: fail: when wrong column type",
        "ตรวจ Column Name Type ตรงกับที่คาดหวัง",
        "Critical", "ตรวจหลัง Migration ว่า Schema ถูกต้อง"),
]

print("=== Soda Checks ===")
for c in checks:
    print(f"  [{c.check}] Severity: {c.severity}")
    print(f"    Syntax: {c.syntax}")
    print(f"    Description: {c.description}")
    print(f"    Example: {c.example}")

CI/CD Integration

# === CI/CD Pipeline ===

# GitHub Actions workflow
# name: Data Quality Check
# on:
#   schedule:
#     - cron: '0 */2 * * *'  # Every 2 hours
#   workflow_dispatch:
# jobs:
#   soda-scan:
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - uses: actions/setup-python@v5
#         with: { python-version: '3.11' }
#       - run: pip install soda-core-postgres
#       - run: |
#           soda scan -d production \
#             -c configuration.yml \
#             checks/*.yml
#         env:
#           POSTGRES_USER: }
#           POSTGRES_PASSWORD: }

# Airflow DAG integration
# from airflow.operators.bash import BashOperator
# soda_check = BashOperator(
#     task_id='soda_data_quality',
#     bash_command='soda scan -d production -c /config/configuration.yml /checks/orders.yml',
#     dag=dag,
# )
# load_task >> soda_check >> downstream_task

# dbt + Soda integration
# dbt run --select staging.*
# soda scan -d warehouse -c config.yml checks/staging_checks.yml
# if [ $? -ne 0 ]; then echo "Quality check failed!"; exit 1; fi
# dbt run --select marts.*

@dataclass
class Integration:
    tool: str
    how: str
    trigger: str
    on_fail: str

integrations = [
    Integration("GitHub Actions",
        "soda scan ใน Workflow Step",
        "Schedule (cron) หรือ Push event",
        "Fail workflow, notify Slack"),
    Integration("Airflow",
        "BashOperator หรือ PythonOperator รัน soda scan",
        "After load task ใน DAG",
        "Fail task, skip downstream, alert"),
    Integration("dbt",
        "soda scan หลัง dbt run ก่อนทำ downstream",
        "After dbt run ทุกครั้ง",
        "Stop pipeline, rollback if needed"),
    Integration("Soda Cloud",
        "API integration, dashboard, alerts",
        "After every soda scan",
        "Slack PagerDuty Email webhook"),
    Integration("Prefect / Dagster",
        "Task/Op ใน Flow/Job",
        "After data load step",
        "Fail flow, alert, retry policy"),
]

print("=== CI/CD Integrations ===")
for i in integrations:
    print(f"  [{i.tool}] {i.how}")
    print(f"    Trigger: {i.trigger}")
    print(f"    On Fail: {i.on_fail}")

Production Monitoring

# === Production Setup ===

@dataclass
class MonitoringConfig:
    dataset: str
    checks: str
    frequency: str
    alert_channel: str
    owner: str

monitoring = [
    MonitoringConfig("orders",
        "row_count, missing(order_id), duplicate(order_id), freshness, amount range",
        "ทุก 1 ชั่วโมง", "Slack #data-alerts + PagerDuty",
        "Data Engineering Team"),
    MonitoringConfig("users",
        "row_count, missing(email), duplicate(email), schema validation",
        "ทุก 6 ชั่วโมง", "Slack #data-alerts",
        "Backend Team"),
    MonitoringConfig("events",
        "row_count anomaly, freshness < 30min, missing(event_type)",
        "ทุก 30 นาที", "Slack #data-alerts + PagerDuty",
        "Data Engineering Team"),
    MonitoringConfig("products",
        "row_count, missing(name price), duplicate(sku), price range",
        "ทุก 12 ชั่วโมง", "Slack #data-quality",
        "Product Team"),
    MonitoringConfig("financial_reports",
        "row_count, schema, freshness < 4h, failed_rows(negative amounts)",
        "ทุก 4 ชั่วโมง", "Slack + Email + PagerDuty",
        "Finance Data Team"),
]

print("=== Production Monitoring ===")
for m in monitoring:
    print(f"  [{m.dataset}] Owner: {m.owner}")
    print(f"    Checks: {m.checks}")
    print(f"    Frequency: {m.frequency}")
    print(f"    Alert: {m.alert_channel}")

เคล็ดลับ

การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ

Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL

เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม

Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

Soda คืออะไร

Open Source Data Quality Platform SodaCL Check PostgreSQL BigQuery Snowflake Schema Freshness Volume Validity Uniqueness Pipeline CI/CD

SodaCL เขียนอย่างไร

YAML row_count missing_count duplicate_count avg between freshness schema failed_rows query Dataset Check Syntax อ่านง่าย

ใช้กับ CI/CD อย่างไร

GitHub Actions Workflow Airflow DAG Task dbt soda scan Pipeline Fail Alert Slack PagerDuty Soda Cloud Dashboard Schedule

Production Best Practices มีอะไร

Critical Table row_count freshness schema Alert Slack PagerDuty CI/CD Pipeline Anomaly Detection History Trend failed_rows Owner Data Contract

สรุป

Soda Data Quality SodaCL Check Pipeline CI/CD GitHub Actions Airflow dbt Freshness Schema Validation Monitoring Alert Production

📖 บทความที่เกี่ยวข้อง

Soda Data Quality Scaling Strategy วิธี Scaleอ่านบทความ → Soda Data Quality Identity Access Managementอ่านบทความ → Soda Data Quality Home Lab Setupอ่านบทความ → Soda Data Quality Message Queue Designอ่านบทความ → Soda Data Quality Domain Driven Design DDDอ่านบทความ →

📚 ดูบทความทั้งหมด →