Passkeys WebAuthn High Availability HA Setup คืออะไร
Passkeys เป็นเทคโนโลยี passwordless authentication ที่ใช้ WebAuthn standard แทนการใช้ password ด้วย biometrics (ลายนิ้วมือ, Face ID) หรือ device PIN ปลอดภัยกว่า password เพราะใช้ public-key cryptography ไม่มี shared secret ที่ถูก phish ได้ High Availability (HA) คือการออกแบบระบบให้มี uptime สูงสุด (99.99%+) สำหรับ authentication system HA สำคัญมากเพราะถ้าระบบ login ล่ม ผู้ใช้ทุกู้คืนจะเข้าระบบไม่ได้ บทความนี้อธิบายการ setup Passkeys/WebAuthn แบบ HA ครบทุกขั้นตอน
WebAuthn & Passkeys Fundamentals
# webauthn_basics.py — WebAuthn fundamentals
import json
class WebAuthnBasics:
CONCEPTS = {
"webauthn": {
"name": "WebAuthn (Web Authentication API)",
"description": "W3C standard สำหรับ passwordless auth — browser API ที่ใช้ public-key cryptography",
"flow": "Browser → Authenticator (biometric/PIN) → Server verifies signature",
},
"passkeys": {
"name": "Passkeys",
"description": "Discoverable credentials ที่ sync ข้าม devices ผ่าน cloud (iCloud Keychain, Google Password Manager)",
"benefit": "ไม่ต้องจำ password, phishing-resistant, sync ข้าม devices",
},
"rp": {
"name": "Relying Party (RP)",
"description": "Server ที่ verify authentication — เก็บ public key + credential ID",
},
"authenticator": {
"name": "Authenticator",
"description": "อุปกรณ์ที่สร้าง keypair — platform (Touch ID, Windows Hello) หรือ roaming (YubiKey)",
},
"ceremony": {
"name": "Ceremonies",
"description": "Registration: สร้าง keypair + register public key, Authentication: sign challenge ด้วย private key",
},
}
REGISTRATION_FLOW = """
Registration Flow:
1. Server สร้าง challenge (random bytes)
2. Browser เรียก navigator.credentials.create()
3. Authenticator สร้าง keypair (private + public)
4. User verify ด้วย biometric/PIN
5. Authenticator ส่ง public key + attestation กลับ
6. Server เก็บ public key + credential ID ใน database
"""
def show_concepts(self):
print("=== WebAuthn Concepts ===\n")
for key, concept in self.CONCEPTS.items():
print(f"[{concept['name']}]")
print(f" {concept['description']}")
print()
basics = WebAuthnBasics()
basics.show_concepts()
HA Architecture
# ha_architecture.py — HA architecture for WebAuthn
import json
class HAArchitecture:
COMPONENTS = {
"load_balancer": {
"name": "Load Balancer (Layer 7)",
"description": "กระจาย traffic ไปหลาย auth servers — health check + failover",
"ha": "Active-Active, multiple AZs, auto-failover",
"tools": "AWS ALB, Cloudflare LB, HAProxy, Nginx",
},
"auth_servers": {
"name": "Authentication Servers (Stateless)",
"description": "WebAuthn RP servers — verify signatures, issue tokens",
"ha": "Horizontal scaling, minimum 3 instances, auto-scaling",
"tools": "Node.js + @simplewebauthn/server, Python + py_webauthn",
},
"credential_store": {
"name": "Credential Store (Database)",
"description": "เก็บ public keys, credential IDs, user mappings",
"ha": "Primary-Replica, auto-failover, cross-region replication",
"tools": "PostgreSQL (Patroni), CockroachDB, DynamoDB Global Tables",
},
"challenge_cache": {
"name": "Challenge Cache",
"description": "เก็บ challenges ชั่วคราว (TTL 5 นาที) สำหรับ verify registration/authentication",
"ha": "Redis Cluster/Sentinel, Memcached, DynamoDB",
"tools": "Redis Cluster (6+ nodes), ElastiCache",
},
"session_store": {
"name": "Session/Token Store",
"description": "เก็บ session หรือ issue JWT tokens หลัง authentication สำเร็จ",
"ha": "Redis Cluster หรือ stateless JWT (ไม่ต้อง store)",
},
}
def show_components(self):
print("=== HA Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print(f" HA: {comp['ha']}")
print()
def sla_targets(self):
print("=== SLA Targets ===")
targets = [
("Availability", "99.99% (< 52 min downtime/year)"),
("Authentication latency (P99)", "< 500ms"),
("Registration latency (P99)", "< 1000ms"),
("Recovery Time Objective (RTO)", "< 30 seconds"),
("Recovery Point Objective (RPO)", "0 (no credential loss)"),
]
for name, target in targets:
print(f" {name:<35} {target}")
ha = HAArchitecture()
ha.show_components()
ha.sla_targets()
Server Implementation
# server_impl.py — WebAuthn server with HA
import json
class WebAuthnServer:
CODE = """
# webauthn_server.py — HA WebAuthn server (Python)
from py_webauthn import (
generate_registration_options,
verify_registration_response,
generate_authentication_options,
verify_authentication_response,
)
from py_webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
ResidentKeyRequirement,
UserVerificationRequirement,
)
import redis
import json
import uuid
from fastapi import FastAPI, HTTPException
app = FastAPI()
# HA: Redis Cluster for challenges
redis_client = redis.RedisCluster(
startup_nodes=[
{"host": "redis-1", "port": 6379},
{"host": "redis-2", "port": 6379},
{"host": "redis-3", "port": 6379},
]
)
RP_ID = "example.com"
RP_NAME = "My App"
ORIGIN = "https://example.com"
@app.post("/api/auth/register/options")
async def register_options(user_id: str, username: str):
options = generate_registration_options(
rp_id=RP_ID,
rp_name=RP_NAME,
user_id=user_id.encode(),
user_name=username,
authenticator_selection=AuthenticatorSelectionCriteria(
resident_key=ResidentKeyRequirement.REQUIRED,
user_verification=UserVerificationRequirement.REQUIRED,
),
)
# Store challenge in Redis (TTL 5 min) — HA via cluster
redis_client.setex(
f"webauthn:challenge:{user_id}",
300,
options.challenge
)
return options
@app.post("/api/auth/register/verify")
async def register_verify(user_id: str, credential: dict):
# Retrieve challenge from Redis
challenge = redis_client.get(f"webauthn:challenge:{user_id}")
if not challenge:
raise HTTPException(400, "Challenge expired or not found")
verification = verify_registration_response(
credential=credential,
expected_challenge=challenge,
expected_rp_id=RP_ID,
expected_origin=ORIGIN,
)
# Store credential in HA database
await db.store_credential(
user_id=user_id,
credential_id=verification.credential_id,
public_key=verification.credential_public_key,
sign_count=verification.sign_count,
)
# Cleanup challenge
redis_client.delete(f"webauthn:challenge:{user_id}")
return {"status": "registered", "credential_id": verification.credential_id.hex()}
@app.post("/api/auth/login/options")
async def login_options(user_id: str = None):
# Get user's credentials from HA database
credentials = await db.get_credentials(user_id) if user_id else []
options = generate_authentication_options(
rp_id=RP_ID,
allow_credentials=credentials,
user_verification=UserVerificationRequirement.REQUIRED,
)
challenge_key = f"webauthn:auth_challenge:{user_id or 'discoverable'}"
redis_client.setex(challenge_key, 300, options.challenge)
return options
"""
def show_code(self):
print("=== WebAuthn Server ===")
print(self.CODE[:600])
server = WebAuthnServer()
server.show_code()
Database HA & Replication
# db_ha.py — Database HA for credential store
import json
import random
class DatabaseHA:
POSTGRES_HA = """
# PostgreSQL HA with Patroni — docker-compose.yml
version: '3.8'
services:
postgres-1:
image: postgres:16
environment:
PATRONI_SCOPE: webauthn-cluster
PATRONI_NAME: pg1
PATRONI_REPLICATION_USERNAME: replicator
PATRONI_REPLICATION_PASSWORD: secret
volumes:
- pg1_data:/var/lib/postgresql/data
postgres-2:
image: postgres:16
environment:
PATRONI_SCOPE: webauthn-cluster
PATRONI_NAME: pg2
postgres-3:
image: postgres:16
environment:
PATRONI_SCOPE: webauthn-cluster
PATRONI_NAME: pg3
etcd:
image: bitnami/etcd:latest
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
"""
SCHEMA = """
-- webauthn_credentials.sql
CREATE TABLE webauthn_credentials (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(255) NOT NULL,
credential_id BYTEA NOT NULL UNIQUE,
public_key BYTEA NOT NULL,
sign_count INTEGER DEFAULT 0,
transports TEXT[],
created_at TIMESTAMPTZ DEFAULT NOW(),
last_used_at TIMESTAMPTZ,
INDEX idx_user_id (user_id),
INDEX idx_credential_id (credential_id)
);
CREATE TABLE webauthn_users (
id VARCHAR(255) PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
display_name VARCHAR(255),
created_at TIMESTAMPTZ DEFAULT NOW()
);
"""
def show_postgres(self):
print("=== PostgreSQL HA ===")
print(self.POSTGRES_HA[:400])
def show_schema(self):
print(f"\n=== Schema ===")
print(self.SCHEMA[:400])
def replication_status(self):
print(f"\n=== Replication Status ===")
nodes = [
{"name": "pg1 (primary)", "lag": "0ms", "status": "running"},
{"name": "pg2 (replica)", "lag": f"{random.uniform(0, 5):.1f}ms", "status": "streaming"},
{"name": "pg3 (replica)", "lag": f"{random.uniform(0, 5):.1f}ms", "status": "streaming"},
]
for n in nodes:
print(f" [{n['status']:>10}] {n['name']:<20} lag: {n['lag']}")
db = DatabaseHA()
db.show_postgres()
db.show_schema()
db.replication_status()
Monitoring & Failover
# monitoring.py — Monitoring and failover
import json
import random
class MonitoringFailover:
HEALTH_CHECKS = {
"auth_server": "HTTP GET /health → check WebAuthn library loaded + DB connected",
"redis_cluster": "Redis PING + cluster info — check all nodes healthy",
"postgres": "SELECT 1 + replication lag check",
"load_balancer": "Health check ทุก 10 วินาที — remove unhealthy instances",
"certificate": "TLS cert expiry check — alert 30 days before",
}
FAILOVER_SCENARIOS = [
{"scenario": "Auth server crash", "rto": "< 10s", "action": "LB removes instance + auto-scale replaces"},
{"scenario": "Redis node failure", "rto": "< 5s", "action": "Redis Cluster auto-failover to replica"},
{"scenario": "PostgreSQL primary down", "rto": "< 30s", "action": "Patroni promotes replica to primary"},
{"scenario": "AZ outage", "rto": "< 60s", "action": "DNS failover to other AZ + traffic reroute"},
{"scenario": "Region outage", "rto": "< 5min", "action": "Global LB routes to DR region"},
]
def show_checks(self):
print("=== Health Checks ===\n")
for check, desc in self.HEALTH_CHECKS.items():
print(f" [{check}] {desc}")
def show_failover(self):
print(f"\n=== Failover Scenarios ===")
for f in self.FAILOVER_SCENARIOS:
print(f" [{f['rto']:>6}] {f['scenario']:<30} → {f['action']}")
def dashboard(self):
print(f"\n=== Auth System Dashboard ===")
print(f" Uptime (30d): {random.uniform(99.98, 100):.4f}%")
print(f" Auth requests/sec: {random.randint(500, 5000):,}")
print(f" Auth latency P50: {random.uniform(50, 150):.0f}ms")
print(f" Auth latency P99: {random.uniform(200, 500):.0f}ms")
print(f" Active credentials: {random.randint(10000, 500000):,}")
print(f" Failed auth (24h): {random.randint(10, 200)}")
print(f" Redis cluster: {random.randint(5, 6)}/6 nodes healthy")
mon = MonitoringFailover()
mon.show_checks()
mon.show_failover()
mon.dashboard()
FAQ - คำถามที่พบบ่อย
Q: Passkeys ปลอดภัยกว่า Password จริงไหม?
A: ปลอดภัยกว่ามาก: Phishing-resistant — private key ไม่เคยออกจาก device, ผูกกับ domain No shared secret — server เก็บแค่ public key (ถูก breach ก็ไม่เสียหาย) No password reuse — ทุก site มี unique keypair Biometric verification — ต้อง fingerprint/face เพื่อ authenticate ข้อเสีย: ต้อง recovery plan ถ้า device หาย (passkey sync ช่วยได้)
Q: HA สำหรับ auth system สำคัญแค่ไหน?
A: สำคัญมาก — auth system ล่ม = ทุก user เข้าระบบไม่ได้ = ธุรกิจหยุด Target: 99.99% uptime = < 52 นาที downtime ต่อปี ต้องมี: Multi-AZ deployment, auto-failover, zero-downtime deployments Challenge store (Redis) ต้อง HA — ถ้า challenge หาย = registration/login fail
Q: Passkeys sync ข้าม devices ได้ยังไง?
A: Apple: iCloud Keychain — sync ทุก Apple devices Google: Google Password Manager — sync ทุก Android/Chrome Microsoft: Windows Hello — sync ผ่าน Microsoft account Cross-platform: ใช้ QR code — scan จาก device ที่มี passkey เพื่อ login บน device อื่น Backup: passkeys ถูก backup ใน cloud ของ platform provider
Q: ต้องรองรับ password ด้วยไหม?
A: แนะนำ: Passkeys first + password fallback ในช่วงเปลี่ยนผ่าน เหตุผล: ไม่ใช่ทุก device รองรับ passkeys, users บางคนยังไม่คุ้นเคย Strategy: 1) เพิ่ม passkey option 2) ส่งเสริมให้ users ลงทะเบียน passkey 3) ค่อยๆ ลด password dependency 4) บังคับ passkey สำหรับ high-security accounts
