Technology

Crossplane Composition Pub/Sub Architecture สร้าง Event-Driven Infrastructure บน Kubernetes

crossplane composition pub sub architecture
Crossplane Composition Pub Sub Architecture | SiamCafe Blog
2026-04-28· อ. บอม — SiamCafe.net· 1,159 คำ

Crossplane Composition ?????????????????????

Crossplane ???????????? open source CNCF project ?????????????????????????????? Kubernetes cluster ????????????????????? universal control plane ???????????????????????????????????? cloud infrastructure ????????? provider (AWS, GCP, Azure) ???????????? Kubernetes API ????????? YAML manifests ???????????????????????????????????? Kubernetes resources ????????????

Composition ???????????? feature ????????????????????? Crossplane ??????????????????????????? composite resources ?????????????????????????????? cloud resources ????????????????????????????????? ???????????? abstraction layer ????????? platform team ???????????????????????? application team ????????? ???????????? ??????????????? "Database" composite resource ?????????????????? RDS instance + Security Group + Subnet + Parameter Group ????????????????????????????????? resource ???????????????

Pub/Sub Architecture ???????????? messaging pattern ????????? publisher ????????? message ?????? topic ???????????? subscriber ????????? message ????????? topic ????????? publisher ??????????????????????????????????????? subscriber ????????????????????????????????? event-driven systems, microservices communication, async processing ?????????????????? Crossplane Composition ?????????????????? Pub/Sub infrastructure ??????????????? provision ???????????? messaging ???????????????????????????????????? consistent ????????? environment

????????????????????? Crossplane ????????? Providers

Setup Crossplane ?????????????????? cloud infrastructure management

# === Crossplane Installation ===

# 1. Install Crossplane via Helm
helm repo add crossplane-stable https://charts.crossplane.io/stable
helm repo update

helm install crossplane crossplane-stable/crossplane \
  --namespace crossplane-system \
  --create-namespace \
  --set args='{"--enable-composition-revisions"}'

kubectl get pods -n crossplane-system

# 2. Install Providers
cat > providers.yaml << 'EOF'
# AWS Provider
apiVersion: pkg.crossplane.io/v1
kind: Provider
metadata:
  name: provider-aws
spec:
  package: xpkg.upbound.io/upbound/provider-aws-sns:v1.1.0
---
apiVersion: pkg.crossplane.io/v1
kind: Provider
metadata:
  name: provider-aws-sqs
spec:
  package: xpkg.upbound.io/upbound/provider-aws-sqs:v1.1.0
---
# GCP Provider
apiVersion: pkg.crossplane.io/v1
kind: Provider
metadata:
  name: provider-gcp-pubsub
spec:
  package: xpkg.upbound.io/upbound/provider-gcp-pubsub:v1.1.0
EOF

kubectl apply -f providers.yaml

# 3. Configure Provider Credentials
cat > aws-credentials.yaml << 'EOF'
apiVersion: v1
kind: Secret
metadata:
  name: aws-credentials
  namespace: crossplane-system
type: Opaque
stringData:
  credentials: |
    [default]
    aws_access_key_id = YOUR_ACCESS_KEY
    aws_secret_access_key = YOUR_SECRET_KEY
---
apiVersion: aws.upbound.io/v1beta1
kind: ProviderConfig
metadata:
  name: default
spec:
  credentials:
    source: Secret
    secretRef:
      namespace: crossplane-system
      name: aws-credentials
      key: credentials
EOF

kubectl apply -f aws-credentials.yaml

# 4. Verify
kubectl get providers
kubectl get providerconfigs

echo "Crossplane installed with providers"

??????????????? Pub/Sub Infrastructure ???????????? Composition

Crossplane Composition ?????????????????? Pub/Sub

# === Crossplane Pub/Sub Composition ===

# 1. CompositeResourceDefinition (XRD)
cat > xrd-pubsub.yaml << 'EOF'
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: xpubsubs.messaging.platform.io
spec:
  group: messaging.platform.io
  names:
    kind: XPubSub
    plural: xpubsubs
  claimNames:
    kind: PubSub
    plural: pubsubs
  versions:
    - name: v1alpha1
      served: true
      referenceable: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                provider:
                  type: string
                  enum: ["aws", "gcp"]
                  description: "Cloud provider"
                topicName:
                  type: string
                  description: "Name of the pub/sub topic"
                subscriptions:
                  type: array
                  items:
                    type: object
                    properties:
                      name:
                        type: string
                      filterPolicy:
                        type: string
                      deadLetterEnabled:
                        type: boolean
                        default: true
                      maxRetries:
                        type: integer
                        default: 5
                region:
                  type: string
                  default: "ap-southeast-1"
                environment:
                  type: string
                  enum: ["dev", "staging", "production"]
              required:
                - provider
                - topicName
                - subscriptions
EOF

kubectl apply -f xrd-pubsub.yaml

# 2. Composition (AWS SNS + SQS)
cat > composition-aws.yaml << 'EOF'
apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: pubsub-aws
  labels:
    provider: aws
spec:
  compositeTypeRef:
    apiVersion: messaging.platform.io/v1alpha1
    kind: XPubSub
  resources:
    # SNS Topic
    - name: sns-topic
      base:
        apiVersion: sns.aws.upbound.io/v1beta1
        kind: Topic
        spec:
          forProvider:
            region: ap-southeast-1
            tags:
              managed-by: crossplane
      patches:
        - fromFieldPath: spec.topicName
          toFieldPath: metadata.name
        - fromFieldPath: spec.region
          toFieldPath: spec.forProvider.region
        - fromFieldPath: spec.environment
          toFieldPath: spec.forProvider.tags.environment
    
    # SQS Queue (per subscription)
    - name: sqs-queue
      base:
        apiVersion: sqs.aws.upbound.io/v1beta1
        kind: Queue
        spec:
          forProvider:
            region: ap-southeast-1
            visibilityTimeoutSeconds: 30
            messageRetentionSeconds: 1209600
            tags:
              managed-by: crossplane
      patches:
        - fromFieldPath: spec.subscriptions[0].name
          toFieldPath: metadata.name
        - fromFieldPath: spec.region
          toFieldPath: spec.forProvider.region
    
    # Dead Letter Queue
    - name: dlq
      base:
        apiVersion: sqs.aws.upbound.io/v1beta1
        kind: Queue
        spec:
          forProvider:
            region: ap-southeast-1
            messageRetentionSeconds: 1209600
            tags:
              managed-by: crossplane
              type: dead-letter-queue
      patches:
        - fromFieldPath: spec.subscriptions[0].name
          toFieldPath: metadata.name
          transforms:
            - type: string
              string:
                fmt: "%s-dlq"
EOF

kubectl apply -f composition-aws.yaml

# 3. Claim (Application team uses this)
cat > claim-orders.yaml << 'EOF'
apiVersion: messaging.platform.io/v1alpha1
kind: PubSub
metadata:
  name: order-events
  namespace: ecommerce
spec:
  provider: aws
  topicName: order-events
  region: ap-southeast-1
  environment: production
  subscriptions:
    - name: order-processor
      deadLetterEnabled: true
      maxRetries: 5
    - name: notification-service
      deadLetterEnabled: true
      maxRetries: 3
    - name: analytics-pipeline
      deadLetterEnabled: false
EOF

kubectl apply -f claim-orders.yaml

echo "Pub/Sub Composition deployed"

Event-Driven Architecture Patterns

Patterns ?????????????????? Pub/Sub architecture

#!/usr/bin/env python3
# event_patterns.py ??? Event-Driven Architecture Patterns
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("patterns")

class EventDrivenPatterns:
    def __init__(self):
        pass
    
    def patterns(self):
        return {
            "fan_out": {
                "description": "1 event ??? ???????????? subscribers ???????????????????????????????????????",
                "use_case": "Order placed ??? ????????? notification + update inventory + send email",
                "crossplane": "1 SNS Topic + N SQS Queues",
                "pros": "Loose coupling, parallel processing",
                "cons": "Eventually consistent, ordering ????????? guarantee",
            },
            "event_sourcing": {
                "description": "????????????????????? event ???????????? source of truth",
                "use_case": "Financial transactions, audit trail",
                "crossplane": "SNS Topic + SQS Queue + S3 (event store)",
                "pros": "Complete audit trail, replay events ?????????",
            },
            "saga_pattern": {
                "description": "Distributed transaction ???????????? events",
                "use_case": "Order ??? Payment ??? Shipping (compensating transactions)",
                "crossplane": "Multiple topics + queues per service",
                "pros": "No distributed locks, each service autonomous",
            },
            "dead_letter_queue": {
                "description": "Queue ?????????????????? messages ????????? process ???????????????????????????",
                "use_case": "Retry failed messages, investigate errors",
                "crossplane": "Main queue + DLQ (automatic via composition)",
                "config": {"maxReceiveCount": 5, "dlqArn": "auto-provisioned"},
            },
            "fifo_ordering": {
                "description": "Guarantee message ordering",
                "use_case": "Financial events, state machine transitions",
                "crossplane": "SQS FIFO Queue (.fifo suffix)",
                "limit": "300 msg/sec (3000 with batching)",
            },
        }
    
    def publisher_example(self):
        return """
import boto3
import json

sns = boto3.client('sns', region_name='ap-southeast-1')

def publish_order_event(order_id, event_type, data):
    message = {
        'order_id': order_id,
        'event_type': event_type,
        'data': data,
        'timestamp': '2024-06-15T10:30:00Z',
    }
    
    response = sns.publish(
        TopicArn='arn:aws:sns:ap-southeast-1:123456:order-events',
        Message=json.dumps(message),
        MessageAttributes={
            'event_type': {
                'DataType': 'String',
                'StringValue': event_type,
            }
        }
    )
    return response['MessageId']

# Publish events
publish_order_event('ORD-001', 'order.created', {'amount': 1500})
publish_order_event('ORD-001', 'order.paid', {'payment_method': 'credit_card'})
        """

patterns = EventDrivenPatterns()
for name, info in patterns.patterns().items():
    print(f"{name}: {info['description']}")
    print(f"  Use case: {info['use_case']}")
    print(f"  Crossplane: {info['crossplane']}\n")

CI/CD ????????? GitOps ?????????????????? Crossplane

GitOps workflow ?????????????????? infrastructure management

# === GitOps with Crossplane ===

# 1. ArgoCD Application for Crossplane Resources
cat > argocd-crossplane.yaml << 'EOF'
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: pubsub-infrastructure
  namespace: argocd
spec:
  project: infrastructure
  source:
    repoURL: https://github.com/org/infra-configs.git
    targetRevision: main
    path: crossplane/pubsub
  destination:
    server: https://kubernetes.default.svc
  syncPolicy:
    automated:
      prune: true
      selfHeal: true
    syncOptions:
      - CreateNamespace=true
    retry:
      limit: 5
      backoff:
        duration: 5s
        maxDuration: 3m0s
        factor: 2
EOF

# 2. GitHub Actions for Crossplane Validation
cat > .github/workflows/crossplane-ci.yml << 'EOF'
name: Crossplane CI

on:
  pull_request:
    paths: ['crossplane/**']

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Kubernetes (Kind)
        uses: helm/kind-action@v1
      
      - name: Install Crossplane
        run: |
          helm repo add crossplane-stable https://charts.crossplane.io/stable
          helm install crossplane crossplane-stable/crossplane \
            --namespace crossplane-system --create-namespace
          kubectl wait --for=condition=ready pod -l app=crossplane \
            -n crossplane-system --timeout=120s
      
      - name: Apply XRDs
        run: kubectl apply -f crossplane/xrds/
      
      - name: Apply Compositions
        run: kubectl apply -f crossplane/compositions/
      
      - name: Validate Claims (Dry Run)
        run: |
          for f in crossplane/claims/*.yaml; do
            echo "Validating $f"
            kubectl apply --dry-run=server -f "$f"
          done
      
      - name: Run Tests
        run: |
          pip install pytest pyyaml
          pytest tests/crossplane/ -v
EOF

# 3. Directory Structure
cat > repo-structure.txt << 'EOF'
infra-configs/
  crossplane/
    providers/
      aws-provider.yaml
      gcp-provider.yaml
    xrds/
      xrd-pubsub.yaml
      xrd-database.yaml
    compositions/
      composition-aws-pubsub.yaml
      composition-gcp-pubsub.yaml
    claims/
      dev/
        order-events.yaml
      staging/
        order-events.yaml
      production/
        order-events.yaml
  tests/
    crossplane/
      test_compositions.py
EOF

echo "GitOps pipeline configured"

Monitoring ????????? Troubleshooting

?????????????????? Crossplane resources

#!/usr/bin/env python3
# crossplane_monitor.py ??? Crossplane Monitoring
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class CrossplaneMonitor:
    def __init__(self):
        pass
    
    def health_dashboard(self):
        return {
            "crossplane_status": {
                "version": "1.15.0",
                "providers": {"installed": 3, "healthy": 3},
                "compositions": 5,
                "claims": 12,
                "composite_resources": 12,
                "managed_resources": 45,
            },
            "pubsub_resources": {
                "sns_topics": {"total": 4, "ready": 4},
                "sqs_queues": {"total": 12, "ready": 11, "syncing": 1},
                "dlq_queues": {"total": 8, "ready": 8},
                "subscriptions": {"total": 12, "active": 12},
            },
            "resource_status": [
                {"name": "order-events", "type": "PubSub", "status": "Ready", "age": "5d"},
                {"name": "payment-events", "type": "PubSub", "status": "Ready", "age": "5d"},
                {"name": "notification-events", "type": "PubSub", "status": "Ready", "age": "3d"},
                {"name": "analytics-stream", "type": "PubSub", "status": "Syncing", "age": "1h"},
            ],
        }
    
    def troubleshooting_commands(self):
        return {
            "check_claims": "kubectl get pubsubs -A",
            "check_composite": "kubectl get xpubsubs",
            "check_managed": "kubectl get managed",
            "describe_claim": "kubectl describe pubsub order-events -n ecommerce",
            "check_events": "kubectl get events --field-selector reason=ReconcileError",
            "provider_logs": "kubectl logs -n crossplane-system -l pkg.crossplane.io/revision",
            "crossplane_logs": "kubectl logs -n crossplane-system -l app=crossplane",
        }
    
    def common_issues(self):
        return {
            "resource_not_ready": {
                "symptoms": "Claim shows 'Waiting' status",
                "checks": ["kubectl describe ", "Check provider credentials", "Check quotas"],
                "fixes": ["Verify ProviderConfig", "Check IAM permissions", "Check resource limits"],
            },
            "composition_not_matching": {
                "symptoms": "No composition selected for claim",
                "checks": ["kubectl get compositions", "Check labels match"],
                "fixes": ["Verify compositeTypeRef matches XRD", "Check composition selector labels"],
            },
            "provider_unhealthy": {
                "symptoms": "Provider pod CrashLoopBackOff",
                "checks": ["kubectl get providers", "kubectl logs provider-pod"],
                "fixes": ["Check credentials secret", "Update provider version", "Check RBAC"],
            },
        }

monitor = CrossplaneMonitor()
dash = monitor.health_dashboard()
print("Crossplane Health:")
print(f"  Providers: {dash['crossplane_status']['providers']['healthy']}/{dash['crossplane_status']['providers']['installed']}")
print(f"  Claims: {dash['crossplane_status']['claims']}")
print(f"  Managed Resources: {dash['crossplane_status']['managed_resources']}")

print(f"\nPub/Sub Resources:")
for rtype, status in dash["pubsub_resources"].items():
    print(f"  {rtype}: {status['ready']}/{status['total']} ready")

FAQ ??????????????????????????????????????????

Q: Crossplane ????????? Terraform ???????????????????????????????????????????

A: Terraform ????????? HCL language, state file based, plan-apply workflow ????????????????????????????????? infrastructure provisioning ????????? imperative (??????????????????) ???????????? run terraform apply ???????????????????????? state drift ???????????? detect ????????? Crossplane ????????? Kubernetes YAML, controller-based, reconciliation loop (declarative) ????????????????????????????????? Kubernetes controller ????????????????????? drift ?????????????????????????????????????????????????????? ????????? kubectl ???????????????????????????????????? Kubernetes resources ??????????????? Terraform ??????????????????????????????????????? HCL, ??????????????? Kubernetes, ????????????????????? plan ???????????? apply ??????????????? Crossplane ?????????????????? Kubernetes ????????????????????????, ????????????????????? continuous reconciliation, ??????????????? platform abstractions ??????????????????????????????????????? Terraform ?????????????????? bootstrap, Crossplane ?????????????????? day-2 operations

Q: Composition ????????? Helm Chart ???????????????????????????????????????????

A: Helm Chart ???????????? package manager ?????????????????? Kubernetes resources (Deployments, Services) template ???????????? Go templates install ?????????????????? ??????????????? continuous reconciliation Crossplane Composition ???????????? abstraction ?????????????????? cloud resources (RDS, S3, SNS) ????????? patches ????????? templates ?????? continuous reconciliation (???????????? drift ???????????????????????????) ??????????????? custom API (XRD) ????????? application teams ????????? Helm ?????????????????? deploy applications ?????? Kubernetes, Crossplane ?????????????????? provision cloud infrastructure ?????????????????????????????? Helm deploy app + Crossplane provision infrastructure

Q: Pub/Sub ????????????????????????????????????????????? SNS+SQS ????????????????????????????????????????????? Kafka?

A: SNS+SQS (AWS) ????????????????????????????????? Simple fan-out patterns, Low-medium throughput (< 10K msg/sec), Serverless architectures (Lambda), ?????????????????????????????? message replay, ????????????????????? managed service ?????????????????????????????? ??????????????????????????????????????? usage Kafka ????????????????????????????????? High throughput (millions msg/sec), ????????????????????? message replay (event sourcing), Stream processing (Kafka Streams, ksqlDB), ????????????????????? ordering guarantee, Log aggregation ??????????????????????????????????????? cluster size ?????????????????? Crossplane Composition ?????????????????????????????? SNS+SQS ????????? MSK (Managed Kafka) ??????????????? XRD ????????? abstract ????????????????????? ????????? application team ???????????????

Q: Crossplane Composition ???????????????????????????????

A: Crossplane ?????? learning curve ???????????????????????? setup ???????????? ???????????????????????????????????? Terraform ??????????????? Kubernetes-native ????????? tools ???????????????????????? (kubectl, ArgoCD, monitoring), Self-healing ????????????????????? drift ???????????????????????????, Version control Compositions ???????????? YAML commit ?????? Git, RBAC ????????? Kubernetes RBAC ?????????????????? access ????????????????????????????????? Provider upgrades ??????????????????????????? compatibility, Composition changes ???????????????????????? existing resources, Debugging ?????????????????????????????? Crossplane reconciliation loop ??????????????? ???????????????????????? Composition ??????????????? (1-2 resources) ???????????????????????????????????????????????????????????????????????? ????????? ArgoCD ?????????????????? GitOps workflow

📖 บทความที่เกี่ยวข้อง

Redis Pub Sub Troubleshooting แก้ปัญหาอ่านบทความ → Crossplane Composition GitOps Workflowอ่านบทความ → Crossplane Composition GreenOps Sustainabilityอ่านบทความ → Crossplane Composition Stream Processingอ่านบทความ →

📚 ดูบทความทั้งหมด →