Great Expectations CQRS Event Sourcing
Great Expectations Data Quality CQRS Event Sourcing Validation Expectation Checkpoint Pipeline Kafka Dashboard Production
| Pattern | Purpose | GX Component | Benefit |
|---|---|---|---|
| Command (Write) | Validate + Write Data | Checkpoint + Expectation Suite | Prevent Bad Data |
| Query (Read) | Quality Dashboard | Data Docs + Custom API | Visibility Reporting |
| Event Sourcing | Audit Trail History | Validation Results → Kafka | Time Travel Compliance |
GX Validation Pipeline
# === Great Expectations Validation ===
# import great_expectations as gx
#
# context = gx.get_context()
#
# # Data Source
# datasource = context.sources.add_pandas("my_source")
# data_asset = datasource.add_dataframe_asset("orders")
#
# # Expectation Suite
# suite = context.add_expectation_suite("orders_quality")
# suite.add_expectation(
# gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
# )
# suite.add_expectation(
# gx.expectations.ExpectColumnValuesToBeBetween(
# column="amount", min_value=0, max_value=1000000
# )
# )
# suite.add_expectation(
# gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
# )
# suite.add_expectation(
# gx.expectations.ExpectTableRowCountToBeGreaterThan(value=0)
# )
#
# # Checkpoint
# checkpoint = context.add_checkpoint(
# name="orders_checkpoint",
# validations=[{
# "batch_request": batch_request,
# "expectation_suite_name": "orders_quality",
# }],
# action_list=[
# {"name": "store_result", "action": {"class_name": "StoreValidationResultAction"}},
# {"name": "update_docs", "action": {"class_name": "UpdateDataDocsAction"}},
# ]
# )
# result = checkpoint.run()
from dataclasses import dataclass
@dataclass
class Expectation:
expectation: str
column: str
parameters: str
severity: str
example: str
expectations = [
Expectation("expect_column_values_to_not_be_null",
"order_id, customer_id",
"mostly=0.99 (99% not null OK)",
"Critical",
"order_id ต้องไม่ Null ทุก Row"),
Expectation("expect_column_values_to_be_between",
"amount, quantity",
"min_value=0 max_value=1000000",
"High",
"amount ต้องอยู่ระหว่าง 0-1,000,000"),
Expectation("expect_column_values_to_be_unique",
"order_id",
"mostly=1.0 (100% unique)",
"Critical",
"order_id ต้อง Unique ไม่ซ้ำ"),
Expectation("expect_table_row_count_to_be_between",
"(table level)",
"min_value=1000 max_value=10000000",
"High",
"Table ต้องมี 1K-10M rows"),
Expectation("expect_column_values_to_match_regex",
"email",
"regex='^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'",
"Medium",
"email ต้องตรง Format"),
Expectation("expect_column_distinct_values_to_be_in_set",
"status",
"value_set=['pending','completed','cancelled']",
"High",
"status ต้องเป็นค่าที่กำหนด"),
]
print("=== GX Expectations ===")
for e in expectations:
print(f" [{e.expectation}]")
print(f" Column: {e.column} | Severity: {e.severity}")
print(f" Params: {e.parameters}")
print(f" Example: {e.example}")
CQRS + Event Sourcing
# === CQRS Event Sourcing for Data Quality ===
# Event Publisher (after GX Validation)
# import json
# from kafka import KafkaProducer
#
# producer = KafkaProducer(
# bootstrap_servers='kafka:9092',
# value_serializer=lambda v: json.dumps(v).encode()
# )
#
# def publish_validation_event(result):
# event = {
# "event_type": "ValidationPassed" if result.success else "ValidationFailed",
# "timestamp": datetime.utcnow().isoformat(),
# "suite_name": result.suite_name,
# "table": result.asset_name,
# "success": result.success,
# "statistics": {
# "evaluated": result.statistics["evaluated_expectations"],
# "successful": result.statistics["successful_expectations"],
# "failed": result.statistics["unsuccessful_expectations"],
# "success_percent": result.statistics["success_percent"],
# },
# "failed_expectations": [
# {"expectation": r.expectation_config.expectation_type,
# "column": r.expectation_config.kwargs.get("column")}
# for r in result.results if not r.success
# ]
# }
# producer.send("data-quality-events", value=event)
@dataclass
class QualityEvent:
event_type: str
trigger: str
payload: str
read_model_update: str
events = [
QualityEvent("ValidationPassed",
"GX Checkpoint ผ่านทุก Expectation",
"suite table stats timestamp",
"เพิ่ม Quality Score อัพเดท Dashboard"),
QualityEvent("ValidationFailed",
"GX Checkpoint ไม่ผ่านบาง Expectation",
"suite table failed_expectations details",
"ลด Quality Score Alert Team"),
QualityEvent("ExpectationAdded",
"เพิ่ม Expectation ใหม่ใน Suite",
"suite expectation_type column params",
"อัพเดท Expectation Count History"),
QualityEvent("ThresholdBreached",
"Quality Score < Threshold (เช่น 95%)",
"suite table score threshold",
"Alert P1 Block Pipeline"),
QualityEvent("DataDriftDetected",
"Schema หรือ Distribution เปลี่ยน",
"table column old_distribution new_distribution",
"Alert Data Team Investigate"),
]
print("=== Data Quality Events ===")
for e in events:
print(f" [{e.event_type}] Trigger: {e.trigger}")
print(f" Payload: {e.payload}")
print(f" Read Model: {e.read_model_update}")
Dashboard & Monitoring
# === Quality Dashboard ===
@dataclass
class DashboardMetric:
metric: str
source: str
visualization: str
alert: str
dashboard = [
DashboardMetric("Quality Score per Table",
"Event Store (success_percent per run)",
"Line Chart trend over time",
"Score < 95% → P2 Alert"),
DashboardMetric("Failed Expectations",
"Event Store (failed_expectations list)",
"Bar Chart top failed expectations",
"Same failure 3x → P1 Alert"),
DashboardMetric("Validation Run History",
"Event Store (all validation events)",
"Table latest 50 runs with status",
"No run for 2x schedule → Alert"),
DashboardMetric("Data Drift Score",
"Event Store (distribution changes)",
"Heatmap drift per column per day",
"Drift > 20% → P2 Alert"),
DashboardMetric("Coverage",
"Expectation count per table",
"Progress bar expectations vs columns",
"Coverage < 50% → Reminder"),
]
print("=== Dashboard Metrics ===")
for d in dashboard:
print(f" [{d.metric}]")
print(f" Source: {d.source}")
print(f" Viz: {d.visualization}")
print(f" Alert: {d.alert}")
เคล็ดลับ
- Checkpoint: รัน GX Checkpoint ทุก Ingestion ก่อน Write
- Kafka: ส่ง Validation Event ไป Kafka สำหรับ Audit Trail
- mostly: ใช้ mostly=0.99 สำหรับ Expectation ที่ยอม Error เล็กน้อย
- CI/CD: รัน GX ใน CI Pipeline ทุก PR ที่แก้ Transform
- Data Docs: เปิด Data Docs HTML Report สำหรับ Team Review
การบริหารจัดการฐานข้อมูลอย่างมืออาชีพ
Database Management ที่ดีเริ่มจากการออกแบบ Schema ที่เหมาะสม ใช้ Normalization ลด Data Redundancy สร้าง Index บน Column ที่ Query บ่อย วิเคราะห์ Query Plan เพื่อ Optimize Performance และทำ Regular Maintenance เช่น VACUUM สำหรับ PostgreSQL หรือ OPTIMIZE TABLE สำหรับ MySQL
เรื่อง High Availability ควรติดตั้ง Replication อย่างน้อย 1 Replica สำหรับ Read Scaling และ Disaster Recovery ใช้ Connection Pooling เช่น PgBouncer หรือ ProxySQL ลดภาระ Connection ที่เปิดพร้อมกัน และตั้ง Automated Failover ให้ระบบสลับไป Replica อัตโนมัติเมื่อ Primary ล่ม
Backup ต้องทำทั้ง Full Backup รายวัน และ Incremental Backup ทุก 1-4 ชั่วโมง เก็บ Binary Log หรือ WAL สำหรับ Point-in-Time Recovery ทดสอบ Restore เป็นประจำ และเก็บ Backup ไว้ Off-site ด้วยเสมอ
Great Expectations คืออะไร
Open Source Data Quality Python Expectations Validation Checkpoint Data Docs Pandas Spark SQL CI/CD Airflow Prefect Dagster
CQRS สำหรับ Data Quality ทำอย่างไร
Command Write Validate GX Checkpoint Query Read Dashboard Data Docs แยก Write Read Scale Optimize Audit Trail Event
Event Sourcing ทำอย่างไร
ValidationPassed Failed ExpectationAdded ThresholdBreached DriftDetected Kafka PostgreSQL Replay Time Travel Audit Compliance
Production Pipeline ออกแบบอย่างไร
Ingestion GX Checkpoint Event Store Read Model Dashboard Grafana Alert Slack PagerDuty CI/CD Quality Score Trend Monitoring
สรุป
Great Expectations Data Quality CQRS Event Sourcing Validation Checkpoint Kafka Dashboard Alert CI/CD Quality Score Production
