Soda Data Quality
Soda Data Quality SodaCL Check Data Pipeline CI/CD Monitoring Alert Freshness Schema Validation Production
| Check Type | SodaCL Syntax | ตรวจอะไร | ใช้เมื่อไหร่ |
|---|---|---|---|
| Row Count | row_count > 0 | มีข้อมูลหรือไม่ | ทุก Table ทุก Run |
| Missing | missing_count(col) = 0 | ค่าว่าง NULL | Column สำคัญ |
| Duplicate | duplicate_count(col) = 0 | ข้อมูลซ้ำ | Primary Key, ID |
| Freshness | freshness(col) < 1h | ข้อมูลเก่าเกินไป | Real-time/Batch Pipeline |
| Schema | schema | Column Type ถูกต้อง | หลัง Migration, Update |
| Anomaly | anomaly score < default | Pattern ผิดปกติ | Production Monitoring |
Installation and Setup
# === Soda Installation ===
# pip install soda-core-postgres
# pip install soda-core-bigquery
# pip install soda-core-snowflake
# pip install soda-core-mysql
# pip install soda-core-spark-df
# configuration.yml
# data_source my_postgres:
# type: postgres
# host: localhost
# port: 5432
# username:
# password:
# database: analytics
# schema: public
#
# data_source my_bigquery:
# type: bigquery
# project_id: my-project
# dataset: analytics
# account_info_json_path: /path/to/service-account.json
# checks/orders.yml
# checks for orders:
# - row_count > 0
# - missing_count(order_id) = 0
# - missing_count(customer_email) = 0
# - duplicate_count(order_id) = 0
# - avg(total_amount) between 10 and 1000
# - max(total_amount) < 50000
# - freshness(created_at) < 2h
# - schema:
# fail:
# when wrong column type:
# order_id: integer
# total_amount: numeric
# created_at: timestamp
# - failed rows:
# fail query: |
# SELECT * FROM orders
# WHERE total_amount < 0
# OR customer_email NOT LIKE '%@%'
# Run scan
# soda scan -d my_postgres -c configuration.yml checks/orders.yml
from dataclasses import dataclass
@dataclass
class SodaCheck:
check: str
syntax: str
description: str
severity: str
example: str
checks = [
SodaCheck("Row Count",
"row_count > 0 / row_count between 1000 and 50000",
"ตรวจว่า Table มีข้อมูล ไม่ว่าง ไม่มากผิดปกติ",
"Critical", "orders table ต้องมีข้อมูลทุกวัน"),
SodaCheck("Missing Values",
"missing_count(col) = 0 / missing_percent(col) < 5",
"ตรวจค่า NULL ว่าง ใน Column สำคัญ",
"Critical", "email ต้องไม่มีค่าว่าง"),
SodaCheck("Duplicate",
"duplicate_count(col) = 0",
"ตรวจข้อมูลซ้ำ ใน Primary Key หรือ Unique Column",
"Critical", "order_id ต้องไม่ซ้ำ"),
SodaCheck("Freshness",
"freshness(timestamp_col) < 1h",
"ตรวจว่าข้อมูลใหม่เข้ามาภายในเวลาที่กำหนด",
"High", "ข้อมูลไม่เก่าเกิน 1 ชั่วโมง"),
SodaCheck("Value Range",
"avg(col) between X and Y / min(col) >= 0",
"ตรวจค่าอยู่ในช่วงที่เหมาะสม",
"Medium", "total_amount ต้อง > 0 และ < 50000"),
SodaCheck("Schema",
"schema: fail: when wrong column type",
"ตรวจ Column Name Type ตรงกับที่คาดหวัง",
"Critical", "ตรวจหลัง Migration ว่า Schema ถูกต้อง"),
]
print("=== Soda Checks ===")
for c in checks:
print(f" [{c.check}] Severity: {c.severity}")
print(f" Syntax: {c.syntax}")
print(f" Description: {c.description}")
print(f" Example: {c.example}")
CI/CD Integration
# === CI/CD Pipeline ===
# GitHub Actions workflow
# name: Data Quality Check
# on:
# schedule:
# - cron: '0 */2 * * *' # Every 2 hours
# workflow_dispatch:
# jobs:
# soda-scan:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: actions/setup-python@v5
# with: { python-version: '3.11' }
# - run: pip install soda-core-postgres
# - run: |
# soda scan -d production \
# -c configuration.yml \
# checks/*.yml
# env:
# POSTGRES_USER: }
# POSTGRES_PASSWORD: }
# Airflow DAG integration
# from airflow.operators.bash import BashOperator
# soda_check = BashOperator(
# task_id='soda_data_quality',
# bash_command='soda scan -d production -c /config/configuration.yml /checks/orders.yml',
# dag=dag,
# )
# load_task >> soda_check >> downstream_task
# dbt + Soda integration
# dbt run --select staging.*
# soda scan -d warehouse -c config.yml checks/staging_checks.yml
# if [ $? -ne 0 ]; then echo "Quality check failed!"; exit 1; fi
# dbt run --select marts.*
@dataclass
class Integration:
tool: str
how: str
trigger: str
on_fail: str
integrations = [
Integration("GitHub Actions",
"soda scan ใน Workflow Step",
"Schedule (cron) หรือ Push event",
"Fail workflow, notify Slack"),
Integration("Airflow",
"BashOperator หรือ PythonOperator รัน soda scan",
"After load task ใน DAG",
"Fail task, skip downstream, alert"),
Integration("dbt",
"soda scan หลัง dbt run ก่อนทำ downstream",
"After dbt run ทุกครั้ง",
"Stop pipeline, rollback if needed"),
Integration("Soda Cloud",
"API integration, dashboard, alerts",
"After every soda scan",
"Slack PagerDuty Email webhook"),
Integration("Prefect / Dagster",
"Task/Op ใน Flow/Job",
"After data load step",
"Fail flow, alert, retry policy"),
]
print("=== CI/CD Integrations ===")
for i in integrations:
print(f" [{i.tool}] {i.how}")
print(f" Trigger: {i.trigger}")
print(f" On Fail: {i.on_fail}")
Production Monitoring
# === Production Setup ===
@dataclass
class MonitoringConfig:
dataset: str
checks: str
frequency: str
alert_channel: str
owner: str
monitoring = [
MonitoringConfig("orders",
"row_count, missing(order_id), duplicate(order_id), freshness, amount range",
"ทุก 1 ชั่วโมง", "Slack #data-alerts + PagerDuty",
"Data Engineering Team"),
MonitoringConfig("users",
"row_count, missing(email), duplicate(email), schema validation",
"ทุก 6 ชั่วโมง", "Slack #data-alerts",
"Backend Team"),
MonitoringConfig("events",
"row_count anomaly, freshness < 30min, missing(event_type)",
"ทุก 30 นาที", "Slack #data-alerts + PagerDuty",
"Data Engineering Team"),
MonitoringConfig("products",
"row_count, missing(name price), duplicate(sku), price range",
"ทุก 12 ชั่วโมง", "Slack #data-quality",
"Product Team"),
MonitoringConfig("financial_reports",
"row_count, schema, freshness < 4h, failed_rows(negative amounts)",
"ทุก 4 ชั่วโมง", "Slack + Email + PagerDuty",
"Finance Data Team"),
]
print("=== Production Monitoring ===")
for m in monitoring:
print(f" [{m.dataset}] Owner: {m.owner}")
print(f" Checks: {m.checks}")
print(f" Frequency: {m.frequency}")
print(f" Alert: {m.alert_channel}")
เคล็ดลับ
- Start Simple: เริ่มจาก row_count freshness schema ก่อน แล้วค่อยเพิ่ม
- CI/CD: ใส่ soda scan ใน Pipeline ทุก Run ไม่ใช่แค่รันบางครั้ง
- Alert: ตั้ง Alert สำหรับ Critical Check ส่ง Slack PagerDuty ทันที
- Owner: กำหนด Owner สำหรับแต่ละ Dataset ใครรับผิดชอบแก้
- History: ใช้ Soda Cloud เก็บ History ดู Trend คุณภาพเปลี่ยนแปลง
การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ
Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL
เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม
Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ
เปรียบเทียบข้อดีและข้อเสีย
จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม
Soda คืออะไร
Open Source Data Quality Platform SodaCL Check PostgreSQL BigQuery Snowflake Schema Freshness Volume Validity Uniqueness Pipeline CI/CD
SodaCL เขียนอย่างไร
YAML row_count missing_count duplicate_count avg between freshness schema failed_rows query Dataset Check Syntax อ่านง่าย
ใช้กับ CI/CD อย่างไร
GitHub Actions Workflow Airflow DAG Task dbt soda scan Pipeline Fail Alert Slack PagerDuty Soda Cloud Dashboard Schedule
Production Best Practices มีอะไร
Critical Table row_count freshness schema Alert Slack PagerDuty CI/CD Pipeline Anomaly Detection History Trend failed_rows Owner Data Contract
สรุป
Soda Data Quality SodaCL Check Pipeline CI/CD GitHub Actions Airflow dbt Freshness Schema Validation Monitoring Alert Production
