SiamCafe.net Blog
Technology

Skaffold Dev Batch Processing Pipeline

skaffold dev batch processing pipeline
Skaffold Dev Batch Processing Pipeline | SiamCafe Blog
2025-07-07· อ. บอม — SiamCafe.net· 11,863 คำ

Skaffold Batch Processing

Skaffold Dev Batch Processing Pipeline Kubernetes Job CronJob Argo Workflows Local Dev Loop CI/CD Production

ToolUse CaseComplexitySchedulingBest For
K8s JobSingle batch taskต่ำManual / CI triggerSimple ETL, migration
K8s CronJobScheduled batchต่ำCron expressionDaily reports, cleanup
Argo WorkflowsMulti-step pipelineกลางCron + EventComplex DAG pipelines
Spark on K8sBig data processingสูงAirflow / ArgoTB-scale data
TektonCI/CD + batchกลางTrigger-basedBuild + process pipeline

Skaffold Configuration

# === Skaffold + Batch Job Setup ===

# skaffold.yaml
# apiVersion: skaffold/v4beta6
# kind: Config
# metadata:
#   name: batch-pipeline
# build:
#   artifacts:
#     - image: batch-etl
#       docker:
#         dockerfile: Dockerfile.etl
#     - image: batch-report
#       docker:
#         dockerfile: Dockerfile.report
# deploy:
#   kubectl:
#     manifests:
#       - k8s/etl-job.yaml
#       - k8s/report-cronjob.yaml
# profiles:
#   - name: production
#     build:
#       tagPolicy:
#         sha256: {}
#     deploy:
#       kubectl:
#         manifests:
#           - k8s/production/*.yaml

# k8s/etl-job.yaml
# apiVersion: batch/v1
# kind: Job
# metadata:
#   name: etl-daily
# spec:
#   backoffLimit: 3
#   activeDeadlineSeconds: 3600
#   template:
#     spec:
#       restartPolicy: Never
#       containers:
#         - name: etl
#           image: batch-etl
#           command: ["python", "etl.py"]
#           resources:
#             requests: { cpu: "500m", memory: "1Gi" }
#             limits: { cpu: "2", memory: "4Gi" }
#           env:
#             - name: DB_URL
#               valueFrom:
#                 secretKeyRef: { name: db-secret, key: url }

# Commands
# skaffold dev          # Watch mode, auto rebuild+deploy on change
# skaffold run          # One-time build+deploy (CI/CD)
# skaffold debug        # Deploy with debug ports
# skaffold delete       # Clean up deployed resources
# skaffold render       # Output rendered manifests

from dataclasses import dataclass

@dataclass
class SkaffoldCommand:
    command: str
    use_case: str
    when: str
    flags: str

commands = [
    SkaffoldCommand("skaffold dev", "Watch mode, rebuild on change",
        "Local development", "--port-forward --tail"),
    SkaffoldCommand("skaffold run", "One-time build and deploy",
        "CI/CD pipeline", "--tag=$GIT_SHA --profile=production"),
    SkaffoldCommand("skaffold debug", "Deploy with remote debug",
        "Debugging batch job issues", "--port-forward"),
    SkaffoldCommand("skaffold render", "Output K8s manifests",
        "Review what will be deployed", "--output=rendered.yaml"),
    SkaffoldCommand("skaffold delete", "Clean up resources",
        "After testing, cleanup", ""),
    SkaffoldCommand("skaffold build", "Build images only",
        "CI build step", "--tag=$GIT_SHA --push"),
]

print("=== Skaffold Commands ===")
for c in commands:
    print(f"  [{c.command}] {c.use_case}")
    print(f"    When: {c.when} | Flags: {c.flags}")

Pipeline Orchestration

# === Argo Workflows for Complex Pipelines ===

# argo-workflow.yaml
# apiVersion: argoproj.io/v1alpha1
# kind: Workflow
# metadata:
#   name: batch-pipeline
# spec:
#   entrypoint: main
#   templates:
#     - name: main
#       dag:
#         tasks:
#           - name: extract
#             template: etl-step
#             arguments:
#               parameters: [{name: step, value: extract}]
#           - name: transform
#             template: etl-step
#             dependencies: [extract]
#             arguments:
#               parameters: [{name: step, value: transform}]
#           - name: load
#             template: etl-step
#             dependencies: [transform]
#             arguments:
#               parameters: [{name: step, value: load}]
#           - name: report
#             template: report-step
#             dependencies: [load]
#     - name: etl-step
#       container:
#         image: batch-etl
#         command: [python, etl.py, "{{inputs.parameters.step}}"]
#     - name: report-step
#       container:
#         image: batch-report
#         command: [python, generate_report.py]

@dataclass
class PipelineStage:
    stage: str
    image: str
    resources: str
    timeout: str
    retry: int
    depends_on: str

pipeline = [
    PipelineStage("Extract", "batch-etl",
        "CPU: 1, Memory: 2Gi", "30 min", 3, "None"),
    PipelineStage("Transform", "batch-etl",
        "CPU: 2, Memory: 4Gi", "60 min", 2, "Extract"),
    PipelineStage("Load", "batch-etl",
        "CPU: 1, Memory: 2Gi", "30 min", 3, "Transform"),
    PipelineStage("Report", "batch-report",
        "CPU: 500m, Memory: 1Gi", "15 min", 2, "Load"),
    PipelineStage("Notify", "batch-notify",
        "CPU: 100m, Memory: 128Mi", "5 min", 1, "Report"),
]

print("=== Pipeline Stages ===")
for p in pipeline:
    print(f"  [{p.stage}] Image: {p.image}")
    print(f"    Resources: {p.resources} | Timeout: {p.timeout}")
    print(f"    Retry: {p.retry} | Depends: {p.depends_on}")

Monitoring and Alerts

# === Job Monitoring ===

@dataclass
class JobMetric:
    metric: str
    source: str
    alert: str
    action: str

metrics = [
    JobMetric("kube_job_status_failed", "kube-state-metrics",
        "Job failed after all retries",
        "Check logs, fix code, re-trigger"),
    JobMetric("kube_job_duration_seconds", "kube-state-metrics",
        "Duration > 2x average",
        "Check for data skew, resource limits"),
    JobMetric("kube_job_status_active", "kube-state-metrics",
        "Job active > deadline",
        "Kill job, investigate hang"),
    JobMetric("container_memory_usage_bytes", "cAdvisor",
        "Memory > 80% of limit",
        "Increase limit or optimize code"),
    JobMetric("kube_cronjob_next_schedule_time", "kube-state-metrics",
        "Missed scheduled run",
        "Check CronJob suspend, cluster health"),
]

print("=== Job Metrics ===")
for m in metrics:
    print(f"  [{m.metric}] Source: {m.source}")
    print(f"    Alert: {m.alert}")
    print(f"    Action: {m.action}")

# Best practices
practices = {
    "Idempotent Jobs": "ออกแบบ Job ให้รันซ้ำได้ไม่มีผลข้างเคียง",
    "Resource Limits": "ตั้ง requests และ limits ทุก Job ป้องกัน noisy neighbor",
    "Deadline": "ตั้ง activeDeadlineSeconds ป้องกัน Job ค้าง",
    "Spot Nodes": "ใช้ Spot/Preemptible Node สำหรับ Batch ลด 60-90%",
    "Log Aggregation": "เก็บ Log ใน Loki/ELK ไม่พึ่ง kubectl logs",
    "Cleanup": "ตั้ง ttlSecondsAfterFinished ลบ Job เก่าอัตโนมัติ",
}

print(f"\n\nBest Practices:")
for k, v in practices.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

Skaffold คืออะไร

Google Kubernetes Dev Loop Build Deploy อัตโนมัติ Docker Helm Kustomize File Sync Watch Mode Debug CI/CD Pipeline

Batch Processing บน K8s ทำอย่างไร

Job รันครั้งเดียว CronJob Schedule Argo Workflows Pipeline Spark Big Data Resource Limits Spot Node Retry Deadline

Skaffold กับ Batch Job ใช้ร่วมกันอย่างไร

Dockerfile Job YAML skaffold.yaml Build Deploy skaffold dev Watch Code Change Log Terminal skaffold run CI/CD Production

Monitor Batch Job อย่างไร

kubectl jobs logs Prometheus kube-state-metrics Job Metrics Alert Fail Duration Grafana Dashboard Slack Notification Loki ELK Debug

สรุป

Skaffold Dev Batch Processing Pipeline Kubernetes Job CronJob Argo Workflows Dev Loop CI/CD Monitoring Spot Node Production

📖 บทความที่เกี่ยวข้อง

Skaffold Dev Agile Scrum Kanbanอ่านบทความ → Skaffold Dev Scaling Strategy วิธี Scaleอ่านบทความ → Skaffold Dev AR VR Developmentอ่านบทความ → Skaffold Dev Container Orchestrationอ่านบทความ → Skaffold Dev สำหรับมือใหม่ Step by Stepอ่านบทความ →

📚 ดูบทความทั้งหมด →