Home > Blog > tech

Python Async/Await คืออะไร? สอน Asynchronous Programming สำหรับ Python Developer 2026

python async await guide
Python Async/Await คืออะไร? สอน Asynchronous Programming สำหรับ Python Developer 2026
2026-04-16 | tech | 1112 words


Python Async/Await คืออะไร? คู่มือ Asynchronous Programming สำหรับ Python Developer 2026

ในโลกของการพัฒนาแอปพลิเคชันยุคใหม่ที่ความรวดเร็วและการตอบสนองถือเป็นหัวใจสำคัญ การเขียนโปรแกรมแบบ Asynchronous (อะซิงโครนัส) ได้กลายเป็นทักษะที่ขาดไม่ได้สำหรับ Python Developer การจัดการงานหลายๆ อย่างพร้อมกันโดยไม่ทำให้โปรแกรมค้างหรือตอบสนองช้าเป็นความท้าทายหลัก และนี่คือจุดที่ Python Async/Await เข้ามาเป็นพระเอก ในบทความคู่มือฉบับสมบูรณ์นี้ เราจะเจาะลึกทุกแง่มุมของการเขียนโปรแกรมแบบอะซิงโครนัสใน Python พร้อมตัวอย่างโค้ดที่ใช้งานได้จริงและเทคนิคล่าสุดสำหรับปี 2026

ทำไมต้อง Asynchronous Programming? ปัญหาของ Synchronous แบบดั้งเดิม

ก่อนจะเข้าใจ Async/Await เราต้องเข้าใจปัญหาของโมเดล Synchronous แบบดั้งเดิมก่อน เมื่อคุณเรียกฟังก์ชันที่ต้องรอการทำงานจากภายนอก เช่น การดาวน์โหลดไฟล์จากอินเทอร์เน็ต, การ query ฐานข้อมูล, หรือการเรียก API โปรแกรมของคุณจะต้อง "รอ" (block) จนกระทั่งงานนั้นเสร็จสิ้นก่อนจึงจะทำอย่างอื่นต่อได้ ลองนึกภาพร้านกาแฟที่มีบาริสต้าเพียงคนเดียวที่ต้องชงกาแฟให้ลูกค้าทีละแก้วจนเสร็จก่อนจึงจะรับออเดอร์ต่อไปได้ นี่คือ Synchronous

ปัญหานี้ชัดเจนมากในแอปพลิเคชันเว็บเซิร์ฟเวอร์ ถ้าเซิร์ฟเวอร์ต้องรอการตอบกลับจากฐานข้อมูลสำหรับผู้ใช้คนที่ 1 ผู้ใช้คนที่ 2, 3, และ 4 ก็จะต้องรอคิวไปด้วย ทำให้ประสิทธิภาพโดยรวมต่ำลงอย่างมาก แม้คุณจะใช้เทคนิคเช่น Multi-threading หรือ Multi-processing ก็ตาม แต่การจัดการ thread หรือ process จำนวนมากก็มี overhead สูงและจัดการซับซ้อน

Asynchronous Programming: ทางออกสำหรับ I/O-bound Tasks

Asynchronous Programming เป็นกระบวนทัศน์ที่อนุญาตให้โปรแกรมของคุณ "เริ่ม" งานที่ต้องรอ (I/O-bound tasks) แล้วไปทำอย่างอื่นที่พร้อมทำงานได้ในระหว่างนั้น พอกงานแรกนั้นเสร็จ ค่อยกลับมาทำต่อ เหมือนบาริสต้าที่เริ่มชงกาแฟ แล้วระหว่างที่เครื่องชงกาแฟกำลังทำงาน ก็ไปรับออเดอร์จากลูกค้าคนต่อไปได้ทันที

หัวใจของ Async Programming ใน Python อยู่ที่ Coroutines, Event Loop, และคีย์เวิร์ด async และ await ซึ่งถูกนำเข้ามาอย่างเป็นทางการใน Python 3.5 และพัฒนาอย่างต่อเนื่องจนถึง Python 3.11+ ที่เราจะใช้ในปี 2026 ซึ่งมีประสิทธิภาพและ features ใหม่ๆ เพิ่มขึ้นมาก

แกนหลักสามประการ: Async, Await, และ Event Loop

เริ่มต้นกับ Async/Await: โค้ดตัวอย่างแรกของคุณ

มาดูตัวอย่างพื้นฐานที่สุดเพื่อให้เห็นภาพการทำงานของ async และ await กัน:

import asyncio
import time

# ประกาศ coroutine ด้วย async def
async def say_hello(name, delay):
    """Coroutine ที่รอ delay วินาทีแล้วพิมพ์ข้อความทักทาย"""
    await asyncio.sleep(delay)  # ใช้ await เพื่อรอแบบไม่ block
    print(f"Hello, {name}! (หลังจากรอ {delay} วินาที)")
    return f"Hello {name}"

async def main():
    print(f"เริ่มต้นเวลา: {time.strftime('%X')}")

    # สร้าง task สองงานให้ทำงานพร้อมกัน
    task1 = asyncio.create_task(say_hello("Alice", 2))
    task2 = asyncio.create_task(say_hello("Bob", 1))

    # รอให้ทั้งสองงานเสร็จ
    result1 = await task1
    result2 = await task2

    print(f"ผลลัพธ์จาก task1: {result1}")
    print(f"ผลลัพธ์จาก task2: {result2}")
    print(f"สิ้นสุดเวลา: {time.strftime('%X')}")

# เรียกใช้ event loop และรัน coroutine หลัก
if __name__ == "__main__":
    asyncio.run(main())

เมื่อรันโค้ดนี้ คุณจะเห็นว่าแม้เราจะให้ Alice รอนาน 2 วินาที และ Bob รอ 1 วินาที แต่โปรแกรมไม่ต้องรอให้ Alice เสร็จก่อนค่อยเริ่ม Bob ทั้งสองทำงานพร้อมกัน (concurrently) และ Bob จะพิมพ์ข้อความออกมาก่อนเพราะรอน้อยกว่า เวลาทั้งหมดที่ใช้จะใกล้เคียง 2 วินาที (ไม่ใช่ 3 วินาที) นี่คือพลังของ asynchronous programming!

การจัดการหลายงานพร้อมกัน: asyncio.gather และ asyncio.wait

ในโลกจริง เรามักต้องจัดการงานอะซิงโครนัสหลายสิบหลายร้อยงาน Python มี utility functions ที่ช่วยจัดการได้ง่าย:

import asyncio
import aiohttp  # library สำหรับทำ asynchronous HTTP requests

async def fetch_url(session, url):
    """ดึงข้อมูลจาก URL แบบ asynchronous"""
    async with session.get(url) as response:
        html = await response.text()
        return f"{url}: {len(html)} characters"

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://api.github.com",
    ]

    async with aiohttp.ClientSession() as session:
        # วิธีที่ 1: ใช้ asyncio.gather เพื่อรันหลายงานและรอผลทั้งหมด
        print("--- ใช้ asyncio.gather ---")
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)  # รันทั้งหมดพร้อมกัน
        for result in results:
            print(result)

        # วิธีที่ 2: ใช้ asyncio.wait สำหรับการควบคุมละเอียดยิ่งขึ้น
        print("
--- ใช้ asyncio.wait (return_when=ALL_COMPLETED) ---")
        tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls[:3]]
        done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        for task in done:
            print(task.result())

        # วิธีที่ 3: ใช้ asyncio.as_completed เพื่อประมวลผลผลลัพธ์ทันทีที่เสร็จ
        print("
--- ใช้ asyncio.as_completed (process as they complete) ---")
        tasks = [fetch_url(session, url) for url in urls]
        for coro in asyncio.as_completed(tasks):
            result = await coro
            print(f"ได้รับผล: {result}")

if __name__ == "__main__":
    asyncio.run(main())

การใช้ asyncio.gather เหมาะสำหรับเมื่อคุณต้องการรันงานกลุ่มหนึ่งและรอผลทั้งหมดพร้อมกัน ในขณะที่ asyncio.wait ให้ความยืดหยุ่นมากขึ้น เช่น สามารถกำหนดให้คืนค่ากลับมาเมื่อมีงานเสร็จบางส่วน หรือเมื่อมี timeout เกิดขึ้น ส่วน asyncio.as_completed ดีมากเมื่อคุณต้องการเริ่มประมวลผลผลลัพธ์ทันทีที่งานใดงานหนึ่งเสร็จ ไม่ต้องรอให้ทั้งหมดเสร็จสิ้น

Async Context Managers และ Async Iterators

Python 3.5+ ยังสนับสนุน Asynchronous Context Managers (ใช้กับ async with) และ Asynchronous Iterators (ใช้กับ async for) ซึ่งสำคัญมากเมื่อทำงานกับทรัพยากรแบบอะซิงโครนัส เช่น การเชื่อมต่อฐานข้อมูลหรือเปิดไฟล์แบบไม่ blocking

import asyncio

# ตัวอย่าง Async Context Manager
class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("กำลังเชื่อมต่อฐานข้อมูลแบบ asynchronous...")
        await asyncio.sleep(1)  # simulate async connection
        print("เชื่อมต่อสำเร็จ!")
        return self  # คืนค่าที่จะถูก bind 到ตัวแปรหลัง 'as'

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("กำลังปิดการเชื่อมต่อฐานข้อมูล...")
        await asyncio.sleep(0.5)
        print("ปิดการเชื่อมต่อเรียบร้อย")

    async def fetch_data(self, query):
        await asyncio.sleep(0.7)
        return f"ผลลัพธ์จาก query: '{query}'"

# ตัวอย่าง Async Iterator
class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.3)  # simulate async delay
        self.current += 1
        return self.current - 1

async def main():
    # การใช้ async with สำหรับ context manager
    print("--- การใช้ Async Context Manager (async with) ---")
    async with AsyncDatabaseConnection() as db:
        data = await db.fetch_data("SELECT * FROM users")
        print(data)

    # การใช้ async for สำหรับ asynchronous iteration
    print("
--- การใช้ Async Iterator (async for) ---")
    async for number in AsyncCounter(5):
        print(f"ได้เลข: {number}")

if __name__ == "__main__":
    asyncio.run(main())

การจัดการ Error ใน Async/Await

การจัดการข้อผิดพลาดใน asynchronous code มีความสำคัญและมีรายละเอียดบางส่วนที่แตกต่างจาก synchronous code เนื่องจาก coroutine อาจ fail แยกกันในเวลาแตกต่างกัน

import asyncio

async def risky_task(name, delay, will_fail=False):
    await asyncio.sleep(delay)
    if will_fail:
        raise ValueError(f"Task {name} ล้มเหลวอย่างตั้งใจ!")
    return f"Task {name} สำเร็จ"

async def main():
    # ตัวอย่างการจัดการ error ใน task เดียว
    try:
        result = await risky_task("A", 1, will_fail=True)
    except ValueError as e:
        print(f"จับ error ได้: {e}")

    # การจัดการ error เมื่อใช้ asyncio.gather
    print("
--- การจัดการ error กับ asyncio.gather (return_exceptions=True) ---")
    tasks = [
        risky_task("T1", 1, will_fail=False),
        risky_task("T2", 2, will_fail=True),  # จะ fail
        risky_task("T3", 1, will_fail=False),
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} มีข้อผิดพลาด: {result}")
        else:
            print(f"Task {i} สำเร็จ: {result}")

    # การใช้ shield เพื่อป้องกัน task จากการถูกยกเลิก
    print("
--- การใช้ asyncio.shield ---")
    long_task = asyncio.create_task(risky_task("Shielded", 3))
    shielded = asyncio.shield(long_task)
    # สมมติว่าเราต้องการยกเลิก shielded task หลังจาก 1 วินาที
    await asyncio.sleep(1)
    shielded.cancel()
    try:
        await shielded
    except asyncio.CancelledError:
        print("Shielded task ถูกยกเลิก แต่ long_task ยังทำงานอยู่...")
        # long_task ยังคงทำงานอยู่เพราะถูก shield
        result = await long_task
        print(f"Long task จริงๆ แล้วเสร็จ: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Async/Await กับ Database และ Web Frameworks

ในปี 2026 ไลบรารีสำคัญๆ สำหรับ Python ได้รับการออกแบบมาสนับสนุน asynchronous programming อย่างเต็มที่แล้ว ตัวอย่างเช่น:

ตัวอย่างการเชื่อมต่อกับฐานข้อมูล PostgreSQL แบบ asynchronous โดยใช้ asyncpg และการสร้าง API ง่ายๆ ด้วย FastAPI:

# ตัวอย่าง: การใช้ asyncpg กับ FastAPI
# ต้องติดตั้ง: pip install fastapi uvicorn asyncpg

import asyncpg
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio

app = FastAPI(title="Async API Example")

# สร้าง connection pool กับ PostgreSQL
async def get_db_pool():
    return await asyncpg.create_pool(
        user="your_user",
        password="your_password",
        database="your_db",
        host="localhost",
        min_size=5,    # ขนาด minimum ของ pool
        max_size=20    # ขนาด maximum ของ pool
    )

# Event handler เมื่อแอปเริ่มต้น
@app.on_event("startup")
async def startup():
    app.state.db_pool = await get_db_pool()
    print("Database pool created.")

# Event handler เมื่อแอปปิด
@app.on_event("shutdown")
async def shutdown():
    await app.state.db_pool.close()
    print("Database pool closed.")

# Pydantic model สำหรับ request/response
class UserCreate(BaseModel):
    name: str
    email: str

# Async endpoint
@app.post("/users/", response_model=dict)
async def create_user(user: UserCreate):
    async with app.state.db_pool.acquire() as connection:
        # ใช้ execute แบบ async
        await connection.execute(
            "INSERT INTO users(name, email) VALUES($1, $2)",
            user.name, user.email
        )
        return {"message": f"User {user.name} created successfully"}

@app.get("/users/")
async def get_users():
    async with app.state.db_pool.acquire() as connection:
        # ใช้ fetch แบบ async
        records = await connection.fetch("SELECT id, name, email FROM users ORDER BY id")
        # แปลง records เป็น list of dicts
        users = [dict(record) for record in records]
        return {"users": users}

# ตัวอย่างการเรียกใช้โค้ดนี้ (ในไฟล์แยก):
# uvicorn main:app --reload --host 0.0.0.0 --port 8000

การทดสอบ (Testing) Async Code

การทดสอบ asynchronous code มีความพิเศษเฉพาะตัว Python มีไลบรารีช่วยเช่น pytest-asyncio ที่ทำให้การเขียน test สำหรับ async code เป็นเรื่องง่าย

# test_async_code.py
import pytest
import asyncio
from my_async_module import fetch_data, process_items

# ใช้ pytest.mark.asyncio เพื่อบอก pytest ว่าเป็น async test
@pytest.mark.asyncio
async def test_fetch_data_success():
    """ทดสอบว่า fetch_data ทำงานได้ปกติ"""
    result = await fetch_data("https://api.example.com/data")
    assert result is not None
    assert "data" in result

@pytest.mark.asyncio
async def test_fetch_data_timeout():
    """ทดสอบการ timeout"""


Back to Blog | iCafe Forex | SiamLanCard | Siam2R