Soda Core คืออะไรและใช้ตรวจสอบคุณภาพข้อมูลอย่างไร
Soda Core เป็น open source data quality framework ที่ใช้ภาษา SodaCL (Soda Checks Language) สำหรับเขียน data quality checks ในรูปแบบที่อ่านง่ายเหมือนภาษาธรรมชาติ Soda Core รองรับ data sources หลายตัวเช่น PostgreSQL, MySQL, BigQuery, Snowflake, Spark, DuckDB และ Pandas
จุดเด่นของ Soda Core คือ SodaCL ที่เข้าใจง่าย ไม่ต้องเขียน SQL ซับซ้อน สามารถ integrate กับ CI/CD และ orchestration tools ได้ มี built-in checks มากมาย รองรับ custom checks ด้วย Python และมี Soda Cloud สำหรับ dashboard และ alerting
Soda ใช้แนวคิด checks ที่เป็น assertions เกี่ยวกับข้อมูล เช่น row count ต้องมากกว่า 0, column ต้องไม่มี null, values ต้องอยู่ในช่วงที่กำหนด เมื่อรัน scan Soda จะ execute checks ทั้งหมดและ report ผลว่า pass หรือ fail
Soda เป็น open source project ที่ยินดีรับ contributions จาก community ไม่ว่าจะเป็นการเพิ่ม data source connectors ใหม่ สร้าง custom checks เขียน documentation หรือ fix bugs การ contribute ช่วยพัฒนาทักษะ open source development และสร้าง portfolio ที่ดี
ติดตั้งและตั้งค่า Soda Core
ขั้นตอนการติดตั้งและ configuration เบื้องต้น
# ติดตั้ง Soda Core
pip install soda-core
# ติดตั้งพร้อม data source connector
pip install soda-core-postgres
pip install soda-core-bigquery
pip install soda-core-snowflake
pip install soda-core-mysql
pip install soda-core-duckdb
pip install soda-core-spark-df
# ตรวจสอบการติดตั้ง
soda --version
# สร้าง configuration file
# configuration.yml
data_source my_postgres:
type: postgres
host: localhost
port: 5432
username:
password:
database: analytics
schema: public
data_source my_bigquery:
type: bigquery
project_id: my-gcp-project
dataset: analytics
account_info_json_path: /path/to/service-account.json
data_source my_duckdb:
type: duckdb
path: /data/analytics.duckdb
# Environment variables
# export POSTGRES_USER=analytics_user
# export POSTGRES_PASSWORD=secure_password
# ทดสอบ connection
soda test-connection -d my_postgres -c configuration.yml
# โครงสร้างโปรเจกต์แนะนำ
# soda-project/
# ├── configuration.yml # data source configs
# ├── checks/
# │ ├── customers.yml # checks for customers table
# │ ├── orders.yml # checks for orders table
# │ └── products.yml # checks for products table
# ├── custom_checks/
# │ └── my_custom_check.py # Python custom checks
# ├── .env # environment variables
# └── .github/
# └── workflows/
# └── data-quality.yml # CI/CD workflow
เขียน SodaCL Checks สำหรับ Data Quality
ตัวอย่าง SodaCL checks สำหรับตรวจสอบข้อมูลหลายรูปแบบ
# checks/customers.yml — SodaCL Checks for Customers Table
# === Row Count Checks ===
checks for customers:
- row_count > 0
- row_count between 1000 and 10000000
# === Freshness Check ===
- freshness(updated_at) < 24h
# === Schema Validation ===
- schema:
name: customers_schema
fail:
when required column missing:
[customer_id, email, name, status, created_at]
when wrong column type:
customer_id: integer
email: varchar
created_at: timestamp
# === Null Checks ===
- missing_count(customer_id) = 0
- missing_count(email) = 0
- missing_percent(phone) < 10
# === Uniqueness ===
- duplicate_count(customer_id) = 0
- duplicate_count(email) = 0
# === Value Validation ===
- invalid_count(email) = 0:
valid format: email
- invalid_count(status) = 0:
valid values: [active, inactive, suspended, deleted]
# === Statistical Checks ===
- avg(lifetime_value) between 100 and 50000
- max(lifetime_value) < 1000000
- min(created_at) > 2020-01-01
# === Anomaly Detection ===
- anomaly detection for row_count:
warn: when relative change > 20%
fail: when relative change > 50%
# === Cross-table Reference ===
- values in (country_code) must exist in countries (code)
# === Custom SQL Check ===
- failed rows:
name: orphan_orders_check
fail query: |
SELECT o.order_id, o.customer_id
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
# checks/orders.yml — SodaCL Checks for Orders Table
checks for orders:
- row_count > 0
- freshness(order_date) < 2h
- missing_count(order_id) = 0
- missing_count(customer_id) = 0
- missing_count(total_amount) = 0
- duplicate_count(order_id) = 0
- min(total_amount) >= 0
- max(total_amount) < 1000000
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled, refunded]
- avg(total_amount) between 50 and 5000
# รัน scan
# soda scan -d my_postgres -c configuration.yml checks/customers.yml
# soda scan -d my_postgres -c configuration.yml checks/
รวม Soda เข้ากับ Data Pipeline
ใช้ Soda เป็น data quality gate ใน Airflow pipeline
#!/usr/bin/env python3
# airflow_soda_dag.py — Airflow DAG with Soda Quality Checks
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
from soda.scan import Scan
default_args = {
"owner": "data-team",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def run_soda_scan(check_file, data_source="my_postgres", **kwargs):
scan = Scan()
scan.set_data_source_name(data_source)
scan.add_configuration_yaml_file("configuration.yml")
scan.add_sodacl_yaml_file(check_file)
scan.execute()
results = {
"has_failures": scan.has_check_fails(),
"has_warnings": scan.has_check_warns(),
"checks_total": len(scan.get_checks_fail()) + len(scan.get_checks_warn()) + len(scan.get_checks_pass()),
"checks_failed": len(scan.get_checks_fail()),
"checks_warned": len(scan.get_checks_warn()),
"checks_passed": len(scan.get_checks_pass()),
}
failed_details = []
for check in scan.get_checks_fail():
failed_details.append({
"name": check.name,
"table": check.check_cfg.source_header,
})
results["failed_checks"] = failed_details
kwargs["ti"].xcom_push(key="soda_results", value=results)
if results["has_failures"]:
raise ValueError(f"Soda scan failed: {results['checks_failed']} checks failed")
return results
def decide_on_quality(**kwargs):
results = kwargs["ti"].xcom_pull(key="soda_results", task_ids="validate_source_data")
if results and not results["has_failures"]:
return "transform_data"
return "quarantine_data"
def transform_data(**kwargs):
print("Running ETL transformations...")
def quarantine_data(**kwargs):
results = kwargs["ti"].xcom_pull(key="soda_results", task_ids="validate_source_data")
print(f"Quarantining data — {results['checks_failed']} checks failed")
for check in results.get("failed_checks", []):
print(f" FAILED: {check['name']} on {check['table']}")
with DAG(
"etl_with_soda_quality",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
validate_source = PythonOperator(
task_id="validate_source_data",
python_callable=run_soda_scan,
op_kwargs={"check_file": "checks/customers.yml"},
)
quality_gate = BranchPythonOperator(
task_id="quality_gate",
python_callable=decide_on_quality,
)
transform = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
)
validate_output = PythonOperator(
task_id="validate_output",
python_callable=run_soda_scan,
op_kwargs={"check_file": "checks/orders.yml"},
)
quarantine = PythonOperator(
task_id="quarantine_data",
python_callable=quarantine_data,
)
done = EmptyOperator(task_id="done", trigger_rule="none_failed_min_one_success")
validate_source >> quality_gate
quality_gate >> transform >> validate_output >> done
quality_gate >> quarantine >> done
Contribute to Soda Open Source Project
ขั้นตอนการ contribute ให้กับ Soda Core project
# === Getting Started with Soda Core Contribution ===
# 1. Fork และ Clone repository
git clone https://github.com/YOUR_USERNAME/soda-core.git
cd soda-core
git remote add upstream https://github.com/sodadata/soda-core.git
# 2. ตั้งค่า Development Environment
python3 -m venv .venv
source .venv/bin/activate
# Install in development mode
pip install -e ".[dev, postgres, bigquery]"
pip install pytest pytest-cov black isort mypy
# 3. สร้าง branch
git checkout -b feature/add-custom-check
git fetch upstream
git rebase upstream/main
# 4. โครงสร้าง Repository
# soda-core/
# ├── soda/ # main package
# │ ├── core/ # core library
# │ │ ├── scan.py # Scan class
# │ │ ├── check.py # Check base class
# │ │ └── data_source.py # Data source base
# │ ├── postgres/ # PostgreSQL connector
# │ ├── bigquery/ # BigQuery connector
# │ └── scientific/ # anomaly detection
# ├── tests/
# │ ├── core/
# │ ├── postgres/
# │ └── helpers/
# ├── docs/
# ├── pyproject.toml
# └── CONTRIBUTING.md
# 5. Run Tests
pytest tests/core/ -v
pytest tests/postgres/ -v --tb=short
# 6. Code Quality
black soda/
isort soda/
mypy soda/core/
# 7. สร้าง Pull Request
git add .
git commit -m "feat: add custom anomaly check for time series data"
git push origin feature/add-custom-check
# -> สร้าง PR บน GitHub
# === Contribution Areas ===
# - New data source connectors (e.g., ClickHouse, QuestDB)
# - New SodaCL check types
# - Bug fixes
# - Documentation improvements
# - Test coverage improvements
# - Performance optimizations
# - CI/CD improvements
# === PR Checklist ===
# [ ] Tests added/updated
# [ ] Documentation updated
# [ ] Code formatted with black
# [ ] Type hints added
# [ ] CHANGELOG updated
# [ ] No breaking changes (or documented)
สร้าง Custom Check ด้วย Python
สร้าง custom SodaCL check สำหรับใช้ในโปรเจกต์
#!/usr/bin/env python3
# custom_checks/statistical_check.py — Custom Soda Check
from soda.core.check import Check
import numpy as np
from typing import Optional
class StatisticalOutlierCheck(Check):
"""Custom check ตรวจจับ statistical outliers ในข้อมูล"""
def __init__(self, check_cfg, data_source, partition):
super().__init__(check_cfg, data_source, partition)
self.column = check_cfg.get("column")
self.method = check_cfg.get("method", "zscore")
self.threshold = check_cfg.get("threshold", 3.0)
self.max_outlier_percent = check_cfg.get("max_outlier_percent", 1.0)
def execute(self):
query = f"""
SELECT {self.column}
FROM {self.partition.table_name}
WHERE {self.column} IS NOT NULL
"""
rows = self.data_source.execute_query(query)
values = np.array([row[0] for row in rows], dtype=float)
if len(values) == 0:
self.outcome = "pass"
return
if self.method == "zscore":
outliers = self._zscore_outliers(values)
elif self.method == "iqr":
outliers = self._iqr_outliers(values)
else:
raise ValueError(f"Unknown method: {self.method}")
outlier_percent = (len(outliers) / len(values)) * 100
self.metrics = {
"total_values": len(values),
"outlier_count": len(outliers),
"outlier_percent": round(outlier_percent, 2),
"mean": round(np.mean(values), 2),
"std": round(np.std(values), 2),
"min": round(np.min(values), 2),
"max": round(np.max(values), 2),
}
if outlier_percent > self.max_outlier_percent:
self.outcome = "fail"
self.message = (f"Outlier percentage {outlier_percent:.2f}% exceeds "
f"threshold {self.max_outlier_percent}%")
else:
self.outcome = "pass"
def _zscore_outliers(self, values):
mean = np.mean(values)
std = np.std(values)
if std == 0:
return np.array([])
zscores = np.abs((values - mean) / std)
return values[zscores > self.threshold]
def _iqr_outliers(self, values):
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower = q1 - self.threshold * iqr
upper = q3 + self.threshold * iqr
return values[(values < lower) | (values > upper)]
# custom_checks/freshness_check.py
class DataFreshnessCheck:
"""ตรวจสอบว่าข้อมูลอัปเดตล่าสุดไม่เกินเวลาที่กำหนด"""
def __init__(self, data_source, table, column, max_hours=24):
self.data_source = data_source
self.table = table
self.column = column
self.max_hours = max_hours
def run(self):
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name(self.data_source)
scan.add_configuration_yaml_file("configuration.yml")
check_yaml = f"""
checks for {self.table}:
- freshness({self.column}) < {self.max_hours}h
"""
scan.add_sodacl_yaml_str(check_yaml)
scan.execute()
return {
"passed": not scan.has_check_fails(),
"checks_failed": len(scan.get_checks_fail()),
}
# ใช้งาน Custom Check ใน Python
def run_custom_quality_checks():
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name("my_postgres")
scan.add_configuration_yaml_file("configuration.yml")
scan.add_sodacl_yaml_file("checks/customers.yml")
scan.execute()
print(f"Scan results:")
print(f" Passed: {len(scan.get_checks_pass())}")
print(f" Warned: {len(scan.get_checks_warn())}")
print(f" Failed: {len(scan.get_checks_fail())}")
for check in scan.get_checks_fail():
print(f" FAIL: {check.name}")
if __name__ == "__main__":
run_custom_quality_checks()
FAQ คำถามที่พบบ่อย
Q: Soda Core กับ Great Expectations ต่างกันอย่างไร?
A: Soda Core ใช้ SodaCL ที่เขียนง่ายกว่า เหมาะสำหรับทีมที่ต้องการเริ่มต้นเร็ว checks เขียนเป็น YAML ที่อ่านง่าย ส่วน Great Expectations มี expectations library ที่ใหญ่กว่า มี profiling และ data docs ที่ดีกว่า แต่ configuration ซับซ้อนกว่า Soda เหมาะสำหรับ monitoring และ alerting ส่วน GX เหมาะสำหรับ comprehensive data quality testing
Q: SodaCL เรียนรู้ยากไหม?
A: SodaCL ออกแบบมาให้เขียนง่ายคล้ายภาษาธรรมชาติ ตัวอย่างเช่น row_count > 0 หรือ missing_count(email) = 0 ไม่ต้องเขียน SQL ซับซ้อน แต่ยังรองรับ custom SQL สำหรับ checks ที่ซับซ้อน ผู้ที่ไม่มีพื้นฐาน programming ก็เขียนได้ documentation มีตัวอย่างครบถ้วน
Q: การ contribute ให้ open source project ยากไหม?
A: เริ่มจากงานเล็กๆก่อนเช่น fix typos ใน documentation, เพิ่ม tests, fix small bugs แล้วค่อยขยายไป features ใหญ่ขึ้น อ่าน CONTRIBUTING.md ของโปรเจกต์ให้เข้าใจ workflow ดู issues ที่ label ว่า good first issue หรือ help wanted สิ่งสำคัญคือสื่อสารกับ maintainers ก่อนเริ่มงานใหญ่
Q: Soda Core ใช้กับ real-time data ได้ไหม?
A: Soda Core ออกแบบสำหรับ batch scanning เป็นหลัก รัน scan เป็นระยะเช่น ทุกชั่วโมงหรือทุกวัน ไม่เหมาะกับ real-time streaming validation สำหรับ real-time ควรใช้ custom validation ใน stream processing framework เช่น Kafka Streams หรือ Flink แล้วใช้ Soda สำหรับ periodic batch checks เสริม
