Kotlin Coroutines ?????????????????????
Kotlin Coroutines ???????????? concurrency framework ????????? Kotlin ??????????????????????????????????????? asynchronous code ?????????????????????????????? synchronous code ????????? suspend functions ????????? callbacks ???????????? reactive streams ??????????????? code ???????????????????????? maintain ???????????? ????????????????????? callback hell
?????????????????? Distributed Systems ????????????????????? communicate ????????????????????? services ???????????????????????? Coroutines ???????????????????????? ??????????????? Lightweight ????????? memory ???????????????????????? threads (??????????????? 100,000 coroutines ?????????????????????), Structured concurrency ?????????????????? lifecycle ????????? concurrent operations ??????????????????, Cancellation propagation ?????????????????? parent coroutine ?????????????????? children ????????????????????????????????????????????????, Channel ????????? Flow ?????????????????? inter-coroutine communication
Use cases ?????? distributed systems Service-to-service communication (HTTP, gRPC), Message queue consumers (Kafka, RabbitMQ), Parallel data aggregation ????????? multiple sources, Rate limiting ????????? backpressure, Circuit breaker patterns
????????????????????? Kotlin Coroutines Project
Setup project ?????????????????? distributed system
# === Kotlin Coroutines Project Setup ===
# 1. build.gradle.kts
cat > build.gradle.kts << 'EOF'
plugins {
kotlin("jvm") version "1.9.23"
kotlin("plugin.serialization") version "1.9.23"
application
}
repositories {
mavenCentral()
}
dependencies {
// Kotlin Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.8.0")
// Ktor (HTTP client & server)
implementation("io.ktor:ktor-server-core:2.3.9")
implementation("io.ktor:ktor-server-netty:2.3.9")
implementation("io.ktor:ktor-server-content-negotiation:2.3.9")
implementation("io.ktor:ktor-client-core:2.3.9")
implementation("io.ktor:ktor-client-cio:2.3.9")
implementation("io.ktor:ktor-client-content-negotiation:2.3.9")
implementation("io.ktor:ktor-serialization-kotlinx-json:2.3.9")
// Kafka
implementation("org.apache.kafka:kafka-clients:3.7.0")
// Redis
implementation("io.lettuce:lettuce-core:6.3.2.RELEASE")
// Logging
implementation("ch.qos.logback:logback-classic:1.4.14")
// Testing
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0")
testImplementation("io.ktor:ktor-server-test-host:2.3.9")
testImplementation(kotlin("test"))
}
application {
mainClass.set("com.example.MainKt")
}
EOF
# 2. Project Structure
cat > project-structure.txt << 'EOF'
src/main/kotlin/com/example/
Main.kt # Application entry
config/
AppConfig.kt # Configuration
service/
OrderService.kt # Business logic
PaymentService.kt # Payment integration
InventoryService.kt # Inventory check
client/
HttpServiceClient.kt # HTTP client wrapper
KafkaProducerClient.kt # Kafka producer
middleware/
CircuitBreaker.kt # Circuit breaker
RateLimiter.kt # Rate limiter
RetryPolicy.kt # Retry with backoff
routes/
OrderRoutes.kt # API routes
EOF
echo "Project setup complete"
Async Communication Patterns
Patterns ?????????????????? distributed communication
// === Kotlin Coroutines Communication Patterns ===
// File: service/OrderService.kt
package com.example.service
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds
// Pattern 1: Parallel Service Calls (Fan-out)
class OrderService(
private val paymentService: PaymentService,
private val inventoryService: InventoryService,
private val shippingService: ShippingService,
) {
suspend fun processOrder(orderId: String): OrderResult = coroutineScope {
// Fan-out: Call 3 services in parallel
val paymentDeferred = async { paymentService.validatePayment(orderId) }
val inventoryDeferred = async { inventoryService.checkStock(orderId) }
val shippingDeferred = async { shippingService.calculateShipping(orderId) }
// Fan-in: Aggregate results
val payment = paymentDeferred.await()
val inventory = inventoryDeferred.await()
val shipping = shippingDeferred.await()
OrderResult(
orderId = orderId,
paymentValid = payment.valid,
inStock = inventory.available,
shippingCost = shipping.cost,
status = if (payment.valid && inventory.available) "APPROVED" else "REJECTED"
)
}
// Pattern 2: Channel-based Pipeline
fun orderPipeline(orders: ReceiveChannel): Flow = channelFlow {
for (order in orders) {
launch {
val result = processOrder(order.id)
send(result)
}
}
}
// Pattern 3: Flow with Backpressure
fun processOrderStream(orderFlow: Flow): Flow {
return orderFlow
.buffer(capacity = 100) // Buffer up to 100 orders
.map { order ->
withTimeout(5000) { // 5s timeout per order
processOrder(order.id)
}
}
.catch { e ->
emit(OrderResult(orderId = "unknown", status = "ERROR: "))
}
}
// Pattern 4: Competing Consumers
suspend fun startConsumers(channel: Channel, concurrency: Int = 10) = coroutineScope {
repeat(concurrency) { consumerId ->
launch {
for (order in channel) {
try {
val result = processOrder(order.id)
println("Consumer $consumerId processed: ")
} catch (e: Exception) {
println("Consumer $consumerId error: ")
}
}
}
}
}
}
data class Order(val id: String, val items: List, val total: Double)
data class OrderResult(
val orderId: String,
val paymentValid: Boolean = false,
val inStock: Boolean = false,
val shippingCost: Double = 0.0,
val status: String = "PENDING"
)
// Pattern 5: Saga Pattern for Distributed Transactions
class OrderSaga {
suspend fun executeSaga(order: Order): SagaResult = coroutineScope {
val steps = mutableListOf()
try {
// Step 1: Reserve inventory
val reservation = reserveInventory(order)
steps.add(SagaStep("inventory", "reserved"))
// Step 2: Process payment
val payment = processPayment(order)
steps.add(SagaStep("payment", "charged"))
// Step 3: Create shipment
val shipment = createShipment(order)
steps.add(SagaStep("shipment", "created"))
SagaResult(success = true, steps = steps)
} catch (e: Exception) {
// Compensate: Rollback completed steps in reverse
compensate(steps.reversed())
SagaResult(success = false, steps = steps, error = e.message)
}
}
private suspend fun compensate(steps: List) {
for (step in steps) {
when (step.name) {
"shipment" -> cancelShipment()
"payment" -> refundPayment()
"inventory" -> releaseInventory()
}
}
}
private suspend fun reserveInventory(order: Order): Boolean { delay(100); return true }
private suspend fun processPayment(order: Order): Boolean { delay(200); return true }
private suspend fun createShipment(order: Order): Boolean { delay(150); return true }
private suspend fun cancelShipment() { delay(50) }
private suspend fun refundPayment() { delay(100) }
private suspend fun releaseInventory() { delay(50) }
}
data class SagaStep(val name: String, val status: String)
data class SagaResult(val success: Boolean, val steps: List, val error: String? = null)
Distributed System ???????????? Ktor
??????????????? microservice ???????????? Ktor ????????? Coroutines
// === Ktor Microservice with Coroutines ===
// File: Main.kt
package com.example
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import kotlinx.coroutines.*
import kotlinx.serialization.*
import kotlinx.serialization.json.*
// HTTP Client with connection pooling
val httpClient = HttpClient(CIO) {
engine {
maxConnectionsCount = 100
endpoint {
connectTimeout = 5000
requestTimeout = 10000
keepAliveTime = 5000
}
}
}
// Service Discovery (simplified)
object ServiceRegistry {
private val services = mapOf(
"payment" to "http://payment-service:8081",
"inventory" to "http://inventory-service:8082",
"shipping" to "http://shipping-service:8083",
)
fun getUrl(service: String): String =
services[service] ?: throw IllegalArgumentException("Unknown service: $service")
}
// Circuit Breaker
class CircuitBreaker(
private val failureThreshold: Int = 5,
private val resetTimeMs: Long = 30000,
) {
private var failures = 0
private var state = State.CLOSED
private var lastFailureTime = 0L
enum class State { CLOSED, OPEN, HALF_OPEN }
suspend fun execute(block: suspend () -> T): T {
return when (state) {
State.OPEN -> {
if (System.currentTimeMillis() - lastFailureTime > resetTimeMs) {
state = State.HALF_OPEN
tryExecute(block)
} else {
throw CircuitBreakerOpenException("Circuit breaker is OPEN")
}
}
State.CLOSED, State.HALF_OPEN -> tryExecute(block)
}
}
private suspend fun tryExecute(block: suspend () -> T): T {
return try {
val result = block()
onSuccess()
result
} catch (e: Exception) {
onFailure()
throw e
}
}
private fun onSuccess() { failures = 0; state = State.CLOSED }
private fun onFailure() {
failures++
lastFailureTime = System.currentTimeMillis()
if (failures >= failureThreshold) state = State.OPEN
}
}
class CircuitBreakerOpenException(message: String) : Exception(message)
// Retry with Exponential Backoff
suspend fun retryWithBackoff(
maxRetries: Int = 3,
initialDelayMs: Long = 100,
maxDelayMs: Long = 5000,
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelayMs
repeat(maxRetries - 1) { attempt ->
try {
return block()
} catch (e: Exception) {
println("Retry /$maxRetries after ms: ")
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelayMs)
}
}
return block() // Last attempt
}
// Main Application
fun main() {
embeddedServer(Netty, port = 8080) {
routing {
get("/health") { call.respondText("OK") }
get("/orders/{id}") {
val orderId = call.parameters["id"] ?: return@get
val result = coroutineScope {
val payment = async { httpClient.get("/validate/$orderId") }
val inventory = async { httpClient.get("/check/$orderId") }
mapOf("payment" to payment.await().status, "inventory" to inventory.await().status)
}
call.respond(result)
}
}
}.start(wait = true)
}
Error Handling ????????? Resilience
?????????????????? errors ?????? distributed system
#!/usr/bin/env python3
# resilience_patterns.py ??? Distributed System Resilience
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("resilience")
class ResiliencePatterns:
def __init__(self):
pass
def patterns(self):
return {
"circuit_breaker": {
"description": "??????????????????????????? service ?????????????????? ????????????????????? cascading failure",
"states": ["CLOSED (????????????)", "OPEN (???????????????????????????)", "HALF_OPEN (???????????????)"],
"kotlin": "CircuitBreaker class with coroutine support",
"library": "resilience4j-kotlin",
},
"retry_with_backoff": {
"description": "Retry ???????????? exponential backoff",
"kotlin": "retryWithBackoff suspend function",
"config": {
"max_retries": 3,
"initial_delay": "100ms",
"max_delay": "5s",
"backoff_factor": 2.0,
},
},
"timeout": {
"description": "??????????????????????????????????????????????????????????????? operation",
"kotlin": "withTimeout(5000) { ... }",
"fallback": "Return cached data or default value",
},
"bulkhead": {
"description": "??????????????? concurrent requests ????????? service",
"kotlin": "Semaphore(permits = 10)",
"benefit": "????????????????????? service ????????? overwhelm",
},
"saga": {
"description": "Distributed transactions with compensating actions",
"kotlin": "OrderSaga with coroutine steps",
"rollback": "Compensate completed steps in reverse order",
},
"fallback": {
"description": "?????????????????? default ??????????????? service ????????????????????????",
"strategies": [
"Return cached response",
"Return default value",
"Call backup service",
"Degrade gracefully (show partial data)",
],
},
}
patterns = ResiliencePatterns()
for name, info in patterns.patterns().items():
print(f"{name}: {info['description']}")
if 'kotlin' in info:
print(f" Kotlin: {info['kotlin']}")
Testing ????????? Monitoring
??????????????? coroutine-based distributed system
// === Testing Coroutines ===
// File: test/OrderServiceTest.kt
import kotlinx.coroutines.test.*
import kotlin.test.*
class OrderServiceTest {
// Test with virtual time (no real delays)
@Test
fun `processOrder should aggregate results from 3 services`() = runTest {
val mockPayment = MockPaymentService(valid = true)
val mockInventory = MockInventoryService(available = true)
val mockShipping = MockShippingService(cost = 50.0)
val service = OrderService(mockPayment, mockInventory, mockShipping)
val result = service.processOrder("ORD-001")
assertEquals("APPROVED", result.status)
assertTrue(result.paymentValid)
assertTrue(result.inStock)
assertEquals(50.0, result.shippingCost)
}
@Test
fun `processOrder should reject when payment fails`() = runTest {
val mockPayment = MockPaymentService(valid = false)
val mockInventory = MockInventoryService(available = true)
val mockShipping = MockShippingService(cost = 50.0)
val service = OrderService(mockPayment, mockInventory, mockShipping)
val result = service.processOrder("ORD-002")
assertEquals("REJECTED", result.status)
assertFalse(result.paymentValid)
}
@Test
fun `circuit breaker should open after failures`() = runTest {
val cb = CircuitBreaker(failureThreshold = 3)
// Fail 3 times
repeat(3) {
assertFails { cb.execute { throw RuntimeException("fail") } }
}
// 4th call should throw CircuitBreakerOpenException
assertFailsWith {
cb.execute { "should not reach" }
}
}
@Test
fun `saga should compensate on failure`() = runTest {
val saga = OrderSaga()
val order = Order("ORD-003", listOf("item1"), 100.0)
val result = saga.executeSaga(order)
assertTrue(result.success)
}
}
// Monitoring metrics
data class CoroutineMetrics(
val activeCoroutines: Int,
val completedCoroutines: Long,
val failedCoroutines: Long,
val avgDurationMs: Double,
val p99DurationMs: Double,
)
// Prometheus metrics example
// counter: orders_processed_total{status="success|failure"}
// histogram: order_processing_duration_seconds
// gauge: active_coroutines
FAQ ??????????????????????????????????????????
Q: Kotlin Coroutines ????????? Java Virtual Threads ???????????????????????????????????????????
A: Kotlin Coroutines ????????? cooperative scheduling (suspend/resume) ???????????? mark functions ???????????? suspend ???????????????????????????????????????????????? Kotlin 1.3 (2018) ?????? structured concurrency, channels, flows ecosystem ???????????? Java Virtual Threads (Project Loom, JDK 21) ????????? preemptive scheduling ?????????????????? OS threads ????????? lightweight ?????????????????????????????????????????? code style ????????? blocking API ????????????????????? ?????????????????????????????????????????? Java developers ??????????????? ????????? Kotlin Coroutines ???????????????????????? Kotlin ???????????????????????? (structured concurrency ???????????????), ????????? Virtual Threads ???????????????????????? Java ??????????????????????????????????????????????????? code style ?????????????????????????????????????????????????????????????????? Kotlin (Dispatchers.Loom)
Q: Structured Concurrency ????????????????????? ?????????????????????????????????????
A: Structured Concurrency ????????????????????? concurrent operations ?????? scope ?????????????????? (parent-child relationship) ????????? parent scope ????????? cancel children ?????????????????????????????? cancel ???????????????????????????, ????????? child ???????????? exception parent ??????????????????????????? cancel siblings ????????????????????????????????? distributed systems ??????????????? ????????????????????? orphaned coroutines (coroutine ?????????????????????????????????????????????), ????????????????????? resource leaks (connections, memory), ????????????????????? cancellation propagation (????????? request ????????? cancel ????????? downstream calls ????????? cancel ????????????), Debugging ???????????????????????? (???????????? hierarchy ??????????????????) ???????????????????????? coroutineScope ?????? processOrder ????????? async ??????????????????????????????????????? fail ??????????????????????????? cancel ??????????????????????????? ???????????????????????????????????????
Q: Ktor ????????? Spring Boot ??????????????????????????????????????????????????? Kotlin?
A: Ktor ???????????? Kotlin-native framework ???????????????????????? JetBrains coroutine-first design, lightweight (~3MB), modular (??????????????? features ??????????????????????????????), startup ???????????? ??????????????? microservices, serverless, API services Spring Boot ???????????? full-featured framework ecosystem ?????????????????????????????? ?????? Spring Cloud ?????????????????? distributed systems, Spring Data, Spring Security libraries ?????????????????? ????????? heavyweight ???????????? startup ????????????????????? ??????????????? Ktor ??????????????? ??????????????? lightweight microservices, ????????????????????? coroutine-native, startup ???????????? (serverless) ??????????????? Spring Boot ??????????????? ????????????????????? ecosystem ????????????, migration ????????? Java Spring, ????????????????????? Spring Cloud features
Q: Channel ????????? Flow ?????? Kotlin Coroutines ???????????????????????????????????????????
A: Channel ???????????? hot stream ???????????????????????????????????????????????? coroutines (producer-consumer) ?????????????????? queue ????????????????????????????????????????????????????????????????????????????????? consumer ????????????????????? (buffered) ??????????????? inter-coroutine communication, competing consumers Flow ???????????? cold stream ??????????????????????????????????????????????????????????????? collector (lazy) ?????????????????? Kotlin sequences ????????? asynchronous ??????????????? data streams, reactive pipelines, API responses ?????????????????? distributed systems ????????? Channel ?????????????????? message passing ????????????????????? components (???????????? Kafka consumer ??? processor), ????????? Flow ?????????????????? data transformation pipelines (???????????? stream processing, API aggregation)
