SiamCafe.net Blog
Technology

Great Expectations CQRS Event Sourcing

great expectations cqrs event sourcing
Great Expectations CQRS Event Sourcing | SiamCafe Blog
2025-10-14· อ. บอม — SiamCafe.net· 10,876 คำ

Great Expectations CQRS Event Sourcing

Great Expectations Data Quality CQRS Event Sourcing Validation Expectation Checkpoint Pipeline Kafka Dashboard Production

PatternPurposeGX ComponentBenefit
Command (Write)Validate + Write DataCheckpoint + Expectation SuitePrevent Bad Data
Query (Read)Quality DashboardData Docs + Custom APIVisibility Reporting
Event SourcingAudit Trail HistoryValidation Results → KafkaTime Travel Compliance

GX Validation Pipeline

# === Great Expectations Validation ===

# import great_expectations as gx
#
# context = gx.get_context()
#
# # Data Source
# datasource = context.sources.add_pandas("my_source")
# data_asset = datasource.add_dataframe_asset("orders")
#
# # Expectation Suite
# suite = context.add_expectation_suite("orders_quality")
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
# )
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToBeBetween(
#         column="amount", min_value=0, max_value=1000000
#     )
# )
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
# )
# suite.add_expectation(
#     gx.expectations.ExpectTableRowCountToBeGreaterThan(value=0)
# )
#
# # Checkpoint
# checkpoint = context.add_checkpoint(
#     name="orders_checkpoint",
#     validations=[{
#         "batch_request": batch_request,
#         "expectation_suite_name": "orders_quality",
#     }],
#     action_list=[
#         {"name": "store_result", "action": {"class_name": "StoreValidationResultAction"}},
#         {"name": "update_docs", "action": {"class_name": "UpdateDataDocsAction"}},
#     ]
# )
# result = checkpoint.run()

from dataclasses import dataclass

@dataclass
class Expectation:
    expectation: str
    column: str
    parameters: str
    severity: str
    example: str

expectations = [
    Expectation("expect_column_values_to_not_be_null",
        "order_id, customer_id",
        "mostly=0.99 (99% not null OK)",
        "Critical",
        "order_id ต้องไม่ Null ทุก Row"),
    Expectation("expect_column_values_to_be_between",
        "amount, quantity",
        "min_value=0 max_value=1000000",
        "High",
        "amount ต้องอยู่ระหว่าง 0-1,000,000"),
    Expectation("expect_column_values_to_be_unique",
        "order_id",
        "mostly=1.0 (100% unique)",
        "Critical",
        "order_id ต้อง Unique ไม่ซ้ำ"),
    Expectation("expect_table_row_count_to_be_between",
        "(table level)",
        "min_value=1000 max_value=10000000",
        "High",
        "Table ต้องมี 1K-10M rows"),
    Expectation("expect_column_values_to_match_regex",
        "email",
        "regex='^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'",
        "Medium",
        "email ต้องตรง Format"),
    Expectation("expect_column_distinct_values_to_be_in_set",
        "status",
        "value_set=['pending','completed','cancelled']",
        "High",
        "status ต้องเป็นค่าที่กำหนด"),
]

print("=== GX Expectations ===")
for e in expectations:
    print(f"  [{e.expectation}]")
    print(f"    Column: {e.column} | Severity: {e.severity}")
    print(f"    Params: {e.parameters}")
    print(f"    Example: {e.example}")

CQRS + Event Sourcing

# === CQRS Event Sourcing for Data Quality ===

# Event Publisher (after GX Validation)
# import json
# from kafka import KafkaProducer
#
# producer = KafkaProducer(
#     bootstrap_servers='kafka:9092',
#     value_serializer=lambda v: json.dumps(v).encode()
# )
#
# def publish_validation_event(result):
#     event = {
#         "event_type": "ValidationPassed" if result.success else "ValidationFailed",
#         "timestamp": datetime.utcnow().isoformat(),
#         "suite_name": result.suite_name,
#         "table": result.asset_name,
#         "success": result.success,
#         "statistics": {
#             "evaluated": result.statistics["evaluated_expectations"],
#             "successful": result.statistics["successful_expectations"],
#             "failed": result.statistics["unsuccessful_expectations"],
#             "success_percent": result.statistics["success_percent"],
#         },
#         "failed_expectations": [
#             {"expectation": r.expectation_config.expectation_type,
#              "column": r.expectation_config.kwargs.get("column")}
#             for r in result.results if not r.success
#         ]
#     }
#     producer.send("data-quality-events", value=event)

@dataclass
class QualityEvent:
    event_type: str
    trigger: str
    payload: str
    read_model_update: str

events = [
    QualityEvent("ValidationPassed",
        "GX Checkpoint ผ่านทุก Expectation",
        "suite table stats timestamp",
        "เพิ่ม Quality Score อัพเดท Dashboard"),
    QualityEvent("ValidationFailed",
        "GX Checkpoint ไม่ผ่านบาง Expectation",
        "suite table failed_expectations details",
        "ลด Quality Score Alert Team"),
    QualityEvent("ExpectationAdded",
        "เพิ่ม Expectation ใหม่ใน Suite",
        "suite expectation_type column params",
        "อัพเดท Expectation Count History"),
    QualityEvent("ThresholdBreached",
        "Quality Score < Threshold (เช่น 95%)",
        "suite table score threshold",
        "Alert P1 Block Pipeline"),
    QualityEvent("DataDriftDetected",
        "Schema หรือ Distribution เปลี่ยน",
        "table column old_distribution new_distribution",
        "Alert Data Team Investigate"),
]

print("=== Data Quality Events ===")
for e in events:
    print(f"  [{e.event_type}] Trigger: {e.trigger}")
    print(f"    Payload: {e.payload}")
    print(f"    Read Model: {e.read_model_update}")

Dashboard & Monitoring

# === Quality Dashboard ===

@dataclass
class DashboardMetric:
    metric: str
    source: str
    visualization: str
    alert: str

dashboard = [
    DashboardMetric("Quality Score per Table",
        "Event Store (success_percent per run)",
        "Line Chart trend over time",
        "Score < 95% → P2 Alert"),
    DashboardMetric("Failed Expectations",
        "Event Store (failed_expectations list)",
        "Bar Chart top failed expectations",
        "Same failure 3x → P1 Alert"),
    DashboardMetric("Validation Run History",
        "Event Store (all validation events)",
        "Table latest 50 runs with status",
        "No run for 2x schedule → Alert"),
    DashboardMetric("Data Drift Score",
        "Event Store (distribution changes)",
        "Heatmap drift per column per day",
        "Drift > 20% → P2 Alert"),
    DashboardMetric("Coverage",
        "Expectation count per table",
        "Progress bar expectations vs columns",
        "Coverage < 50% → Reminder"),
]

print("=== Dashboard Metrics ===")
for d in dashboard:
    print(f"  [{d.metric}]")
    print(f"    Source: {d.source}")
    print(f"    Viz: {d.visualization}")
    print(f"    Alert: {d.alert}")

เคล็ดลับ

การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ

Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL

เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม

Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ

Great Expectations คืออะไร

Open Source Data Quality Python Expectations Validation Checkpoint Data Docs Pandas Spark SQL CI/CD Airflow Prefect Dagster

CQRS สำหรับ Data Quality ทำอย่างไร

Command Write Validate GX Checkpoint Query Read Dashboard Data Docs แยก Write Read Scale Optimize Audit Trail Event

Event Sourcing ทำอย่างไร

ValidationPassed Failed ExpectationAdded ThresholdBreached DriftDetected Kafka PostgreSQL Replay Time Travel Audit Compliance

Production Pipeline ออกแบบอย่างไร

Ingestion GX Checkpoint Event Store Read Model Dashboard Grafana Alert Slack PagerDuty CI/CD Quality Score Trend Monitoring

สรุป

Great Expectations Data Quality CQRS Event Sourcing Validation Checkpoint Kafka Dashboard Alert CI/CD Quality Score Production

📖 บทความที่เกี่ยวข้อง

MLOps Pipeline CQRS Event Sourcingอ่านบทความ → Opsgenie Alert CQRS Event Sourcingอ่านบทความ → Great Expectations Progressive Deliveryอ่านบทความ → Great Expectations Cloud Native Designอ่านบทความ → Great Expectations Service Level Objective SLOอ่านบทความ →

📚 ดูบทความทั้งหมด →