Go (Golang) ถูกออกแบบมาสำหรับ Concurrency ตั้งแต่แรก — Goroutines เบากว่า Thread (ใช้ Stack เริ่มต้นแค่ 2KB) และ Channels ทำให้ Goroutines สื่อสารกันได้อย่างปลอดภัย Rob Pike ผู้ร่วมสร้าง Go กล่าวไว้ว่า:
บทความนี้จะสอน Go Concurrency ตั้งแต่พื้นฐาน (Goroutine, Channel, Select) จนถึง Patterns ที่ใช้ใน Production จริง (Fan-out/Fan-in, Worker Pool, Pipeline, Rate Limiter)
Goroutine — Lightweight Thread
Goroutine ไม่ใช่ OS Thread แต่เป็น "Green thread" ที่ Go runtime จัดการเอง:
| OS Thread | Goroutine | |
|---|---|---|
| Stack size | 1-8 MB (fixed) | 2 KB (growable up to 1 GB) |
| Creation cost | ~1ms | ~1μs (เร็วกว่า 1000 เท่า) |
| Context switch | ~1-10μs (kernel) | ~200ns (user-space) |
| จำนวนที่สร้างได้ | หลักพัน | หลักแสน-ล้าน |
| Scheduling | OS scheduler | Go runtime scheduler (M:N) |
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("Hello from %s (iteration %d)\n", name, i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// เริ่ม Goroutine ด้วย keyword "go"
go sayHello("Goroutine-1")
go sayHello("Goroutine-2")
go sayHello("Goroutine-3")
// main() ต้องรออย่าจบก่อน Goroutines
time.Sleep(500 * time.Millisecond)
fmt.Println("Main done")
// ⚠️ ใช้ time.Sleep ไม่ดี ใช้ WaitGroup แทน!
}
sync.WaitGroup — รอ Goroutines จบ
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // บอก WaitGroup ว่างานเสร็จแล้ว
fmt.Printf("Worker %d starting\n", id)
// ทำงาน...
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // เพิ่ม Counter ก่อนเริ่ม Goroutine
go worker(i, &wg)
}
wg.Wait() // รอจนทุก Goroutine เรียก Done()
fmt.Println("All workers completed")
}
Channel — ท่อสื่อสารระหว่าง Goroutines
// Unbuffered Channel — Send blocks จนมีคน Receive
ch := make(chan int)
// Buffered Channel — Send blocks เมื่อ Buffer เต็ม
ch := make(chan int, 10) // buffer size 10
// Directional Channels — จำกัดทิศทาง
func producer(ch chan<- int) { // Send-only channel
ch <- 42
}
func consumer(ch <-chan int) { // Receive-only channel
value := <-ch
fmt.Println(value)
}
// ปิด Channel
close(ch)
// Range over Channel — อ่านจน Channel ปิด
for value := range ch {
fmt.Println(value)
}
ตัวอย่าง: Producer-Consumer
func producer(ch chan<- int, count int) {
for i := 0; i < count; i++ {
ch <- i
fmt.Printf("Produced: %d\n", i)
}
close(ch) // ปิด Channel เมื่อส่งครบ
}
func consumer(ch <-chan int, done chan<- bool) {
for value := range ch {
fmt.Printf("Consumed: %d\n", value)
}
done <- true
}
func main() {
ch := make(chan int, 5)
done := make(chan bool)
go producer(ch, 10)
go consumer(ch, done)
<-done // รอ Consumer ทำเสร็จ
}
Select — Multiplexing Channels
select ทำให้ Goroutine รอหลาย Channels พร้อมกัน ทำ Channel ไหนพร้อมก่อนก็ทำอันนั้น:
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "result from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "result from ch2"
}()
// รอทั้งสอง Channels
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
}
}
}
// Select with Timeout
select {
case result := <-ch:
fmt.Println("Got result:", result)
case <-time.After(3 * time.Second):
fmt.Println("Timeout!")
}
// Select with Default (non-blocking)
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No message available, doing other work...")
}
Pattern: Fan-Out / Fan-In
Fan-Out: กระจายงานไปหลาย Goroutines
Fan-In: รวมผลลัพธ์จากหลาย Channels เข้า Channel เดียว
// Fan-Out: กระจายงาน
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = process(input) // แต่ละ Worker อ่านจาก input เดียวกัน
}
return channels
}
func process(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for n := range input {
output <- n * n // ยกกำลัง 2
}
}()
return output
}
// Fan-In: รวมผลลัพธ์
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
output := func(ch <-chan int) {
defer wg.Done()
for n := range ch {
merged <- n
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
Pattern: Pipeline
Pipeline คือ Chain ของ Stages ที่แต่ละ Stage รับ Input จาก Channel แล้วส่ง Output ไป Channel ถัดไป:
// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
// Pipeline: generate -> square -> filterEven
pipeline := filterEven(square(generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
for result := range pipeline {
fmt.Println(result) // 4, 16, 36, 64, 100
}
}
Pattern: Worker Pool
Worker Pool จำกัดจำนวน Goroutines ที่ทำงานพร้อมกัน ป้องกันการใช้ Resource มากเกินไป:
func workerPool(jobs <-chan int, results chan<- int, workerID int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", workerID, job)
time.Sleep(time.Duration(job) * 100 * time.Millisecond) // Simulate work
results <- job * 2
}
}
func main() {
const numJobs = 20
const numWorkers = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Start workers
for w := 1; w <= numWorkers; w++ {
go workerPool(jobs, results, w)
}
// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// Collect results
for r := 1; r <= numJobs; r++ {
result := <-results
fmt.Printf("Result: %d\n", result)
}
}
Pattern: Semaphore — จำกัด Concurrency
// Semaphore ด้วย Buffered Channel
func main() {
sem := make(chan struct{}, 3) // จำกัด 3 concurrent
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
fmt.Printf("Task %d running\n", id)
time.Sleep(1 * time.Second)
}(i)
}
wg.Wait()
}
Pattern: Rate Limiter
func main() {
// Rate limit: 5 requests per second
limiter := time.NewTicker(200 * time.Millisecond) // 1 ทุก 200ms = 5/sec
defer limiter.Stop()
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-limiter.C // รอ Ticker ก่อนทำงาน
fmt.Printf("Request %d at %s\n", req, time.Now().Format("15:04:05.000"))
}
}
// Burst rate limiter
func burstRateLimiter() {
burstyLimiter := make(chan time.Time, 3) // Burst 3
// เติม Burst tokens
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
// เติมทุก 200ms
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
for i := 0; i < 10; i++ {
<-burstyLimiter // 3 อันแรกจะทำทันที (burst) ที่เหลือ 200ms/อัน
fmt.Printf("Request %d at %s\n", i, time.Now().Format("15:04:05.000"))
}
}
Context — Cancellation & Timeout
func longRunningTask(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Task %d cancelled: %v\n", id, ctx.Err())
return
default:
fmt.Printf("Task %d working...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// Context with Timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go longRunningTask(ctx, 1)
go longRunningTask(ctx, 2)
<-ctx.Done() // รอจน Timeout
time.Sleep(100 * time.Millisecond) // ให้ Goroutines cleanup
fmt.Println("All tasks cancelled")
}
// Context with Cancel (manual)
ctx, cancel := context.WithCancel(context.Background())
// ... เมื่อต้องการยกเลิก:
cancel()
errgroup — Error Handling ใน Concurrent Code
import "golang.org/x/sync/errgroup"
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
}
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // Capture loop variables
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("create request for %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read %s: %w", url, err)
}
results[i] = string(body)
return nil
})
}
// ถ้า Goroutine ไหน Return error → ยกเลิกทั้งหมดผ่าน Context
if err := g.Wait(); err != nil {
log.Fatalf("Failed: %v", err)
}
for i, r := range results {
fmt.Printf("Result %d: %s...\n", i, r[:100])
}
}
sync.Mutex & sync.Once & Atomic
// Mutex — ป้องกัน Race condition
type SafeCounter struct {
mu sync.Mutex
count map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.count[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count[key]
}
// sync.Once — ทำครั้งเดียว (Singleton pattern)
var instance *Database
var once sync.Once
func GetDatabase() *Database {
once.Do(func() {
instance = &Database{}
instance.Connect()
})
return instance
}
// Atomic — สำหรับ Simple counters (เร็วกว่า Mutex)
import "sync/atomic"
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}
func getCount() int64 {
return atomic.LoadInt64(&counter)
}
Race Condition Detection
# Go มี Built-in Race detector!
go run -race main.go
go test -race ./...
go build -race -o myapp
# Output เมื่อพบ Race condition:
# WARNING: DATA RACE
# Goroutine 7 (running) at:
# main.main.func1()
# /path/main.go:15 +0x48
# Previous write at:
# main.main.func2()
# /path/main.go:20 +0x58
# ⚠️ ใช้ -race ตอน Dev/Test เสมอ!
# อย่าใช้ใน Production (ช้าลง 2-10x)
Common Mistakes
1. Goroutine Leak
// ❌ Goroutine leak — Channel ไม่มีคน Receive
func leak() {
ch := make(chan int)
go func() {
result := heavyComputation()
ch <- result // ❌ Block ตลอดกาลถ้าไม่มีคน Receive
}()
// ถ้า Return ก่อน Receive → Goroutine leak!
}
// ✅ แก้ด้วย Context cancellation
func noLeak(ctx context.Context) {
ch := make(chan int, 1) // Buffered channel
go func() {
result := heavyComputation()
select {
case ch <- result:
case <-ctx.Done(): // ถ้า Context ถูก Cancel → Goroutine จบ
}
}()
}
2. Channel Deadlock
// ❌ Deadlock — Unbuffered channel, send & receive ใน Goroutine เดียว
func deadlock() {
ch := make(chan int)
ch <- 42 // ❌ Block ตลอดกาล เพราะไม่มีคน Receive
value := <-ch
}
// ✅ แก้: ส่งและรับคนละ Goroutine
func noDeadlock() {
ch := make(chan int)
go func() { ch <- 42 }()
value := <-ch
fmt.Println(value)
}
3. Shared Memory Without Synchronization
// ❌ Race condition
var counter int
for i := 0; i < 1000; i++ {
go func() { counter++ }() // ❌ Concurrent write without lock
}
// ✅ ใช้ Channel แทน Shared memory
func safeCounter() int {
ch := make(chan int, 1000)
for i := 0; i < 1000; i++ {
go func() { ch <- 1 }()
}
total := 0
for i := 0; i < 1000; i++ {
total += <-ch
}
return total // 1000 เสมอ
}
Real-world Example: Concurrent Web Scraper
func scrape(ctx context.Context, urls []string, maxConcurrent int) map[string]string {
results := make(map[string]string)
var mu sync.Mutex
sem := make(chan struct{}, maxConcurrent)
g, ctx := errgroup.WithContext(ctx)
for _, url := range urls {
url := url
g.Go(func() error {
sem <- struct{}{}
defer func() { <-sem }()
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
mu.Lock()
results[url] = string(body[:200])
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
log.Printf("Error: %v", err)
}
return results
}
สรุป — Go Concurrency Best Practices
- Prefer Channels over shared memory + Mutex — "Share memory by communicating"
- ใช้ Context สำหรับ Cancellation — ทุก Long-running operation ต้องรับ Context
- ใช้ errgroup สำหรับ Error handling — ดีกว่า WaitGroup เมื่อต้องจัดการ Error
- ปิด Channel ฝั่ง Sender เท่านั้น — Receiver ไม่ควรปิด Channel
- ใช้ -race flag เสมอตอน Test — จับ Race condition ก่อน Production
- ระวัง Goroutine leak — ทุก Goroutine ต้องมีทางจบ (Context, close channel)
- ใช้ Buffered channel เมื่อรู้ขนาด — ลด Blocking ที่ไม่จำเป็น
- Goroutines are cheap ≠ Goroutines are free — จำกัดจำนวนด้วย Worker pool / Semaphore
Go Concurrency ไม่ยากอย่างที่คิด แต่ต้องเข้าใจ Pattern ที่ถูกต้อง เริ่มจาก Goroutine + Channel + Select แล้วค่อยไปถึง Pattern ที่ซับซ้อนขึ้น สิ่งสำคัญคือ "คิดเป็น Pipeline" — แต่ละ Stage ทำงานเดียว รับ Input จาก Channel ส่ง Output ไป Channel ถัดไป แค่นี้ก็ครอบคลุม 90% ของ Concurrency use cases แล้ว
