Apache Parquet คืออะไรและทำไมถึงสำคัญ
Apache Parquet เป็น columnar storage format ที่ออกแบบมาสำหรับ big data processing เก็บข้อมูลแบบ column-oriented แทน row-oriented ทำให้ query ที่ select เฉพาะบาง columns เร็วมากเพราะอ่านเฉพาะ columns ที่ต้องการ ไม่ต้องอ่านทั้ง row
ข้อดีของ Parquet เมื่อเทียบกับ CSV ได้แก่ compression ที่ดีกว่า 5-10 เท่า (Snappy, GZIP, ZSTD), columnar format ทำให้ analytical queries เร็วกว่า 10-100 เท่า, schema enforcement ที่มี type safety, predicate pushdown ที่ skip data ที่ไม่ต้องการ และ encoding schemes (dictionary, RLE, delta) ที่ลดขนาดไฟล์
Progressive Delivery คือแนวคิดการส่งมอบ data หรือ features แบบค่อยๆ เปิดให้ผู้ใช้ แทนที่จะ release ทั้งหมดในครั้งเดียว ใช้ feature flags, canary releases และ A/B testing สำหรับ data pipelines หมายถึงการ deploy data schema changes, new data sources หรือ processing logic แบบ gradual rollout
การรวม Parquet format กับ Progressive Delivery ทำให้สามารถ version data schemas, rollback data changes, test new data formats กับ subset ของ consumers และ monitor data quality ก่อน full release
โครงสร้าง Parquet File Format
เข้าใจ internal structure ของ Parquet
# === Parquet File Structure ===
#
# ┌─────────────────────────────────┐
# │ Magic Number │ "PAR1" (4 bytes)
# ├─────────────────────────────────┤
# │ Row Group 1 │
# │ ┌──────────────────────────┐ │
# │ │ Column Chunk: col_A │ │
# │ │ ┌─ Page 1 (data page) ┐│ │
# │ │ │ Header + Data ││ │
# │ │ └─────────────────────┘│ │
# │ │ ┌─ Page 2 (data page) ┐│ │
# │ │ │ Header + Data ││ │
# │ │ └─────────────────────┘│ │
# │ └──────────────────────────┘ │
# │ ┌──────────────────────────┐ │
# │ │ Column Chunk: col_B │ │
# │ │ ┌─ Page 1 ┐│ │
# │ │ │ Header + Data ││ │
# │ │ └─────────────────────┘│ │
# │ └──────────────────────────┘ │
# ├─────────────────────────────────┤
# │ Row Group 2 │
# │ (same structure) │
# ├─────────────────────────────────┤
# │ Footer │
# │ ┌──────────────────────────┐ │
# │ │ File Metadata │ │
# │ │ - Schema │ │
# │ │ - Row Group metadata │ │
# │ │ - Column statistics │ │
# │ │ - Key-Value metadata │ │
# │ └──────────────────────────┘ │
# │ Footer Length (4 bytes) │
# │ Magic Number "PAR1" │
# └─────────────────────────────────┘
#
# === Encoding Types ===
# PLAIN: raw values
# DICTIONARY: dictionary + indices (good for low cardinality)
# RLE: Run Length Encoding (good for repeated values)
# DELTA_BINARY_PACKED: delta encoding for integers
# DELTA_LENGTH_BYTE_ARRAY: delta encoding for strings
# DELTA_BYTE_ARRAY: prefix + suffix encoding
#
# === Compression Codecs ===
# UNCOMPRESSED: no compression
# SNAPPY: fast compression/decompression (default)
# GZIP: better ratio, slower
# LZ4: very fast, moderate ratio
# ZSTD: best ratio with good speed (recommended)
# BROTLI: high ratio, slow compression
#
# === Data Types ===
# BOOLEAN, INT32, INT64, INT96 (deprecated)
# FLOAT, DOUBLE
# BYTE_ARRAY (strings, binary)
# FIXED_LEN_BYTE_ARRAY
# Logical types: STRING, DATE, TIMESTAMP, DECIMAL, UUID, LIST, MAP
#
# === Key Concepts ===
# Row Group: horizontal partition (default 128MB)
# Column Chunk: one column in a row group
# Page: smallest unit of I/O (default 1MB)
# Statistics: min/max/null_count per column chunk
# Predicate Pushdown: skip row groups based on statistics
ใช้งาน Parquet ด้วย Python และ Spark
อ่านเขียน Parquet files ด้วยเครื่องมือต่างๆ
#!/usr/bin/env python3
# parquet_operations.py — Parquet File Operations
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
from pathlib import Path
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("parquet_ops")
# === PyArrow (recommended) ===
# สร้าง Parquet file
def create_parquet_pyarrow():
# Define schema
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("amount", pa.float64()),
pa.field("category", pa.dictionary(pa.int32(), pa.string())),
pa.field("created_at", pa.timestamp("ms")),
pa.field("tags", pa.list_(pa.string())),
pa.field("metadata", pa.map_(pa.string(), pa.string())),
])
# Create data
table = pa.table({
"id": range(1000000),
"name": [f"item_{i}" for i in range(1000000)],
"amount": np.random.uniform(10.0, 1000.0, 1000000),
"category": np.random.choice(["A", "B", "C", "D"], 1000000),
"created_at": pd.date_range("2024-01-01", periods=1000000, freq="s"),
"tags": [["tag1", "tag2"] for _ in range(1000000)],
"metadata": [{"key": "value"} for _ in range(1000000)],
}, schema=schema)
# Write with options
pq.write_table(
table,
"data/output.parquet",
compression="zstd",
compression_level=3,
row_group_size=1000000,
use_dictionary=True,
write_statistics=True,
version="2.6",
)
logger.info(f"Written {len(table)} rows")
# อ่าน Parquet file
def read_parquet_pyarrow():
# Read full file
table = pq.read_table("data/output.parquet")
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
print(f"Schema: {table.schema}")
# Read specific columns (columnar advantage)
table = pq.read_table("data/output.parquet", columns=["id", "amount"])
# Read with filter (predicate pushdown)
table = pq.read_table(
"data/output.parquet",
filters=[
("amount", ">", 500.0),
("category", "==", "A"),
],
)
print(f"Filtered rows: {table.num_rows}")
# Read metadata only
parquet_file = pq.ParquetFile("data/output.parquet")
metadata = parquet_file.metadata
print(f"Row groups: {metadata.num_row_groups}")
print(f"Total rows: {metadata.num_rows}")
for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
print(f" Row group {i}: {rg.num_rows} rows")
for j in range(rg.num_columns):
col = rg.column(j)
print(f" {col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
# === Pandas Integration ===
def pandas_parquet():
df = pd.DataFrame({
"id": range(100000),
"value": np.random.randn(100000),
"category": np.random.choice(["X", "Y", "Z"], 100000),
})
# Write
df.to_parquet("data/pandas_output.parquet", engine="pyarrow", compression="zstd")
# Read
df = pd.read_parquet("data/pandas_output.parquet")
# Read specific columns
df = pd.read_parquet("data/pandas_output.parquet", columns=["id", "value"])
# Read with filter
df = pd.read_parquet(
"data/pandas_output.parquet",
filters=[("category", "==", "X")],
)
# === Partitioned Dataset ===
def partitioned_dataset():
table = pa.table({
"date": pd.date_range("2024-01-01", periods=1000, freq="D").repeat(100),
"region": np.random.choice(["us", "eu", "asia"], 100000),
"value": np.random.randn(100000),
})
# Write partitioned
pq.write_to_dataset(
table,
root_path="data/partitioned/",
partition_cols=["region"],
compression="zstd",
)
# Read partition
ds = pq.read_table("data/partitioned/", filters=[("region", "==", "us")])
print(f"US data: {ds.num_rows} rows")
create_parquet_pyarrow()
Progressive Delivery สำหรับ Data Pipelines
ระบบ progressive delivery สำหรับ data schema changes
#!/usr/bin/env python3
# progressive_data_delivery.py — Progressive Delivery for Data
import pyarrow as pa
import pyarrow.parquet as pq
import json
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("progressive_delivery")
class SchemaRegistry:
def __init__(self, registry_path="schema_registry"):
self.path = Path(registry_path)
self.path.mkdir(parents=True, exist_ok=True)
def register_schema(self, name, schema: pa.Schema, version=None):
if version is None:
existing = list(self.path.glob(f"{name}_v*.json"))
version = len(existing) + 1
schema_dict = {
"name": name,
"version": version,
"fields": [
{"name": f.name, "type": str(f.type), "nullable": f.nullable}
for f in schema
],
"registered_at": datetime.utcnow().isoformat(),
"checksum": hashlib.md5(schema.to_string().encode()).hexdigest(),
}
filepath = self.path / f"{name}_v{version}.json"
filepath.write_text(json.dumps(schema_dict, indent=2))
logger.info(f"Registered schema: {name} v{version}")
return version
def get_schema(self, name, version=None):
if version:
filepath = self.path / f"{name}_v{version}.json"
else:
files = sorted(self.path.glob(f"{name}_v*.json"))
if not files:
return None
filepath = files[-1]
return json.loads(filepath.read_text())
def check_compatibility(self, name, new_schema: pa.Schema):
current = self.get_schema(name)
if not current:
return {"compatible": True, "changes": ["new_schema"]}
current_fields = {f["name"]: f for f in current["fields"]}
new_fields = {f.name: f for f in new_schema}
changes = []
breaking = False
# Check removed fields
for field_name in current_fields:
if field_name not in new_fields:
changes.append(f"REMOVED: {field_name}")
breaking = True
# Check added fields
for field_name, field in new_fields.items():
if field_name not in current_fields:
if not field.nullable:
changes.append(f"ADDED (non-nullable): {field_name} — BREAKING")
breaking = True
else:
changes.append(f"ADDED (nullable): {field_name}")
# Check type changes
for field_name in current_fields:
if field_name in new_fields:
old_type = current_fields[field_name]["type"]
new_type = str(new_fields[field_name].type)
if old_type != new_type:
changes.append(f"TYPE CHANGED: {field_name} ({old_type} -> {new_type})")
breaking = True
return {
"compatible": not breaking,
"breaking": breaking,
"changes": changes,
}
class ProgressiveDataDelivery:
def __init__(self, base_path="data/delivery"):
self.base_path = Path(base_path)
self.registry = SchemaRegistry()
self.rollout_config = {}
def create_delivery(self, name, schema, data_table, rollout_stages=None):
if rollout_stages is None:
rollout_stages = [
{"name": "canary", "percentage": 5, "duration_hours": 2},
{"name": "early_adopters", "percentage": 25, "duration_hours": 4},
{"name": "majority", "percentage": 75, "duration_hours": 8},
{"name": "full", "percentage": 100, "duration_hours": 0},
]
# Check compatibility
compat = self.registry.check_compatibility(name, schema)
if compat["breaking"]:
logger.warning(f"Breaking changes detected: {compat['changes']}")
logger.warning("Progressive delivery recommended")
# Register new schema version
version = self.registry.register_schema(name, schema)
# Write data
output_dir = self.base_path / name / f"v{version}"
output_dir.mkdir(parents=True, exist_ok=True)
pq.write_table(
data_table,
output_dir / "data.parquet",
compression="zstd",
)
# Save delivery config
delivery_config = {
"name": name,
"version": version,
"schema_compatible": compat["compatible"],
"changes": compat["changes"],
"rollout_stages": rollout_stages,
"current_stage": 0,
"status": "created",
"created_at": datetime.utcnow().isoformat(),
}
(output_dir / "delivery.json").write_text(json.dumps(delivery_config, indent=2))
logger.info(f"Delivery created: {name} v{version}")
return delivery_config
def advance_stage(self, name, version):
config_path = self.base_path / name / f"v{version}" / "delivery.json"
config = json.loads(config_path.read_text())
current = config["current_stage"]
stages = config["rollout_stages"]
if current >= len(stages) - 1:
config["status"] = "complete"
logger.info(f"{name} v{version}: fully rolled out")
else:
config["current_stage"] = current + 1
stage = stages[config["current_stage"]]
config["status"] = f"rolling_out_{stage['name']}"
logger.info(f"{name} v{version}: advancing to {stage['name']} ({stage['percentage']}%)")
config_path.write_text(json.dumps(config, indent=2))
return config
def rollback(self, name, version):
config_path = self.base_path / name / f"v{version}" / "delivery.json"
config = json.loads(config_path.read_text())
config["status"] = "rolled_back"
config["rolled_back_at"] = datetime.utcnow().isoformat()
config_path.write_text(json.dumps(config, indent=2))
logger.info(f"Rolled back: {name} v{version}")
return config
# delivery = ProgressiveDataDelivery()
# schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.float64())])
# table = pa.table({"id": [1,2,3], "value": [1.0, 2.0, 3.0]})
# delivery.create_delivery("sales_data", schema, table)
สร้าง Data Delivery Pipeline ด้วย Parquet
Pipeline สำหรับ deliver data ด้วย Parquet format
#!/usr/bin/env python3
# data_pipeline.py — Parquet Data Delivery Pipeline
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
import json
import logging
import hashlib
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_pipeline")
class ParquetDataPipeline:
def __init__(self, data_lake_path="data_lake"):
self.lake_path = Path(data_lake_path)
self.lake_path.mkdir(parents=True, exist_ok=True)
def ingest(self, source_name, data, partition_by=None):
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
if isinstance(data, pd.DataFrame):
table = pa.Table.from_pandas(data)
elif isinstance(data, pa.Table):
table = data
else:
raise ValueError("Data must be DataFrame or Arrow Table")
output_path = self.lake_path / "raw" / source_name
if partition_by:
pq.write_to_dataset(
table,
root_path=str(output_path),
partition_cols=partition_by,
compression="zstd",
existing_data_behavior="overwrite_or_ignore",
)
else:
output_path.mkdir(parents=True, exist_ok=True)
pq.write_table(
table,
output_path / f"{timestamp}.parquet",
compression="zstd",
)
# Write metadata
meta = {
"source": source_name,
"rows": table.num_rows,
"columns": table.num_columns,
"schema": str(table.schema),
"ingested_at": datetime.utcnow().isoformat(),
"checksum": hashlib.md5(
table.to_pandas().to_json().encode()
).hexdigest()[:16],
}
meta_path = output_path / f"_metadata_{timestamp}.json"
meta_path.write_text(json.dumps(meta, indent=2))
logger.info(f"Ingested {table.num_rows} rows from {source_name}")
return meta
def transform(self, source_name, output_name, transform_fn):
source_path = self.lake_path / "raw" / source_name
dataset = ds.dataset(str(source_path), format="parquet")
table = dataset.to_table()
df = table.to_pandas()
df_transformed = transform_fn(df)
output_path = self.lake_path / "processed" / output_name
output_path.mkdir(parents=True, exist_ok=True)
result_table = pa.Table.from_pandas(df_transformed)
pq.write_table(
result_table,
output_path / f"{datetime.utcnow().strftime('%Y%m%d')}.parquet",
compression="zstd",
)
logger.info(f"Transformed {source_name} -> {output_name}: {len(df_transformed)} rows")
return len(df_transformed)
def validate(self, dataset_name, layer="processed"):
data_path = self.lake_path / layer / dataset_name
dataset = ds.dataset(str(data_path), format="parquet")
table = dataset.to_table()
df = table.to_pandas()
checks = {
"row_count": len(df),
"null_counts": df.isnull().sum().to_dict(),
"duplicate_count": df.duplicated().sum(),
"schema_valid": True,
}
# Check for nulls in required columns
issues = []
total_nulls = df.isnull().sum().sum()
if total_nulls > 0:
issues.append(f"Found {total_nulls} null values")
if checks["duplicate_count"] > 0:
issues.append(f"Found {checks['duplicate_count']} duplicates")
checks["passed"] = len(issues) == 0
checks["issues"] = issues
logger.info(f"Validation {dataset_name}: {'PASS' if checks['passed'] else 'FAIL'}")
return checks
def deliver(self, dataset_name, destination, format="parquet"):
data_path = self.lake_path / "processed" / dataset_name
dataset = ds.dataset(str(data_path), format="parquet")
table = dataset.to_table()
dest_path = Path(destination)
dest_path.mkdir(parents=True, exist_ok=True)
if format == "parquet":
pq.write_table(table, dest_path / f"{dataset_name}.parquet", compression="zstd")
elif format == "csv":
table.to_pandas().to_csv(dest_path / f"{dataset_name}.csv", index=False)
logger.info(f"Delivered {dataset_name} to {destination} ({format})")
return table.num_rows
def catalog(self):
catalog = {"datasets": [], "generated_at": datetime.utcnow().isoformat()}
for layer in ["raw", "processed"]:
layer_path = self.lake_path / layer
if not layer_path.exists():
continue
for dataset_dir in layer_path.iterdir():
if dataset_dir.is_dir():
parquet_files = list(dataset_dir.glob("**/*.parquet"))
total_size = sum(f.stat().st_size for f in parquet_files)
catalog["datasets"].append({
"name": dataset_dir.name,
"layer": layer,
"files": len(parquet_files),
"total_size_mb": round(total_size / 1024 / 1024, 2),
})
return catalog
# pipeline = ParquetDataPipeline()
# df = pd.DataFrame({"id": range(1000), "value": np.random.randn(1000)})
# pipeline.ingest("sales", df)
# pipeline.transform("sales", "sales_clean", lambda df: df.dropna())
# pipeline.validate("sales_clean")
# pipeline.deliver("sales_clean", "output/")
Performance Optimization และ Best Practices
เทคนิค optimize Parquet performance
#!/usr/bin/env python3
# parquet_optimization.py — Parquet Performance Best Practices
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import time
import os
# === 1. Row Group Size ===
# Default 128MB ปรับตาม use case
# Large row groups: better compression, slower random access
# Small row groups: faster filtering, more metadata overhead
def optimal_row_group_size():
data = pa.table({"id": range(10_000_000), "val": np.random.randn(10_000_000)})
for rg_size in [100_000, 500_000, 1_000_000]:
start = time.time()
pq.write_table(data, f"/tmp/rg_{rg_size}.parquet", row_group_size=rg_size)
write_time = time.time() - start
file_size = os.path.getsize(f"/tmp/rg_{rg_size}.parquet") / 1024 / 1024
start = time.time()
pq.read_table(f"/tmp/rg_{rg_size}.parquet", filters=[("id", "<", 1000)])
read_time = time.time() - start
print(f"RG={rg_size}: size={file_size:.1f}MB, write={write_time:.2f}s, read={read_time:.3f}s")
# === 2. Compression Comparison ===
def compression_benchmark():
data = pa.table({
"id": range(1_000_000),
"name": [f"user_{i % 10000}" for i in range(1_000_000)],
"value": np.random.randn(1_000_000),
"category": np.random.choice(["A", "B", "C"], 1_000_000),
})
for codec in ["none", "snappy", "gzip", "zstd", "lz4"]:
start = time.time()
pq.write_table(data, f"/tmp/comp_{codec}.parquet",
compression=codec if codec != "none" else "NONE")
write_time = time.time() - start
file_size = os.path.getsize(f"/tmp/comp_{codec}.parquet") / 1024 / 1024
start = time.time()
pq.read_table(f"/tmp/comp_{codec}.parquet")
read_time = time.time() - start
print(f"{codec:8s}: {file_size:6.1f}MB, write={write_time:.3f}s, read={read_time:.3f}s")
# === 3. Dictionary Encoding ===
# ใช้สำหรับ columns ที่มี cardinality ต่ำ (เช่น country, status)
def dictionary_encoding():
# Low cardinality: dictionary encoding ดีมาก
categories = np.random.choice(["cat_" + str(i) for i in range(100)], 1_000_000)
# Without dictionary
table = pa.table({"category": categories})
pq.write_table(table, "/tmp/no_dict.parquet", use_dictionary=False)
# With dictionary
pq.write_table(table, "/tmp/with_dict.parquet", use_dictionary=True)
no_dict = os.path.getsize("/tmp/no_dict.parquet") / 1024
with_dict = os.path.getsize("/tmp/with_dict.parquet") / 1024
print(f"Without dict: {no_dict:.0f}KB")
print(f"With dict: {with_dict:.0f}KB")
print(f"Savings: {(1 - with_dict/no_dict)*100:.0f}%")
# === 4. Predicate Pushdown ===
def predicate_pushdown_demo():
# Write large dataset
n = 10_000_000
table = pa.table({
"id": range(n),
"amount": np.random.uniform(0, 10000, n),
"region": np.random.choice(["US", "EU", "ASIA", "LATAM"], n),
})
pq.write_table(table, "/tmp/large.parquet", row_group_size=1_000_000)
# Without filter (reads everything)
start = time.time()
full = pq.read_table("/tmp/large.parquet")
full_time = time.time() - start
# With predicate pushdown (skips row groups)
start = time.time()
filtered = pq.read_table("/tmp/large.parquet",
filters=[("region", "==", "US"), ("amount", ">", 9000)])
filter_time = time.time() - start
print(f"Full read: {full.num_rows} rows in {full_time:.3f}s")
print(f"Filtered: {filtered.num_rows} rows in {filter_time:.3f}s")
print(f"Speedup: {full_time/filter_time:.1f}x")
# === Best Practices Summary ===
# 1. Use ZSTD compression (best ratio/speed tradeoff)
# 2. Enable dictionary encoding for low-cardinality columns
# 3. Row group size 128MB-256MB for analytics, smaller for random access
# 4. Partition by commonly filtered columns (date, region)
# 5. Sort data before writing (improves compression and filtering)
# 6. Use predicate pushdown for selective queries
# 7. Write statistics (min/max) for predicate pushdown
# 8. Use schema evolution carefully (add nullable columns)
# 9. Compact small files periodically
# 10. Monitor file sizes (avoid too many small files)
# optimal_row_group_size()
# compression_benchmark()
# predicate_pushdown_demo()
FAQ คำถามที่พบบ่อย
Q: Parquet กับ ORC ต่างกันอย่างไร?
A: ทั้งสองเป็น columnar format Parquet เกิดจาก Cloudera/Twitter ได้รับ adoption กว้างกว่า ใช้ได้กับทุก framework (Spark, Pandas, DuckDB, Polars) ORC เกิดจาก Hortonworks เน้น Hive ecosystem มี ACID support ดีกว่า Parquet ได้ compression ratio ดีกว่าเล็กน้อยในบาง cases แต่ Parquet มี ecosystem support กว้างกว่า สำหรับ projects ใหม่แนะนำ Parquet
Q: Parquet file ใหญ่แค่ไหนถึงจะดี?
A: ขนาดที่แนะนำคือ 128MB-1GB ต่อไฟล์ ไฟล์เล็กเกินไป (< 10MB) ทำให้ metadata overhead สูงและมี file listing ช้า ไฟล์ใหญ่เกินไป (> 2GB) ทำให้ parallelism ต่ำ สำหรับ data lake ที่ partition by date ขนาด 128-512MB ต่อ partition เหมาะสม ถ้ามี small files มาก ควร compact (merge) เป็น files ที่ใหญ่ขึ้น
Q: Progressive Delivery สำหรับ data ต่างจาก software อย่างไร?
A: Software progressive delivery เปิดปิด features ด้วย code paths Data progressive delivery ต้อง handle schema evolution ด้วย (เพิ่ม/ลบ columns, เปลี่ยน types) ซึ่งซับซ้อนกว่าเพราะ downstream consumers อาจ depend on specific schema Data rollback ก็ยากกว่าเพราะ data อาจถูก consume ไปแล้ว ต้องใช้ versioned datasets และ schema registry
Q: เมื่อไหรไม่ควรใช้ Parquet?
A: ไม่เหมาะสำหรับ transactional workloads (OLTP) ที่ต้อง update/delete rows บ่อย (ใช้ relational database แทน), real-time streaming ที่ต้อง append ทีละ row (ใช้ Kafka/Avro แทน), small datasets (< 1MB) ที่ CSV/JSON ง่ายกว่า และ use cases ที่ต้อง human-readable format (ใช้ JSON/CSV) Parquet เหมาะสำหรับ analytical workloads ที่อ่านมากกว่าเขียน
