SiamCafe.net Blog
Cybersecurity

SOPS Encryption Message Queue Design — เข้ารหัส Secrets และ Messages ด้วย SOPS

sops encryption message queue design
SOPS Encryption Message Queue Design | SiamCafe Blog
2025-08-08· อ. บอม — SiamCafe.net· 1,661 คำ

SOPS คืออะไรและทำไมต้องใช้เข้ารหัส Secrets

SOPS (Secrets OPerationS) เป็นเครื่องมือ open source จาก Mozilla สำหรับเข้ารหัสไฟล์ configuration ที่มี secrets โดยเข้ารหัสเฉพาะ values ไม่เข้ารหัส keys ทำให้สามารถ review changes ใน Git ได้ง่าย SOPS รองรับ encryption backends หลายตัวเช่น AWS KMS, GCP KMS, Azure Key Vault, HashiCorp Vault และ PGP

ปัญหาที่ SOPS แก้คือ secrets เช่น API keys, database passwords, encryption keys มักถูกเก็บใน plaintext ในไฟล์ config แล้ว commit เข้า Git ซึ่งเป็นความเสี่ยงด้าน security SOPS ทำให้สามารถเก็บ encrypted secrets ใน Git ได้อย่างปลอดภัย decrypt ได้เฉพาะผู้ที่มี access ถูกต้อง

ในบริบทของ Message Queue Design การเข้ารหัส messages ที่ผ่าน queue เป็นสิ่งสำคัญสำหรับ sensitive data เช่น PII, financial transactions, health records SOPS สามารถใช้เข้ารหัส message configurations, connection strings และ message payloads ที่ต้องการ encryption at rest

ข้อดีของ SOPS เมื่อเทียบกับ alternatives คือ เข้ารหัสเฉพาะ values ทำให้ diff/review ง่าย รองรับหลาย file formats (YAML, JSON, ENV, INI) รองรับหลาย KMS providers ทำ key rotation ได้ง่าย และ integrate กับ GitOps workflow ได้ดี

ติดตั้งและตั้งค่า SOPS กับ AWS KMS

ขั้นตอนการติดตั้งและ configuration

# ติดตั้ง SOPS
# macOS
brew install sops

# Linux
wget https://github.com/getsops/sops/releases/download/v3.8.1/sops-v3.8.1.linux.amd64
chmod +x sops-v3.8.1.linux.amd64
sudo mv sops-v3.8.1.linux.amd64 /usr/local/bin/sops

# ตรวจสอบ
sops --version

# สร้าง AWS KMS Key
aws kms create-key \
  --description "SOPS encryption key for message queue configs" \
  --key-usage ENCRYPT_DECRYPT \
  --origin AWS_KMS

# สร้าง alias
aws kms create-alias \
  --alias-name alias/sops-mq-key \
  --target-key-id arn:aws:kms:ap-southeast-1:123456789:key/abc-123

# .sops.yaml — SOPS Configuration
# creation_rules:
#   # Message queue configs
#   - path_regex: mq-config/.*\.ya?ml$
#     kms: arn:aws:kms:ap-southeast-1:123456789:key/abc-123
#     encrypted_regex: ^(password|secret|key|token|connection_string)$
#
#   # Service credentials
#   - path_regex: credentials/.*\.json$
#     kms: arn:aws:kms:ap-southeast-1:123456789:key/abc-123
#
#   # Environment files
#   - path_regex: \.env\.encrypted$
#     kms: arn:aws:kms:ap-southeast-1:123456789:key/abc-123

# สร้างไฟล์ encrypted config
cat > mq-config/rabbitmq.yml << 'EOF'
rabbitmq:
  host: rabbitmq.internal
  port: 5672
  vhost: /production
  username: mq_service
  password: SuperSecretPass123!
  ssl: true
  connection_string: amqps://mq_service:SuperSecretPass123!@rabbitmq.internal:5671/production

exchange:
  name: events
  type: topic
  durable: true

queues:
  orders:
    name: order.processing
    durable: true
    encryption_key: AES256-KEY-FOR-ORDER-MESSAGES
  payments:
    name: payment.processing
    durable: true
    encryption_key: AES256-KEY-FOR-PAYMENT-MESSAGES
EOF

# เข้ารหัสไฟล์
sops --encrypt --in-place mq-config/rabbitmq.yml

# ดูไฟล์ที่เข้ารหัสแล้ว (values ถูกเข้ารหัส, keys ยังอ่านได้)
cat mq-config/rabbitmq.yml

# แก้ไขไฟล์ (decrypt อัตโนมัติ, encrypt กลับเมื่อ save)
sops mq-config/rabbitmq.yml

# Decrypt เพื่อใช้งาน
sops --decrypt mq-config/rabbitmq.yml > /tmp/rabbitmq-decrypted.yml

# Decrypt เฉพาะบาง key
sops --decrypt --extract '["rabbitmq"]["password"]' mq-config/rabbitmq.yml

# Key rotation
sops --rotate --in-place mq-config/rabbitmq.yml

ออกแบบ Message Queue ที่เข้ารหัสด้วย SOPS

สถาปัตยกรรม message queue ที่มี encryption layer

# Message Queue Architecture with SOPS Encryption
#
# === Architecture Overview ===
#
# Producer -> [SOPS Decrypt Config] -> [Encrypt Payload] -> MQ Broker
#                                                              |
# Consumer <- [SOPS Decrypt Config] <- [Decrypt Payload] <----+
#
# === Encryption Layers ===
#
# Layer 1: Transport Encryption (TLS/SSL)
#   - MQ broker ใช้ TLS สำหรับ connections
#   - Certificates จัดการผ่าน SOPS
#
# Layer 2: Configuration Encryption (SOPS)
#   - Connection strings, credentials
#   - Encryption keys สำหรับ payload
#   - API tokens
#
# Layer 3: Message Payload Encryption (AES-256)
#   - Sensitive message payloads
#   - PII data ใน messages
#   - Financial transaction data
#
# === Queue Design ===
#
# Exchange: events (topic)
# ├── order.created     -> Queue: order.processing (encrypted payload)
# ├── order.updated     -> Queue: order.processing (encrypted payload)
# ├── payment.initiated -> Queue: payment.processing (encrypted payload)
# ├── payment.completed -> Queue: payment.notification
# ├── user.registered   -> Queue: user.onboarding (encrypted PII)
# └── audit.log         -> Queue: audit.storage (encrypted)
#
# === Message Format ===
# {
#   "metadata": {
#     "message_id": "uuid",
#     "timestamp": "ISO8601",
#     "type": "order.created",
#     "version": "1.0",
#     "encrypted": true,
#     "encryption_key_id": "key-rotation-2024-01"
#   },
#   "payload": "BASE64_ENCRYPTED_DATA",
#   "signature": "HMAC_SHA256_SIGNATURE"
# }
#
# === Key Management ===
#
# SOPS Config Keys:
#   - AWS KMS: สำหรับ encrypt/decrypt SOPS files
#   - ใช้ IAM roles สำหรับ access control
#
# Message Encryption Keys:
#   - เก็บใน SOPS encrypted config
#   - AES-256-GCM สำหรับ payload encryption
#   - Rotate ทุก 90 วัน
#
# Key Rotation Process:
#   1. Generate new AES key
#   2. Update SOPS config (sops --rotate)
#   3. Deploy new config to services
#   4. Old key kept for decrypting old messages (grace period 30 days)
#   5. Remove old key after grace period

สร้าง Encrypted Message Pipeline ด้วย Python

โค้ดสำหรับ producer และ consumer ที่เข้ารหัส messages

#!/usr/bin/env python3
# encrypted_mq.py — Encrypted Message Queue with SOPS
import json
import base64
import hashlib
import hmac
import uuid
import subprocess
from datetime import datetime
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import pika
import yaml
import os

class SOPSConfigLoader:
    @staticmethod
    def load(config_path):
        result = subprocess.run(
            ["sops", "--decrypt", config_path],
            capture_output=True, text=True
        )
        if result.returncode != 0:
            raise RuntimeError(f"SOPS decrypt failed: {result.stderr}")
        return yaml.safe_load(result.stdout)
    
    @staticmethod
    def get_value(config_path, key_path):
        extract = '["' + '"]["'.join(key_path.split(".")) + '"]'
        result = subprocess.run(
            ["sops", "--decrypt", "--extract", extract, config_path],
            capture_output=True, text=True
        )
        return result.stdout.strip().strip('"')

class MessageEncryptor:
    def __init__(self, encryption_key):
        if isinstance(encryption_key, str):
            encryption_key = encryption_key.encode()
        self.key = encryption_key[:32].ljust(32, b'\0')
        self.aesgcm = AESGCM(self.key)
    
    def encrypt(self, plaintext):
        if isinstance(plaintext, str):
            plaintext = plaintext.encode()
        nonce = os.urandom(12)
        ciphertext = self.aesgcm.encrypt(nonce, plaintext, None)
        return base64.b64encode(nonce + ciphertext).decode()
    
    def decrypt(self, encrypted_data):
        raw = base64.b64decode(encrypted_data)
        nonce = raw[:12]
        ciphertext = raw[12:]
        plaintext = self.aesgcm.decrypt(nonce, ciphertext, None)
        return plaintext.decode()
    
    def sign(self, data):
        return hmac.new(self.key, data.encode(), hashlib.sha256).hexdigest()
    
    def verify(self, data, signature):
        expected = self.sign(data)
        return hmac.compare_digest(expected, signature)

class EncryptedProducer:
    def __init__(self, config_path):
        config = SOPSConfigLoader.load(config_path)
        mq = config["rabbitmq"]
        
        credentials = pika.PlainCredentials(mq["username"], mq["password"])
        params = pika.ConnectionParameters(
            host=mq["host"], port=mq["port"],
            virtual_host=mq["vhost"],
            credentials=credentials,
            ssl_options=pika.SSLOptions() if mq.get("ssl") else None,
        )
        
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        
        self.encryptors = {}
        for queue_name, queue_config in config.get("queues", {}).items():
            key = queue_config.get("encryption_key", "")
            self.encryptors[queue_name] = MessageEncryptor(key)
            self.channel.queue_declare(queue=queue_config["name"], durable=queue_config.get("durable", True))
    
    def publish(self, queue_name, payload, routing_key=None):
        encryptor = self.encryptors.get(queue_name)
        if not encryptor:
            raise ValueError(f"No encryptor for queue: {queue_name}")
        
        payload_json = json.dumps(payload)
        encrypted_payload = encryptor.encrypt(payload_json)
        signature = encryptor.sign(encrypted_payload)
        
        message = {
            "metadata": {
                "message_id": str(uuid.uuid4()),
                "timestamp": datetime.utcnow().isoformat(),
                "type": routing_key or queue_name,
                "encrypted": True,
            },
            "payload": encrypted_payload,
            "signature": signature,
        }
        
        self.channel.basic_publish(
            exchange="events",
            routing_key=routing_key or queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2, content_type="application/json",
            ),
        )
        return message["metadata"]["message_id"]
    
    def close(self):
        self.connection.close()

class EncryptedConsumer:
    def __init__(self, config_path, queue_name):
        config = SOPSConfigLoader.load(config_path)
        mq = config["rabbitmq"]
        queue_config = config["queues"][queue_name]
        
        credentials = pika.PlainCredentials(mq["username"], mq["password"])
        params = pika.ConnectionParameters(
            host=mq["host"], port=mq["port"],
            virtual_host=mq["vhost"], credentials=credentials,
        )
        
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        self.queue = queue_config["name"]
        self.encryptor = MessageEncryptor(queue_config["encryption_key"])
    
    def consume(self, callback):
        def wrapper(ch, method, properties, body):
            message = json.loads(body)
            
            if not self.encryptor.verify(message["payload"], message["signature"]):
                print(f"Signature verification FAILED: {message['metadata']['message_id']}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                return
            
            decrypted = self.encryptor.decrypt(message["payload"])
            payload = json.loads(decrypted)
            
            try:
                callback(payload, message["metadata"])
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                print(f"Processing error: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        self.channel.basic_consume(queue=self.queue, on_message_callback=wrapper)
        self.channel.start_consuming()

# ใช้งาน
# producer = EncryptedProducer("mq-config/rabbitmq.yml")
# msg_id = producer.publish("orders", {"order_id": "ORD-001", "amount": 1500.00})
# print(f"Published: {msg_id}")

# consumer = EncryptedConsumer("mq-config/rabbitmq.yml", "orders")
# consumer.consume(lambda payload, meta: print(f"Received: {payload}"))

Key Rotation และ Secret Management

จัดการ key rotation สำหรับ SOPS และ message encryption

#!/bin/bash
# key_rotation.sh — SOPS Key Rotation Script
set -euo pipefail

CONFIG_DIR=""
BACKUP_DIR="/tmp/sops_backup_$(date +%Y%m%d)"

echo "=== SOPS Key Rotation ==="
echo "Config directory: $CONFIG_DIR"

# Step 1: Backup current configs
mkdir -p "$BACKUP_DIR"
cp -r "$CONFIG_DIR" "$BACKUP_DIR/"
echo "Backup saved to: $BACKUP_DIR"

# Step 2: Rotate SOPS master key
echo "Rotating SOPS keys..."
find "$CONFIG_DIR" -name "*.yml" -o -name "*.yaml" -o -name "*.json" | while read f; do
    echo "  Rotating: $f"
    sops --rotate --in-place "$f"
done

# Step 3: Generate new message encryption keys
echo "Generating new message encryption keys..."
NEW_ORDER_KEY=$(python3 -c "import os, base64; print(base64.b64encode(os.urandom(32)).decode())")
NEW_PAYMENT_KEY=$(python3 -c "import os, base64; print(base64.b64encode(os.urandom(32)).decode())")

# Step 4: Update encryption keys in SOPS config
sops --set '["queues"]["orders"]["encryption_key"] "'"$NEW_ORDER_KEY"'"' \
    "$CONFIG_DIR/rabbitmq.yml"

sops --set '["queues"]["payments"]["encryption_key"] "'"$NEW_PAYMENT_KEY"'"' \
    "$CONFIG_DIR/rabbitmq.yml"

# Step 5: Verify
echo "Verifying decryption..."
sops --decrypt "$CONFIG_DIR/rabbitmq.yml" > /dev/null 2>&1
echo "Verification OK"

# Step 6: Commit changes
echo "Committing rotated configs..."
git add "$CONFIG_DIR"
git commit -m "chore: rotate SOPS keys $(date +%Y-%m-%d)"

echo ""
echo "=== Key Rotation Complete ==="
echo "IMPORTANT: Deploy updated configs to all services"
echo "Old keys will work for existing messages during grace period"

# Cron: Rotate keys monthly
# 0 3 1 * * /opt/scripts/key_rotation.sh mq-config >> /var/log/key_rotation.log 2>&1

Monitoring และ Audit Trail สำหรับ Encrypted Messages

ระบบ monitoring สำหรับ encrypted message pipeline

#!/usr/bin/env python3
# mq_monitor.py — Message Queue Monitoring with Audit Trail
import json
import logging
from datetime import datetime
from collections import defaultdict

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("mq_monitor")

class MQAuditTrail:
    def __init__(self, audit_file="mq_audit.jsonl"):
        self.audit_file = audit_file
        self.stats = defaultdict(int)
    
    def log_event(self, event_type, message_id, queue, details=None):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "event": event_type,
            "message_id": message_id,
            "queue": queue,
            "details": details or {},
        }
        
        with open(self.audit_file, "a") as f:
            f.write(json.dumps(entry) + "\n")
        
        self.stats[event_type] += 1
        self.stats[f"{queue}_{event_type}"] += 1
    
    def log_encryption(self, message_id, queue, key_id):
        self.log_event("encrypted", message_id, queue, {"key_id": key_id})
    
    def log_decryption(self, message_id, queue, key_id):
        self.log_event("decrypted", message_id, queue, {"key_id": key_id})
    
    def log_signature_failure(self, message_id, queue):
        self.log_event("signature_failed", message_id, queue)
        logger.warning(f"Signature verification failed: {message_id} on {queue}")
    
    def log_decryption_failure(self, message_id, queue, error):
        self.log_event("decryption_failed", message_id, queue, {"error": str(error)})
        logger.error(f"Decryption failed: {message_id} on {queue}: {error}")
    
    def log_key_rotation(self, queue, old_key_id, new_key_id):
        self.log_event("key_rotated", "N/A", queue, {
            "old_key_id": old_key_id, "new_key_id": new_key_id
        })
        logger.info(f"Key rotated for {queue}: {old_key_id} -> {new_key_id}")
    
    def get_stats(self):
        return dict(self.stats)
    
    def report(self, hours=24):
        cutoff = datetime.utcnow().timestamp() - (hours * 3600)
        events = defaultdict(int)
        
        try:
            with open(self.audit_file) as f:
                for line in f:
                    entry = json.loads(line)
                    ts = datetime.fromisoformat(entry["timestamp"]).timestamp()
                    if ts >= cutoff:
                        events[entry["event"]] += 1
        except FileNotFoundError:
            pass
        
        print(f"\n=== MQ Audit Report (last {hours}h) ===")
        print(f"  Encrypted: {events.get('encrypted', 0)}")
        print(f"  Decrypted: {events.get('decrypted', 0)}")
        print(f"  Signature failures: {events.get('signature_failed', 0)}")
        print(f"  Decryption failures: {events.get('decryption_failed', 0)}")
        print(f"  Key rotations: {events.get('key_rotated', 0)}")
        
        if events.get("signature_failed", 0) > 0:
            print("\n  WARNING: Signature failures detected!")
        if events.get("decryption_failed", 0) > 0:
            print("  WARNING: Decryption failures detected!")
        
        return events

class MQHealthCheck:
    def __init__(self, config_path):
        self.config_path = config_path
    
    def check_sops_access(self):
        import subprocess
        result = subprocess.run(
            ["sops", "--decrypt", self.config_path],
            capture_output=True, text=True
        )
        return result.returncode == 0
    
    def check_mq_connection(self):
        try:
            import subprocess
            config = subprocess.run(
                ["sops", "--decrypt", self.config_path],
                capture_output=True, text=True
            )
            import yaml
            mq = yaml.safe_load(config.stdout)["rabbitmq"]
            import pika
            conn = pika.BlockingConnection(pika.ConnectionParameters(
                host=mq["host"], port=mq["port"],
                credentials=pika.PlainCredentials(mq["username"], mq["password"]),
            ))
            conn.close()
            return True
        except Exception:
            return False
    
    def run_all_checks(self):
        results = {
            "sops_access": self.check_sops_access(),
            "mq_connection": self.check_mq_connection(),
            "timestamp": datetime.utcnow().isoformat(),
        }
        
        results["healthy"] = all(results[k] for k in ["sops_access", "mq_connection"])
        
        status = "HEALTHY" if results["healthy"] else "UNHEALTHY"
        print(f"Health check: {status}")
        for k, v in results.items():
            if k not in ("timestamp", "healthy"):
                print(f"  {k}: {'OK' if v else 'FAIL'}")
        
        return results

# ใช้งาน
audit = MQAuditTrail()
audit.log_encryption("msg-001", "order.processing", "key-2024-01")
audit.log_decryption("msg-001", "order.processing", "key-2024-01")
audit.report(24)

FAQ คำถามที่พบบ่อย

Q: SOPS กับ HashiCorp Vault ต่างกันอย่างไร?

A: SOPS เป็น file-based encryption ที่เข้ารหัสไฟล์ config แล้วเก็บใน Git เหมาะกับ GitOps workflow ส่วน Vault เป็น secrets management platform ที่เก็บ secrets centrally มี dynamic secrets, lease management, audit logging SOPS เหมาะสำหรับ static configs ที่เปลี่ยนไม่บ่อย Vault เหมาะสำหรับ dynamic secrets ที่ต้องการ fine-grained access control หลายทีมใช้ทั้งสองร่วมกัน

Q: ควรเข้ารหัส message payload ทุก message ไหม?

A: ไม่จำเป็น ขึ้นอยู่กับ data sensitivity ข้อมูล PII, financial data, health records ควรเข้ารหัสเสมอ ข้อมูลทั่วไปเช่น logs, metrics, notifications อาจไม่จำเป็น การเข้ารหัสทุก message เพิ่ม latency ประมาณ 1-5ms ต่อ message และใช้ CPU มากขึ้น ควร classify data แล้วเข้ารหัสเฉพาะที่จำเป็น

Q: Key rotation ทำบ่อยแค่ไหน?

A: SOPS master keys (KMS) ควร rotate ทุก 90-365 วันตาม compliance requirements Message encryption keys ควร rotate ทุก 30-90 วัน ในช่วง rotation ต้องรองรับทั้ง old key และ new key (dual-key period) เพื่อให้ messages ที่ encrypt ด้วย old key ยัง decrypt ได้ grace period แนะนำ 7-30 วัน

Q: SOPS ทำงานกับ Kubernetes Secrets ได้ไหม?

A: ได้ ใช้ร่วมกับ tools เช่น helm-secrets plugin ที่ decrypt SOPS files ตอน helm install หรือใช้ KSOPS (Kustomize plugin) สำหรับ Kustomize workflows หรือใช้ External Secrets Operator ที่อ่าน secrets จาก SOPS encrypted files แล้วสร้าง Kubernetes Secrets อัตโนมัติ

📖 บทความที่เกี่ยวข้อง

SOPS Encryption API Gateway Patternอ่านบทความ → Linux io_uring Message Queue Designอ่านบทความ → SOPS Encryption Multi-cloud Strategyอ่านบทความ → SOPS Encryption Edge Computingอ่านบทความ → SOPS Encryption Micro-segmentationอ่านบทความ →

📚 ดูบทความทั้งหมด →