SiamCafe.net Blog
Technology

Kotlin Coroutines Distributed System สร้าง Microservices ด้วย Async Kotlin

kotlin coroutines distributed system
Kotlin Coroutines Distributed System | SiamCafe Blog
2026-05-07· อ. บอม — SiamCafe.net· 1,268 คำ

Kotlin Coroutines ?????????????????????

Kotlin Coroutines ???????????? concurrency framework ????????? Kotlin ??????????????????????????????????????? asynchronous code ?????????????????????????????? synchronous code ????????? suspend functions ????????? callbacks ???????????? reactive streams ??????????????? code ???????????????????????? maintain ???????????? ????????????????????? callback hell

?????????????????? Distributed Systems ????????????????????? communicate ????????????????????? services ???????????????????????? Coroutines ???????????????????????? ??????????????? Lightweight ????????? memory ???????????????????????? threads (??????????????? 100,000 coroutines ?????????????????????), Structured concurrency ?????????????????? lifecycle ????????? concurrent operations ??????????????????, Cancellation propagation ?????????????????? parent coroutine ?????????????????? children ????????????????????????????????????????????????, Channel ????????? Flow ?????????????????? inter-coroutine communication

Use cases ?????? distributed systems Service-to-service communication (HTTP, gRPC), Message queue consumers (Kafka, RabbitMQ), Parallel data aggregation ????????? multiple sources, Rate limiting ????????? backpressure, Circuit breaker patterns

????????????????????? Kotlin Coroutines Project

Setup project ?????????????????? distributed system

# === Kotlin Coroutines Project Setup ===

# 1. build.gradle.kts
cat > build.gradle.kts << 'EOF'
plugins {
    kotlin("jvm") version "1.9.23"
    kotlin("plugin.serialization") version "1.9.23"
    application
}

repositories {
    mavenCentral()
}

dependencies {
    // Kotlin Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.8.0")

    // Ktor (HTTP client & server)
    implementation("io.ktor:ktor-server-core:2.3.9")
    implementation("io.ktor:ktor-server-netty:2.3.9")
    implementation("io.ktor:ktor-server-content-negotiation:2.3.9")
    implementation("io.ktor:ktor-client-core:2.3.9")
    implementation("io.ktor:ktor-client-cio:2.3.9")
    implementation("io.ktor:ktor-client-content-negotiation:2.3.9")
    implementation("io.ktor:ktor-serialization-kotlinx-json:2.3.9")

    // Kafka
    implementation("org.apache.kafka:kafka-clients:3.7.0")

    // Redis
    implementation("io.lettuce:lettuce-core:6.3.2.RELEASE")

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.14")

    // Testing
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0")
    testImplementation("io.ktor:ktor-server-test-host:2.3.9")
    testImplementation(kotlin("test"))
}

application {
    mainClass.set("com.example.MainKt")
}
EOF

# 2. Project Structure
cat > project-structure.txt << 'EOF'
src/main/kotlin/com/example/
  Main.kt                    # Application entry
  config/
    AppConfig.kt              # Configuration
  service/
    OrderService.kt           # Business logic
    PaymentService.kt         # Payment integration
    InventoryService.kt       # Inventory check
  client/
    HttpServiceClient.kt      # HTTP client wrapper
    KafkaProducerClient.kt    # Kafka producer
  middleware/
    CircuitBreaker.kt         # Circuit breaker
    RateLimiter.kt            # Rate limiter
    RetryPolicy.kt            # Retry with backoff
  routes/
    OrderRoutes.kt            # API routes
EOF

echo "Project setup complete"

Async Communication Patterns

Patterns ?????????????????? distributed communication

// === Kotlin Coroutines Communication Patterns ===

// File: service/OrderService.kt
package com.example.service

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

// Pattern 1: Parallel Service Calls (Fan-out)
class OrderService(
    private val paymentService: PaymentService,
    private val inventoryService: InventoryService,
    private val shippingService: ShippingService,
) {
    suspend fun processOrder(orderId: String): OrderResult = coroutineScope {
        // Fan-out: Call 3 services in parallel
        val paymentDeferred = async { paymentService.validatePayment(orderId) }
        val inventoryDeferred = async { inventoryService.checkStock(orderId) }
        val shippingDeferred = async { shippingService.calculateShipping(orderId) }

        // Fan-in: Aggregate results
        val payment = paymentDeferred.await()
        val inventory = inventoryDeferred.await()
        val shipping = shippingDeferred.await()

        OrderResult(
            orderId = orderId,
            paymentValid = payment.valid,
            inStock = inventory.available,
            shippingCost = shipping.cost,
            status = if (payment.valid && inventory.available) "APPROVED" else "REJECTED"
        )
    }

    // Pattern 2: Channel-based Pipeline
    fun orderPipeline(orders: ReceiveChannel): Flow = channelFlow {
        for (order in orders) {
            launch {
                val result = processOrder(order.id)
                send(result)
            }
        }
    }

    // Pattern 3: Flow with Backpressure
    fun processOrderStream(orderFlow: Flow): Flow {
        return orderFlow
            .buffer(capacity = 100)  // Buffer up to 100 orders
            .map { order ->
                withTimeout(5000) {  // 5s timeout per order
                    processOrder(order.id)
                }
            }
            .catch { e ->
                emit(OrderResult(orderId = "unknown", status = "ERROR: "))
            }
    }

    // Pattern 4: Competing Consumers
    suspend fun startConsumers(channel: Channel, concurrency: Int = 10) = coroutineScope {
        repeat(concurrency) { consumerId ->
            launch {
                for (order in channel) {
                    try {
                        val result = processOrder(order.id)
                        println("Consumer $consumerId processed: ")
                    } catch (e: Exception) {
                        println("Consumer $consumerId error: ")
                    }
                }
            }
        }
    }
}

data class Order(val id: String, val items: List, val total: Double)
data class OrderResult(
    val orderId: String,
    val paymentValid: Boolean = false,
    val inStock: Boolean = false,
    val shippingCost: Double = 0.0,
    val status: String = "PENDING"
)

// Pattern 5: Saga Pattern for Distributed Transactions
class OrderSaga {
    suspend fun executeSaga(order: Order): SagaResult = coroutineScope {
        val steps = mutableListOf()

        try {
            // Step 1: Reserve inventory
            val reservation = reserveInventory(order)
            steps.add(SagaStep("inventory", "reserved"))

            // Step 2: Process payment
            val payment = processPayment(order)
            steps.add(SagaStep("payment", "charged"))

            // Step 3: Create shipment
            val shipment = createShipment(order)
            steps.add(SagaStep("shipment", "created"))

            SagaResult(success = true, steps = steps)
        } catch (e: Exception) {
            // Compensate: Rollback completed steps in reverse
            compensate(steps.reversed())
            SagaResult(success = false, steps = steps, error = e.message)
        }
    }

    private suspend fun compensate(steps: List) {
        for (step in steps) {
            when (step.name) {
                "shipment" -> cancelShipment()
                "payment" -> refundPayment()
                "inventory" -> releaseInventory()
            }
        }
    }

    private suspend fun reserveInventory(order: Order): Boolean { delay(100); return true }
    private suspend fun processPayment(order: Order): Boolean { delay(200); return true }
    private suspend fun createShipment(order: Order): Boolean { delay(150); return true }
    private suspend fun cancelShipment() { delay(50) }
    private suspend fun refundPayment() { delay(100) }
    private suspend fun releaseInventory() { delay(50) }
}

data class SagaStep(val name: String, val status: String)
data class SagaResult(val success: Boolean, val steps: List, val error: String? = null)

Distributed System ???????????? Ktor

??????????????? microservice ???????????? Ktor ????????? Coroutines

// === Ktor Microservice with Coroutines ===

// File: Main.kt
package com.example

import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import kotlinx.coroutines.*
import kotlinx.serialization.*
import kotlinx.serialization.json.*

// HTTP Client with connection pooling
val httpClient = HttpClient(CIO) {
    engine {
        maxConnectionsCount = 100
        endpoint {
            connectTimeout = 5000
            requestTimeout = 10000
            keepAliveTime = 5000
        }
    }
}

// Service Discovery (simplified)
object ServiceRegistry {
    private val services = mapOf(
        "payment" to "http://payment-service:8081",
        "inventory" to "http://inventory-service:8082",
        "shipping" to "http://shipping-service:8083",
    )
    fun getUrl(service: String): String =
        services[service] ?: throw IllegalArgumentException("Unknown service: $service")
}

// Circuit Breaker
class CircuitBreaker(
    private val failureThreshold: Int = 5,
    private val resetTimeMs: Long = 30000,
) {
    private var failures = 0
    private var state = State.CLOSED
    private var lastFailureTime = 0L

    enum class State { CLOSED, OPEN, HALF_OPEN }

    suspend fun  execute(block: suspend () -> T): T {
        return when (state) {
            State.OPEN -> {
                if (System.currentTimeMillis() - lastFailureTime > resetTimeMs) {
                    state = State.HALF_OPEN
                    tryExecute(block)
                } else {
                    throw CircuitBreakerOpenException("Circuit breaker is OPEN")
                }
            }
            State.CLOSED, State.HALF_OPEN -> tryExecute(block)
        }
    }

    private suspend fun  tryExecute(block: suspend () -> T): T {
        return try {
            val result = block()
            onSuccess()
            result
        } catch (e: Exception) {
            onFailure()
            throw e
        }
    }

    private fun onSuccess() { failures = 0; state = State.CLOSED }
    private fun onFailure() {
        failures++
        lastFailureTime = System.currentTimeMillis()
        if (failures >= failureThreshold) state = State.OPEN
    }
}

class CircuitBreakerOpenException(message: String) : Exception(message)

// Retry with Exponential Backoff
suspend fun  retryWithBackoff(
    maxRetries: Int = 3,
    initialDelayMs: Long = 100,
    maxDelayMs: Long = 5000,
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelayMs
    repeat(maxRetries - 1) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            println("Retry /$maxRetries after ms: ")
            delay(currentDelay)
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelayMs)
        }
    }
    return block() // Last attempt
}

// Main Application
fun main() {
    embeddedServer(Netty, port = 8080) {
        routing {
            get("/health") { call.respondText("OK") }

            get("/orders/{id}") {
                val orderId = call.parameters["id"] ?: return@get
                val result = coroutineScope {
                    val payment = async { httpClient.get("/validate/$orderId") }
                    val inventory = async { httpClient.get("/check/$orderId") }
                    mapOf("payment" to payment.await().status, "inventory" to inventory.await().status)
                }
                call.respond(result)
            }
        }
    }.start(wait = true)
}

Error Handling ????????? Resilience

?????????????????? errors ?????? distributed system

#!/usr/bin/env python3
# resilience_patterns.py ??? Distributed System Resilience
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("resilience")

class ResiliencePatterns:
    def __init__(self):
        pass
    
    def patterns(self):
        return {
            "circuit_breaker": {
                "description": "??????????????????????????? service ?????????????????? ????????????????????? cascading failure",
                "states": ["CLOSED (????????????)", "OPEN (???????????????????????????)", "HALF_OPEN (???????????????)"],
                "kotlin": "CircuitBreaker class with coroutine support",
                "library": "resilience4j-kotlin",
            },
            "retry_with_backoff": {
                "description": "Retry ???????????? exponential backoff",
                "kotlin": "retryWithBackoff suspend function",
                "config": {
                    "max_retries": 3,
                    "initial_delay": "100ms",
                    "max_delay": "5s",
                    "backoff_factor": 2.0,
                },
            },
            "timeout": {
                "description": "??????????????????????????????????????????????????????????????? operation",
                "kotlin": "withTimeout(5000) { ... }",
                "fallback": "Return cached data or default value",
            },
            "bulkhead": {
                "description": "??????????????? concurrent requests ????????? service",
                "kotlin": "Semaphore(permits = 10)",
                "benefit": "????????????????????? service ????????? overwhelm",
            },
            "saga": {
                "description": "Distributed transactions with compensating actions",
                "kotlin": "OrderSaga with coroutine steps",
                "rollback": "Compensate completed steps in reverse order",
            },
            "fallback": {
                "description": "?????????????????? default ??????????????? service ????????????????????????",
                "strategies": [
                    "Return cached response",
                    "Return default value",
                    "Call backup service",
                    "Degrade gracefully (show partial data)",
                ],
            },
        }

patterns = ResiliencePatterns()
for name, info in patterns.patterns().items():
    print(f"{name}: {info['description']}")
    if 'kotlin' in info:
        print(f"  Kotlin: {info['kotlin']}")

Testing ????????? Monitoring

??????????????? coroutine-based distributed system

// === Testing Coroutines ===

// File: test/OrderServiceTest.kt
import kotlinx.coroutines.test.*
import kotlin.test.*

class OrderServiceTest {
    // Test with virtual time (no real delays)
    @Test
    fun `processOrder should aggregate results from 3 services`() = runTest {
        val mockPayment = MockPaymentService(valid = true)
        val mockInventory = MockInventoryService(available = true)
        val mockShipping = MockShippingService(cost = 50.0)

        val service = OrderService(mockPayment, mockInventory, mockShipping)
        val result = service.processOrder("ORD-001")

        assertEquals("APPROVED", result.status)
        assertTrue(result.paymentValid)
        assertTrue(result.inStock)
        assertEquals(50.0, result.shippingCost)
    }

    @Test
    fun `processOrder should reject when payment fails`() = runTest {
        val mockPayment = MockPaymentService(valid = false)
        val mockInventory = MockInventoryService(available = true)
        val mockShipping = MockShippingService(cost = 50.0)

        val service = OrderService(mockPayment, mockInventory, mockShipping)
        val result = service.processOrder("ORD-002")

        assertEquals("REJECTED", result.status)
        assertFalse(result.paymentValid)
    }

    @Test
    fun `circuit breaker should open after failures`() = runTest {
        val cb = CircuitBreaker(failureThreshold = 3)

        // Fail 3 times
        repeat(3) {
            assertFails { cb.execute { throw RuntimeException("fail") } }
        }

        // 4th call should throw CircuitBreakerOpenException
        assertFailsWith {
            cb.execute { "should not reach" }
        }
    }

    @Test
    fun `saga should compensate on failure`() = runTest {
        val saga = OrderSaga()
        val order = Order("ORD-003", listOf("item1"), 100.0)
        val result = saga.executeSaga(order)
        assertTrue(result.success)
    }
}

// Monitoring metrics
data class CoroutineMetrics(
    val activeCoroutines: Int,
    val completedCoroutines: Long,
    val failedCoroutines: Long,
    val avgDurationMs: Double,
    val p99DurationMs: Double,
)

// Prometheus metrics example
// counter: orders_processed_total{status="success|failure"}
// histogram: order_processing_duration_seconds
// gauge: active_coroutines

FAQ ??????????????????????????????????????????

Q: Kotlin Coroutines ????????? Java Virtual Threads ???????????????????????????????????????????

A: Kotlin Coroutines ????????? cooperative scheduling (suspend/resume) ???????????? mark functions ???????????? suspend ???????????????????????????????????????????????? Kotlin 1.3 (2018) ?????? structured concurrency, channels, flows ecosystem ???????????? Java Virtual Threads (Project Loom, JDK 21) ????????? preemptive scheduling ?????????????????? OS threads ????????? lightweight ?????????????????????????????????????????? code style ????????? blocking API ????????????????????? ?????????????????????????????????????????? Java developers ??????????????? ????????? Kotlin Coroutines ???????????????????????? Kotlin ???????????????????????? (structured concurrency ???????????????), ????????? Virtual Threads ???????????????????????? Java ??????????????????????????????????????????????????? code style ?????????????????????????????????????????????????????????????????? Kotlin (Dispatchers.Loom)

Q: Structured Concurrency ????????????????????? ?????????????????????????????????????

A: Structured Concurrency ????????????????????? concurrent operations ?????? scope ?????????????????? (parent-child relationship) ????????? parent scope ????????? cancel children ?????????????????????????????? cancel ???????????????????????????, ????????? child ???????????? exception parent ??????????????????????????? cancel siblings ????????????????????????????????? distributed systems ??????????????? ????????????????????? orphaned coroutines (coroutine ?????????????????????????????????????????????), ????????????????????? resource leaks (connections, memory), ????????????????????? cancellation propagation (????????? request ????????? cancel ????????? downstream calls ????????? cancel ????????????), Debugging ???????????????????????? (???????????? hierarchy ??????????????????) ???????????????????????? coroutineScope ?????? processOrder ????????? async ??????????????????????????????????????? fail ??????????????????????????? cancel ??????????????????????????? ???????????????????????????????????????

Q: Ktor ????????? Spring Boot ??????????????????????????????????????????????????? Kotlin?

A: Ktor ???????????? Kotlin-native framework ???????????????????????? JetBrains coroutine-first design, lightweight (~3MB), modular (??????????????? features ??????????????????????????????), startup ???????????? ??????????????? microservices, serverless, API services Spring Boot ???????????? full-featured framework ecosystem ?????????????????????????????? ?????? Spring Cloud ?????????????????? distributed systems, Spring Data, Spring Security libraries ?????????????????? ????????? heavyweight ???????????? startup ????????????????????? ??????????????? Ktor ??????????????? ??????????????? lightweight microservices, ????????????????????? coroutine-native, startup ???????????? (serverless) ??????????????? Spring Boot ??????????????? ????????????????????? ecosystem ????????????, migration ????????? Java Spring, ????????????????????? Spring Cloud features

Q: Channel ????????? Flow ?????? Kotlin Coroutines ???????????????????????????????????????????

A: Channel ???????????? hot stream ???????????????????????????????????????????????? coroutines (producer-consumer) ?????????????????? queue ????????????????????????????????????????????????????????????????????????????????? consumer ????????????????????? (buffered) ??????????????? inter-coroutine communication, competing consumers Flow ???????????? cold stream ??????????????????????????????????????????????????????????????? collector (lazy) ?????????????????? Kotlin sequences ????????? asynchronous ??????????????? data streams, reactive pipelines, API responses ?????????????????? distributed systems ????????? Channel ?????????????????? message passing ????????????????????? components (???????????? Kafka consumer ??? processor), ????????? Flow ?????????????????? data transformation pipelines (???????????? stream processing, API aggregation)

📖 บทความที่เกี่ยวข้อง

Svelte 5 Runes Distributed Systemอ่านบทความ → Kotlin Coroutines Docker Container Deployอ่านบทความ → HTTP/3 QUIC Distributed Systemอ่านบทความ → XDR Platform Distributed Systemอ่านบทความ → Kotlin Coroutines Machine Learning Pipelineอ่านบทความ →

📚 ดูบทความทั้งหมด →