ai

Great Expectations CQRS Event Sourcing — Data

Great Expectations CQRS Event Sourcing — Data

Great Expectations CQRS Event Sourcing

Great Expectations CQRS Event Sourcing — Data

Great Expectations Data Quality CQRS Event Sourcing Validation Expectation Checkpoint Pipeline Kafka Dashboard Production

PatternPurposeGX ComponentBenefit
Command (Write)Validate + Write DataCheckpoint + Expectation SuitePrevent Bad Data
Query (Read)Quality DashboardData Docs + Custom APIVisibility Reporting
Event SourcingAudit Trail HistoryValidation Results → KafkaTime Travel Compliance

GX Validation Pipeline

# === Great Expectations Validation ===

# import great_expectations as gx
#
# context = gx.get_context()
#
# # Data Source
# datasource = context.sources.add_pandas("my_source")
# data_asset = datasource.add_dataframe_asset("orders")
#
# # Expectation Suite
# suite = context.add_expectation_suite("orders_quality")
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
# )
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToBeBetween(
#         column="amount", min_value=0, max_value=1000000
#     )
# )
# suite.add_expectation(
#     gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
# )
# suite.add_expectation(
#     gx.expectations.ExpectTableRowCountToBeGreaterThan(value=0)
# )
#
# # Checkpoint
# checkpoint = context.add_checkpoint(
#     name="orders_checkpoint",
#     validations=[{
#         "batch_request": batch_request,
#         "expectation_suite_name": "orders_quality",
#     }],
#     action_list=[
#         {"name": "store_result", "action": {"class_name": "StoreValidationResultAction"}},
#         {"name": "update_docs", "action": {"class_name": "UpdateDataDocsAction"}},
#     ]
# )
# result = checkpoint.run()

from dataclasses import dataclass

@dataclass
class Expectation:
    expectation: str
    column: str
    parameters: str
    severity: str
    example: str

expectations = [
    Expectation("expect_column_values_to_not_be_null",
        "order_id, customer_id",
        "mostly=0.99 (99% not null OK)",
        "Critical",
        "order_id ต้องไม่ Null ทุก Row"),
    Expectation("expect_column_values_to_be_between",
        "amount, quantity",
        "min_value=0 max_value=1000000",
        "High",
        "amount ต้องอยู่ระหว่าง 0-1,000,000"),
    Expectation("expect_column_values_to_be_unique",
        "order_id",
        "mostly=1.0 (100% unique)",
        "Critical",
        "order_id ต้อง Unique ไม่ซ้ำ"),
    Expectation("expect_table_row_count_to_be_between",
        "(table level)",
        "min_value=1000 max_value=10000000",
        "High",
        "Table ต้องมี 1K-10M rows"),
    Expectation("expect_column_values_to_match_regex",
        "email",
        "regex='^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'",
        "Medium",
        "email ต้องตรง Format"),
    Expectation("expect_column_distinct_values_to_be_in_set",
        "status",
        "value_set=['pending','completed','cancelled']",
        "High",
        "status ต้องเป็นค่าที่กำหนด"),
]

print("=== GX Expectations ===")
for e in expectations:
    print(f"  [{e.expectation}]")
    print(f"    Column: {e.column} | Severity: {e.severity}")
    print(f"    Params: {e.parameters}")
    print(f"    Example: {e.example}")

CQRS + Event Sourcing

# === CQRS Event Sourcing for Data Quality ===

# Event Publisher (after GX Validation)
# import json
# from kafka import KafkaProducer
#
# producer = KafkaProducer(
#     bootstrap_servers='kafka:9092',
#     value_serializer=lambda v: json.dumps(v).encode()
# )
#
# def publish_validation_event(result):
#     event = {
#         "event_type": "ValidationPassed" if result.success else "ValidationFailed",
#         "timestamp": datetime.utcnow().isoformat(),
#         "suite_name": result.suite_name,
#         "table": result.asset_name,
#         "success": result.success,
#         "statistics": {
#             "evaluated": result.statistics["evaluated_expectations"],
#             "successful": result.statistics["successful_expectations"],
#             "failed": result.statistics["unsuccessful_expectations"],
#             "success_percent": result.statistics["success_percent"],
#         },
#         "failed_expectations": [
#             {"expectation": r.expectation_config.expectation_type,
#              "column": r.expectation_config.kwargs.get("column")}
#             for r in result.results if not r.success
#         ]
#     }
#     producer.send("data-quality-events", value=event)

@dataclass
class QualityEvent:
    event_type: str
    trigger: str
    payload: str
    read_model_update: str

events = [
    QualityEvent("ValidationPassed",
        "GX Checkpoint ผ่านทุก Expectation",
        "suite table stats timestamp",
        "เพิ่ม Quality Score อัพเดท Dashboard"),
    QualityEvent("ValidationFailed",
        "GX Checkpoint ไม่ผ่านบาง Expectation",
        "suite table failed_expectations details",
        "ลด Quality Score Alert Team"),
    QualityEvent("ExpectationAdded",
        "เพิ่ม Expectation ใหม่ใน Suite",
        "suite expectation_type column params",
        "อัพเดท Expectation Count History"),
    QualityEvent("ThresholdBreached",
        "Quality Score < Threshold (เช่น 95%)",
        "suite table score threshold",
        "Alert P1 Block Pipeline"),
    QualityEvent("DataDriftDetected",
        "Schema หรือ Distribution เปลี่ยน",
        "table column old_distribution new_distribution",
        "Alert Data Team Investigate"),
]

print("=== Data Quality Events ===")
for e in events:
    print(f"  [{e.event_type}] Trigger: {e.trigger}")
    print(f"    Payload: {e.payload}")
    print(f"    Read Model: {e.read_model_update}")

Dashboard & Monitoring

# === Quality Dashboard ===

@dataclass
class DashboardMetric:
    metric: str
    source: str
    visualization: str
    alert: str

dashboard = [
    DashboardMetric("Quality Score per Table",
        "Event Store (success_percent per run)",
        "Line Chart trend over time",
        "Score < 95% → P2 Alert"),
    DashboardMetric("Failed Expectations",
        "Event Store (failed_expectations list)",
        "Bar Chart top failed expectations",
        "Same failure 3x → P1 Alert"),
    DashboardMetric("Validation Run History",
        "Event Store (all validation events)",
        "Table latest 50 runs with status",
        "No run for 2x schedule → Alert"),
    DashboardMetric("Data Drift Score",
        "Event Store (distribution changes)",
        "Heatmap drift per column per day",
        "Drift > 20% → P2 Alert"),
    DashboardMetric("Coverage",
        "Expectation count per table",
        "Progress bar expectations vs columns",
        "Coverage < 50% → Reminder"),
]

print("=== Dashboard Metrics ===")
for d in dashboard:
    print(f"  [{d.metric}]")
    print(f"    Source: {d.source}")
    print(f"    Viz: {d.visualization}")
    print(f"    Alert: {d.alert}")

เคล็ดลับ

  • Checkpoint: รัน GX Checkpoint ทุก Ingestion ก่อน Write
  • Kafka: ส่ง Validation Event ไป Kafka สำหรับ Audit Trail
  • mostly: ใช้ mostly=0.99 สำหรับ Expectation ที่ยอม Error เล็กน้อย
  • CI/CD: รัน GX ใน CI Pipeline ทุก PR ที่แก้ Transform
  • Data Docs: เปิด Data Docs HTML Report สำหรับ Team Review

การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ

Great Expectations CQRS Event Sourcing — Data

Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL

เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: phishing mail คือ — ข้อมูลครบถ้วน 2026

Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ

แนะนำเพิ่มเติม — ติดตาม XM Signal

Great Expectations คืออะไร

Open Source Data Quality Python Expectations Validation Checkpoint Data Docs Pandas Spark SQL CI/CD Airflow Prefect Dagster

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: pcie gen3 ssd คือ — ข้อมูลครบถ้วน 2026

CQRS สำหรับ Data Quality ทำอย่างไร

Command Write Validate GX Checkpoint Query Read Dashboard Data Docs แยก Write Read Scale Optimize Audit Trail Event

Event Sourcing ทำอย่างไร

ValidationPassed Failed ExpectationAdded ThresholdBreached DriftDetected Kafka PostgreSQL Replay Time Travel Audit Compliance

แนะนำเพิ่มเติม — คู่มือเทรดจาก SiamCafeBook

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Ollama Local LLM Consensus Algorithm

Production Pipeline ออกแบบอย่างไร

Ingestion GX Checkpoint Event Store Read Model Dashboard Grafana Alert Slack PagerDuty CI/CD Quality Score Trend Monitoring

สรุป

Great Expectations Data Quality CQRS Event Sourcing Validation Checkpoint Kafka Dashboard Alert CI/CD Quality Score Production

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Medusa Commerce CQRS Event Sourcing

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง