ในโลกของ Distributed Systems การจัดการ Business Process ที่มีหลายขั้นตอน เกี่ยวข้องกับหลาย Services และต้องรับมือกับ Failure ที่อาจเกิดขึ้นได้ทุกจุด เป็นหนึ่งในปัญหาที่ยากที่สุดของ Software Engineering ระบบ Payment ที่ต้องตัดเงิน อัปเดต Inventory แจ้ง Shipping และส่ง Email ยืนยัน ถ้าขั้นตอนใดขั้นตอนหนึ่งล้มเหลว คุณจะจัดการอย่างไร?
คำตอบของปัญหานี้คือ Workflow Orchestration และในปี 2026 เครื่องมือที่โดดเด่นที่สุดในสาขานี้คือ Temporal.io บทความนี้จะสอนคุณทุกอย่างเกี่ยวกับ Temporal ตั้งแต่แนวคิดพื้นฐาน สถาปัตยกรรม การเขียน Workflow ด้วย Go, TypeScript, Python ไปจนถึง Saga Pattern, Signals, Queries และการ Deploy ใน Production
Workflow Orchestration คืออะไร?
Workflow Orchestration คือการจัดการลำดับขั้นตอนการทำงาน (Steps) ของ Business Process ที่ซับซ้อน โดยมีระบบกลางคอยควบคุมว่าขั้นตอนไหนต้องทำก่อน-หลัง ขั้นตอนไหนทำพร้อมกันได้ และถ้าขั้นตอนใดล้มเหลวจะจัดการอย่างไร Orchestrator เป็นเหมือนวาทยกรของวงออร์เคสตรา ที่สั่งให้เครื่องดนตรีแต่ละชิ้นเล่นในจังหวะที่ถูกต้อง
Workflow Orchestration vs วิธีแบบเดิม
ก่อนที่จะมี Workflow Orchestration นักพัฒนามักใช้วิธีเหล่านี้จัดการ Business Process ที่ซับซ้อน แต่ละวิธีมีข้อจำกัดที่ชัดเจน
| วิธี | ข้อดี | ข้อจำกัด |
|---|---|---|
| Message Queue (RabbitMQ, Kafka) | Decouple Services, Scale ได้ดี | ไม่มี State Machine, ไม่เห็นภาพรวมของ Process, จัดการ Failure ยาก |
| Cron Jobs | ง่าย ตั้ง Schedule ได้ | ไม่เหมาะกับ Long-running Process, ไม่มี Retry Logic, ไม่รู้สถานะของ Step ก่อนหน้า |
| Database + Polling | ควบคุมได้ทุกอย่าง | ต้องเขียน State Machine เอง, Race Conditions, Scaling ยาก |
| Event Choreography | Loose Coupling สูง | เมื่อระบบใหญ่ขึ้น ยากที่จะเข้าใจ Flow ทั้งหมด, Debug ยากมาก |
Workflow Orchestration แก้ปัญหาเหล่านี้ทั้งหมดโดยให้คุณเขียน Business Logic เป็น Code ธรรมดา (ไม่ใช่ YAML หรือ JSON) ที่อ่านง่าย ทดสอบได้ และมี Built-in Retry, Timeout, Error Handling, State Persistence โดยไม่ต้องจัดการเอง
Temporal.io คืออะไร?
Temporal คือ Open Source Workflow Orchestration Platform ที่สร้างโดยทีมที่เคยพัฒนา Cadence (ระบบ Workflow ภายในของ Uber) Temporal ออกแบบมาเพื่อให้นักพัฒนาเขียน Distributed Application ที่ทนทานต่อ Failure ได้โดยไม่ต้องจัดการ Infrastructure ที่ซับซ้อนเอง
แนวคิดหลักของ Temporal คือ "Durable Execution" หมายความว่า Code ของคุณจะทำงานจนเสร็จเสมอ ไม่ว่าจะเกิด Server Crash, Network Failure หรือ Process Restart ก็ตาม Temporal จะจำสถานะของ Workflow ทุกขั้นตอนและทำงานต่อจากจุดที่หยุดไป
ใครใช้ Temporal ในปี 2026?
- Netflix — จัดการ Media Encoding Pipelines
- Stripe — Payment Processing Workflows
- Snap (Snapchat) — Content Moderation Pipelines
- Datadog — Infrastructure Provisioning
- HashiCorp — Internal Automation
- DoorDash — Order Fulfillment Workflows
- Box — File Processing Pipelines
สถาปัตยกรรมของ Temporal
Temporal ประกอบด้วยส่วนสำคัญ 3 ส่วน ซึ่งทำงานร่วมกันเพื่อให้ Workflow Execution มีความทนทาน
1. Temporal Server
เป็นหัวใจของระบบ ทำหน้าที่เก็บ State ของ Workflow ทั้งหมด จัดคิวงาน (Task Queues) จัดการ Timer และ Schedule และบันทึก Event History ของทุก Workflow Execution Temporal Server ประกอบด้วย Components ย่อยดังนี้
- Frontend Service — รับ API Calls จาก Client และ Workers
- History Service — จัดการ Workflow State และ Event History
- Matching Service — จับคู่ Tasks กับ Workers ผ่าน Task Queues
- Worker Service — Internal Worker สำหรับ System Workflows
Temporal Server ต้องการ Database สำหรับเก็บ State โดยรองรับ PostgreSQL, MySQL และ Apache Cassandra สำหรับ Scale ขนาดใหญ่ นอกจากนี้ยังใช้ Elasticsearch สำหรับ Advanced Visibility (ค้นหา Workflow ตาม Custom Attributes)
2. Workers
Workers คือ Application ที่คุณเขียนขึ้นเอง ทำหน้าที่ Execute Workflow Logic และ Activity Logic จริงๆ Workers เชื่อมต่อกับ Temporal Server ผ่าน gRPC แล้ว Poll งานจาก Task Queue เมื่อได้งาน ก็จะ Execute แล้วส่งผลลัพธ์กลับ
จุดสำคัญคือ Workers เป็น Stateless ถ้า Worker ตาย งานจะถูกส่งไปให้ Worker ตัวอื่นทำต่อ คุณสามารถ Scale Workers ได้โดยอิสระ เพิ่มเครื่องใหม่ก็แค่รัน Worker Process เพิ่ม
3. SDKs (Client Libraries)
Temporal มี Official SDKs สำหรับหลายภาษา ซึ่งแต่ละตัวเป็น First-class Citizen
- Go SDK — ตัวหลัก ฟีเจอร์ครบที่สุด
- TypeScript SDK — นิยมมากสำหรับ Node.js Developers
- Python SDK — ใช้ async/await เหมาะกับ Data Pipeline
- Java SDK — สำหรับ Enterprise
- .NET SDK — สำหรับ C# Developers
# ติดตั้ง Temporal CLI (Development Server)
# macOS
brew install temporal
# Linux / Windows
curl -sSf https://temporal.download/cli.sh | sh
# รัน Development Server
temporal server start-dev
# เปิด Temporal UI
# http://localhost:8233
Workflow vs Activity — แนวคิดหลักของ Temporal
ใน Temporal มี 2 แนวคิดสำคัญที่คุณต้องเข้าใจอย่างลึกซึ้งก่อนเริ่มเขียน Code
Workflow
Workflow คือ Function ที่กำหนด "ลำดับขั้นตอน" ของ Business Process ทั้งหมด เปรียบเสมือนแผนผังการทำงาน Workflow จะเรียก Activities ตามลำดับ ตัดสินใจว่าจะทำอะไรต่อตาม Condition รอ Signals จากภายนอก และจัดการ Error
Workflow ต้องเป็น Deterministic หมายความว่าถ้ารัน Workflow ด้วย Input เดียวกัน ต้องได้ผลลัพธ์เดียวกันทุกครั้ง เพราะ Temporal ใช้ Event Sourcing ภายใน — เมื่อ Worker Restart ระบบจะ Replay Events ทั้งหมดเพื่อกู้คืน State ถ้า Code ไม่ Deterministic การ Replay จะให้ผลลัพธ์ผิดพลาด
Activity
Activity คือ Function ที่ทำ "งานจริง" เช่น เรียก API ภายนอก Query Database ส่ง Email อ่านเขียนไฟล์ หรือทำ Computation ที่หนัก Activity ไม่ต้อง Deterministic เพราะ Temporal จะไม่ Replay Activity แต่จะใช้ผลลัพธ์ที่บันทึกไว้แทน
เขียน Workflow ด้วย Go SDK
Go เป็นภาษาที่ Temporal SDK รองรับดีที่สุดและเป็น First-class SDK มาดูตัวอย่างการเขียน Workflow สำหรับ Order Processing
// workflows.go
package app
import (
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// OrderWorkflow จัดการ Process ทั้งหมดของการสั่งซื้อสินค้า
func OrderWorkflow(ctx workflow.Context, order Order) (OrderResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Starting order workflow", "orderID", order.ID)
// กำหนด Activity Options (retry, timeout)
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
})
// Step 1: ตรวจสอบ Inventory
var inventoryResult InventoryResult
err := workflow.ExecuteActivity(actCtx, CheckInventory, order).Get(ctx, &inventoryResult)
if err != nil {
return OrderResult{}, err
}
if !inventoryResult.Available {
return OrderResult{Status: "out_of_stock"}, nil
}
// Step 2: ตัดเงิน
var paymentResult PaymentResult
err = workflow.ExecuteActivity(actCtx, ProcessPayment, order).Get(ctx, &paymentResult)
if err != nil {
// ถ้าตัดเงินไม่สำเร็จ ปล่อย Inventory กลับ
_ = workflow.ExecuteActivity(actCtx, ReleaseInventory, order).Get(ctx, nil)
return OrderResult{}, err
}
// Step 3: จัดส่ง
var shippingResult ShippingResult
err = workflow.ExecuteActivity(actCtx, ArrangeShipping, order).Get(ctx, &shippingResult)
if err != nil {
// ถ้าจัดส่งไม่ได้ คืนเงิน + ปล่อย Inventory
_ = workflow.ExecuteActivity(actCtx, RefundPayment, paymentResult).Get(ctx, nil)
_ = workflow.ExecuteActivity(actCtx, ReleaseInventory, order).Get(ctx, nil)
return OrderResult{}, err
}
// Step 4: ส่ง Notification
_ = workflow.ExecuteActivity(actCtx, SendConfirmationEmail, order).Get(ctx, nil)
return OrderResult{
Status: "completed",
TrackingID: shippingResult.TrackingID,
}, nil
}
เขียน Activities
// activities.go
package app
import (
"context"
"fmt"
)
func CheckInventory(ctx context.Context, order Order) (InventoryResult, error) {
// เรียก Inventory Service API
available, err := inventoryClient.Check(order.ProductID, order.Quantity)
if err != nil {
return InventoryResult{}, fmt.Errorf("inventory check failed: %w", err)
}
return InventoryResult{Available: available}, nil
}
func ProcessPayment(ctx context.Context, order Order) (PaymentResult, error) {
// เรียก Payment Gateway (Stripe, etc.)
chargeID, err := paymentClient.Charge(order.CustomerID, order.Amount)
if err != nil {
return PaymentResult{}, fmt.Errorf("payment failed: %w", err)
}
return PaymentResult{ChargeID: chargeID}, nil
}
func ArrangeShipping(ctx context.Context, order Order) (ShippingResult, error) {
// เรียก Shipping Provider API
trackingID, err := shippingClient.CreateShipment(order)
if err != nil {
return ShippingResult{}, fmt.Errorf("shipping failed: %w", err)
}
return ShippingResult{TrackingID: trackingID}, nil
}
func RefundPayment(ctx context.Context, payment PaymentResult) error {
return paymentClient.Refund(payment.ChargeID)
}
func ReleaseInventory(ctx context.Context, order Order) error {
return inventoryClient.Release(order.ProductID, order.Quantity)
}
func SendConfirmationEmail(ctx context.Context, order Order) error {
return emailClient.Send(order.CustomerEmail, "Order Confirmed", order.ID)
}
ตั้งค่า Worker
// worker/main.go
package main
import (
"log"
app "myapp/workflows"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
// เชื่อมต่อ Temporal Server
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatal("Unable to create client", err)
}
defer c.Close()
// สร้าง Worker ที่ฟัง Task Queue "order-processing"
w := worker.New(c, "order-processing", worker.Options{})
// Register Workflows และ Activities
w.RegisterWorkflow(app.OrderWorkflow)
w.RegisterActivity(app.CheckInventory)
w.RegisterActivity(app.ProcessPayment)
w.RegisterActivity(app.ArrangeShipping)
w.RegisterActivity(app.RefundPayment)
w.RegisterActivity(app.ReleaseInventory)
w.RegisterActivity(app.SendConfirmationEmail)
// เริ่ม Worker
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatal("Unable to start worker", err)
}
}
เขียน Workflow ด้วย TypeScript SDK
TypeScript SDK ใช้ V8 Isolates เพื่อรับประกัน Determinism โดยอัตโนมัติ ทำให้เขียนได้สะดวกมาก
// src/workflows.ts
import { proxyActivities, sleep, defineSignal, setHandler,
defineQuery, condition } from '@temporalio/workflow';
import type * as activities from './activities';
const { checkInventory, processPayment, arrangeShipping,
refundPayment, releaseInventory, sendConfirmationEmail
} = proxyActivities<typeof activities>({
startToCloseTimeout: '30s',
retry: {
initialInterval: '1s',
backoffCoefficient: 2,
maximumInterval: '1m',
maximumAttempts: 5,
},
});
// Define Signals and Queries
export const cancelSignal = defineSignal('cancel');
export const statusQuery = defineQuery<string>('status');
export async function orderWorkflow(order: Order): Promise<OrderResult> {
let status = 'started';
let cancelled = false;
// จัดการ Signal
setHandler(cancelSignal, () => { cancelled = true; });
// จัดการ Query
setHandler(statusQuery, () => status);
// Step 1: ตรวจสอบ Inventory
status = 'checking_inventory';
const inventory = await checkInventory(order);
if (!inventory.available) {
return { status: 'out_of_stock' };
}
// ตรวจสอบว่าถูก Cancel หรือไม่
if (cancelled) return { status: 'cancelled' };
// Step 2: ตัดเงิน
status = 'processing_payment';
const payment = await processPayment(order);
// Step 3: จัดส่ง
status = 'arranging_shipping';
const shipping = await arrangeShipping(order);
// Step 4: แจ้งเตือน
status = 'sending_notification';
await sendConfirmationEmail(order);
status = 'completed';
return { status: 'completed', trackingId: shipping.trackingId };
}
// src/activities.ts
export async function checkInventory(order: Order): Promise<InventoryResult> {
const response = await fetch(
`https://inventory-api.internal/check/${order.productId}`
);
const data = await response.json();
return { available: data.quantity >= order.quantity };
}
export async function processPayment(order: Order): Promise<PaymentResult> {
const stripe = new Stripe(process.env.STRIPE_KEY!);
const charge = await stripe.charges.create({
amount: order.amount,
currency: 'thb',
customer: order.customerId,
});
return { chargeId: charge.id };
}
export async function arrangeShipping(order: Order): Promise<ShippingResult> {
// เรียก Shipping API
const result = await shippingClient.createShipment(order);
return { trackingId: result.trackingId };
}
export async function sendConfirmationEmail(order: Order): Promise<void> {
await emailService.send({
to: order.customerEmail,
subject: `Order ${order.id} Confirmed`,
template: 'order-confirmation',
data: order,
});
}
เขียน Workflow ด้วย Python SDK
Python SDK ใช้ async/await และ Decorators ทำให้เขียนได้เป็นธรรมชาติมาก เหมาะกับ Data Pipeline และ ML Workflows
# workflows.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
with workflow.unsafe.imports_passed_through():
from activities import (
check_inventory, process_payment,
arrange_shipping, send_confirmation_email,
refund_payment, release_inventory
)
@workflow.defn
class OrderWorkflow:
def __init__(self):
self.status = "initialized"
@workflow.run
async def run(self, order: dict) -> dict:
self.status = "checking_inventory"
retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=1),
maximum_attempts=5,
)
# Step 1: ตรวจสอบ Inventory
inventory = await workflow.execute_activity(
check_inventory, order,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
if not inventory["available"]:
return {"status": "out_of_stock"}
# Step 2: ตัดเงิน
self.status = "processing_payment"
payment = await workflow.execute_activity(
process_payment, order,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
# Step 3: จัดส่ง
self.status = "arranging_shipping"
try:
shipping = await workflow.execute_activity(
arrange_shipping, order,
start_to_close_timeout=timedelta(minutes=2),
retry_policy=retry_policy,
)
except Exception:
# Compensate: คืนเงิน + ปล่อย Inventory
await workflow.execute_activity(
refund_payment, payment,
start_to_close_timeout=timedelta(seconds=30),
)
await workflow.execute_activity(
release_inventory, order,
start_to_close_timeout=timedelta(seconds=30),
)
raise
# Step 4: แจ้งเตือน
self.status = "sending_notification"
await workflow.execute_activity(
send_confirmation_email, order,
start_to_close_timeout=timedelta(seconds=30),
)
self.status = "completed"
return {"status": "completed", "tracking_id": shipping["tracking_id"]}
@workflow.query
def get_status(self) -> str:
return self.status
Deterministic Constraints — ข้อห้ามใน Workflow
เนื่องจาก Temporal ใช้ Event Sourcing และ Replay เพื่อกู้คืน Workflow State สิ่งที่คุณทำใน Workflow Function ต้องเป็น Deterministic เสมอ นี่คือข้อห้ามที่ต้องจำให้ขึ้นใจ
| ห้ามทำใน Workflow | ใช้แทน |
|---|---|
time.Now() / Date.now() | workflow.Now() / Temporal's time API |
rand.Int() / Math.random() | ใช้ Side Effect หรือ Activity |
| HTTP Calls / Database Queries | ย้ายไปใส่ใน Activity |
time.Sleep() | workflow.Sleep() (Durable Timer) |
| Global Mutable State | ใช้ Workflow Local State |
| Goroutines / Threads | workflow.Go() (Temporal Coroutines) |
| Non-deterministic Libraries | ห่อด้วย Activity หรือ Side Effect |
Activity Retries และ Timeouts
หนึ่งในจุดแข็งที่สุดของ Temporal คือระบบ Retry ที่ทรงพลังและยืดหยุ่น คุณสามารถกำหนด Retry Policy ในระดับ Activity ได้อย่างละเอียด
ประเภทของ Timeout
- Schedule-to-Start Timeout — เวลาสูงสุดที่ Task รอใน Queue ก่อน Worker หยิบไปทำ ใช้ตรวจจับว่า Workers ล่มหมด
- Start-to-Close Timeout — เวลาสูงสุดที่ Activity ทำงานได้ต่อครั้ง ถ้าเกินจะถือว่า Timeout
- Schedule-to-Close Timeout — เวลาสูงสุดทั้งหมด (รวม Retries ทุกครั้ง)
- Heartbeat Timeout — สำหรับ Long-running Activities ต้อง Heartbeat เป็นระยะเพื่อบอกว่ายังทำงานอยู่
// Go: ตัวอย่าง Retry Policy แบบละเอียด
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
ScheduleToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 5 * time.Minute,
MaximumAttempts: 10,
NonRetryableErrorTypes: []string{"InvalidInputError", "AuthenticationError"},
},
})
สังเกตว่า NonRetryableErrorTypes ช่วยให้คุณกำหนดได้ว่า Error ประเภทไหนไม่ต้อง Retry เช่น Input ไม่ถูกต้อง (Retry ไปก็ไม่สำเร็จ) หรือ Authentication Failed (Retry ไปก็ไม่ช่วย)
Signals และ Queries
Signals ช่วยให้ระบบภายนอกส่งข้อมูลหรือคำสั่งเข้าไปยัง Workflow ที่กำลังทำงานอยู่ได้ เช่น ผู้ใช้กดยกเลิกคำสั่งซื้อ Admin อนุมัติเอกสาร หรือ Webhook จาก Third-party Service
Queries ช่วยให้อ่านสถานะของ Workflow ได้โดยไม่กระทบการทำงาน เช่น ดูว่า Order อยู่ในขั้นตอนไหน หรือดูว่า Process เสร็จกี่เปอร์เซ็นต์
// Go: Workflow ที่รับ Signal สำหรับ Approval
func ApprovalWorkflow(ctx workflow.Context, request ApprovalRequest) (string, error) {
var approved bool
var approverNote string
// สร้าง Signal Channel
approvalCh := workflow.GetSignalChannel(ctx, "approval-signal")
// ส่ง Notification ให้ Approver
_ = workflow.ExecuteActivity(ctx, NotifyApprover, request).Get(ctx, nil)
// รอ Signal ภายใน 24 ชั่วโมง
timerCtx, cancel := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, 24*time.Hour)
selector := workflow.NewSelector(ctx)
// รับ Signal
selector.AddReceive(approvalCh, func(c workflow.ReceiveChannel, more bool) {
var signal ApprovalSignal
c.Receive(ctx, &signal)
approved = signal.Approved
approverNote = signal.Note
cancel() // ยกเลิก Timer
})
// หรือ Timeout
selector.AddFuture(timer, func(f workflow.Future) {
approved = false
approverNote = "Timed out waiting for approval"
})
selector.Select(ctx)
if approved {
_ = workflow.ExecuteActivity(ctx, ExecuteApprovedAction, request).Get(ctx, nil)
return "approved: " + approverNote, nil
}
return "rejected: " + approverNote, nil
}
# ส่ง Signal จากภายนอก (CLI)
temporal workflow signal --workflow-id "approval-12345" --name "approval-signal" --input '{"approved": true, "note": "Looks good"}'
# Query สถานะ
temporal workflow query --workflow-id "order-67890" --name "status"
Child Workflows
สำหรับ Business Process ที่ซับซ้อนมาก คุณสามารถแยก Workflow ย่อยออกเป็น Child Workflows ได้ Child Workflow ทำงานอิสระจาก Parent มี Execution History แยก และสามารถถูก Cancel หรือ Terminate ได้อิสระ
// Go: Parent Workflow ที่เรียก Child Workflows
func BatchProcessingWorkflow(ctx workflow.Context, items []Item) (BatchResult, error) {
var results []ItemResult
// ประมวลผลแต่ละ Item ด้วย Child Workflow
var futures []workflow.ChildWorkflowFuture
for _, item := range items {
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: "process-item-" + item.ID,
})
future := workflow.ExecuteChildWorkflow(childCtx, ProcessItemWorkflow, item)
futures = append(futures, future)
}
// รอผลลัพธ์ทั้งหมด
for _, future := range futures {
var result ItemResult
if err := future.Get(ctx, &result); err != nil {
results = append(results, ItemResult{Error: err.Error()})
} else {
results = append(results, result)
}
}
return BatchResult{Items: results}, nil
}
Saga Pattern ด้วย Temporal
Saga Pattern เป็นวิธีจัดการ Distributed Transaction ที่ข้ามหลาย Services แทนที่จะใช้ Two-phase Commit (2PC) ที่ช้าและ Scale ยาก Saga จะทำงานแต่ละ Step ตามลำดับ ถ้า Step ใด Fail จะรัน Compensation (การชดเชย) ของทุก Step ที่สำเร็จไปแล้ว ย้อนกลับจากล่างขึ้นบน
Temporal ทำให้ Saga Pattern เรียบง่ายมากเพราะ Workflow State ถูกจัดการให้อัตโนมัติ ไม่ต้องเขียน State Machine หรือจัดการ Message Queue เอง
// Go: Saga Pattern สำหรับ Travel Booking
func TravelBookingWorkflow(ctx workflow.Context, trip TripRequest) (TripResult, error) {
// เก็บรายการ Compensations ที่ต้องทำถ้าล้มเหลว
var compensations []func(workflow.Context) error
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3},
})
// Step 1: จองเที่ยวบิน
flight, err := executeStep(actCtx, BookFlight, trip.Flight)
if err != nil {
return TripResult{}, err
}
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(ctx, CancelFlight, flight).Get(ctx, nil)
})
// Step 2: จองโรงแรม
hotel, err := executeStep(actCtx, BookHotel, trip.Hotel)
if err != nil {
runCompensations(ctx, compensations)
return TripResult{}, err
}
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(ctx, CancelHotel, hotel).Get(ctx, nil)
})
// Step 3: จองรถเช่า
car, err := executeStep(actCtx, BookCar, trip.Car)
if err != nil {
runCompensations(ctx, compensations)
return TripResult{}, err
}
return TripResult{
FlightConfirmation: flight.Confirmation,
HotelConfirmation: hotel.Confirmation,
CarConfirmation: car.Confirmation,
}, nil
}
// รัน Compensations ย้อนกลับ (LIFO)
func runCompensations(ctx workflow.Context, compensations []func(workflow.Context) error) {
for i := len(compensations) - 1; i >= 0; i-- {
_ = compensations[i](ctx)
}
}
Temporal เปรียบเทียบกับเครื่องมืออื่น
| Feature | Temporal | Airflow | AWS Step Functions | Prefect |
|---|---|---|---|---|
| Workflow Definition | Code (Go/TS/Python/Java) | Python DAG | JSON/YAML (ASL) | Python |
| Execution Model | Durable Execution | Task Scheduling | State Machine | Task Scheduling |
| Long-running Workflows | วันถึงปี | ไม่เหมาะ | สูงสุด 1 ปี | ไม่เหมาะ |
| Human-in-the-loop | Signals + Queries | ไม่รองรับ | Task Tokens | จำกัด |
| Retry / Error Handling | ละเอียดมาก | Task-level Retry | Step-level Retry | Task-level Retry |
| Use Case หลัก | Microservices, Transactions | Data Pipeline, ETL | AWS Serverless | Data Pipeline |
| Self-hosted | ได้ (Open Source) | ได้ | ไม่ได้ (AWS only) | ได้ |
| Managed Service | Temporal Cloud | Astronomer, MWAA | Built-in | Prefect Cloud |
| Scalability | สูงมาก (millions workflows/s) | ปานกลาง | สูง (AWS) | ปานกลาง |
| Pricing | ฟรี (Self-hosted) / per-action (Cloud) | ฟรี (Self-hosted) | Per transition | ฟรี (Self-hosted) |
Temporal Cloud vs Self-hosted
Temporal มี 2 แบบให้เลือก ขึ้นอยู่กับความต้องการและทรัพยากรของทีม
Self-hosted (Open Source)
- ฟรี ไม่มีค่าใช้จ่าย License
- ต้อง Maintain Temporal Server, Database (PostgreSQL/Cassandra), Elasticsearch เอง
- ต้องจัดการ High Availability, Backup, Monitoring, Upgrades เอง
- เหมาะกับทีมที่มี DevOps/SRE ดูแล Infrastructure
- ใช้ Docker Compose หรือ Helm Chart ติดตั้ง
# Self-hosted ด้วย Docker Compose
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker compose up -d
# หรือ Kubernetes ด้วย Helm
helm repo add temporal https://charts.temporal.io
helm install temporal temporal/temporal --set server.replicaCount=3 --set cassandra.enabled=true
Temporal Cloud (Managed)
- Temporal ดูแล Infrastructure ทั้งหมด
- SLA 99.99% Uptime
- มี Multi-region Support
- Built-in Monitoring, Metrics, Alerting
- จ่ายตาม Actions (จำนวน Activity Executions, Signals, Queries)
- เหมาะกับทีมที่ต้องการ Focus ที่ Business Logic ไม่อยากดูแล Infrastructure
Use Cases ที่เหมาะกับ Temporal
1. Payment Processing
การชำระเงินที่ต้องตัดเงิน อัปเดต Ledger แจ้ง Merchant และส่ง Receipt ถ้าขั้นตอนใดล้มเหลวต้อง Rollback ทุกอย่าง Temporal จัดการ Saga Pattern นี้ได้อย่างสมบูรณ์แบบ
2. Order Fulfillment
ระบบสั่งซื้อสินค้าที่ต้อง Validate Order ตรวจ Stock ตัดเงิน จัดส่ง อัปเดตสถานะ และรอ Delivery Confirmation อาจใช้เวลาหลายวันและต้องรับ Signals จากหลายระบบ
3. ML Pipeline
การ Train Model ที่ต้อง Fetch Data, Preprocess, Train, Evaluate, Deploy แต่ละขั้นตอนอาจใช้เวลาหลายชั่วโมง ถ้า Worker ตายกลางทาง Temporal จะทำงานต่อจากจุดที่หยุด
4. Infrastructure Provisioning
การสร้าง Cloud Resources (VM, Database, Network, DNS) ที่ต้องทำตามลำดับ รอ Resources พร้อม และ Rollback ถ้าล้มเหลว
Monitoring Workflows
Temporal UI
Temporal มาพร้อมกับ Web UI ที่ให้คุณดูสถานะของ Workflow ทั้งหมด เปิดดู Event History ของแต่ละ Execution ส่ง Signals จาก UI และค้นหา Workflows ด้วย Advanced Query
# Temporal UI (Development)
# เปิดอัตโนมัติเมื่อรัน temporal server start-dev
# http://localhost:8233
# Temporal UI (Production - ติดตั้งแยก)
docker run -p 8080:8080 -e TEMPORAL_ADDRESS=temporal-server:7233 temporalio/ui:latest
Metrics
Temporal Server และ SDKs ส่ง Prometheus Metrics ออกมาให้ คุณสามารถตั้ง Grafana Dashboard เพื่อ Monitor ได้ Metrics สำคัญที่ควรดูมีดังนี้
temporal_workflow_completed— จำนวน Workflows ที่เสร็จสมบูรณ์temporal_workflow_failed— จำนวน Workflows ที่ล้มเหลวtemporal_activity_execution_latency— เวลาที่ Activity ใช้ทำงานtemporal_workflow_task_schedule_to_start_latency— เวลาที่ Task รอใน Queuetemporal_worker_task_slots_available— จำนวน Slots ที่ว่างใน Worker
Testing Workflows
Temporal SDKs มี Testing Framework ที่ช่วยให้คุณทดสอบ Workflows ได้โดยไม่ต้องรัน Temporal Server จริง
// Go: Unit Test สำหรับ Workflow
func TestOrderWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Mock Activities
env.OnActivity(CheckInventory, mock.Anything, mock.Anything).Return(
InventoryResult{Available: true}, nil,
)
env.OnActivity(ProcessPayment, mock.Anything, mock.Anything).Return(
PaymentResult{ChargeID: "ch_test123"}, nil,
)
env.OnActivity(ArrangeShipping, mock.Anything, mock.Anything).Return(
ShippingResult{TrackingID: "TRACK123"}, nil,
)
env.OnActivity(SendConfirmationEmail, mock.Anything, mock.Anything).Return(nil)
// Execute Workflow
env.ExecuteWorkflow(OrderWorkflow, Order{
ID: "order-1",
ProductID: "prod-1",
Quantity: 2,
Amount: 1000,
})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result OrderResult
require.NoError(t, env.GetWorkflowResult(&result))
assert.Equal(t, "completed", result.Status)
assert.Equal(t, "TRACK123", result.TrackingID)
}
# TypeScript: Unit Test
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { orderWorkflow } from './workflows';
describe('OrderWorkflow', () => {
let env: TestWorkflowEnvironment;
beforeAll(async () => {
env = await TestWorkflowEnvironment.createLocal();
});
afterAll(async () => {
await env?.teardown();
});
it('should complete order successfully', async () => {
const { client, nativeConnection } = env;
const worker = await Worker.create({
connection: nativeConnection,
taskQueue: 'test',
workflowsPath: require.resolve('./workflows'),
activities: {
checkInventory: async () => ({ available: true }),
processPayment: async () => ({ chargeId: 'ch_test' }),
arrangeShipping: async () => ({ trackingId: 'TRACK123' }),
sendConfirmationEmail: async () => {},
},
});
const result = await worker.runUntil(
client.workflow.execute(orderWorkflow, {
workflowId: 'test-order-1',
taskQueue: 'test',
args: [{ id: 'order-1', productId: 'prod-1', quantity: 2, amount: 1000 }],
})
);
expect(result.status).toBe('completed');
expect(result.trackingId).toBe('TRACK123');
});
});
Versioning และ Migration
เมื่อ Workflow ทำงานอยู่แล้วในระบบ (อาจทำงานอยู่เป็นวันหรือเดือน) คุณจะแก้ไข Workflow Code ได้อย่างไรโดยไม่กระทบ Workflow ที่กำลังทำงานอยู่? Temporal มีกลไก Versioning สำหรับเรื่องนี้
// Go: ใช้ workflow.GetVersion() สำหรับ Backward Compatibility
func OrderWorkflow(ctx workflow.Context, order Order) (OrderResult, error) {
// Version 1: ไม่มี fraud check
// Version 2: เพิ่ม fraud check ก่อน payment
v := workflow.GetVersion(ctx, "add-fraud-check", workflow.DefaultVersion, 1)
if v >= 1 {
// Workflow ใหม่: ทำ Fraud Check ก่อน
err := workflow.ExecuteActivity(ctx, FraudCheck, order).Get(ctx, nil)
if err != nil {
return OrderResult{Status: "fraud_detected"}, nil
}
}
// ... ขั้นตอนที่เหลือเหมือนเดิม ...
return OrderResult{Status: "completed"}, nil
}
GetVersion() เมื่อต้องแก้ไข Workflow ที่มี Execution ทำงานอยู่ Workflow ที่รันอยู่จะใช้ Version เดิม ส่วน Workflow ใหม่จะใช้ Version ใหม่ เมื่อ Workflow เก่าทำงานเสร็จหมดแล้ว ค่อยลบ Code Version เก่าออก
Best Practices สำหรับ Temporal
1. ออกแบบ Activities ให้เป็น Idempotent
เนื่องจาก Activity อาจถูก Retry หลายครั้ง ต้องแน่ใจว่าการรัน Activity ซ้ำจะไม่สร้างปัญหา เช่น ใช้ Idempotency Key กับ Payment Gateway เพื่อป้องกันการตัดเงินซ้ำ
2. ใช้ Task Queue แยกตาม Workload
อย่าใช้ Task Queue เดียวสำหรับทุกอย่าง แยก Queue ตามประเภทงาน เช่น "payment-processing", "email-sending", "data-pipeline" เพื่อให้ Scale Workers แยกตาม Workload ได้
3. จำกัดขนาดของ Workflow History
Temporal เก็บ Event ทุก Event ของ Workflow ถ้า Workflow มีขั้นตอนมากเกินไป History จะใหญ่มาก แก้โดยใช้ ContinueAsNew เมื่อ History ยาวเกิน หรือแยกเป็น Child Workflows
// Go: ใช้ ContinueAsNew สำหรับ Long-running Workflows
func PollingWorkflow(ctx workflow.Context, state PollingState) error {
if state.Iteration > 1000 {
// Reset History โดย ContinueAsNew
return workflow.NewContinueAsNewError(ctx, PollingWorkflow, state)
}
// ... ทำงาน ...
state.Iteration++
_ = workflow.Sleep(ctx, time.Minute)
return workflow.NewContinueAsNewError(ctx, PollingWorkflow, state)
}
4. ใช้ Heartbeats สำหรับ Long-running Activities
ถ้า Activity ใช้เวลานาน (เช่น Process ไฟล์ใหญ่) ให้ส่ง Heartbeat เป็นระยะเพื่อบอก Temporal ว่า Activity ยังทำงานอยู่ ถ้าไม่ Heartbeat และเกิน Timeout Temporal จะถือว่า Activity ตายและ Schedule ใหม่
5. จัดการ Error อย่างเหมาะสม
แยก Error ที่ Retry ได้ (Transient Error เช่น Network Timeout) กับ Error ที่ Retry ไม่ได้ (Business Error เช่น Insufficient Balance) ออกจากกัน ใช้ NonRetryableErrorTypes ใน Retry Policy
6. ใช้ Namespace แยก Environment
ใช้ Temporal Namespace แยกสำหรับ Development, Staging และ Production เพื่อป้องกันการปนกันของ Workflow Executions
สรุป
Temporal.io เป็นเครื่องมือที่เปลี่ยนวิธีคิดเกี่ยวกับ Distributed Systems อย่างสิ้นเชิง แทนที่จะเขียน Code จัดการ State Machine, Message Queue, Retry Logic, Dead Letter Queue และ Error Handling เอง คุณแค่เขียน Business Logic เป็น Code ธรรมดาแล้ว Temporal จะจัดการ Durability, Reliability และ Scalability ให้ทั้งหมด
ถ้าคุณกำลังสร้าง Microservices ที่มี Business Process ข้ามหลาย Services, Long-running Workflows ที่ใช้เวลาเป็นวันหรือเดือน หรือ Distributed Transactions ที่ต้องการ Saga Pattern Temporal คือคำตอบที่ดีที่สุดในปี 2026 เริ่มต้นด้วย temporal server start-dev แล้วลองเขียน Workflow แรกของคุณวันนี้ อ่านบทความเกี่ยวกับ Distributed Systems และ Backend Architecture เพิ่มเติมได้ที่ SiamCafe Blog ครับ
