Technology

CrewAI Multi-Agent Identity Access Management — ระบบ IAM อัตโนมัติด้วย AI Agents

crewai multi agent identity access management
CrewAI Multi-Agent Identity Access Management | SiamCafe Blog
2026-01-05· อ. บอม — SiamCafe.net· 1,919 คำ

CrewAI คืออะไร

CrewAI เป็น open source framework สำหรับสร้าง multi-agent AI systems ที่ agents หลายตัวทำงานร่วมกันเป็น crew เพื่อทำ complex tasks ได้อัตโนมัติ แต่ละ agent มี role, goal และ backstory เฉพาะ ทำงานผ่าน tools ที่กำหนดและ communicate กันเพื่อบรรลุเป้าหมายรวม

Identity and Access Management (IAM) เป็น security framework ที่จัดการ digital identities และ control access ไปยัง resources ขององค์กร ครอบคลุม user provisioning, authentication, authorization, access review และ compliance

การรวม CrewAI กับ IAM ช่วยให้ automate งาน IAM ที่ซับซ้อนได้ เช่น automated access review ตรวจสอบ permissions ที่ไม่จำเป็นอัตโนมัติ, threat detection ตรวจจับ suspicious access patterns, compliance auditing ตรวจสอบว่า access policies ถูกต้องตาม regulations, user lifecycle management จัดการ onboarding/offboarding อัตโนมัติ

ติดตั้งและเริ่มต้นใช้งาน CrewAI

วิธีติดตั้ง CrewAI

# === ติดตั้ง CrewAI ===

# 1. Install CrewAI
pip install crewai crewai-tools langchain-openai

# 2. Set API Key
export OPENAI_API_KEY="sk-your-api-key-here"

# 3. Basic CrewAI Example
python3 << 'PYEOF'
from crewai import Agent, Task, Crew, Process

# Define Agents
security_analyst = Agent(
    role="Security Analyst",
    goal="Analyze access patterns and identify security risks",
    backstory="""You are an experienced security analyst specializing 
    in identity and access management. You review user permissions 
    and detect anomalies in access patterns.""",
    verbose=True,
    allow_delegation=True,
)

compliance_officer = Agent(
    role="Compliance Officer",
    goal="Ensure IAM policies comply with regulations",
    backstory="""You are a compliance expert who ensures that 
    access management follows PDPA, ISO 27001, and SOC 2 
    requirements. You review policies and report gaps.""",
    verbose=True,
)

# Define Tasks
review_task = Task(
    description="""Review the current access permissions for 
    the finance department. Check for:
    1. Excessive permissions (least privilege violations)
    2. Orphaned accounts (ex-employees)
    3. Shared accounts
    4. Accounts without MFA""",
    expected_output="Detailed access review report with findings",
    agent=security_analyst,
)

compliance_task = Task(
    description="""Based on the security analyst's findings, 
    evaluate compliance with PDPA and ISO 27001. 
    Identify gaps and recommend remediation actions.""",
    expected_output="Compliance gap analysis with remediation plan",
    agent=compliance_officer,
)

# Create Crew
iam_crew = Crew(
    agents=[security_analyst, compliance_officer],
    tasks=[review_task, compliance_task],
    process=Process.sequential,
    verbose=True,
)

# result = iam_crew.kickoff()
# print(result)
print("CrewAI IAM crew defined successfully")
PYEOF

# 4. CrewAI with Tools
pip install crewai-tools

python3 << 'PYEOF'
from crewai_tools import FileReadTool, SerperDevTool

# Custom tools for IAM
# file_tool = FileReadTool(file_path="access_logs.csv")
# search_tool = SerperDevTool()

print("CrewAI tools configured")
PYEOF

echo "CrewAI installed"

สร้าง Multi-Agent System สำหรับ IAM

ระบบ multi-agent สำหรับ identity management

#!/usr/bin/env python3
# iam_crew.py — Multi-Agent IAM System
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("iam_crew")

@dataclass
class User:
    id: str
    name: str
    department: str
    role: str
    permissions: List[str]
    mfa_enabled: bool
    last_login: str
    created_at: str
    status: str = "active"

@dataclass
class AccessReviewFinding:
    user_id: str
    finding_type: str
    severity: str
    description: str
    recommendation: str

class IAMDataSource:
    def __init__(self):
        self.users = [
            User("u001", "สมชาย", "finance", "accountant",
                 ["read_ledger", "write_ledger", "admin_panel", "delete_records"],
                 True, "2025-01-14", "2023-01-15"),
            User("u002", "สมหญิง", "finance", "manager",
                 ["read_ledger", "write_ledger", "approve_payments"],
                 True, "2025-01-15", "2022-06-01"),
            User("u003", "วิชัย", "finance", "intern",
                 ["read_ledger", "write_ledger", "admin_panel"],
                 False, "2024-12-01", "2024-11-01"),
            User("u004", "อรุณ", "it", "developer",
                 ["read_code", "write_code", "deploy_prod", "admin_panel"],
                 True, "2025-01-15", "2023-03-01"),
            User("u005", "พิมพ์", "hr", "recruiter",
                 ["read_hr_data", "write_hr_data", "finance_reports"],
                 False, "2024-10-15", "2023-09-01", "inactive"),
        ]
    
    def get_users_by_department(self, dept):
        return [u for u in self.users if u.department == dept]
    
    def get_all_users(self):
        return self.users

class AccessReviewAgent:
    """Agent that reviews access permissions"""
    
    def __init__(self, data_source: IAMDataSource):
        self.ds = data_source
        self.findings: List[AccessReviewFinding] = []
    
    def review_least_privilege(self):
        role_permissions = {
            "accountant": ["read_ledger", "write_ledger"],
            "manager": ["read_ledger", "write_ledger", "approve_payments"],
            "intern": ["read_ledger"],
            "developer": ["read_code", "write_code"],
            "recruiter": ["read_hr_data", "write_hr_data"],
        }
        
        for user in self.ds.get_all_users():
            expected = set(role_permissions.get(user.role, []))
            actual = set(user.permissions)
            excessive = actual - expected
            
            if excessive:
                self.findings.append(AccessReviewFinding(
                    user.id, "excessive_permissions", "high",
                    f"{user.name} ({user.role}) has excessive permissions: {excessive}",
                    f"Remove permissions: {', '.join(excessive)}"
                ))
    
    def review_inactive_accounts(self, inactive_days=60):
        cutoff = datetime.utcnow() - timedelta(days=inactive_days)
        
        for user in self.ds.get_all_users():
            last_login = datetime.fromisoformat(user.last_login)
            if last_login < cutoff and user.status == "active":
                self.findings.append(AccessReviewFinding(
                    user.id, "inactive_account", "medium",
                    f"{user.name} last login {user.last_login} ({(datetime.utcnow()-last_login).days} days ago)",
                    "Disable account or verify with manager"
                ))
    
    def review_mfa_compliance(self):
        for user in self.ds.get_all_users():
            if not user.mfa_enabled and user.status == "active":
                self.findings.append(AccessReviewFinding(
                    user.id, "mfa_not_enabled", "high",
                    f"{user.name} does not have MFA enabled",
                    "Enable MFA immediately"
                ))
    
    def run_full_review(self):
        self.review_least_privilege()
        self.review_inactive_accounts()
        self.review_mfa_compliance()
        
        return {
            "review_date": datetime.utcnow().isoformat(),
            "total_users": len(self.ds.get_all_users()),
            "total_findings": len(self.findings),
            "by_severity": {
                "high": sum(1 for f in self.findings if f.severity == "high"),
                "medium": sum(1 for f in self.findings if f.severity == "medium"),
                "low": sum(1 for f in self.findings if f.severity == "low"),
            },
            "findings": [
                {"user": f.user_id, "type": f.finding_type,
                 "severity": f.severity, "description": f.description,
                 "recommendation": f.recommendation}
                for f in self.findings
            ],
        }

class ComplianceAgent:
    """Agent that checks compliance"""
    
    def check_compliance(self, review_results):
        checks = {
            "pdpa": {
                "data_access_logging": True,
                "consent_management": True,
                "data_minimization": review_results["by_severity"]["high"] == 0,
                "access_review_done": True,
            },
            "iso27001": {
                "mfa_enforced": not any(
                    f["type"] == "mfa_not_enabled" for f in review_results["findings"]
                ),
                "least_privilege": not any(
                    f["type"] == "excessive_permissions" for f in review_results["findings"]
                ),
                "inactive_accounts_reviewed": True,
                "access_logs_retained": True,
            },
        }
        
        results = {}
        for framework, controls in checks.items():
            passed = sum(1 for v in controls.values() if v)
            total = len(controls)
            results[framework] = {
                "passed": passed, "total": total,
                "compliance_pct": round(passed / total * 100, 1),
                "controls": controls,
            }
        
        return results

ds = IAMDataSource()
reviewer = AccessReviewAgent(ds)
review = reviewer.run_full_review()
print("Access Review:", json.dumps(review, indent=2, ensure_ascii=False))

compliance = ComplianceAgent()
comp_result = compliance.check_compliance(review)
print("\nCompliance:", json.dumps(comp_result, indent=2))

Automated Access Review ด้วย Agents

Agents สำหรับ automated access review

#!/usr/bin/env python3
# auto_access_review.py — Automated Access Review Pipeline
import json
import logging
from datetime import datetime
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("access_review")

class ProvisioningAgent:
    """Agent ที่จัดการ user provisioning/deprovisioning"""
    
    def __init__(self):
        self.actions_taken = []
    
    def onboard_user(self, user_info):
        role = user_info["role"]
        department = user_info["department"]
        
        # Role-based access assignment
        role_map = {
            "developer": {
                "groups": ["engineering", "github-devs"],
                "apps": ["jira", "github", "slack", "aws-dev"],
                "permissions": ["read_code", "write_code", "ci_cd"],
            },
            "analyst": {
                "groups": ["analytics", "data-readers"],
                "apps": ["jira", "slack", "bigquery", "looker"],
                "permissions": ["read_data", "create_reports"],
            },
            "manager": {
                "groups": [f"{department}-managers", department],
                "apps": ["jira", "slack", "hr-system", "budget-tool"],
                "permissions": ["approve_requests", "view_reports"],
            },
        }
        
        access = role_map.get(role, {"groups": [department], "apps": ["slack"], "permissions": []})
        
        action = {
            "type": "onboard",
            "user": user_info["name"],
            "role": role,
            "assigned_groups": access["groups"],
            "assigned_apps": access["apps"],
            "assigned_permissions": access["permissions"],
            "mfa_enforced": True,
            "review_date": "90 days from now",
            "timestamp": datetime.utcnow().isoformat(),
        }
        
        self.actions_taken.append(action)
        logger.info(f"Onboarded: {user_info['name']} as {role}")
        return action
    
    def offboard_user(self, user_id, reason="termination"):
        action = {
            "type": "offboard",
            "user_id": user_id,
            "reason": reason,
            "steps": [
                "Disable SSO account",
                "Revoke all OAuth tokens",
                "Remove from all groups",
                "Disable email forwarding",
                "Transfer file ownership to manager",
                "Archive user data (retain 90 days)",
                "Log offboarding for audit",
            ],
            "timestamp": datetime.utcnow().isoformat(),
        }
        
        self.actions_taken.append(action)
        logger.info(f"Offboarded: {user_id}")
        return action
    
    def periodic_review(self, users):
        """Quarterly access review"""
        review_results = []
        
        for user in users:
            review = {
                "user": user.get("name"),
                "current_role": user.get("role"),
                "permissions_count": len(user.get("permissions", [])),
                "last_reviewed": user.get("last_reviewed", "never"),
                "action_needed": False,
                "recommendations": [],
            }
            
            # Check if permissions match role
            if user.get("excessive_permissions"):
                review["action_needed"] = True
                review["recommendations"].append("Remove excessive permissions")
            
            # Check last login
            if user.get("days_since_login", 0) > 90:
                review["action_needed"] = True
                review["recommendations"].append("Account appears inactive")
            
            review_results.append(review)
        
        return {
            "review_type": "quarterly",
            "review_date": datetime.utcnow().isoformat(),
            "users_reviewed": len(review_results),
            "actions_needed": sum(1 for r in review_results if r["action_needed"]),
            "results": review_results,
        }

class RemediationAgent:
    """Agent ที่ทำ remediation อัตโนมัติ"""
    
    def __init__(self):
        self.remediations = []
    
    def auto_remediate(self, findings):
        for finding in findings:
            severity = finding.get("severity", "low")
            finding_type = finding.get("type", "")
            
            remediation = {
                "finding": finding,
                "auto_remediated": False,
                "action": "none",
            }
            
            if finding_type == "mfa_not_enabled":
                remediation["action"] = "enforce_mfa"
                remediation["auto_remediated"] = True
                remediation["details"] = "MFA enforcement email sent, 48h deadline"
            
            elif finding_type == "inactive_account" and severity != "high":
                remediation["action"] = "send_reactivation_notice"
                remediation["auto_remediated"] = True
                remediation["details"] = "Notification sent to user and manager"
            
            elif finding_type == "excessive_permissions":
                remediation["action"] = "create_ticket"
                remediation["auto_remediated"] = False
                remediation["details"] = "Ticket created for manual review"
            
            self.remediations.append(remediation)
        
        return {
            "total_findings": len(findings),
            "auto_remediated": sum(1 for r in self.remediations if r["auto_remediated"]),
            "manual_review": sum(1 for r in self.remediations if not r["auto_remediated"]),
            "actions": [{"action": r["action"], "details": r.get("details", "")} 
                       for r in self.remediations],
        }

# Run agents
provisioner = ProvisioningAgent()
provisioner.onboard_user({"name": "นพดล", "role": "developer", "department": "engineering"})
provisioner.offboard_user("u005", "resignation")

remediator = RemediationAgent()
findings = [
    {"type": "mfa_not_enabled", "severity": "high", "user": "u003"},
    {"type": "inactive_account", "severity": "medium", "user": "u005"},
    {"type": "excessive_permissions", "severity": "high", "user": "u001"},
]
result = remediator.auto_remediate(findings)
print(json.dumps(result, indent=2))

Threat Detection และ Response Agents

Agents สำหรับตรวจจับภัยคุกคาม

#!/usr/bin/env python3
# threat_detection.py — IAM Threat Detection Agents
import json
import logging
import random
from datetime import datetime, timedelta
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("threat")

class ThreatDetectionAgent:
    def __init__(self):
        self.alerts = []
    
    def detect_brute_force(self, login_events):
        ip_failures = {}
        
        for event in login_events:
            if event["status"] == "failed":
                ip = event["source_ip"]
                ip_failures[ip] = ip_failures.get(ip, 0) + 1
        
        for ip, count in ip_failures.items():
            if count >= 5:
                self.alerts.append({
                    "type": "brute_force",
                    "severity": "high",
                    "source_ip": ip,
                    "failed_attempts": count,
                    "action": "block_ip",
                    "timestamp": datetime.utcnow().isoformat(),
                })
    
    def detect_impossible_travel(self, login_events):
        user_logins = {}
        
        for event in login_events:
            if event["status"] == "success":
                user = event["user_id"]
                if user not in user_logins:
                    user_logins[user] = []
                user_logins[user].append(event)
        
        for user, logins in user_logins.items():
            for i in range(1, len(logins)):
                prev = logins[i-1]
                curr = logins[i]
                
                if prev["country"] != curr["country"]:
                    time_diff_hours = 1  # simplified
                    if time_diff_hours < 4:
                        self.alerts.append({
                            "type": "impossible_travel",
                            "severity": "critical",
                            "user_id": user,
                            "from_country": prev["country"],
                            "to_country": curr["country"],
                            "time_diff_hours": time_diff_hours,
                            "action": "lock_account_and_notify",
                        })
    
    def detect_privilege_escalation(self, audit_events):
        for event in audit_events:
            if event.get("action") == "grant_permission":
                if event.get("target_permission") in ["admin", "root", "superuser"]:
                    if event.get("approved_by") is None:
                        self.alerts.append({
                            "type": "privilege_escalation",
                            "severity": "critical",
                            "user_id": event["user_id"],
                            "permission": event["target_permission"],
                            "action": "revoke_and_investigate",
                        })
    
    def detect_unusual_access(self, access_events):
        user_patterns = {}
        
        for event in access_events:
            user = event["user_id"]
            if user not in user_patterns:
                user_patterns[user] = {"resources": set(), "hours": set()}
            user_patterns[user]["resources"].add(event.get("resource", ""))
            user_patterns[user]["hours"].add(event.get("hour", 0))
        
        for user, pattern in user_patterns.items():
            if len(pattern["resources"]) > 20:
                self.alerts.append({
                    "type": "unusual_resource_access",
                    "severity": "medium",
                    "user_id": user,
                    "resources_accessed": len(pattern["resources"]),
                    "action": "flag_for_review",
                })
    
    def get_alert_summary(self):
        return {
            "total_alerts": len(self.alerts),
            "by_severity": {
                "critical": sum(1 for a in self.alerts if a["severity"] == "critical"),
                "high": sum(1 for a in self.alerts if a["severity"] == "high"),
                "medium": sum(1 for a in self.alerts if a["severity"] == "medium"),
            },
            "by_type": {},
            "alerts": self.alerts,
        }

# Simulate events
login_events = [
    {"user_id": "u001", "status": "success", "source_ip": "10.0.0.1", "country": "TH"},
    {"user_id": "u001", "status": "success", "source_ip": "203.0.113.5", "country": "US"},
    {"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
    {"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
    {"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
    {"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
    {"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
]

audit_events = [
    {"user_id": "u003", "action": "grant_permission", "target_permission": "admin", "approved_by": None},
]

detector = ThreatDetectionAgent()
detector.detect_brute_force(login_events)
detector.detect_impossible_travel(login_events)
detector.detect_privilege_escalation(audit_events)

print(json.dumps(detector.get_alert_summary(), indent=2))

Production Deployment และ Monitoring

Deploy และ monitor IAM agents

# === IAM Agent Production Deployment ===

# 1. Docker Deployment
# ===================================
cat > Dockerfile << 'EOF'
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY configs/ ./configs/

ENV PYTHONPATH=/app
CMD ["python", "src/main.py"]
EOF

cat > requirements.txt << 'EOF'
crewai>=0.30.0
crewai-tools>=0.4.0
langchain-openai>=0.1.0
fastapi>=0.110.0
uvicorn>=0.27.0
prometheus-client>=0.20.0
EOF

cat > docker-compose.yml << 'EOF'
version: '3'
services:
  iam-agents:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=
      - LOG_LEVEL=INFO
      - REVIEW_SCHEDULE=0 0 * * 1
    volumes:
      - ./configs:/app/configs
  
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
EOF

# 2. Kubernetes Deployment
# ===================================
cat > k8s-deployment.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
  name: iam-agents
spec:
  replicas: 1
  selector:
    matchLabels:
      app: iam-agents
  template:
    metadata:
      labels:
        app: iam-agents
    spec:
      containers:
        - name: iam-agents
          image: iam-agents:latest
          ports:
            - containerPort: 8000
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: openai-secret
                  key: api-key
          resources:
            limits:
              memory: 512Mi
              cpu: 500m
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: weekly-access-review
spec:
  schedule: "0 9 * * 1"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: access-review
              image: iam-agents:latest
              command: ["python", "src/run_review.py"]
          restartPolicy: OnFailure
EOF

# 3. Monitoring Metrics
# ===================================
# iam_access_review_total{status="completed"}
# iam_findings_total{severity="high",type="excessive_permissions"}
# iam_remediation_total{action="auto",status="success"}
# iam_threat_alerts_total{type="brute_force",severity="critical"}
# iam_agent_execution_seconds{agent="review",task="least_privilege"}
# iam_users_reviewed_total
# iam_compliance_score{framework="pdpa"}

# 4. Alert Rules
# ===================================
# - alert: CriticalThreatDetected
#   expr: iam_threat_alerts_total{severity="critical"} > 0
#   for: 0m
#   labels: {severity: critical}
#
# - alert: ComplianceScoreLow  
#   expr: iam_compliance_score < 80
#   for: 1h
#   labels: {severity: warning}

echo "Production deployment configured"

FAQ คำถามที่พบบ่อย

Q: CrewAI กับ LangChain Agents ต่างกันอย่างไร?

A: LangChain Agents เป็น single agent ที่ใช้ tools ทำ tasks CrewAI เป็น multi-agent framework ที่ agents หลายตัวทำงานร่วมกัน มี role-based design แต่ละ agent มี specialization ของตัวเอง ข้อดีของ CrewAI agents สามารถ delegate งานให้กันได้, process แบบ sequential หรือ hierarchical, แต่ละ agent มี memory และ context ของตัวเอง เหมาะกับ complex workflows ที่ต้องการ multiple perspectives เช่น IAM ที่ต้องการทั้ง security analysis, compliance check และ remediation

Q: AI Agents สำหรับ IAM ปลอดภัยไหม?

A: ต้อง implement safeguards ที่เหมาะสม AI agents ไม่ควรมี direct access ในการเปลี่ยน permissions โดยไม่ผ่าน approval workflow ใช้ human-in-the-loop สำหรับ actions ที่ critical (เช่น revoke admin access), auto-remediation เฉพาะ low-risk actions (เช่น enforce MFA), audit log ทุก agent action, rate limiting ป้องกัน agent ทำ actions มากเกินไป, rollback mechanism สำหรับ undo actions ที่ผิดพลาด

Q: ต้องใช้ LLM อะไรกับ CrewAI?

A: CrewAI รองรับ LLMs หลายตัว GPT-4 เหมาะที่สุดสำหรับ complex reasoning tasks (IAM analysis, compliance), GPT-3.5 Turbo เพียงพอสำหรับ simple tasks (log parsing, report generation), Claude 3 ดีสำหรับ long context analysis, local LLMs (Ollama + Llama 3) สำหรับ data privacy concerns สำหรับ IAM ที่ต้อง analyze sensitive data แนะนำใช้ local LLMs หรือ enterprise API ที่มี data processing agreement

Q: ROI ของ AI-powered IAM เป็นอย่างไร?

A: ลด manual access review time 70-80% (จาก 2-3 วันต่อ quarterly review เหลือ 2-3 ชั่วโมง), ตรวจจับ threats ได้เร็วขึ้น (จากวันเหลือนาที), ลด human error ใน permission management, compliance audit preparation เร็วขึ้น 60% ค่าใช้จ่าย LLM API ประมาณ $50-200/เดือนสำหรับ weekly reviews (1000 users) เทียบกับ manual review ที่ใช้ FTE 0.5-1 คน ROI positive ภายใน 3-6 เดือน

📖 บทความที่เกี่ยวข้อง

CrewAI Multi-Agent Data Pipeline ETLอ่านบทความ → CrewAI Multi-Agent IoT Gatewayอ่านบทความ → CrewAI Multi-Agent 12 Factor Appอ่านบทความ → CrewAI Multi-Agent Message Queue Designอ่านบทความ → IS-IS Protocol Identity Access Managementอ่านบทความ →

📚 ดูบทความทั้งหมด →