SQLite Litestream Machine Learning Pipeline คืออะไร
SQLite เป็น embedded database ที่เบา เร็ว และไม่ต้อง server เหมาะสำหรับ applications ขนาดเล็ก-กลาง Litestream เป็น open-source tool ที่ทำ streaming replication ของ SQLite database ไปยัง cloud storage (S3, GCS, Azure Blob) แบบ real-time ทำให้ SQLite มี durability และ disaster recovery เทียบเท่า managed databases Machine Learning Pipeline คือกระบวนการตั้งแต่ data collection, feature engineering, model training ไปจนถึง deployment การรวม SQLite + Litestream กับ ML Pipeline ช่วยให้สร้าง lightweight ML infrastructure ที่ cost-effective ไม่ต้องพึ่ง managed database services แพงๆ
SQLite + Litestream Architecture
# architecture.py — SQLite + Litestream architecture
import json
class SQLiteLitestreamArch:
COMPONENTS = {
"sqlite": {
"name": "SQLite Database",
"description": "Embedded database — single file, zero configuration, ACID compliant",
"benefit": "ไม่ต้อง server, ไม่ต้อง network, เร็วมาก (in-process)",
"limitation": "Single writer, ไม่ scale horizontally",
},
"litestream": {
"name": "Litestream",
"description": "Streaming replication — replicate SQLite WAL to cloud storage",
"benefit": "Point-in-time recovery, disaster recovery, near-zero RPO",
"how": "Monitor WAL changes → compress → upload to S3/GCS continuously",
},
"s3_storage": {
"name": "Cloud Storage (S3/GCS)",
"description": "เก็บ WAL snapshots และ incremental changes",
"cost": "~$0.023/GB/month (S3) — ถูกมากเทียบกับ managed DB",
},
}
LITESTREAM_CONFIG = """
# litestream.yml — Litestream configuration
dbs:
- path: /data/ml_pipeline.db
replicas:
- type: s3
bucket: my-ml-backup
path: ml_pipeline
region: ap-southeast-1
retention: 720h # 30 days retention
sync-interval: 1s # sync every 1 second
snapshot-interval: 1h # full snapshot every hour
"""
def show_components(self):
print("=== Architecture ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print(f" Benefit: {comp['benefit']}")
print()
def show_config(self):
print("=== Litestream Config ===")
print(self.LITESTREAM_CONFIG[:400])
arch = SQLiteLitestreamArch()
arch.show_components()
arch.show_config()
ML Pipeline Design
# pipeline.py — ML pipeline with SQLite
import json
class MLPipelineDesign:
STAGES = {
"data_collection": {
"name": "1. Data Collection",
"storage": "SQLite: raw_data table — append-only, timestamped",
"litestream": "Auto-replicate ทุก insert → S3 backup",
},
"feature_store": {
"name": "2. Feature Store",
"storage": "SQLite: features table — pre-computed features, versioned",
"litestream": "Feature snapshots replicated → reproducible training",
},
"experiment_tracking": {
"name": "3. Experiment Tracking",
"storage": "SQLite: experiments table — hyperparams, metrics, model paths",
"litestream": "Experiment history preserved — never lose tracking data",
},
"model_registry": {
"name": "4. Model Registry",
"storage": "SQLite: models table — model metadata, version, status",
"litestream": "Model registry replicated — disaster recovery",
},
"predictions": {
"name": "5. Prediction Serving",
"storage": "SQLite: predictions table — input, output, timestamp, latency",
"litestream": "Prediction logs replicated → monitoring, debugging",
},
}
SCHEMA = """
-- ML Pipeline SQLite Schema
CREATE TABLE raw_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
data JSON NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE features (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT NOT NULL,
feature_name TEXT NOT NULL,
feature_value REAL,
version INTEGER DEFAULT 1,
computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE experiments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
hyperparams JSON,
metrics JSON,
model_path TEXT,
status TEXT DEFAULT 'running',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
version TEXT NOT NULL,
experiment_id INTEGER REFERENCES experiments(id),
status TEXT DEFAULT 'staging',
metrics JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id INTEGER REFERENCES models(id),
input JSON NOT NULL,
output JSON NOT NULL,
latency_ms REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
def show_stages(self):
print("=== ML Pipeline Stages ===\n")
for key, stage in self.STAGES.items():
print(f"[{stage['name']}]")
print(f" Storage: {stage['storage']}")
print()
def show_schema(self):
print("=== SQLite Schema ===")
print(self.SCHEMA[:500])
pipeline = MLPipelineDesign()
pipeline.show_stages()
Python Implementation
# implementation.py — Python ML pipeline with SQLite
import json
class MLPipelineImpl:
CODE = """
# ml_pipeline.py — Lightweight ML pipeline with SQLite + Litestream
import sqlite3
import json
import hashlib
import pickle
from datetime import datetime
from pathlib import Path
from contextlib import contextmanager
class MLPipeline:
def __init__(self, db_path="ml_pipeline.db"):
self.db_path = db_path
self._init_db()
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # Required for Litestream
conn.execute("PRAGMA busy_timeout=5000")
try:
yield conn
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
def _init_db(self):
with self._conn() as conn:
conn.executescript('''
CREATE TABLE IF NOT EXISTS raw_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT, data JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS features (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT, feature_name TEXT, feature_value REAL,
version INTEGER DEFAULT 1, computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS experiments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, hyperparams JSON, metrics JSON,
model_path TEXT, status TEXT DEFAULT 'running',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, version TEXT, experiment_id INTEGER,
status TEXT DEFAULT 'staging', metrics JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id INTEGER, input JSON, output JSON,
latency_ms REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''')
# === Data Collection ===
def ingest_data(self, source, data):
with self._conn() as conn:
conn.execute(
"INSERT INTO raw_data (source, data) VALUES (?, ?)",
(source, json.dumps(data)),
)
# === Feature Store ===
def store_features(self, entity_id, features, version=1):
with self._conn() as conn:
for name, value in features.items():
conn.execute(
"INSERT INTO features (entity_id, feature_name, feature_value, version) VALUES (?, ?, ?, ?)",
(entity_id, name, value, version),
)
def get_features(self, entity_id, version=None):
with self._conn() as conn:
if version:
rows = conn.execute(
"SELECT feature_name, feature_value FROM features WHERE entity_id=? AND version=?",
(entity_id, version),
).fetchall()
else:
rows = conn.execute(
"SELECT feature_name, feature_value FROM features WHERE entity_id=? ORDER BY version DESC",
(entity_id,),
).fetchall()
return {r['feature_name']: r['feature_value'] for r in rows}
# === Experiment Tracking ===
def start_experiment(self, name, hyperparams):
with self._conn() as conn:
cursor = conn.execute(
"INSERT INTO experiments (name, hyperparams) VALUES (?, ?)",
(name, json.dumps(hyperparams)),
)
return cursor.lastrowid
def log_metrics(self, experiment_id, metrics):
with self._conn() as conn:
conn.execute(
"UPDATE experiments SET metrics=?, status='completed' WHERE id=?",
(json.dumps(metrics), experiment_id),
)
# === Model Registry ===
def register_model(self, name, version, experiment_id, metrics):
with self._conn() as conn:
cursor = conn.execute(
"INSERT INTO models (name, version, experiment_id, metrics) VALUES (?, ?, ?, ?)",
(name, version, experiment_id, json.dumps(metrics)),
)
return cursor.lastrowid
def promote_model(self, model_id, status='production'):
with self._conn() as conn:
# Demote current production
conn.execute(
"UPDATE models SET status='archived' WHERE status='production' AND name=(SELECT name FROM models WHERE id=?)",
(model_id,),
)
conn.execute("UPDATE models SET status=? WHERE id=?", (status, model_id))
# === Prediction Logging ===
def log_prediction(self, model_id, input_data, output_data, latency_ms):
with self._conn() as conn:
conn.execute(
"INSERT INTO predictions (model_id, input, output, latency_ms) VALUES (?, ?, ?, ?)",
(model_id, json.dumps(input_data), json.dumps(output_data), latency_ms),
)
# === Dashboard ===
def dashboard(self):
with self._conn() as conn:
experiments = conn.execute("SELECT COUNT(*) as c FROM experiments").fetchone()['c']
models = conn.execute("SELECT COUNT(*) as c FROM models").fetchone()['c']
predictions = conn.execute("SELECT COUNT(*) as c FROM predictions").fetchone()['c']
return {
'total_experiments': experiments,
'total_models': models,
'total_predictions': predictions,
'db_size_mb': round(Path(self.db_path).stat().st_size / 1e6, 2),
}
# pipeline = MLPipeline("ml_pipeline.db")
# pipeline.ingest_data("sensor", {"temp": 25.5, "humidity": 60})
# exp_id = pipeline.start_experiment("xgboost_v1", {"n_estimators": 100, "lr": 0.1})
# pipeline.log_metrics(exp_id, {"accuracy": 0.95, "f1": 0.93})
# model_id = pipeline.register_model("churn_model", "v1.0", exp_id, {"accuracy": 0.95})
"""
def show_code(self):
print("=== ML Pipeline ===")
print(self.CODE[:600])
impl = MLPipelineImpl()
impl.show_code()
Docker Deployment
# docker.py — Docker deployment with Litestream
import json
class DockerDeployment:
DOCKERFILE = """
# Dockerfile — ML Pipeline with Litestream
FROM python:3.12-slim
# Install Litestream
ADD https://github.com/benbjohnson/litestream/releases/download/v0.3.13/litestream-v0.3.13-linux-amd64.tar.gz /tmp/
RUN tar -C /usr/local/bin -xzf /tmp/litestream-v0.3.13-linux-amd64.tar.gz
# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . /app
WORKDIR /app
# Litestream config
COPY litestream.yml /etc/litestream.yml
# Start: restore DB from S3 → run app with Litestream replication
CMD ["litestream", "replicate", "-exec", "python app.py"]
"""
COMPOSE = """
# docker-compose.yaml
version: '3.8'
services:
ml-pipeline:
build: .
volumes:
- ml-data:/data
environment:
- AWS_ACCESS_KEY_ID=
- AWS_SECRET_ACCESS_KEY=
- LITESTREAM_REPLICA_URL=s3://my-bucket/ml-pipeline
ports:
- "8000:8000"
volumes:
ml-data:
"""
RESTORE = """
# Restore from backup (disaster recovery)
litestream restore -o /data/ml_pipeline.db s3://my-bucket/ml-pipeline
# Restore to specific point in time
litestream restore -o /data/ml_pipeline.db -timestamp 2026-01-15T10:30:00Z s3://my-bucket/ml-pipeline
"""
def show_dockerfile(self):
print("=== Dockerfile ===")
print(self.DOCKERFILE[:400])
def show_restore(self):
print("\n=== Restore Commands ===")
print(self.RESTORE[:300])
docker = DockerDeployment()
docker.show_dockerfile()
docker.show_restore()
Cost Comparison
# cost.py — Cost comparison: SQLite+Litestream vs managed DB
import json
class CostComparison:
COMPARISON = {
"sqlite_litestream": {
"name": "SQLite + Litestream + S3",
"compute": "$5-20/month (small VPS)",
"storage": "$0.50-5/month (S3 backups)",
"total": "$5-25/month",
"pros": "ถูกมาก, simple, no vendor lock-in",
"cons": "Single writer, ไม่ scale horizontally, ต้อง manage เอง",
},
"rds_postgres": {
"name": "AWS RDS PostgreSQL",
"compute": "$30-200/month (db.t3.small-medium)",
"storage": "$10-50/month (gp3)",
"total": "$40-250/month",
"pros": "Managed, HA, auto-backup, multi-AZ",
"cons": "แพงกว่า 5-10x, vendor lock-in",
},
"supabase": {
"name": "Supabase (Managed PostgreSQL)",
"compute": "$0-25/month (free-pro)",
"storage": "Included",
"total": "$0-25/month",
"pros": "Free tier, managed, built-in auth/API",
"cons": "Shared resources on free tier, limited on pro",
},
}
def show_comparison(self):
print("=== Cost Comparison ===\n")
for key, opt in self.COMPARISON.items():
print(f"[{opt['name']}]")
print(f" Total: {opt['total']}")
print(f" Pros: {opt['pros']}")
print()
cost = CostComparison()
cost.show_comparison()
FAQ - คำถามที่พบบ่อย
Q: SQLite เหมาะกับ ML Pipeline จริงไหม?
A: เหมาะสำหรับ small-medium scale: Data < 10GB, single server, single writer predictions < 1000 req/sec (SQLite handles well) ไม่เหมาะเมื่อ: data > 50GB, multiple writers, high concurrency, distributed system ใช้ SQLite: prototyping, small teams, edge ML, personal projects ย้ายไป PostgreSQL: เมื่อ scale เกิน SQLite limits
Q: Litestream มี data loss ไหม?
A: Near-zero RPO: Litestream sync ทุก 1 วินาที (configurable) — data loss สูงสุด ~1 วินาที WAL-based: replicate WAL changes ต่อเนื่อง — ไม่ใช่ periodic backup Recovery: restore จาก S3 ได้ทั้ง latest state และ point-in-time สำคัญ: ต้อง enable WAL mode (PRAGMA journal_mode=WAL) — Litestream ต้องการ WAL
Q: SQLite กับ DuckDB อันไหนดีกว่าสำหรับ ML?
A: SQLite: general-purpose, OLTP-oriented, small footprint, Litestream support DuckDB: analytical (OLAP), columnar storage, fast aggregations, Parquet/CSV native เลือก SQLite: application database, feature store, experiment tracking, prediction logging เลือก DuckDB: data analysis, feature engineering, large dataset processing ใช้ร่วมกัน: SQLite สำหรับ operational data + DuckDB สำหรับ analytics/training
Q: Litestream กับ regular backup ต่างกันอย่างไร?
A: Regular backup (sqlite3 .backup): point-in-time snapshot — RPO = backup interval (hours) Litestream: continuous WAL replication — RPO ≈ 1 second Litestream ดีกว่า: granular recovery, lower RPO, automatic, no downtime during backup Regular backup ดีสำหรับ: simple needs, manual restore OK, no S3 needed แนะนำ: Litestream สำหรับ production + occasional full backup สำหรับ long-term archive
