Parquet กับ Developer Experience
Developer Experience (DX) ในการทำงานกับข้อมูลขนาดใหญ่ขึ้นอยู่กับ Format ที่เลือกใช้เป็นอย่างมาก Parquet เป็น Columnar Format ที่ออกแบบมาให้ทำงานได้ดีกับ Analytical Workload มี Self-describing Schema ที่ทำให้ไม่ต้องเดา Data Type ของแต่ละ Column รองรับ Schema Evolution สำหรับเพิ่ม Column ใหม่ได้โดยไม่กระทบไฟล์เดิม และมี Column Statistics ที่ช่วยให้ Query Engine ทำ Predicate Pushdown ได้
เมื่อเปรียบเทียบกับ CSV ที่ไม่มี Schema ไม่มี Type Information และต้องอ่านทั้งไฟล์ทุกครั้ง Parquet ช่วยให้ Developer ทำงานได้เร็วขึ้นมากเพราะลด Guesswork และลด I/O ที่ไม่จำเป็น
การตรวจสอบ Parquet Metadata
# ตรวจสอบ Metadata ของ Parquet File โดยไม่ต้องโหลดข้อมูลทั้งหมด
import pyarrow.parquet as pq
import json
def inspect_parquet(file_path):
"""ตรวจสอบ Parquet File Metadata"""
# อ่าน Metadata (ไม่โหลดข้อมูล)
parquet_file = pq.ParquetFile(file_path)
metadata = parquet_file.metadata
print(f"=== Parquet File: {file_path} ===")
print(f"Created by: {metadata.created_by}")
print(f"Format version: {metadata.format_version}")
print(f"Num rows: {metadata.num_rows:,}")
print(f"Num columns: {metadata.num_columns}")
print(f"Num row groups: {metadata.num_row_groups}")
print(f"Serialized size: {metadata.serialized_size:,} bytes")
# Schema
schema = parquet_file.schema_arrow
print(f"\n--- Schema ({len(schema)} columns) ---")
for i, field in enumerate(schema):
print(f" {i:3d}. {field.name:30s} {str(field.type):20s} nullable={field.nullable}")
# Row Group Details
print(f"\n--- Row Groups ---")
for rg_idx in range(metadata.num_row_groups):
rg = metadata.row_group(rg_idx)
print(f" Row Group {rg_idx}: {rg.num_rows:,} rows, "
f"{rg.total_byte_size / 1024 / 1024:.1f} MB")
# Column Statistics (แสดงแค่ 3 column แรก)
for col_idx in range(min(3, rg.num_columns)):
col = rg.column(col_idx)
stats = col.statistics
if stats and stats.has_min_max:
print(f" {col.path_in_schema}: "
f"min={stats.min}, max={stats.max}, "
f"nulls={stats.num_nulls}, "
f"compression={col.compression}")
# Key-Value Metadata
kv = metadata.metadata
if kv:
print(f"\n--- Key-Value Metadata ---")
for key in kv:
val = kv[key].decode("utf-8")[:100]
print(f" {key.decode('utf-8')}: {val}")
# inspect_parquet("sales_data.parquet")
# เปรียบเทียบ Schema ของ 2 ไฟล์
def compare_schemas(file1, file2):
"""เปรียบเทียบ Schema ของ Parquet Files"""
schema1 = pq.read_schema(file1)
schema2 = pq.read_schema(file2)
fields1 = {f.name: f.type for f in schema1}
fields2 = {f.name: f.type for f in schema2}
added = set(fields2.keys()) - set(fields1.keys())
removed = set(fields1.keys()) - set(fields2.keys())
changed = {k for k in fields1.keys() & fields2.keys()
if fields1[k] != fields2[k]}
print(f"Schema Diff: {file1} → {file2}")
for col in added:
print(f" + {col}: {fields2[col]}")
for col in removed:
print(f" - {col}: {fields1[col]}")
for col in changed:
print(f" ~ {col}: {fields1[col]} → {fields2[col]}")
if not added and not removed and not changed:
print(" (no changes)")
DuckDB สำหรับ Ad-hoc Analysis
# DuckDB — Query Parquet Files โดยตรงไม่ต้อง Import
import duckdb
# เชื่อมต่อ (in-memory)
con = duckdb.connect()
# Query Parquet File โดยตรง
result = con.execute("""
SELECT
category,
COUNT(*) as order_count,
ROUND(SUM(amount), 2) as total_revenue,
ROUND(AVG(amount), 2) as avg_order_value,
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount), 2) as median_order
FROM 'sales_data.parquet'
WHERE date >= '2026-01-01'
GROUP BY category
ORDER BY total_revenue DESC
""").fetchdf()
print(result)
# Query หลายไฟล์พร้อมกัน (Glob Pattern)
result = con.execute("""
SELECT
DATE_TRUNC('month', date) as month,
COUNT(*) as orders,
SUM(amount) as revenue
FROM 'data/sales_*.parquet'
GROUP BY 1
ORDER BY 1
""").fetchdf()
print(result)
# Query Parquet บน S3 โดยตรง
con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute("""
SET s3_region = 'ap-southeast-1';
SET s3_access_key_id = 'YOUR_KEY';
SET s3_secret_access_key = 'YOUR_SECRET';
""")
result = con.execute("""
SELECT COUNT(*), SUM(amount)
FROM 's3://my-bucket/data/sales_2026.parquet'
WHERE category = 'Electronics'
""").fetchdf()
print(result)
# Export ผลลัพธ์เป็น Parquet
con.execute("""
COPY (
SELECT * FROM 'raw_data.parquet'
WHERE status = 'completed'
) TO 'filtered_data.parquet' (FORMAT PARQUET, COMPRESSION ZSTD)
""")
# สร้าง View จาก Parquet สำหรับ Query ซ้ำ
con.execute("""
CREATE VIEW sales AS SELECT * FROM 'data/sales_*.parquet';
CREATE VIEW customers AS SELECT * FROM 'data/customers.parquet';
""")
# Join ข้าม Parquet Files
result = con.execute("""
SELECT
c.name,
c.segment,
COUNT(s.id) as total_orders,
SUM(s.amount) as total_spent
FROM sales s
JOIN customers c ON s.customer_id = c.id
GROUP BY c.name, c.segment
HAVING total_spent > 10000
ORDER BY total_spent DESC
LIMIT 20
""").fetchdf()
print(result)
Schema Evolution — การจัดการ Schema ที่เปลี่ยนแปลง
# Schema Evolution ด้วย PyArrow
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
# ข้อมูล Version 1 (Schema เดิม)
df_v1 = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"amount": [100.0, 200.0, 300.0],
})
pq.write_table(pa.Table.from_pandas(df_v1), "data_v1.parquet")
# ข้อมูล Version 2 (เพิ่ม Column ใหม่)
df_v2 = pd.DataFrame({
"id": [4, 5],
"name": ["Dave", "Eve"],
"amount": [400.0, 500.0],
"category": ["A", "B"], # Column ใหม่
"created_at": ["2026-03-01", "2026-03-02"], # Column ใหม่
})
pq.write_table(pa.Table.from_pandas(df_v2), "data_v2.parquet")
# อ่านทั้ง 2 ไฟล์พร้อมกัน (Schema Merging)
# PyArrow จะ Merge Schema อัตโนมัติ
dataset = pq.ParquetDataset(
["data_v1.parquet", "data_v2.parquet"],
schema=None, # Auto-merge schemas
)
table = dataset.read()
print(table.to_pandas())
# id name amount category created_at
# 1 Alice 100.0 None None ← ไฟล์เก่าได้ null
# 2 Bob 200.0 None None
# 3 Charlie 300.0 None None
# 4 Dave 400.0 A 2026-03-01
# 5 Eve 500.0 B 2026-03-02
# DuckDB ก็รองรับ Schema Evolution
import duckdb
con = duckdb.connect()
result = con.execute("""
SELECT * FROM read_parquet(['data_v1.parquet', 'data_v2.parquet'],
union_by_name=true)
""").fetchdf()
print(result)
# ตรวจสอบ Compatibility ก่อน Merge
def check_schema_compatibility(old_schema, new_schema):
"""ตรวจสอบว่า Schema ใหม่เข้ากันได้กับเก่าหรือไม่"""
old_fields = {f.name: f.type for f in old_schema}
new_fields = {f.name: f.type for f in new_schema}
issues = []
for name, old_type in old_fields.items():
if name in new_fields and new_fields[name] != old_type:
issues.append(f"Type change: {name} {old_type} → {new_fields[name]}")
removed = set(old_fields) - set(new_fields)
if removed:
issues.append(f"Removed columns: {removed}")
return {"compatible": len(issues) == 0, "issues": issues}
schema_v1 = pq.read_schema("data_v1.parquet")
schema_v2 = pq.read_schema("data_v2.parquet")
result = check_schema_compatibility(schema_v1, schema_v2)
print(f"Compatible: {result['compatible']}")
for issue in result["issues"]:
print(f" {issue}")
Integration กับ Data Tools
- Pandas:
pd.read_parquet()/df.to_parquet()ใช้งานง่ายที่สุด เหมาะกับข้อมูลที่ใส่ใน Memory ได้ - Polars:
pl.read_parquet()เร็วกว่า Pandas 5-10x สำหรับ Parquet Files ขนาดใหญ่ ใช้ Rust Engine - DuckDB: Query Parquet โดยตรงด้วย SQL ไม่ต้อง Import เหมาะกับ Ad-hoc Analysis
- Apache Spark:
spark.read.parquet()สำหรับข้อมูลขนาด TB ที่ต้อง Distributed Processing - AWS Athena: Query Parquet บน S3 ด้วย SQL จ่ายตามข้อมูลที่ Scan
- Google BigQuery: Load Parquet เข้า BigQuery หรือ Query External Table บน GCS
- dbt: ใช้ Parquet เป็น Source สำหรับ Data Transformation
- Great Expectations: Validate Parquet Data Quality ก่อนใช้งาน
Best Practices สำหรับ DX ที่ดี
- ใช้ Schema Registry: เก็บ Schema Versions ไว้ใน Central Registry เพื่อ Track Changes
- ตั้ง Naming Convention: ตั้งชื่อไฟล์ให้สื่อความหมาย เช่น sales_2026_q1.parquet, users_snapshot_20260301.parquet
- Document Schema: เขียน Description สำหรับแต่ละ Column ใน Parquet Metadata
- ใช้ Data Contracts: กำหนด Schema ที่ Producer และ Consumer ตกลงกัน ตรวจสอบก่อน Write
- Automate Validation: ใช้ Great Expectations หรือ Custom Scripts ตรวจ Data Quality หลัง Write
Parquet ช่วยปรับปรุง Developer Experience อย่างไร
Parquet มี Self-describing Schema ไม่ต้องเดา Data Type, มี Metadata อ่านได้โดยไม่ต้องโหลดทั้งไฟล์, รองรับ Schema Evolution เพิ่ม Column ได้, Column Pruning อ่านเฉพาะ Column ที่ต้องการ และ Predicate Pushdown Filter ก่อนอ่านข้อมูล ทำให้ทำงานเร็วขึ้นมาก
Schema Evolution ใน Parquet ทำงานอย่างไร
Parquet รองรับเพิ่ม Column ใหม่ได้โดยไม่ต้องเขียนไฟล์เก่าใหม่ เมื่ออ่านไฟล์เก่าที่ไม่มี Column ใหม่จะได้ null อัตโนมัติ รองรับลบ Column โดย Query Engine ข้ามไป แต่ไม่รองรับเปลี่ยน Data Type ของ Column เดิม ใช้ union_by_name ใน DuckDB หรือ schema merging ใน PyArrow
DuckDB คืออะไรและทำไมถึงเหมาะกับ Parquet
DuckDB เป็น In-process SQL Database สำหรับ Analytical Query อ่าน Parquet โดยตรงไม่ต้อง Import, Query เร็วมากสำหรับไฟล์หลาย GB, ติดตั้งง่าย pip install duckdb, รองรับ S3 Remote Files เหมาะสำหรับ Data Exploration, Ad-hoc Analysis และ ETL Scripts
วิธีตรวจสอบ Metadata ของ Parquet File ทำอย่างไร
ใช้ PyArrow: pq.ParquetFile(path).metadata อ่าน Schema, Row Groups, Column Statistics (min/max/null count), Compression, Row Count โดยไม่โหลดข้อมูลจริง หรือใช้ DuckDB: SELECT * FROM parquet_metadata('file.parquet') และ parquet_schema('file.parquet')
สรุปและแนวทางปฏิบัติ
Parquet เป็น Format ที่ช่วยปรับปรุง Developer Experience ในการทำงานกับ Big Data ได้อย่างมาก ด้วย Self-describing Schema, Schema Evolution, Column Pruning และ Integration กับ Tools ทุกตัวในระบบ Data Stack การใช้ DuckDB สำหรับ Ad-hoc Analysis, PyArrow สำหรับ Metadata Inspection และ Schema Management ที่ดีจะทำให้ทีม Data ทำงานได้เร็วขึ้นและมีข้อผิดพลาดน้อยลง
