SiamCafe.net Blog
Technology

Parquet Format Developer Experience DX

parquet format developer experience dx
Parquet Format Developer Experience DX | SiamCafe Blog
2025-09-20· อ. บอม — SiamCafe.net· 9,467 คำ

Parquet กับ Developer Experience

Developer Experience (DX) ในการทำงานกับข้อมูลขนาดใหญ่ขึ้นอยู่กับ Format ที่เลือกใช้เป็นอย่างมาก Parquet เป็น Columnar Format ที่ออกแบบมาให้ทำงานได้ดีกับ Analytical Workload มี Self-describing Schema ที่ทำให้ไม่ต้องเดา Data Type ของแต่ละ Column รองรับ Schema Evolution สำหรับเพิ่ม Column ใหม่ได้โดยไม่กระทบไฟล์เดิม และมี Column Statistics ที่ช่วยให้ Query Engine ทำ Predicate Pushdown ได้

เมื่อเปรียบเทียบกับ CSV ที่ไม่มี Schema ไม่มี Type Information และต้องอ่านทั้งไฟล์ทุกครั้ง Parquet ช่วยให้ Developer ทำงานได้เร็วขึ้นมากเพราะลด Guesswork และลด I/O ที่ไม่จำเป็น

การตรวจสอบ Parquet Metadata

# ตรวจสอบ Metadata ของ Parquet File โดยไม่ต้องโหลดข้อมูลทั้งหมด
import pyarrow.parquet as pq
import json

def inspect_parquet(file_path):
    """ตรวจสอบ Parquet File Metadata"""
    # อ่าน Metadata (ไม่โหลดข้อมูล)
    parquet_file = pq.ParquetFile(file_path)
    metadata = parquet_file.metadata

    print(f"=== Parquet File: {file_path} ===")
    print(f"Created by: {metadata.created_by}")
    print(f"Format version: {metadata.format_version}")
    print(f"Num rows: {metadata.num_rows:,}")
    print(f"Num columns: {metadata.num_columns}")
    print(f"Num row groups: {metadata.num_row_groups}")
    print(f"Serialized size: {metadata.serialized_size:,} bytes")

    # Schema
    schema = parquet_file.schema_arrow
    print(f"\n--- Schema ({len(schema)} columns) ---")
    for i, field in enumerate(schema):
        print(f"  {i:3d}. {field.name:30s} {str(field.type):20s} nullable={field.nullable}")

    # Row Group Details
    print(f"\n--- Row Groups ---")
    for rg_idx in range(metadata.num_row_groups):
        rg = metadata.row_group(rg_idx)
        print(f"  Row Group {rg_idx}: {rg.num_rows:,} rows, "
              f"{rg.total_byte_size / 1024 / 1024:.1f} MB")

        # Column Statistics (แสดงแค่ 3 column แรก)
        for col_idx in range(min(3, rg.num_columns)):
            col = rg.column(col_idx)
            stats = col.statistics
            if stats and stats.has_min_max:
                print(f"    {col.path_in_schema}: "
                      f"min={stats.min}, max={stats.max}, "
                      f"nulls={stats.num_nulls}, "
                      f"compression={col.compression}")

    # Key-Value Metadata
    kv = metadata.metadata
    if kv:
        print(f"\n--- Key-Value Metadata ---")
        for key in kv:
            val = kv[key].decode("utf-8")[:100]
            print(f"  {key.decode('utf-8')}: {val}")

# inspect_parquet("sales_data.parquet")

# เปรียบเทียบ Schema ของ 2 ไฟล์
def compare_schemas(file1, file2):
    """เปรียบเทียบ Schema ของ Parquet Files"""
    schema1 = pq.read_schema(file1)
    schema2 = pq.read_schema(file2)

    fields1 = {f.name: f.type for f in schema1}
    fields2 = {f.name: f.type for f in schema2}

    added = set(fields2.keys()) - set(fields1.keys())
    removed = set(fields1.keys()) - set(fields2.keys())
    changed = {k for k in fields1.keys() & fields2.keys()
               if fields1[k] != fields2[k]}

    print(f"Schema Diff: {file1} → {file2}")
    for col in added:
        print(f"  + {col}: {fields2[col]}")
    for col in removed:
        print(f"  - {col}: {fields1[col]}")
    for col in changed:
        print(f"  ~ {col}: {fields1[col]} → {fields2[col]}")
    if not added and not removed and not changed:
        print("  (no changes)")

DuckDB สำหรับ Ad-hoc Analysis

# DuckDB — Query Parquet Files โดยตรงไม่ต้อง Import
import duckdb

# เชื่อมต่อ (in-memory)
con = duckdb.connect()

# Query Parquet File โดยตรง
result = con.execute("""
    SELECT
        category,
        COUNT(*) as order_count,
        ROUND(SUM(amount), 2) as total_revenue,
        ROUND(AVG(amount), 2) as avg_order_value,
        ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount), 2) as median_order
    FROM 'sales_data.parquet'
    WHERE date >= '2026-01-01'
    GROUP BY category
    ORDER BY total_revenue DESC
""").fetchdf()
print(result)

# Query หลายไฟล์พร้อมกัน (Glob Pattern)
result = con.execute("""
    SELECT
        DATE_TRUNC('month', date) as month,
        COUNT(*) as orders,
        SUM(amount) as revenue
    FROM 'data/sales_*.parquet'
    GROUP BY 1
    ORDER BY 1
""").fetchdf()
print(result)

# Query Parquet บน S3 โดยตรง
con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute("""
    SET s3_region = 'ap-southeast-1';
    SET s3_access_key_id = 'YOUR_KEY';
    SET s3_secret_access_key = 'YOUR_SECRET';
""")

result = con.execute("""
    SELECT COUNT(*), SUM(amount)
    FROM 's3://my-bucket/data/sales_2026.parquet'
    WHERE category = 'Electronics'
""").fetchdf()
print(result)

# Export ผลลัพธ์เป็น Parquet
con.execute("""
    COPY (
        SELECT * FROM 'raw_data.parquet'
        WHERE status = 'completed'
    ) TO 'filtered_data.parquet' (FORMAT PARQUET, COMPRESSION ZSTD)
""")

# สร้าง View จาก Parquet สำหรับ Query ซ้ำ
con.execute("""
    CREATE VIEW sales AS SELECT * FROM 'data/sales_*.parquet';
    CREATE VIEW customers AS SELECT * FROM 'data/customers.parquet';
""")

# Join ข้าม Parquet Files
result = con.execute("""
    SELECT
        c.name,
        c.segment,
        COUNT(s.id) as total_orders,
        SUM(s.amount) as total_spent
    FROM sales s
    JOIN customers c ON s.customer_id = c.id
    GROUP BY c.name, c.segment
    HAVING total_spent > 10000
    ORDER BY total_spent DESC
    LIMIT 20
""").fetchdf()
print(result)

Schema Evolution — การจัดการ Schema ที่เปลี่ยนแปลง

# Schema Evolution ด้วย PyArrow
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

# ข้อมูล Version 1 (Schema เดิม)
df_v1 = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "amount": [100.0, 200.0, 300.0],
})
pq.write_table(pa.Table.from_pandas(df_v1), "data_v1.parquet")

# ข้อมูล Version 2 (เพิ่ม Column ใหม่)
df_v2 = pd.DataFrame({
    "id": [4, 5],
    "name": ["Dave", "Eve"],
    "amount": [400.0, 500.0],
    "category": ["A", "B"],       # Column ใหม่
    "created_at": ["2026-03-01", "2026-03-02"],  # Column ใหม่
})
pq.write_table(pa.Table.from_pandas(df_v2), "data_v2.parquet")

# อ่านทั้ง 2 ไฟล์พร้อมกัน (Schema Merging)
# PyArrow จะ Merge Schema อัตโนมัติ
dataset = pq.ParquetDataset(
    ["data_v1.parquet", "data_v2.parquet"],
    schema=None,  # Auto-merge schemas
)
table = dataset.read()
print(table.to_pandas())
# id  name     amount  category  created_at
# 1   Alice    100.0   None      None        ← ไฟล์เก่าได้ null
# 2   Bob      200.0   None      None
# 3   Charlie  300.0   None      None
# 4   Dave     400.0   A         2026-03-01
# 5   Eve      500.0   B         2026-03-02

# DuckDB ก็รองรับ Schema Evolution
import duckdb
con = duckdb.connect()
result = con.execute("""
    SELECT * FROM read_parquet(['data_v1.parquet', 'data_v2.parquet'],
                                union_by_name=true)
""").fetchdf()
print(result)

# ตรวจสอบ Compatibility ก่อน Merge
def check_schema_compatibility(old_schema, new_schema):
    """ตรวจสอบว่า Schema ใหม่เข้ากันได้กับเก่าหรือไม่"""
    old_fields = {f.name: f.type for f in old_schema}
    new_fields = {f.name: f.type for f in new_schema}

    issues = []
    for name, old_type in old_fields.items():
        if name in new_fields and new_fields[name] != old_type:
            issues.append(f"Type change: {name} {old_type} → {new_fields[name]}")

    removed = set(old_fields) - set(new_fields)
    if removed:
        issues.append(f"Removed columns: {removed}")

    return {"compatible": len(issues) == 0, "issues": issues}

schema_v1 = pq.read_schema("data_v1.parquet")
schema_v2 = pq.read_schema("data_v2.parquet")
result = check_schema_compatibility(schema_v1, schema_v2)
print(f"Compatible: {result['compatible']}")
for issue in result["issues"]:
    print(f"  {issue}")

Integration กับ Data Tools

Best Practices สำหรับ DX ที่ดี

Parquet ช่วยปรับปรุง Developer Experience อย่างไร

Parquet มี Self-describing Schema ไม่ต้องเดา Data Type, มี Metadata อ่านได้โดยไม่ต้องโหลดทั้งไฟล์, รองรับ Schema Evolution เพิ่ม Column ได้, Column Pruning อ่านเฉพาะ Column ที่ต้องการ และ Predicate Pushdown Filter ก่อนอ่านข้อมูล ทำให้ทำงานเร็วขึ้นมาก

Schema Evolution ใน Parquet ทำงานอย่างไร

Parquet รองรับเพิ่ม Column ใหม่ได้โดยไม่ต้องเขียนไฟล์เก่าใหม่ เมื่ออ่านไฟล์เก่าที่ไม่มี Column ใหม่จะได้ null อัตโนมัติ รองรับลบ Column โดย Query Engine ข้ามไป แต่ไม่รองรับเปลี่ยน Data Type ของ Column เดิม ใช้ union_by_name ใน DuckDB หรือ schema merging ใน PyArrow

DuckDB คืออะไรและทำไมถึงเหมาะกับ Parquet

DuckDB เป็น In-process SQL Database สำหรับ Analytical Query อ่าน Parquet โดยตรงไม่ต้อง Import, Query เร็วมากสำหรับไฟล์หลาย GB, ติดตั้งง่าย pip install duckdb, รองรับ S3 Remote Files เหมาะสำหรับ Data Exploration, Ad-hoc Analysis และ ETL Scripts

วิธีตรวจสอบ Metadata ของ Parquet File ทำอย่างไร

ใช้ PyArrow: pq.ParquetFile(path).metadata อ่าน Schema, Row Groups, Column Statistics (min/max/null count), Compression, Row Count โดยไม่โหลดข้อมูลจริง หรือใช้ DuckDB: SELECT * FROM parquet_metadata('file.parquet') และ parquet_schema('file.parquet')

สรุปและแนวทางปฏิบัติ

Parquet เป็น Format ที่ช่วยปรับปรุง Developer Experience ในการทำงานกับ Big Data ได้อย่างมาก ด้วย Self-describing Schema, Schema Evolution, Column Pruning และ Integration กับ Tools ทุกตัวในระบบ Data Stack การใช้ DuckDB สำหรับ Ad-hoc Analysis, PyArrow สำหรับ Metadata Inspection และ Schema Management ที่ดีจะทำให้ทีม Data ทำงานได้เร็วขึ้นและมีข้อผิดพลาดน้อยลง

📖 บทความที่เกี่ยวข้อง

Parquet Format Automation Scriptอ่านบทความ → Parquet Format Hexagonal Architectureอ่านบทความ → Parquet Format Progressive Deliveryอ่านบทความ → Parquet Format Database Migrationอ่านบทความ → Parquet Format Cost Optimization ลดค่าใช้จ่ายอ่านบทความ →

📚 ดูบทความทั้งหมด →