SiamCafe · Blog
SQLite Litestream Machine Learning Pipeline
บทความ

SQLite Litestream Machine Learning Pipeline

เผยแพร่ 28 พฤษภาคม 2569

SQLite Litestream Machine Learning Pipeline คืออะไร

SQLite เป็น embedded database ที่เบา เร็ว และไม่ต้อง server เหมาะสำหรับ applications ขนาดเล็ก-กลาง Litestream เป็น open-source tool ที่ทำ streaming replication ของ SQLite database ไปยัง cloud storage (S3, GCS, Azure Blob) แบบ real-time ทำให้ SQLite มี durability และ disaster recovery เทียบเท่า managed databases Machine Learning Pipeline คือกระบวนการตั้งแต่ data collection, feature engineering, model training ไปจนถึง deployment การรวม SQLite + Litestream กับ ML Pipeline ช่วยให้สร้าง lightweight ML infrastructure ที่ cost-effective ไม่ต้องพึ่ง managed database services แพงๆ

SQLite + Litestream Architecture

# architecture.py — SQLite + Litestream architecture
import json

class SQLiteLitestreamArch:
    COMPONENTS = {
        "sqlite": {
            "name": "SQLite Database",
            "description": "Embedded database — single file, zero configuration, ACID compliant",
            "benefit": "ไม่ต้อง server, ไม่ต้อง network, เร็วมาก (in-process)",
            "limitation": "Single writer, ไม่ scale horizontally",
        },
        "litestream": {
            "name": "Litestream",
            "description": "Streaming replication — replicate SQLite WAL to cloud storage",
            "benefit": "Point-in-time recovery, disaster recovery, near-zero RPO",
            "how": "Monitor WAL changes → compress → upload to S3/GCS continuously",
        },
        "s3_storage": {
            "name": "Cloud Storage (S3/GCS)",
            "description": "เก็บ WAL snapshots และ incremental changes",
            "cost": "~$0.023/GB/month (S3) — ถูกมากเทียบกับ managed DB",
        },
    }

    LITESTREAM_CONFIG = """
# litestream.yml — Litestream configuration
dbs:
  - path: /data/ml_pipeline.db
    replicas:
      - type: s3
        bucket: my-ml-backup
        path: ml_pipeline
        region: ap-southeast-1
        retention: 720h        # 30 days retention
        sync-interval: 1s      # sync every 1 second
        snapshot-interval: 1h  # full snapshot every hour
"""

    def show_components(self):
        print("=== Architecture ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print(f"  Benefit: {comp['benefit']}")
            print()

    def show_config(self):
        print("=== Litestream Config ===")
        print(self.LITESTREAM_CONFIG[:400])

arch = SQLiteLitestreamArch()
arch.show_components()
arch.show_config()

ML Pipeline Design

# pipeline.py — ML pipeline with SQLite
import json

class MLPipelineDesign:
    STAGES = {
        "data_collection": {
            "name": "1. Data Collection",
            "storage": "SQLite: raw_data table — append-only, timestamped",
            "litestream": "Auto-replicate ทุก insert → S3 backup",
        },
        "feature_store": {
            "name": "2. Feature Store",
            "storage": "SQLite: features table — pre-computed features, versioned",
            "litestream": "Feature snapshots replicated → reproducible training",
        },
        "experiment_tracking": {
            "name": "3. Experiment Tracking",
            "storage": "SQLite: experiments table — hyperparams, metrics, model paths",
            "litestream": "Experiment history preserved — never lose tracking data",
        },
        "model_registry": {
            "name": "4. Model Registry",
            "storage": "SQLite: models table — model metadata, version, status",
            "litestream": "Model registry replicated — disaster recovery",
        },
        "predictions": {
            "name": "5. Prediction Serving",
            "storage": "SQLite: predictions table — input, output, timestamp, latency",
            "litestream": "Prediction logs replicated → monitoring, debugging",
        },
    }

    SCHEMA = """
-- ML Pipeline SQLite Schema
CREATE TABLE raw_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    source TEXT NOT NULL,
    data JSON NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE features (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    entity_id TEXT NOT NULL,
    feature_name TEXT NOT NULL,
    feature_value REAL,
    version INTEGER DEFAULT 1,
    computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE experiments (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    hyperparams JSON,
    metrics JSON,
    model_path TEXT,
    status TEXT DEFAULT 'running',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE models (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    version TEXT NOT NULL,
    experiment_id INTEGER REFERENCES experiments(id),
    status TEXT DEFAULT 'staging',
    metrics JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE predictions (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    model_id INTEGER REFERENCES models(id),
    input JSON NOT NULL,
    output JSON NOT NULL,
    latency_ms REAL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

    def show_stages(self):
        print("=== ML Pipeline Stages ===\n")
        for key, stage in self.STAGES.items():
            print(f"[{stage['name']}]")
            print(f"  Storage: {stage['storage']}")
            print()

    def show_schema(self):
        print("=== SQLite Schema ===")
        print(self.SCHEMA[:500])

pipeline = MLPipelineDesign()
pipeline.show_stages()

Python Implementation

# implementation.py — Python ML pipeline with SQLite
import json

class MLPipelineImpl:
    CODE = """
# ml_pipeline.py — Lightweight ML pipeline with SQLite + Litestream
import sqlite3
import json
import hashlib
import pickle
from datetime import datetime
from pathlib import Path
from contextlib import contextmanager

class MLPipeline:
    def __init__(self, db_path="ml_pipeline.db"):
        self.db_path = db_path
        self._init_db()
    
    @contextmanager
    def _conn(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        conn.execute("PRAGMA journal_mode=WAL")  # Required for Litestream
        conn.execute("PRAGMA busy_timeout=5000")
        try:
            yield conn
            conn.commit()
        except:
            conn.rollback()
            raise
        finally:
            conn.close()
    
    def _init_db(self):
        with self._conn() as conn:
            conn.executescript('''
                CREATE TABLE IF NOT EXISTS raw_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    source TEXT, data JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS features (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    entity_id TEXT, feature_name TEXT, feature_value REAL,
                    version INTEGER DEFAULT 1, computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS experiments (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT, hyperparams JSON, metrics JSON,
                    model_path TEXT, status TEXT DEFAULT 'running',
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS models (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT, version TEXT, experiment_id INTEGER,
                    status TEXT DEFAULT 'staging', metrics JSON,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS predictions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    model_id INTEGER, input JSON, output JSON,
                    latency_ms REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            ''')
    
    # === Data Collection ===
    def ingest_data(self, source, data):
        with self._conn() as conn:
            conn.execute(
                "INSERT INTO raw_data (source, data) VALUES (?, ?)",
                (source, json.dumps(data)),
            )
    
    # === Feature Store ===
    def store_features(self, entity_id, features, version=1):
        with self._conn() as conn:
            for name, value in features.items():
                conn.execute(
                    "INSERT INTO features (entity_id, feature_name, feature_value, version) VALUES (?, ?, ?, ?)",
                    (entity_id, name, value, version),
                )
    
    def get_features(self, entity_id, version=None):
        with self._conn() as conn:
            if version:
                rows = conn.execute(
                    "SELECT feature_name, feature_value FROM features WHERE entity_id=? AND version=?",
                    (entity_id, version),
                ).fetchall()
            else:
                rows = conn.execute(
                    "SELECT feature_name, feature_value FROM features WHERE entity_id=? ORDER BY version DESC",
                    (entity_id,),
                ).fetchall()
            return {r['feature_name']: r['feature_value'] for r in rows}
    
    # === Experiment Tracking ===
    def start_experiment(self, name, hyperparams):
        with self._conn() as conn:
            cursor = conn.execute(
                "INSERT INTO experiments (name, hyperparams) VALUES (?, ?)",
                (name, json.dumps(hyperparams)),
            )
            return cursor.lastrowid
    
    def log_metrics(self, experiment_id, metrics):
        with self._conn() as conn:
            conn.execute(
                "UPDATE experiments SET metrics=?, status='completed' WHERE id=?",
                (json.dumps(metrics), experiment_id),
            )
    
    # === Model Registry ===
    def register_model(self, name, version, experiment_id, metrics):
        with self._conn() as conn:
            cursor = conn.execute(
                "INSERT INTO models (name, version, experiment_id, metrics) VALUES (?, ?, ?, ?)",
                (name, version, experiment_id, json.dumps(metrics)),
            )
            return cursor.lastrowid
    
    def promote_model(self, model_id, status='production'):
        with self._conn() as conn:
            # Demote current production
            conn.execute(
                "UPDATE models SET status='archived' WHERE status='production' AND name=(SELECT name FROM models WHERE id=?)",
                (model_id,),
            )
            conn.execute("UPDATE models SET status=? WHERE id=?", (status, model_id))
    
    # === Prediction Logging ===
    def log_prediction(self, model_id, input_data, output_data, latency_ms):
        with self._conn() as conn:
            conn.execute(
                "INSERT INTO predictions (model_id, input, output, latency_ms) VALUES (?, ?, ?, ?)",
                (model_id, json.dumps(input_data), json.dumps(output_data), latency_ms),
            )
    
    # === Dashboard ===
    def dashboard(self):
        with self._conn() as conn:
            experiments = conn.execute("SELECT COUNT(*) as c FROM experiments").fetchone()['c']
            models = conn.execute("SELECT COUNT(*) as c FROM models").fetchone()['c']
            predictions = conn.execute("SELECT COUNT(*) as c FROM predictions").fetchone()['c']
            
            return {
                'total_experiments': experiments,
                'total_models': models,
                'total_predictions': predictions,
                'db_size_mb': round(Path(self.db_path).stat().st_size / 1e6, 2),
            }

# pipeline = MLPipeline("ml_pipeline.db")
# pipeline.ingest_data("sensor", {"temp": 25.5, "humidity": 60})
# exp_id = pipeline.start_experiment("xgboost_v1", {"n_estimators": 100, "lr": 0.1})
# pipeline.log_metrics(exp_id, {"accuracy": 0.95, "f1": 0.93})
# model_id = pipeline.register_model("churn_model", "v1.0", exp_id, {"accuracy": 0.95})
"""

    def show_code(self):
        print("=== ML Pipeline ===")
        print(self.CODE[:600])

impl = MLPipelineImpl()
impl.show_code()

Docker Deployment

# docker.py — Docker deployment with Litestream
import json

class DockerDeployment:
    DOCKERFILE = """
# Dockerfile — ML Pipeline with Litestream
FROM python:3.12-slim

# Install Litestream
ADD https://github.com/benbjohnson/litestream/releases/download/v0.3.13/litestream-v0.3.13-linux-amd64.tar.gz /tmp/
RUN tar -C /usr/local/bin -xzf /tmp/litestream-v0.3.13-linux-amd64.tar.gz

# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . /app
WORKDIR /app

# Litestream config
COPY litestream.yml /etc/litestream.yml

# Start: restore DB from S3 → run app with Litestream replication
CMD ["litestream", "replicate", "-exec", "python app.py"]
"""

    COMPOSE = """
# docker-compose.yaml
version: '3.8'
services:
  ml-pipeline:
    build: .
    volumes:
      - ml-data:/data
    environment:
      - AWS_ACCESS_KEY_ID=
      - AWS_SECRET_ACCESS_KEY=
      - LITESTREAM_REPLICA_URL=s3://my-bucket/ml-pipeline
    ports:
      - "8000:8000"

volumes:
  ml-data:
"""

    RESTORE = """
# Restore from backup (disaster recovery)
litestream restore -o /data/ml_pipeline.db s3://my-bucket/ml-pipeline

# Restore to specific point in time
litestream restore -o /data/ml_pipeline.db -timestamp 2026-01-15T10:30:00Z s3://my-bucket/ml-pipeline
"""

    def show_dockerfile(self):
        print("=== Dockerfile ===")
        print(self.DOCKERFILE[:400])

    def show_restore(self):
        print("\n=== Restore Commands ===")
        print(self.RESTORE[:300])

docker = DockerDeployment()
docker.show_dockerfile()
docker.show_restore()

Cost Comparison

# cost.py — Cost comparison: SQLite+Litestream vs managed DB
import json

class CostComparison:
    COMPARISON = {
        "sqlite_litestream": {
            "name": "SQLite + Litestream + S3",
            "compute": "$5-20/month (small VPS)",
            "storage": "$0.50-5/month (S3 backups)",
            "total": "$5-25/month",
            "pros": "ถูกมาก, simple, no vendor lock-in",
            "cons": "Single writer, ไม่ scale horizontally, ต้อง manage เอง",
        },
        "rds_postgres": {
            "name": "AWS RDS PostgreSQL",
            "compute": "$30-200/month (db.t3.small-medium)",
            "storage": "$10-50/month (gp3)",
            "total": "$40-250/month",
            "pros": "Managed, HA, auto-backup, multi-AZ",
            "cons": "แพงกว่า 5-10x, vendor lock-in",
        },
        "supabase": {
            "name": "Supabase (Managed PostgreSQL)",
            "compute": "$0-25/month (free-pro)",
            "storage": "Included",
            "total": "$0-25/month",
            "pros": "Free tier, managed, built-in auth/API",
            "cons": "Shared resources on free tier, limited on pro",
        },
    }

    def show_comparison(self):
        print("=== Cost Comparison ===\n")
        for key, opt in self.COMPARISON.items():
            print(f"[{opt['name']}]")
            print(f"  Total: {opt['total']}")
            print(f"  Pros: {opt['pros']}")
            print()

cost = CostComparison()
cost.show_comparison()

FAQ - คำถามที่พบบ่อย

Q: SQLite เหมาะกับ ML Pipeline จริงไหม?

A: เหมาะสำหรับ small-medium scale: Data < 10GB, single server, single writer predictions < 1000 req/sec (SQLite handles well) ไม่เหมาะเมื่อ: data > 50GB, multiple writers, high concurrency, distributed system ใช้ SQLite: prototyping, small teams, edge ML, personal projects ย้ายไป PostgreSQL: เมื่อ scale เกิน SQLite limits

Q: Litestream มี data loss ไหม?

A: Near-zero RPO: Litestream sync ทุก 1 วินาที (configurable) — data loss สูงสุด ~1 วินาที WAL-based: replicate WAL changes ต่อเนื่อง — ไม่ใช่ periodic backup Recovery: restore จาก S3 ได้ทั้ง latest state และ point-in-time สำคัญ: ต้อง enable WAL mode (PRAGMA journal_mode=WAL) — Litestream ต้องการ WAL

Q: SQLite กับ DuckDB อันไหนดีกว่าสำหรับ ML?

A: SQLite: general-purpose, OLTP-oriented, small footprint, Litestream support DuckDB: analytical (OLAP), columnar storage, fast aggregations, Parquet/CSV native เลือก SQLite: application database, feature store, experiment tracking, prediction logging เลือก DuckDB: data analysis, feature engineering, large dataset processing ใช้ร่วมกัน: SQLite สำหรับ operational data + DuckDB สำหรับ analytics/training

Q: Litestream กับ regular backup ต่างกันอย่างไร?

A: Regular backup (sqlite3 .backup): point-in-time snapshot — RPO = backup interval (hours) Litestream: continuous WAL replication — RPO ≈ 1 second Litestream ดีกว่า: granular recovery, lower RPO, automatic, no downtime during backup Regular backup ดีสำหรับ: simple needs, manual restore OK, no S3 needed แนะนำ: Litestream สำหรับ production + occasional full backup สำหรับ long-term archive