SiamCafe.net Blog
Technology

Parquet Format Progressive Delivery — จัดการ Data Pipeline ด้วย Apache Parquet

parquet format progressive delivery
Parquet Format Progressive Delivery | SiamCafe Blog
2026-04-14· อ. บอม — SiamCafe.net· 1,371 คำ

Apache Parquet คืออะไรและทำไมถึงสำคัญ

Apache Parquet เป็น columnar storage format ที่ออกแบบมาสำหรับ big data processing เก็บข้อมูลแบบ column-oriented แทน row-oriented ทำให้ query ที่ select เฉพาะบาง columns เร็วมากเพราะอ่านเฉพาะ columns ที่ต้องการ ไม่ต้องอ่านทั้ง row

ข้อดีของ Parquet เมื่อเทียบกับ CSV ได้แก่ compression ที่ดีกว่า 5-10 เท่า (Snappy, GZIP, ZSTD), columnar format ทำให้ analytical queries เร็วกว่า 10-100 เท่า, schema enforcement ที่มี type safety, predicate pushdown ที่ skip data ที่ไม่ต้องการ และ encoding schemes (dictionary, RLE, delta) ที่ลดขนาดไฟล์

Progressive Delivery คือแนวคิดการส่งมอบ data หรือ features แบบค่อยๆ เปิดให้ผู้ใช้ แทนที่จะ release ทั้งหมดในครั้งเดียว ใช้ feature flags, canary releases และ A/B testing สำหรับ data pipelines หมายถึงการ deploy data schema changes, new data sources หรือ processing logic แบบ gradual rollout

การรวม Parquet format กับ Progressive Delivery ทำให้สามารถ version data schemas, rollback data changes, test new data formats กับ subset ของ consumers และ monitor data quality ก่อน full release

โครงสร้าง Parquet File Format

เข้าใจ internal structure ของ Parquet

# === Parquet File Structure ===
#
# ┌─────────────────────────────────┐
# │         Magic Number            │  "PAR1" (4 bytes)
# ├─────────────────────────────────┤
# │         Row Group 1             │
# │  ┌──────────────────────────┐  │
# │  │  Column Chunk: col_A     │  │
# │  │  ┌─ Page 1 (data page) ┐│  │
# │  │  │  Header + Data       ││  │
# │  │  └─────────────────────┘│  │
# │  │  ┌─ Page 2 (data page) ┐│  │
# │  │  │  Header + Data       ││  │
# │  │  └─────────────────────┘│  │
# │  └──────────────────────────┘  │
# │  ┌──────────────────────────┐  │
# │  │  Column Chunk: col_B     │  │
# │  │  ┌─ Page 1             ┐│  │
# │  │  │  Header + Data       ││  │
# │  │  └─────────────────────┘│  │
# │  └──────────────────────────┘  │
# ├─────────────────────────────────┤
# │         Row Group 2             │
# │  (same structure)               │
# ├─────────────────────────────────┤
# │         Footer                  │
# │  ┌──────────────────────────┐  │
# │  │  File Metadata           │  │
# │  │  - Schema                │  │
# │  │  - Row Group metadata    │  │
# │  │  - Column statistics     │  │
# │  │  - Key-Value metadata    │  │
# │  └──────────────────────────┘  │
# │  Footer Length (4 bytes)        │
# │  Magic Number "PAR1"           │
# └─────────────────────────────────┘
#
# === Encoding Types ===
# PLAIN: raw values
# DICTIONARY: dictionary + indices (good for low cardinality)
# RLE: Run Length Encoding (good for repeated values)
# DELTA_BINARY_PACKED: delta encoding for integers
# DELTA_LENGTH_BYTE_ARRAY: delta encoding for strings
# DELTA_BYTE_ARRAY: prefix + suffix encoding
#
# === Compression Codecs ===
# UNCOMPRESSED: no compression
# SNAPPY: fast compression/decompression (default)
# GZIP: better ratio, slower
# LZ4: very fast, moderate ratio
# ZSTD: best ratio with good speed (recommended)
# BROTLI: high ratio, slow compression
#
# === Data Types ===
# BOOLEAN, INT32, INT64, INT96 (deprecated)
# FLOAT, DOUBLE
# BYTE_ARRAY (strings, binary)
# FIXED_LEN_BYTE_ARRAY
# Logical types: STRING, DATE, TIMESTAMP, DECIMAL, UUID, LIST, MAP
#
# === Key Concepts ===
# Row Group: horizontal partition (default 128MB)
# Column Chunk: one column in a row group
# Page: smallest unit of I/O (default 1MB)
# Statistics: min/max/null_count per column chunk
# Predicate Pushdown: skip row groups based on statistics

ใช้งาน Parquet ด้วย Python และ Spark

อ่านเขียน Parquet files ด้วยเครื่องมือต่างๆ

#!/usr/bin/env python3
# parquet_operations.py — Parquet File Operations
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
from pathlib import Path
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("parquet_ops")

# === PyArrow (recommended) ===

# สร้าง Parquet file
def create_parquet_pyarrow():
    # Define schema
    schema = pa.schema([
        pa.field("id", pa.int64()),
        pa.field("name", pa.string()),
        pa.field("amount", pa.float64()),
        pa.field("category", pa.dictionary(pa.int32(), pa.string())),
        pa.field("created_at", pa.timestamp("ms")),
        pa.field("tags", pa.list_(pa.string())),
        pa.field("metadata", pa.map_(pa.string(), pa.string())),
    ])
    
    # Create data
    table = pa.table({
        "id": range(1000000),
        "name": [f"item_{i}" for i in range(1000000)],
        "amount": np.random.uniform(10.0, 1000.0, 1000000),
        "category": np.random.choice(["A", "B", "C", "D"], 1000000),
        "created_at": pd.date_range("2024-01-01", periods=1000000, freq="s"),
        "tags": [["tag1", "tag2"] for _ in range(1000000)],
        "metadata": [{"key": "value"} for _ in range(1000000)],
    }, schema=schema)
    
    # Write with options
    pq.write_table(
        table,
        "data/output.parquet",
        compression="zstd",
        compression_level=3,
        row_group_size=1000000,
        use_dictionary=True,
        write_statistics=True,
        version="2.6",
    )
    
    logger.info(f"Written {len(table)} rows")

# อ่าน Parquet file
def read_parquet_pyarrow():
    # Read full file
    table = pq.read_table("data/output.parquet")
    print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
    print(f"Schema: {table.schema}")
    
    # Read specific columns (columnar advantage)
    table = pq.read_table("data/output.parquet", columns=["id", "amount"])
    
    # Read with filter (predicate pushdown)
    table = pq.read_table(
        "data/output.parquet",
        filters=[
            ("amount", ">", 500.0),
            ("category", "==", "A"),
        ],
    )
    print(f"Filtered rows: {table.num_rows}")
    
    # Read metadata only
    parquet_file = pq.ParquetFile("data/output.parquet")
    metadata = parquet_file.metadata
    print(f"Row groups: {metadata.num_row_groups}")
    print(f"Total rows: {metadata.num_rows}")
    
    for i in range(metadata.num_row_groups):
        rg = metadata.row_group(i)
        print(f"  Row group {i}: {rg.num_rows} rows")
        for j in range(rg.num_columns):
            col = rg.column(j)
            print(f"    {col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")

# === Pandas Integration ===
def pandas_parquet():
    df = pd.DataFrame({
        "id": range(100000),
        "value": np.random.randn(100000),
        "category": np.random.choice(["X", "Y", "Z"], 100000),
    })
    
    # Write
    df.to_parquet("data/pandas_output.parquet", engine="pyarrow", compression="zstd")
    
    # Read
    df = pd.read_parquet("data/pandas_output.parquet")
    
    # Read specific columns
    df = pd.read_parquet("data/pandas_output.parquet", columns=["id", "value"])
    
    # Read with filter
    df = pd.read_parquet(
        "data/pandas_output.parquet",
        filters=[("category", "==", "X")],
    )

# === Partitioned Dataset ===
def partitioned_dataset():
    table = pa.table({
        "date": pd.date_range("2024-01-01", periods=1000, freq="D").repeat(100),
        "region": np.random.choice(["us", "eu", "asia"], 100000),
        "value": np.random.randn(100000),
    })
    
    # Write partitioned
    pq.write_to_dataset(
        table,
        root_path="data/partitioned/",
        partition_cols=["region"],
        compression="zstd",
    )
    
    # Read partition
    ds = pq.read_table("data/partitioned/", filters=[("region", "==", "us")])
    print(f"US data: {ds.num_rows} rows")

create_parquet_pyarrow()

Progressive Delivery สำหรับ Data Pipelines

ระบบ progressive delivery สำหรับ data schema changes

#!/usr/bin/env python3
# progressive_data_delivery.py — Progressive Delivery for Data
import pyarrow as pa
import pyarrow.parquet as pq
import json
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("progressive_delivery")

class SchemaRegistry:
    def __init__(self, registry_path="schema_registry"):
        self.path = Path(registry_path)
        self.path.mkdir(parents=True, exist_ok=True)
    
    def register_schema(self, name, schema: pa.Schema, version=None):
        if version is None:
            existing = list(self.path.glob(f"{name}_v*.json"))
            version = len(existing) + 1
        
        schema_dict = {
            "name": name,
            "version": version,
            "fields": [
                {"name": f.name, "type": str(f.type), "nullable": f.nullable}
                for f in schema
            ],
            "registered_at": datetime.utcnow().isoformat(),
            "checksum": hashlib.md5(schema.to_string().encode()).hexdigest(),
        }
        
        filepath = self.path / f"{name}_v{version}.json"
        filepath.write_text(json.dumps(schema_dict, indent=2))
        
        logger.info(f"Registered schema: {name} v{version}")
        return version
    
    def get_schema(self, name, version=None):
        if version:
            filepath = self.path / f"{name}_v{version}.json"
        else:
            files = sorted(self.path.glob(f"{name}_v*.json"))
            if not files:
                return None
            filepath = files[-1]
        
        return json.loads(filepath.read_text())
    
    def check_compatibility(self, name, new_schema: pa.Schema):
        current = self.get_schema(name)
        if not current:
            return {"compatible": True, "changes": ["new_schema"]}
        
        current_fields = {f["name"]: f for f in current["fields"]}
        new_fields = {f.name: f for f in new_schema}
        
        changes = []
        breaking = False
        
        # Check removed fields
        for field_name in current_fields:
            if field_name not in new_fields:
                changes.append(f"REMOVED: {field_name}")
                breaking = True
        
        # Check added fields
        for field_name, field in new_fields.items():
            if field_name not in current_fields:
                if not field.nullable:
                    changes.append(f"ADDED (non-nullable): {field_name} — BREAKING")
                    breaking = True
                else:
                    changes.append(f"ADDED (nullable): {field_name}")
        
        # Check type changes
        for field_name in current_fields:
            if field_name in new_fields:
                old_type = current_fields[field_name]["type"]
                new_type = str(new_fields[field_name].type)
                if old_type != new_type:
                    changes.append(f"TYPE CHANGED: {field_name} ({old_type} -> {new_type})")
                    breaking = True
        
        return {
            "compatible": not breaking,
            "breaking": breaking,
            "changes": changes,
        }

class ProgressiveDataDelivery:
    def __init__(self, base_path="data/delivery"):
        self.base_path = Path(base_path)
        self.registry = SchemaRegistry()
        self.rollout_config = {}
    
    def create_delivery(self, name, schema, data_table, rollout_stages=None):
        if rollout_stages is None:
            rollout_stages = [
                {"name": "canary", "percentage": 5, "duration_hours": 2},
                {"name": "early_adopters", "percentage": 25, "duration_hours": 4},
                {"name": "majority", "percentage": 75, "duration_hours": 8},
                {"name": "full", "percentage": 100, "duration_hours": 0},
            ]
        
        # Check compatibility
        compat = self.registry.check_compatibility(name, schema)
        if compat["breaking"]:
            logger.warning(f"Breaking changes detected: {compat['changes']}")
            logger.warning("Progressive delivery recommended")
        
        # Register new schema version
        version = self.registry.register_schema(name, schema)
        
        # Write data
        output_dir = self.base_path / name / f"v{version}"
        output_dir.mkdir(parents=True, exist_ok=True)
        
        pq.write_table(
            data_table,
            output_dir / "data.parquet",
            compression="zstd",
        )
        
        # Save delivery config
        delivery_config = {
            "name": name,
            "version": version,
            "schema_compatible": compat["compatible"],
            "changes": compat["changes"],
            "rollout_stages": rollout_stages,
            "current_stage": 0,
            "status": "created",
            "created_at": datetime.utcnow().isoformat(),
        }
        
        (output_dir / "delivery.json").write_text(json.dumps(delivery_config, indent=2))
        
        logger.info(f"Delivery created: {name} v{version}")
        return delivery_config
    
    def advance_stage(self, name, version):
        config_path = self.base_path / name / f"v{version}" / "delivery.json"
        config = json.loads(config_path.read_text())
        
        current = config["current_stage"]
        stages = config["rollout_stages"]
        
        if current >= len(stages) - 1:
            config["status"] = "complete"
            logger.info(f"{name} v{version}: fully rolled out")
        else:
            config["current_stage"] = current + 1
            stage = stages[config["current_stage"]]
            config["status"] = f"rolling_out_{stage['name']}"
            logger.info(f"{name} v{version}: advancing to {stage['name']} ({stage['percentage']}%)")
        
        config_path.write_text(json.dumps(config, indent=2))
        return config
    
    def rollback(self, name, version):
        config_path = self.base_path / name / f"v{version}" / "delivery.json"
        config = json.loads(config_path.read_text())
        
        config["status"] = "rolled_back"
        config["rolled_back_at"] = datetime.utcnow().isoformat()
        config_path.write_text(json.dumps(config, indent=2))
        
        logger.info(f"Rolled back: {name} v{version}")
        return config

# delivery = ProgressiveDataDelivery()
# schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.float64())])
# table = pa.table({"id": [1,2,3], "value": [1.0, 2.0, 3.0]})
# delivery.create_delivery("sales_data", schema, table)

สร้าง Data Delivery Pipeline ด้วย Parquet

Pipeline สำหรับ deliver data ด้วย Parquet format

#!/usr/bin/env python3
# data_pipeline.py — Parquet Data Delivery Pipeline
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
import json
import logging
import hashlib

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_pipeline")

class ParquetDataPipeline:
    def __init__(self, data_lake_path="data_lake"):
        self.lake_path = Path(data_lake_path)
        self.lake_path.mkdir(parents=True, exist_ok=True)
    
    def ingest(self, source_name, data, partition_by=None):
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        
        if isinstance(data, pd.DataFrame):
            table = pa.Table.from_pandas(data)
        elif isinstance(data, pa.Table):
            table = data
        else:
            raise ValueError("Data must be DataFrame or Arrow Table")
        
        output_path = self.lake_path / "raw" / source_name
        
        if partition_by:
            pq.write_to_dataset(
                table,
                root_path=str(output_path),
                partition_cols=partition_by,
                compression="zstd",
                existing_data_behavior="overwrite_or_ignore",
            )
        else:
            output_path.mkdir(parents=True, exist_ok=True)
            pq.write_table(
                table,
                output_path / f"{timestamp}.parquet",
                compression="zstd",
            )
        
        # Write metadata
        meta = {
            "source": source_name,
            "rows": table.num_rows,
            "columns": table.num_columns,
            "schema": str(table.schema),
            "ingested_at": datetime.utcnow().isoformat(),
            "checksum": hashlib.md5(
                table.to_pandas().to_json().encode()
            ).hexdigest()[:16],
        }
        
        meta_path = output_path / f"_metadata_{timestamp}.json"
        meta_path.write_text(json.dumps(meta, indent=2))
        
        logger.info(f"Ingested {table.num_rows} rows from {source_name}")
        return meta
    
    def transform(self, source_name, output_name, transform_fn):
        source_path = self.lake_path / "raw" / source_name
        dataset = ds.dataset(str(source_path), format="parquet")
        table = dataset.to_table()
        
        df = table.to_pandas()
        df_transformed = transform_fn(df)
        
        output_path = self.lake_path / "processed" / output_name
        output_path.mkdir(parents=True, exist_ok=True)
        
        result_table = pa.Table.from_pandas(df_transformed)
        pq.write_table(
            result_table,
            output_path / f"{datetime.utcnow().strftime('%Y%m%d')}.parquet",
            compression="zstd",
        )
        
        logger.info(f"Transformed {source_name} -> {output_name}: {len(df_transformed)} rows")
        return len(df_transformed)
    
    def validate(self, dataset_name, layer="processed"):
        data_path = self.lake_path / layer / dataset_name
        dataset = ds.dataset(str(data_path), format="parquet")
        table = dataset.to_table()
        df = table.to_pandas()
        
        checks = {
            "row_count": len(df),
            "null_counts": df.isnull().sum().to_dict(),
            "duplicate_count": df.duplicated().sum(),
            "schema_valid": True,
        }
        
        # Check for nulls in required columns
        issues = []
        total_nulls = df.isnull().sum().sum()
        if total_nulls > 0:
            issues.append(f"Found {total_nulls} null values")
        
        if checks["duplicate_count"] > 0:
            issues.append(f"Found {checks['duplicate_count']} duplicates")
        
        checks["passed"] = len(issues) == 0
        checks["issues"] = issues
        
        logger.info(f"Validation {dataset_name}: {'PASS' if checks['passed'] else 'FAIL'}")
        return checks
    
    def deliver(self, dataset_name, destination, format="parquet"):
        data_path = self.lake_path / "processed" / dataset_name
        dataset = ds.dataset(str(data_path), format="parquet")
        table = dataset.to_table()
        
        dest_path = Path(destination)
        dest_path.mkdir(parents=True, exist_ok=True)
        
        if format == "parquet":
            pq.write_table(table, dest_path / f"{dataset_name}.parquet", compression="zstd")
        elif format == "csv":
            table.to_pandas().to_csv(dest_path / f"{dataset_name}.csv", index=False)
        
        logger.info(f"Delivered {dataset_name} to {destination} ({format})")
        return table.num_rows
    
    def catalog(self):
        catalog = {"datasets": [], "generated_at": datetime.utcnow().isoformat()}
        
        for layer in ["raw", "processed"]:
            layer_path = self.lake_path / layer
            if not layer_path.exists():
                continue
            
            for dataset_dir in layer_path.iterdir():
                if dataset_dir.is_dir():
                    parquet_files = list(dataset_dir.glob("**/*.parquet"))
                    total_size = sum(f.stat().st_size for f in parquet_files)
                    
                    catalog["datasets"].append({
                        "name": dataset_dir.name,
                        "layer": layer,
                        "files": len(parquet_files),
                        "total_size_mb": round(total_size / 1024 / 1024, 2),
                    })
        
        return catalog

# pipeline = ParquetDataPipeline()
# df = pd.DataFrame({"id": range(1000), "value": np.random.randn(1000)})
# pipeline.ingest("sales", df)
# pipeline.transform("sales", "sales_clean", lambda df: df.dropna())
# pipeline.validate("sales_clean")
# pipeline.deliver("sales_clean", "output/")

Performance Optimization และ Best Practices

เทคนิค optimize Parquet performance

#!/usr/bin/env python3
# parquet_optimization.py — Parquet Performance Best Practices
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import time
import os

# === 1. Row Group Size ===
# Default 128MB ปรับตาม use case
# Large row groups: better compression, slower random access
# Small row groups: faster filtering, more metadata overhead

def optimal_row_group_size():
    data = pa.table({"id": range(10_000_000), "val": np.random.randn(10_000_000)})
    
    for rg_size in [100_000, 500_000, 1_000_000]:
        start = time.time()
        pq.write_table(data, f"/tmp/rg_{rg_size}.parquet", row_group_size=rg_size)
        write_time = time.time() - start
        
        file_size = os.path.getsize(f"/tmp/rg_{rg_size}.parquet") / 1024 / 1024
        
        start = time.time()
        pq.read_table(f"/tmp/rg_{rg_size}.parquet", filters=[("id", "<", 1000)])
        read_time = time.time() - start
        
        print(f"RG={rg_size}: size={file_size:.1f}MB, write={write_time:.2f}s, read={read_time:.3f}s")

# === 2. Compression Comparison ===
def compression_benchmark():
    data = pa.table({
        "id": range(1_000_000),
        "name": [f"user_{i % 10000}" for i in range(1_000_000)],
        "value": np.random.randn(1_000_000),
        "category": np.random.choice(["A", "B", "C"], 1_000_000),
    })
    
    for codec in ["none", "snappy", "gzip", "zstd", "lz4"]:
        start = time.time()
        pq.write_table(data, f"/tmp/comp_{codec}.parquet",
                       compression=codec if codec != "none" else "NONE")
        write_time = time.time() - start
        
        file_size = os.path.getsize(f"/tmp/comp_{codec}.parquet") / 1024 / 1024
        
        start = time.time()
        pq.read_table(f"/tmp/comp_{codec}.parquet")
        read_time = time.time() - start
        
        print(f"{codec:8s}: {file_size:6.1f}MB, write={write_time:.3f}s, read={read_time:.3f}s")

# === 3. Dictionary Encoding ===
# ใช้สำหรับ columns ที่มี cardinality ต่ำ (เช่น country, status)
def dictionary_encoding():
    # Low cardinality: dictionary encoding ดีมาก
    categories = np.random.choice(["cat_" + str(i) for i in range(100)], 1_000_000)
    
    # Without dictionary
    table = pa.table({"category": categories})
    pq.write_table(table, "/tmp/no_dict.parquet", use_dictionary=False)
    
    # With dictionary
    pq.write_table(table, "/tmp/with_dict.parquet", use_dictionary=True)
    
    no_dict = os.path.getsize("/tmp/no_dict.parquet") / 1024
    with_dict = os.path.getsize("/tmp/with_dict.parquet") / 1024
    
    print(f"Without dict: {no_dict:.0f}KB")
    print(f"With dict: {with_dict:.0f}KB")
    print(f"Savings: {(1 - with_dict/no_dict)*100:.0f}%")

# === 4. Predicate Pushdown ===
def predicate_pushdown_demo():
    # Write large dataset
    n = 10_000_000
    table = pa.table({
        "id": range(n),
        "amount": np.random.uniform(0, 10000, n),
        "region": np.random.choice(["US", "EU", "ASIA", "LATAM"], n),
    })
    pq.write_table(table, "/tmp/large.parquet", row_group_size=1_000_000)
    
    # Without filter (reads everything)
    start = time.time()
    full = pq.read_table("/tmp/large.parquet")
    full_time = time.time() - start
    
    # With predicate pushdown (skips row groups)
    start = time.time()
    filtered = pq.read_table("/tmp/large.parquet",
                              filters=[("region", "==", "US"), ("amount", ">", 9000)])
    filter_time = time.time() - start
    
    print(f"Full read: {full.num_rows} rows in {full_time:.3f}s")
    print(f"Filtered: {filtered.num_rows} rows in {filter_time:.3f}s")
    print(f"Speedup: {full_time/filter_time:.1f}x")

# === Best Practices Summary ===
# 1. Use ZSTD compression (best ratio/speed tradeoff)
# 2. Enable dictionary encoding for low-cardinality columns
# 3. Row group size 128MB-256MB for analytics, smaller for random access
# 4. Partition by commonly filtered columns (date, region)
# 5. Sort data before writing (improves compression and filtering)
# 6. Use predicate pushdown for selective queries
# 7. Write statistics (min/max) for predicate pushdown
# 8. Use schema evolution carefully (add nullable columns)
# 9. Compact small files periodically
# 10. Monitor file sizes (avoid too many small files)

# optimal_row_group_size()
# compression_benchmark()
# predicate_pushdown_demo()

FAQ คำถามที่พบบ่อย

Q: Parquet กับ ORC ต่างกันอย่างไร?

A: ทั้งสองเป็น columnar format Parquet เกิดจาก Cloudera/Twitter ได้รับ adoption กว้างกว่า ใช้ได้กับทุก framework (Spark, Pandas, DuckDB, Polars) ORC เกิดจาก Hortonworks เน้น Hive ecosystem มี ACID support ดีกว่า Parquet ได้ compression ratio ดีกว่าเล็กน้อยในบาง cases แต่ Parquet มี ecosystem support กว้างกว่า สำหรับ projects ใหม่แนะนำ Parquet

Q: Parquet file ใหญ่แค่ไหนถึงจะดี?

A: ขนาดที่แนะนำคือ 128MB-1GB ต่อไฟล์ ไฟล์เล็กเกินไป (< 10MB) ทำให้ metadata overhead สูงและมี file listing ช้า ไฟล์ใหญ่เกินไป (> 2GB) ทำให้ parallelism ต่ำ สำหรับ data lake ที่ partition by date ขนาด 128-512MB ต่อ partition เหมาะสม ถ้ามี small files มาก ควร compact (merge) เป็น files ที่ใหญ่ขึ้น

Q: Progressive Delivery สำหรับ data ต่างจาก software อย่างไร?

A: Software progressive delivery เปิดปิด features ด้วย code paths Data progressive delivery ต้อง handle schema evolution ด้วย (เพิ่ม/ลบ columns, เปลี่ยน types) ซึ่งซับซ้อนกว่าเพราะ downstream consumers อาจ depend on specific schema Data rollback ก็ยากกว่าเพราะ data อาจถูก consume ไปแล้ว ต้องใช้ versioned datasets และ schema registry

Q: เมื่อไหรไม่ควรใช้ Parquet?

A: ไม่เหมาะสำหรับ transactional workloads (OLTP) ที่ต้อง update/delete rows บ่อย (ใช้ relational database แทน), real-time streaming ที่ต้อง append ทีละ row (ใช้ Kafka/Avro แทน), small datasets (< 1MB) ที่ CSV/JSON ง่ายกว่า และ use cases ที่ต้อง human-readable format (ใช้ JSON/CSV) Parquet เหมาะสำหรับ analytical workloads ที่อ่านมากกว่าเขียน

📖 บทความที่เกี่ยวข้อง

Parquet Format Automation Scriptอ่านบทความ → CrewAI Multi-Agent Progressive Deliveryอ่านบทความ → Parquet Format Database Migrationอ่านบทความ → Parquet Format Cost Optimization ลดค่าใช้จ่ายอ่านบทความ → Parquet Format Freelance IT Careerอ่านบทความ →

📚 ดูบทความทั้งหมด →