SiamCafe.net Blog
Cybersecurity

SQLite Litestream Machine Learning Pipeline

sqlite litestream machine learning pipeline
SQLite Litestream Machine Learning Pipeline | SiamCafe Blog
2026-04-03· อ. บอม — SiamCafe.net· 1,873 คำ

SQLite Litestream Machine Learning Pipeline คืออะไร

SQLite เป็น embedded database ที่เบา เร็ว และไม่ต้อง server เหมาะสำหรับ applications ขนาดเล็ก-กลาง Litestream เป็น open-source tool ที่ทำ streaming replication ของ SQLite database ไปยัง cloud storage (S3, GCS, Azure Blob) แบบ real-time ทำให้ SQLite มี durability และ disaster recovery เทียบเท่า managed databases Machine Learning Pipeline คือกระบวนการตั้งแต่ data collection, feature engineering, model training ไปจนถึง deployment การรวม SQLite + Litestream กับ ML Pipeline ช่วยให้สร้าง lightweight ML infrastructure ที่ cost-effective ไม่ต้องพึ่ง managed database services แพงๆ

SQLite + Litestream Architecture

# architecture.py — SQLite + Litestream architecture
import json

class SQLiteLitestreamArch:
    COMPONENTS = {
        "sqlite": {
            "name": "SQLite Database",
            "description": "Embedded database — single file, zero configuration, ACID compliant",
            "benefit": "ไม่ต้อง server, ไม่ต้อง network, เร็วมาก (in-process)",
            "limitation": "Single writer, ไม่ scale horizontally",
        },
        "litestream": {
            "name": "Litestream",
            "description": "Streaming replication — replicate SQLite WAL to cloud storage",
            "benefit": "Point-in-time recovery, disaster recovery, near-zero RPO",
            "how": "Monitor WAL changes → compress → upload to S3/GCS continuously",
        },
        "s3_storage": {
            "name": "Cloud Storage (S3/GCS)",
            "description": "เก็บ WAL snapshots และ incremental changes",
            "cost": "~$0.023/GB/month (S3) — ถูกมากเทียบกับ managed DB",
        },
    }

    LITESTREAM_CONFIG = """
# litestream.yml — Litestream configuration
dbs:
  - path: /data/ml_pipeline.db
    replicas:
      - type: s3
        bucket: my-ml-backup
        path: ml_pipeline
        region: ap-southeast-1
        retention: 720h        # 30 days retention
        sync-interval: 1s      # sync every 1 second
        snapshot-interval: 1h  # full snapshot every hour
"""

    def show_components(self):
        print("=== Architecture ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print(f"  Benefit: {comp['benefit']}")
            print()

    def show_config(self):
        print("=== Litestream Config ===")
        print(self.LITESTREAM_CONFIG[:400])

arch = SQLiteLitestreamArch()
arch.show_components()
arch.show_config()

ML Pipeline Design

# pipeline.py — ML pipeline with SQLite
import json

class MLPipelineDesign:
    STAGES = {
        "data_collection": {
            "name": "1. Data Collection",
            "storage": "SQLite: raw_data table — append-only, timestamped",
            "litestream": "Auto-replicate ทุก insert → S3 backup",
        },
        "feature_store": {
            "name": "2. Feature Store",
            "storage": "SQLite: features table — pre-computed features, versioned",
            "litestream": "Feature snapshots replicated → reproducible training",
        },
        "experiment_tracking": {
            "name": "3. Experiment Tracking",
            "storage": "SQLite: experiments table — hyperparams, metrics, model paths",
            "litestream": "Experiment history preserved — never lose tracking data",
        },
        "model_registry": {
            "name": "4. Model Registry",
            "storage": "SQLite: models table — model metadata, version, status",
            "litestream": "Model registry replicated — disaster recovery",
        },
        "predictions": {
            "name": "5. Prediction Serving",
            "storage": "SQLite: predictions table — input, output, timestamp, latency",
            "litestream": "Prediction logs replicated → monitoring, debugging",
        },
    }

    SCHEMA = """
-- ML Pipeline SQLite Schema
CREATE TABLE raw_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    source TEXT NOT NULL,
    data JSON NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE features (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    entity_id TEXT NOT NULL,
    feature_name TEXT NOT NULL,
    feature_value REAL,
    version INTEGER DEFAULT 1,
    computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE experiments (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    hyperparams JSON,
    metrics JSON,
    model_path TEXT,
    status TEXT DEFAULT 'running',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE models (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    version TEXT NOT NULL,
    experiment_id INTEGER REFERENCES experiments(id),
    status TEXT DEFAULT 'staging',
    metrics JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE predictions (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    model_id INTEGER REFERENCES models(id),
    input JSON NOT NULL,
    output JSON NOT NULL,
    latency_ms REAL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

    def show_stages(self):
        print("=== ML Pipeline Stages ===\n")
        for key, stage in self.STAGES.items():
            print(f"[{stage['name']}]")
            print(f"  Storage: {stage['storage']}")
            print()

    def show_schema(self):
        print("=== SQLite Schema ===")
        print(self.SCHEMA[:500])

pipeline = MLPipelineDesign()
pipeline.show_stages()

Python Implementation

# implementation.py — Python ML pipeline with SQLite
import json

class MLPipelineImpl:
    CODE = """
# ml_pipeline.py — Lightweight ML pipeline with SQLite + Litestream
import sqlite3
import json
import hashlib
import pickle
from datetime import datetime
from pathlib import Path
from contextlib import contextmanager

class MLPipeline:
    def __init__(self, db_path="ml_pipeline.db"):
        self.db_path = db_path
        self._init_db()
    
    @contextmanager
    def _conn(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        conn.execute("PRAGMA journal_mode=WAL")  # Required for Litestream
        conn.execute("PRAGMA busy_timeout=5000")
        try:
            yield conn
            conn.commit()
        except:
            conn.rollback()
            raise
        finally:
            conn.close()
    
    def _init_db(self):
        with self._conn() as conn:
            conn.executescript('''
                CREATE TABLE IF NOT EXISTS raw_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    source TEXT, data JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS features (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    entity_id TEXT, feature_name TEXT, feature_value REAL,
                    version INTEGER DEFAULT 1, computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS experiments (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT, hyperparams JSON, metrics JSON,
                    model_path TEXT, status TEXT DEFAULT 'running',
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS models (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT, version TEXT, experiment_id INTEGER,
                    status TEXT DEFAULT 'staging', metrics JSON,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                CREATE TABLE IF NOT EXISTS predictions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    model_id INTEGER, input JSON, output JSON,
                    latency_ms REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            ''')
    
    # === Data Collection ===
    def ingest_data(self, source, data):
        with self._conn() as conn:
            conn.execute(
                "INSERT INTO raw_data (source, data) VALUES (?, ?)",
                (source, json.dumps(data)),
            )
    
    # === Feature Store ===
    def store_features(self, entity_id, features, version=1):
        with self._conn() as conn:
            for name, value in features.items():
                conn.execute(
                    "INSERT INTO features (entity_id, feature_name, feature_value, version) VALUES (?, ?, ?, ?)",
                    (entity_id, name, value, version),
                )
    
    def get_features(self, entity_id, version=None):
        with self._conn() as conn:
            if version:
                rows = conn.execute(
                    "SELECT feature_name, feature_value FROM features WHERE entity_id=? AND version=?",
                    (entity_id, version),
                ).fetchall()
            else:
                rows = conn.execute(
                    "SELECT feature_name, feature_value FROM features WHERE entity_id=? ORDER BY version DESC",
                    (entity_id,),
                ).fetchall()
            return {r['feature_name']: r['feature_value'] for r in rows}
    
    # === Experiment Tracking ===
    def start_experiment(self, name, hyperparams):
        with self._conn() as conn:
            cursor = conn.execute(
                "INSERT INTO experiments (name, hyperparams) VALUES (?, ?)",
                (name, json.dumps(hyperparams)),
            )
            return cursor.lastrowid
    
    def log_metrics(self, experiment_id, metrics):
        with self._conn() as conn:
            conn.execute(
                "UPDATE experiments SET metrics=?, status='completed' WHERE id=?",
                (json.dumps(metrics), experiment_id),
            )
    
    # === Model Registry ===
    def register_model(self, name, version, experiment_id, metrics):
        with self._conn() as conn:
            cursor = conn.execute(
                "INSERT INTO models (name, version, experiment_id, metrics) VALUES (?, ?, ?, ?)",
                (name, version, experiment_id, json.dumps(metrics)),
            )
            return cursor.lastrowid
    
    def promote_model(self, model_id, status='production'):
        with self._conn() as conn:
            # Demote current production
            conn.execute(
                "UPDATE models SET status='archived' WHERE status='production' AND name=(SELECT name FROM models WHERE id=?)",
                (model_id,),
            )
            conn.execute("UPDATE models SET status=? WHERE id=?", (status, model_id))
    
    # === Prediction Logging ===
    def log_prediction(self, model_id, input_data, output_data, latency_ms):
        with self._conn() as conn:
            conn.execute(
                "INSERT INTO predictions (model_id, input, output, latency_ms) VALUES (?, ?, ?, ?)",
                (model_id, json.dumps(input_data), json.dumps(output_data), latency_ms),
            )
    
    # === Dashboard ===
    def dashboard(self):
        with self._conn() as conn:
            experiments = conn.execute("SELECT COUNT(*) as c FROM experiments").fetchone()['c']
            models = conn.execute("SELECT COUNT(*) as c FROM models").fetchone()['c']
            predictions = conn.execute("SELECT COUNT(*) as c FROM predictions").fetchone()['c']
            
            return {
                'total_experiments': experiments,
                'total_models': models,
                'total_predictions': predictions,
                'db_size_mb': round(Path(self.db_path).stat().st_size / 1e6, 2),
            }

# pipeline = MLPipeline("ml_pipeline.db")
# pipeline.ingest_data("sensor", {"temp": 25.5, "humidity": 60})
# exp_id = pipeline.start_experiment("xgboost_v1", {"n_estimators": 100, "lr": 0.1})
# pipeline.log_metrics(exp_id, {"accuracy": 0.95, "f1": 0.93})
# model_id = pipeline.register_model("churn_model", "v1.0", exp_id, {"accuracy": 0.95})
"""

    def show_code(self):
        print("=== ML Pipeline ===")
        print(self.CODE[:600])

impl = MLPipelineImpl()
impl.show_code()

Docker Deployment

# docker.py — Docker deployment with Litestream
import json

class DockerDeployment:
    DOCKERFILE = """
# Dockerfile — ML Pipeline with Litestream
FROM python:3.12-slim

# Install Litestream
ADD https://github.com/benbjohnson/litestream/releases/download/v0.3.13/litestream-v0.3.13-linux-amd64.tar.gz /tmp/
RUN tar -C /usr/local/bin -xzf /tmp/litestream-v0.3.13-linux-amd64.tar.gz

# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . /app
WORKDIR /app

# Litestream config
COPY litestream.yml /etc/litestream.yml

# Start: restore DB from S3 → run app with Litestream replication
CMD ["litestream", "replicate", "-exec", "python app.py"]
"""

    COMPOSE = """
# docker-compose.yaml
version: '3.8'
services:
  ml-pipeline:
    build: .
    volumes:
      - ml-data:/data
    environment:
      - AWS_ACCESS_KEY_ID=
      - AWS_SECRET_ACCESS_KEY=
      - LITESTREAM_REPLICA_URL=s3://my-bucket/ml-pipeline
    ports:
      - "8000:8000"

volumes:
  ml-data:
"""

    RESTORE = """
# Restore from backup (disaster recovery)
litestream restore -o /data/ml_pipeline.db s3://my-bucket/ml-pipeline

# Restore to specific point in time
litestream restore -o /data/ml_pipeline.db -timestamp 2026-01-15T10:30:00Z s3://my-bucket/ml-pipeline
"""

    def show_dockerfile(self):
        print("=== Dockerfile ===")
        print(self.DOCKERFILE[:400])

    def show_restore(self):
        print("\n=== Restore Commands ===")
        print(self.RESTORE[:300])

docker = DockerDeployment()
docker.show_dockerfile()
docker.show_restore()

Cost Comparison

# cost.py — Cost comparison: SQLite+Litestream vs managed DB
import json

class CostComparison:
    COMPARISON = {
        "sqlite_litestream": {
            "name": "SQLite + Litestream + S3",
            "compute": "$5-20/month (small VPS)",
            "storage": "$0.50-5/month (S3 backups)",
            "total": "$5-25/month",
            "pros": "ถูกมาก, simple, no vendor lock-in",
            "cons": "Single writer, ไม่ scale horizontally, ต้อง manage เอง",
        },
        "rds_postgres": {
            "name": "AWS RDS PostgreSQL",
            "compute": "$30-200/month (db.t3.small-medium)",
            "storage": "$10-50/month (gp3)",
            "total": "$40-250/month",
            "pros": "Managed, HA, auto-backup, multi-AZ",
            "cons": "แพงกว่า 5-10x, vendor lock-in",
        },
        "supabase": {
            "name": "Supabase (Managed PostgreSQL)",
            "compute": "$0-25/month (free-pro)",
            "storage": "Included",
            "total": "$0-25/month",
            "pros": "Free tier, managed, built-in auth/API",
            "cons": "Shared resources on free tier, limited on pro",
        },
    }

    def show_comparison(self):
        print("=== Cost Comparison ===\n")
        for key, opt in self.COMPARISON.items():
            print(f"[{opt['name']}]")
            print(f"  Total: {opt['total']}")
            print(f"  Pros: {opt['pros']}")
            print()

cost = CostComparison()
cost.show_comparison()

FAQ - คำถามที่พบบ่อย

Q: SQLite เหมาะกับ ML Pipeline จริงไหม?

A: เหมาะสำหรับ small-medium scale: Data < 10GB, single server, single writer predictions < 1000 req/sec (SQLite handles well) ไม่เหมาะเมื่อ: data > 50GB, multiple writers, high concurrency, distributed system ใช้ SQLite: prototyping, small teams, edge ML, personal projects ย้ายไป PostgreSQL: เมื่อ scale เกิน SQLite limits

Q: Litestream มี data loss ไหม?

A: Near-zero RPO: Litestream sync ทุก 1 วินาที (configurable) — data loss สูงสุด ~1 วินาที WAL-based: replicate WAL changes ต่อเนื่อง — ไม่ใช่ periodic backup Recovery: restore จาก S3 ได้ทั้ง latest state และ point-in-time สำคัญ: ต้อง enable WAL mode (PRAGMA journal_mode=WAL) — Litestream ต้องการ WAL

Q: SQLite กับ DuckDB อันไหนดีกว่าสำหรับ ML?

A: SQLite: general-purpose, OLTP-oriented, small footprint, Litestream support DuckDB: analytical (OLAP), columnar storage, fast aggregations, Parquet/CSV native เลือก SQLite: application database, feature store, experiment tracking, prediction logging เลือก DuckDB: data analysis, feature engineering, large dataset processing ใช้ร่วมกัน: SQLite สำหรับ operational data + DuckDB สำหรับ analytics/training

Q: Litestream กับ regular backup ต่างกันอย่างไร?

A: Regular backup (sqlite3 .backup): point-in-time snapshot — RPO = backup interval (hours) Litestream: continuous WAL replication — RPO ≈ 1 second Litestream ดีกว่า: granular recovery, lower RPO, automatic, no downtime during backup Regular backup ดีสำหรับ: simple needs, manual restore OK, no S3 needed แนะนำ: Litestream สำหรับ production + occasional full backup สำหรับ long-term archive

📖 บทความที่เกี่ยวข้อง

Linux io_uring Machine Learning Pipelineอ่านบทความ → SQLite Litestream Technical Debt Managementอ่านบทความ → machine learning reinforcement learning คืออ่านบทความ → SQLite Litestream Message Queue Designอ่านบทความ → SQLite Litestream Monitoring และ Alertingอ่านบทความ →

📚 ดูบทความทั้งหมด →