SiamCafe.net Blog
Technology

Ollama Local LLM Data Pipeline ETL — สร้าง ETL Pipeline ด้วย LLM บนเครื่อง Local

ollama local llm data pipeline etl
Ollama Local LLM Data Pipeline ETL | SiamCafe Blog
2025-08-14· อ. บอม — SiamCafe.net· 1,626 คำ

Ollama คืออะไรและใช้รัน LLM แบบ Local อย่างไร

Ollama เป็นเครื่องมือ open source สำหรับรัน Large Language Models (LLMs) บนเครื่อง local โดยไม่ต้องส่งข้อมูลไปยัง cloud service รองรับ models หลายตัวเช่น Llama 3, Mistral, Gemma, Phi-3, CodeLlama, Qwen และอีกมากมาย Ollama ทำให้การ setup LLM ง่ายมากเพียงคำสั่งเดียว

ข้อดีของการใช้ Ollama สำหรับ Data Pipeline คือ Privacy ที่ข้อมูลไม่ออกจากเครื่อง เหมาะสำหรับ sensitive data เช่น PII, financial data, medical records ไม่มีค่า API calls ลด latency เพราะไม่ต้อง network round-trip และ customizable ที่สร้าง custom models ด้วย Modelfile ได้

ใน ETL pipeline Ollama ถูกใช้สำหรับ Transform step เช่น text classification, entity extraction, summarization, sentiment analysis, data cleaning และ schema mapping โดยใช้ LLM เป็น intelligent transformer ที่เข้าใจ context ของข้อมูล

Hardware requirements ขึ้นอยู่กับขนาด model โดย 7B models ต้องการ RAM 8GB ขึ้นไป 13B models ต้องการ 16GB 70B models ต้องการ 64GB สำหรับ GPU acceleration รองรับ NVIDIA CUDA และ Apple Metal

ติดตั้ง Ollama และดาวน์โหลด Models

ขั้นตอนการติดตั้งและเตรียม models

# ติดตั้ง Ollama
# macOS
brew install ollama

# Linux
curl -fsSL https://ollama.com/install.sh | sh

# Windows
# ดาวน์โหลดจาก https://ollama.com/download

# เริ่ม Ollama server
ollama serve

# ดาวน์โหลดและรัน models
ollama pull llama3:8b
ollama pull mistral:7b
ollama pull gemma2:9b
ollama pull nomic-embed-text  # สำหรับ embeddings

# ตรวจสอบ models ที่มี
ollama list

# ทดสอบ model
ollama run llama3:8b "สรุปข้อความนี้: Ollama เป็นเครื่องมือรัน LLM บน local"

# === Ollama API ===
# Default: http://localhost:11434

# Generate
curl http://localhost:11434/api/generate -d '{
  "model": "llama3:8b",
  "prompt": "Extract entities from: John Smith bought 5 items at Amazon",
  "stream": false
}'

# Chat
curl http://localhost:11434/api/chat -d '{
  "model": "llama3:8b",
  "messages": [
    {"role": "system", "content": "You are a data extraction assistant. Output JSON only."},
    {"role": "user", "content": "Extract: Order #12345 from Bangkok, total 1500 THB"}
  ],
  "stream": false
}'

# Embeddings
curl http://localhost:11434/api/embeddings -d '{
  "model": "nomic-embed-text",
  "prompt": "Machine learning pipeline"
}'

# === Custom Modelfile สำหรับ ETL ===
# Modelfile.etl
# FROM llama3:8b
# SYSTEM """You are a data extraction and transformation assistant.
# Always output valid JSON. Never add explanations outside JSON.
# Extract structured data from unstructured text accurately."""
# PARAMETER temperature 0.1
# PARAMETER top_p 0.9
# PARAMETER num_ctx 4096

# สร้าง custom model
# ollama create etl-assistant -f Modelfile.etl
# ollama run etl-assistant "Extract: ..."

สร้าง Data Pipeline ETL ด้วย Ollama

โค้ด Python สำหรับ ETL pipeline ที่ใช้ Ollama

#!/usr/bin/env python3
# ollama_etl.py — Data Pipeline ETL with Ollama Local LLM
import requests
import json
import pandas as pd
import logging
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("ollama_etl")

class OllamaClient:
    def __init__(self, base_url="http://localhost:11434", model="llama3:8b"):
        self.base_url = base_url
        self.model = model
        self.session = requests.Session()
    
    def generate(self, prompt, system=None, temperature=0.1):
        payload = {
            "model": self.model,
            "prompt": prompt,
            "stream": False,
            "options": {"temperature": temperature, "num_ctx": 4096},
        }
        if system:
            payload["system"] = system
        
        resp = self.session.post(f"{self.base_url}/api/generate", json=payload, timeout=120)
        resp.raise_for_status()
        return resp.json()["response"]
    
    def chat(self, messages, temperature=0.1):
        payload = {
            "model": self.model,
            "messages": messages,
            "stream": False,
            "options": {"temperature": temperature},
        }
        resp = self.session.post(f"{self.base_url}/api/chat", json=payload, timeout=120)
        resp.raise_for_status()
        return resp.json()["message"]["content"]
    
    def embed(self, text, model="nomic-embed-text"):
        payload = {"model": model, "prompt": text}
        resp = self.session.post(f"{self.base_url}/api/embeddings", json=payload, timeout=30)
        resp.raise_for_status()
        return resp.json()["embedding"]

class ETLPipeline:
    def __init__(self, ollama_model="llama3:8b"):
        self.client = OllamaClient(model=ollama_model)
        self.stats = {"processed": 0, "success": 0, "failed": 0}
    
    def extract_entities(self, text):
        system = """You are an entity extraction system. Extract all entities and return valid JSON.
Format: {"persons": [], "organizations": [], "locations": [], "dates": [], "amounts": []}"""
        
        try:
            response = self.client.chat([
                {"role": "system", "content": system},
                {"role": "user", "content": f"Extract entities from:\n{text}"},
            ])
            return json.loads(response)
        except (json.JSONDecodeError, Exception) as e:
            logger.warning(f"Entity extraction failed: {e}")
            return {"persons": [], "organizations": [], "locations": [], "dates": [], "amounts": []}
    
    def classify_text(self, text, categories):
        system = f"""Classify the text into one of these categories: {', '.join(categories)}.
Return JSON: {{"category": "chosen_category", "confidence": 0.0-1.0}}"""
        
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": text},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return {"category": "unknown", "confidence": 0.0}
    
    def clean_and_normalize(self, text):
        system = """Clean and normalize the text. Fix typos, standardize formats.
Return JSON: {"cleaned_text": "...", "changes": ["list of changes made"]}"""
        
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": f"Clean this text:\n{text}"},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return {"cleaned_text": text, "changes": []}
    
    def summarize(self, text, max_words=50):
        prompt = f"Summarize in {max_words} words or less. Output only the summary:\n\n{text}"
        return self.client.generate(prompt).strip()
    
    def transform_to_schema(self, raw_text, target_schema):
        system = f"""Transform the input text into this JSON schema:
{json.dumps(target_schema, indent=2)}
Output ONLY valid JSON matching the schema."""
        
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": raw_text},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return None
    
    def process_batch(self, texts, operation="classify", **kwargs):
        results = []
        total = len(texts)
        
        for i, text in enumerate(texts):
            try:
                if operation == "classify":
                    result = self.classify_text(text, kwargs.get("categories", []))
                elif operation == "entities":
                    result = self.extract_entities(text)
                elif operation == "summarize":
                    result = self.summarize(text)
                elif operation == "schema":
                    result = self.transform_to_schema(text, kwargs.get("schema", {}))
                else:
                    result = None
                
                results.append({"input": text, "output": result, "status": "success"})
                self.stats["success"] += 1
            except Exception as e:
                results.append({"input": text, "output": None, "status": "error", "error": str(e)})
                self.stats["failed"] += 1
            
            self.stats["processed"] += 1
            if (i + 1) % 10 == 0:
                logger.info(f"Progress: {i+1}/{total}")
        
        return results

# ใช้งาน
pipeline = ETLPipeline(ollama_model="llama3:8b")

# Entity extraction
entities = pipeline.extract_entities(
    "บริษัท ABC จำกัด ที่กรุงเทพ สั่งซื้อสินค้า 50,000 บาท วันที่ 15 มกราคม 2024"
)
print(json.dumps(entities, indent=2, ensure_ascii=False))

Text Processing Pipeline สำหรับ Structured Data Extraction

Pipeline สำหรับแปลง unstructured text เป็น structured data

#!/usr/bin/env python3
# structured_extraction.py — Extract Structured Data from Documents
import json
import pandas as pd
from ollama_etl import OllamaClient
from pathlib import Path

class DocumentProcessor:
    def __init__(self):
        self.client = OllamaClient(model="llama3:8b")
    
    def process_invoice(self, invoice_text):
        schema = {
            "invoice_number": "string",
            "date": "YYYY-MM-DD",
            "vendor": {"name": "string", "address": "string", "tax_id": "string"},
            "customer": {"name": "string", "address": "string"},
            "items": [{"description": "string", "quantity": "number", "unit_price": "number", "total": "number"}],
            "subtotal": "number",
            "tax": "number",
            "total": "number",
            "currency": "string",
        }
        
        system = f"""Extract invoice data into this exact JSON structure:
{json.dumps(schema, indent=2)}
Use null for missing fields. Numbers should be numeric, not strings."""
        
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": f"Extract from this invoice:\n{invoice_text}"},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return None
    
    def process_resume(self, resume_text):
        system = """Extract resume data into JSON:
{"name": "", "email": "", "phone": "", "summary": "",
 "experience": [{"company": "", "title": "", "start": "", "end": "", "description": ""}],
 "education": [{"institution": "", "degree": "", "year": ""}],
 "skills": []}"""
        
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": resume_text},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return None
    
    def process_log_entries(self, log_text):
        system = """Parse log entries into structured JSON array.
Each entry: {"timestamp": "", "level": "", "service": "", "message": "", "error_code": null}"""
        
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": f"Parse these logs:\n{log_text}"},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return []
    
    def sentiment_analysis(self, reviews):
        system = """Analyze sentiment of each review. Return JSON array:
[{"text": "original text", "sentiment": "positive|negative|neutral", "score": 0.0-1.0, "aspects": []}]"""
        
        combined = "\n---\n".join(reviews)
        response = self.client.chat([
            {"role": "system", "content": system},
            {"role": "user", "content": combined},
        ])
        
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            return []
    
    def batch_process_files(self, input_dir, output_file, doc_type="invoice"):
        results = []
        files = list(Path(input_dir).glob("*.txt"))
        
        for i, file_path in enumerate(files):
            text = file_path.read_text(encoding="utf-8")
            
            if doc_type == "invoice":
                data = self.process_invoice(text)
            elif doc_type == "resume":
                data = self.process_resume(text)
            elif doc_type == "log":
                data = self.process_log_entries(text)
            else:
                data = None
            
            if data:
                results.append({"file": file_path.name, "data": data})
            
            print(f"Processed {i+1}/{len(files)}: {file_path.name}")
        
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        
        print(f"Results saved to {output_file}")
        return results

# ใช้งาน
processor = DocumentProcessor()

# Process invoice
invoice = """
บริษัท ไทยซอฟต์แวร์ จำกัด
ใบแจ้งหนี้เลขที่: INV-2024-0042
วันที่: 15 มกราคม 2567
ลูกค้า: บริษัท ABC จำกัด
รายการ:
1. Software License x 5 @ 10,000 = 50,000
2. Support 1 year x 1 @ 20,000 = 20,000
รวม: 70,000 บาท
VAT 7%: 4,900 บาท
รวมทั้งสิ้น: 74,900 บาท
"""
result = processor.process_invoice(invoice)
print(json.dumps(result, indent=2, ensure_ascii=False))

Batch Processing และ Performance Optimization

เทคนิคสำหรับ processing ข้อมูลจำนวนมาก

#!/usr/bin/env python3
# batch_optimizer.py — Optimized Batch Processing with Ollama
import asyncio
import aiohttp
import json
import time
from typing import List
from dataclasses import dataclass, field

@dataclass
class BatchConfig:
    max_concurrent: int = 4
    timeout: int = 120
    retry_count: int = 3
    retry_delay: float = 2.0
    chunk_size: int = 5

class AsyncOllamaProcessor:
    def __init__(self, base_url="http://localhost:11434", model="llama3:8b", config=None):
        self.base_url = base_url
        self.model = model
        self.config = config or BatchConfig()
        self.stats = {"total": 0, "success": 0, "failed": 0, "retries": 0}
    
    async def _request(self, session, prompt, system=None):
        payload = {
            "model": self.model,
            "prompt": prompt,
            "stream": False,
            "options": {"temperature": 0.1, "num_ctx": 4096},
        }
        if system:
            payload["system"] = system
        
        for attempt in range(self.config.retry_count):
            try:
                async with session.post(
                    f"{self.base_url}/api/generate",
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=self.config.timeout),
                ) as resp:
                    if resp.status == 200:
                        data = await resp.json()
                        self.stats["success"] += 1
                        return data["response"]
                    else:
                        self.stats["retries"] += 1
            except (aiohttp.ClientError, asyncio.TimeoutError):
                self.stats["retries"] += 1
                if attempt < self.config.retry_count - 1:
                    await asyncio.sleep(self.config.retry_delay * (attempt + 1))
        
        self.stats["failed"] += 1
        return None
    
    async def process_batch(self, items, system_prompt):
        semaphore = asyncio.Semaphore(self.config.max_concurrent)
        results = []
        
        async with aiohttp.ClientSession() as session:
            async def process_one(idx, item):
                async with semaphore:
                    self.stats["total"] += 1
                    result = await self._request(session, item, system_prompt)
                    return idx, result
            
            tasks = [process_one(i, item) for i, item in enumerate(items)]
            
            for coro in asyncio.as_completed(tasks):
                idx, result = await coro
                results.append((idx, result))
                
                done = len(results)
                if done % 10 == 0:
                    print(f"Progress: {done}/{len(items)} "
                          f"(success={self.stats['success']}, failed={self.stats['failed']})")
        
        results.sort(key=lambda x: x[0])
        return [r[1] for r in results]
    
    def run_batch(self, items, system_prompt):
        start = time.time()
        results = asyncio.run(self.process_batch(items, system_prompt))
        elapsed = time.time() - start
        
        print(f"\nBatch complete in {elapsed:.1f}s")
        print(f"  Total: {self.stats['total']}")
        print(f"  Success: {self.stats['success']}")
        print(f"  Failed: {self.stats['failed']}")
        print(f"  Retries: {self.stats['retries']}")
        print(f"  Avg time: {elapsed/max(len(items),1):.1f}s per item")
        
        return results

# Performance Tips:
# 1. ใช้ model ขนาดเล็ก (7B-8B) สำหรับ simple tasks
# 2. ลด num_ctx ถ้า input ไม่ยาว
# 3. ใช้ temperature 0 สำหรับ deterministic output
# 4. Batch similar tasks together
# 5. ใช้ GPU acceleration (NVIDIA CUDA)
# 6. Pre-process text ก่อนส่งให้ LLM (ตัดส่วนที่ไม่จำเป็น)

# ตัวอย่างการใช้งาน
if __name__ == "__main__":
    processor = AsyncOllamaProcessor(
        model="llama3:8b",
        config=BatchConfig(max_concurrent=2, timeout=120)
    )
    
    texts = [
        "สั่งซื้อสินค้า 5 ชิ้น ราคา 1,500 บาท จัดส่งถึงกรุงเทพ",
        "ยกเลิกคำสั่งซื้อ ORD-001 เนื่องจากสินค้าไม่ตรงตามที่สั่ง",
        "สอบถามสถานะการจัดส่ง tracking number TH12345",
    ]
    
    system = """Classify the text into: order, cancellation, inquiry, complaint.
Return JSON: {"category": "...", "intent": "...", "priority": "high|medium|low"}"""
    
    results = processor.run_batch(texts, system)

รวม Ollama กับ Airflow Data Pipeline

Airflow DAG สำหรับ ETL pipeline ที่ใช้ Ollama

#!/usr/bin/env python3
# ollama_airflow_dag.py — Airflow DAG with Ollama LLM Processing
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import json
import pandas as pd
import requests

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

OLLAMA_URL = "http://ollama:11434"
MODEL = "llama3:8b"

def check_ollama_health(**kwargs):
    resp = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
    models = [m["name"] for m in resp.json().get("models", [])]
    if MODEL not in models and f"{MODEL}:latest" not in models:
        raise ValueError(f"Model {MODEL} not found. Available: {models}")
    print(f"Ollama healthy. Models: {models}")

def extract_raw_data(**kwargs):
    df = pd.read_csv("/data/raw/customer_feedback.csv")
    df.to_parquet("/tmp/feedback_raw.parquet", index=False)
    kwargs["ti"].xcom_push(key="row_count", value=len(df))
    print(f"Extracted {len(df)} rows")

def transform_with_llm(**kwargs):
    df = pd.read_parquet("/tmp/feedback_raw.parquet")
    
    system = """Analyze customer feedback. Return JSON:
{"sentiment": "positive|negative|neutral", "category": "product|service|delivery|price",
 "key_issues": [], "urgency": "high|medium|low"}"""
    
    results = []
    for idx, row in df.iterrows():
        try:
            resp = requests.post(f"{OLLAMA_URL}/api/generate", json={
                "model": MODEL,
                "prompt": f"Analyze:\n{row['feedback_text']}",
                "system": system,
                "stream": False,
                "options": {"temperature": 0.1},
            }, timeout=120)
            
            analysis = json.loads(resp.json()["response"])
            analysis["feedback_id"] = row["id"]
            analysis["original_text"] = row["feedback_text"]
            results.append(analysis)
        except Exception as e:
            results.append({
                "feedback_id": row["id"],
                "sentiment": "unknown",
                "error": str(e),
            })
        
        if (idx + 1) % 50 == 0:
            print(f"Processed {idx+1}/{len(df)}")
    
    result_df = pd.DataFrame(results)
    result_df.to_parquet("/tmp/feedback_analyzed.parquet", index=False)
    
    stats = result_df["sentiment"].value_counts().to_dict()
    kwargs["ti"].xcom_push(key="sentiment_stats", value=stats)
    print(f"Transform complete: {stats}")

def load_to_warehouse(**kwargs):
    df = pd.read_parquet("/tmp/feedback_analyzed.parquet")
    
    from sqlalchemy import create_engine
    engine = create_engine("postgresql://user:pass@warehouse:5432/analytics")
    df.to_sql("customer_feedback_analyzed", engine, if_exists="append", index=False)
    
    print(f"Loaded {len(df)} rows to warehouse")

def generate_report(**kwargs):
    stats = kwargs["ti"].xcom_pull(key="sentiment_stats", task_ids="transform")
    row_count = kwargs["ti"].xcom_pull(key="row_count", task_ids="extract")
    
    report = f"""
    ETL Report — {datetime.now().strftime('%Y-%m-%d')}
    Total records: {row_count}
    Sentiment distribution: {json.dumps(stats, indent=2)}
    """
    print(report)

with DAG(
    "ollama_feedback_etl",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["etl", "llm", "ollama"],
) as dag:

    health = PythonOperator(task_id="check_ollama", python_callable=check_ollama_health)
    extract = PythonOperator(task_id="extract", python_callable=extract_raw_data)
    transform = PythonOperator(task_id="transform", python_callable=transform_with_llm,
                               execution_timeout=timedelta(hours=4))
    load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
    report = PythonOperator(task_id="report", python_callable=generate_report)

    health >> extract >> transform >> load >> report

FAQ คำถามที่พบบ่อย

Q: Ollama กับ OpenAI API ใช้อันไหนดีกว่าสำหรับ ETL?

A: Ollama เหมาะเมื่อข้อมูลเป็น sensitive data ที่ห้ามส่งออก cloud, ต้องการลดค่าใช้จ่าย API, ต้องการ latency ต่ำ และทำงาน offline ได้ OpenAI เหมาะเมื่อต้องการ model quality สูงสุด (GPT-4), ไม่อยากจัดการ hardware และ data volume น้อย สำหรับ production ETL ที่ process ข้อมูลมาก Ollama คุ้มค่ากว่ามาก

Q: Model ไหนเหมาะกับ ETL tasks?

A: Llama 3 8B เหมาะสำหรับงานทั่วไปเช่น classification, entity extraction Mistral 7B เร็วกว่าเล็กน้อย เหมาะสำหรับ simple tasks Gemma 2 9B ดีสำหรับ multilingual tasks รวมถึงภาษาไทย สำหรับงาน coding ใช้ CodeLlama สำหรับ embeddings ใช้ nomic-embed-text เลือก model เล็กที่สุดที่ให้ผลลัพธ์ถูกต้องเพื่อ throughput สูงสุด

Q: Ollama รองรับ concurrent requests กี่ตัว?

A: Ollama รองรับ concurrent requests ได้ แต่ performance ขึ้นอยู่กับ GPU memory ถ้า GPU memory เพียงพอ Ollama จะ process parallel ได้ สำหรับ CPU-only ควรจำกัดที่ 1-2 concurrent requests สำหรับ GPU ที่มี VRAM 16GB ขึ้นไปสามารถ process 2-4 requests พร้อมกัน ใช้ OLLAMA_NUM_PARALLEL environment variable กำหนดได้

Q: ผลลัพธ์จาก LLM ไม่เป็น JSON ที่ถูกต้องทำอย่างไร?

A: ใช้ temperature 0 หรือต่ำมาก (0.1) เพื่อลด randomness ใส่ system prompt ที่ชัดเจนว่าต้อง output JSON only ให้ตัวอย่าง JSON ใน prompt (few-shot) ใช้ retry logic ถ้า parse ไม่ได้ สร้าง custom model ด้วย Modelfile ที่ tune สำหรับ JSON output และใช้ json_repair library สำหรับ fix JSON ที่มี errors เล็กน้อย

📖 บทความที่เกี่ยวข้อง

Ollama Local LLM Microservices Architectureอ่านบทความ → Ollama Local LLM Message Queue Designอ่านบทความ → Ollama Local LLM Chaos Engineeringอ่านบทความ → Ollama Local LLM Container Orchestrationอ่านบทความ → Ollama Local LLM MLOps Workflowอ่านบทความ →

📚 ดูบทความทั้งหมด →