CrewAI คืออะไร
CrewAI เป็น open source framework สำหรับสร้าง multi-agent AI systems ที่ agents หลายตัวทำงานร่วมกันเป็น crew เพื่อทำ complex tasks ได้อัตโนมัติ แต่ละ agent มี role, goal และ backstory เฉพาะ ทำงานผ่าน tools ที่กำหนดและ communicate กันเพื่อบรรลุเป้าหมายรวม
Identity and Access Management (IAM) เป็น security framework ที่จัดการ digital identities และ control access ไปยัง resources ขององค์กร ครอบคลุม user provisioning, authentication, authorization, access review และ compliance
การรวม CrewAI กับ IAM ช่วยให้ automate งาน IAM ที่ซับซ้อนได้ เช่น automated access review ตรวจสอบ permissions ที่ไม่จำเป็นอัตโนมัติ, threat detection ตรวจจับ suspicious access patterns, compliance auditing ตรวจสอบว่า access policies ถูกต้องตาม regulations, user lifecycle management จัดการ onboarding/offboarding อัตโนมัติ
ติดตั้งและเริ่มต้นใช้งาน CrewAI
วิธีติดตั้ง CrewAI
# === ติดตั้ง CrewAI ===
# 1. Install CrewAI
pip install crewai crewai-tools langchain-openai
# 2. Set API Key
export OPENAI_API_KEY="sk-your-api-key-here"
# 3. Basic CrewAI Example
python3 << 'PYEOF'
from crewai import Agent, Task, Crew, Process
# Define Agents
security_analyst = Agent(
role="Security Analyst",
goal="Analyze access patterns and identify security risks",
backstory="""You are an experienced security analyst specializing
in identity and access management. You review user permissions
and detect anomalies in access patterns.""",
verbose=True,
allow_delegation=True,
)
compliance_officer = Agent(
role="Compliance Officer",
goal="Ensure IAM policies comply with regulations",
backstory="""You are a compliance expert who ensures that
access management follows PDPA, ISO 27001, and SOC 2
requirements. You review policies and report gaps.""",
verbose=True,
)
# Define Tasks
review_task = Task(
description="""Review the current access permissions for
the finance department. Check for:
1. Excessive permissions (least privilege violations)
2. Orphaned accounts (ex-employees)
3. Shared accounts
4. Accounts without MFA""",
expected_output="Detailed access review report with findings",
agent=security_analyst,
)
compliance_task = Task(
description="""Based on the security analyst's findings,
evaluate compliance with PDPA and ISO 27001.
Identify gaps and recommend remediation actions.""",
expected_output="Compliance gap analysis with remediation plan",
agent=compliance_officer,
)
# Create Crew
iam_crew = Crew(
agents=[security_analyst, compliance_officer],
tasks=[review_task, compliance_task],
process=Process.sequential,
verbose=True,
)
# result = iam_crew.kickoff()
# print(result)
print("CrewAI IAM crew defined successfully")
PYEOF
# 4. CrewAI with Tools
pip install crewai-tools
python3 << 'PYEOF'
from crewai_tools import FileReadTool, SerperDevTool
# Custom tools for IAM
# file_tool = FileReadTool(file_path="access_logs.csv")
# search_tool = SerperDevTool()
print("CrewAI tools configured")
PYEOF
echo "CrewAI installed"
สร้าง Multi-Agent System สำหรับ IAM
ระบบ multi-agent สำหรับ identity management
#!/usr/bin/env python3
# iam_crew.py — Multi-Agent IAM System
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("iam_crew")
@dataclass
class User:
id: str
name: str
department: str
role: str
permissions: List[str]
mfa_enabled: bool
last_login: str
created_at: str
status: str = "active"
@dataclass
class AccessReviewFinding:
user_id: str
finding_type: str
severity: str
description: str
recommendation: str
class IAMDataSource:
def __init__(self):
self.users = [
User("u001", "สมชาย", "finance", "accountant",
["read_ledger", "write_ledger", "admin_panel", "delete_records"],
True, "2025-01-14", "2023-01-15"),
User("u002", "สมหญิง", "finance", "manager",
["read_ledger", "write_ledger", "approve_payments"],
True, "2025-01-15", "2022-06-01"),
User("u003", "วิชัย", "finance", "intern",
["read_ledger", "write_ledger", "admin_panel"],
False, "2024-12-01", "2024-11-01"),
User("u004", "อรุณ", "it", "developer",
["read_code", "write_code", "deploy_prod", "admin_panel"],
True, "2025-01-15", "2023-03-01"),
User("u005", "พิมพ์", "hr", "recruiter",
["read_hr_data", "write_hr_data", "finance_reports"],
False, "2024-10-15", "2023-09-01", "inactive"),
]
def get_users_by_department(self, dept):
return [u for u in self.users if u.department == dept]
def get_all_users(self):
return self.users
class AccessReviewAgent:
"""Agent that reviews access permissions"""
def __init__(self, data_source: IAMDataSource):
self.ds = data_source
self.findings: List[AccessReviewFinding] = []
def review_least_privilege(self):
role_permissions = {
"accountant": ["read_ledger", "write_ledger"],
"manager": ["read_ledger", "write_ledger", "approve_payments"],
"intern": ["read_ledger"],
"developer": ["read_code", "write_code"],
"recruiter": ["read_hr_data", "write_hr_data"],
}
for user in self.ds.get_all_users():
expected = set(role_permissions.get(user.role, []))
actual = set(user.permissions)
excessive = actual - expected
if excessive:
self.findings.append(AccessReviewFinding(
user.id, "excessive_permissions", "high",
f"{user.name} ({user.role}) has excessive permissions: {excessive}",
f"Remove permissions: {', '.join(excessive)}"
))
def review_inactive_accounts(self, inactive_days=60):
cutoff = datetime.utcnow() - timedelta(days=inactive_days)
for user in self.ds.get_all_users():
last_login = datetime.fromisoformat(user.last_login)
if last_login < cutoff and user.status == "active":
self.findings.append(AccessReviewFinding(
user.id, "inactive_account", "medium",
f"{user.name} last login {user.last_login} ({(datetime.utcnow()-last_login).days} days ago)",
"Disable account or verify with manager"
))
def review_mfa_compliance(self):
for user in self.ds.get_all_users():
if not user.mfa_enabled and user.status == "active":
self.findings.append(AccessReviewFinding(
user.id, "mfa_not_enabled", "high",
f"{user.name} does not have MFA enabled",
"Enable MFA immediately"
))
def run_full_review(self):
self.review_least_privilege()
self.review_inactive_accounts()
self.review_mfa_compliance()
return {
"review_date": datetime.utcnow().isoformat(),
"total_users": len(self.ds.get_all_users()),
"total_findings": len(self.findings),
"by_severity": {
"high": sum(1 for f in self.findings if f.severity == "high"),
"medium": sum(1 for f in self.findings if f.severity == "medium"),
"low": sum(1 for f in self.findings if f.severity == "low"),
},
"findings": [
{"user": f.user_id, "type": f.finding_type,
"severity": f.severity, "description": f.description,
"recommendation": f.recommendation}
for f in self.findings
],
}
class ComplianceAgent:
"""Agent that checks compliance"""
def check_compliance(self, review_results):
checks = {
"pdpa": {
"data_access_logging": True,
"consent_management": True,
"data_minimization": review_results["by_severity"]["high"] == 0,
"access_review_done": True,
},
"iso27001": {
"mfa_enforced": not any(
f["type"] == "mfa_not_enabled" for f in review_results["findings"]
),
"least_privilege": not any(
f["type"] == "excessive_permissions" for f in review_results["findings"]
),
"inactive_accounts_reviewed": True,
"access_logs_retained": True,
},
}
results = {}
for framework, controls in checks.items():
passed = sum(1 for v in controls.values() if v)
total = len(controls)
results[framework] = {
"passed": passed, "total": total,
"compliance_pct": round(passed / total * 100, 1),
"controls": controls,
}
return results
ds = IAMDataSource()
reviewer = AccessReviewAgent(ds)
review = reviewer.run_full_review()
print("Access Review:", json.dumps(review, indent=2, ensure_ascii=False))
compliance = ComplianceAgent()
comp_result = compliance.check_compliance(review)
print("\nCompliance:", json.dumps(comp_result, indent=2))
Automated Access Review ด้วย Agents
Agents สำหรับ automated access review
#!/usr/bin/env python3
# auto_access_review.py — Automated Access Review Pipeline
import json
import logging
from datetime import datetime
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("access_review")
class ProvisioningAgent:
"""Agent ที่จัดการ user provisioning/deprovisioning"""
def __init__(self):
self.actions_taken = []
def onboard_user(self, user_info):
role = user_info["role"]
department = user_info["department"]
# Role-based access assignment
role_map = {
"developer": {
"groups": ["engineering", "github-devs"],
"apps": ["jira", "github", "slack", "aws-dev"],
"permissions": ["read_code", "write_code", "ci_cd"],
},
"analyst": {
"groups": ["analytics", "data-readers"],
"apps": ["jira", "slack", "bigquery", "looker"],
"permissions": ["read_data", "create_reports"],
},
"manager": {
"groups": [f"{department}-managers", department],
"apps": ["jira", "slack", "hr-system", "budget-tool"],
"permissions": ["approve_requests", "view_reports"],
},
}
access = role_map.get(role, {"groups": [department], "apps": ["slack"], "permissions": []})
action = {
"type": "onboard",
"user": user_info["name"],
"role": role,
"assigned_groups": access["groups"],
"assigned_apps": access["apps"],
"assigned_permissions": access["permissions"],
"mfa_enforced": True,
"review_date": "90 days from now",
"timestamp": datetime.utcnow().isoformat(),
}
self.actions_taken.append(action)
logger.info(f"Onboarded: {user_info['name']} as {role}")
return action
def offboard_user(self, user_id, reason="termination"):
action = {
"type": "offboard",
"user_id": user_id,
"reason": reason,
"steps": [
"Disable SSO account",
"Revoke all OAuth tokens",
"Remove from all groups",
"Disable email forwarding",
"Transfer file ownership to manager",
"Archive user data (retain 90 days)",
"Log offboarding for audit",
],
"timestamp": datetime.utcnow().isoformat(),
}
self.actions_taken.append(action)
logger.info(f"Offboarded: {user_id}")
return action
def periodic_review(self, users):
"""Quarterly access review"""
review_results = []
for user in users:
review = {
"user": user.get("name"),
"current_role": user.get("role"),
"permissions_count": len(user.get("permissions", [])),
"last_reviewed": user.get("last_reviewed", "never"),
"action_needed": False,
"recommendations": [],
}
# Check if permissions match role
if user.get("excessive_permissions"):
review["action_needed"] = True
review["recommendations"].append("Remove excessive permissions")
# Check last login
if user.get("days_since_login", 0) > 90:
review["action_needed"] = True
review["recommendations"].append("Account appears inactive")
review_results.append(review)
return {
"review_type": "quarterly",
"review_date": datetime.utcnow().isoformat(),
"users_reviewed": len(review_results),
"actions_needed": sum(1 for r in review_results if r["action_needed"]),
"results": review_results,
}
class RemediationAgent:
"""Agent ที่ทำ remediation อัตโนมัติ"""
def __init__(self):
self.remediations = []
def auto_remediate(self, findings):
for finding in findings:
severity = finding.get("severity", "low")
finding_type = finding.get("type", "")
remediation = {
"finding": finding,
"auto_remediated": False,
"action": "none",
}
if finding_type == "mfa_not_enabled":
remediation["action"] = "enforce_mfa"
remediation["auto_remediated"] = True
remediation["details"] = "MFA enforcement email sent, 48h deadline"
elif finding_type == "inactive_account" and severity != "high":
remediation["action"] = "send_reactivation_notice"
remediation["auto_remediated"] = True
remediation["details"] = "Notification sent to user and manager"
elif finding_type == "excessive_permissions":
remediation["action"] = "create_ticket"
remediation["auto_remediated"] = False
remediation["details"] = "Ticket created for manual review"
self.remediations.append(remediation)
return {
"total_findings": len(findings),
"auto_remediated": sum(1 for r in self.remediations if r["auto_remediated"]),
"manual_review": sum(1 for r in self.remediations if not r["auto_remediated"]),
"actions": [{"action": r["action"], "details": r.get("details", "")}
for r in self.remediations],
}
# Run agents
provisioner = ProvisioningAgent()
provisioner.onboard_user({"name": "นพดล", "role": "developer", "department": "engineering"})
provisioner.offboard_user("u005", "resignation")
remediator = RemediationAgent()
findings = [
{"type": "mfa_not_enabled", "severity": "high", "user": "u003"},
{"type": "inactive_account", "severity": "medium", "user": "u005"},
{"type": "excessive_permissions", "severity": "high", "user": "u001"},
]
result = remediator.auto_remediate(findings)
print(json.dumps(result, indent=2))
Threat Detection และ Response Agents
Agents สำหรับตรวจจับภัยคุกคาม
#!/usr/bin/env python3
# threat_detection.py — IAM Threat Detection Agents
import json
import logging
import random
from datetime import datetime, timedelta
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("threat")
class ThreatDetectionAgent:
def __init__(self):
self.alerts = []
def detect_brute_force(self, login_events):
ip_failures = {}
for event in login_events:
if event["status"] == "failed":
ip = event["source_ip"]
ip_failures[ip] = ip_failures.get(ip, 0) + 1
for ip, count in ip_failures.items():
if count >= 5:
self.alerts.append({
"type": "brute_force",
"severity": "high",
"source_ip": ip,
"failed_attempts": count,
"action": "block_ip",
"timestamp": datetime.utcnow().isoformat(),
})
def detect_impossible_travel(self, login_events):
user_logins = {}
for event in login_events:
if event["status"] == "success":
user = event["user_id"]
if user not in user_logins:
user_logins[user] = []
user_logins[user].append(event)
for user, logins in user_logins.items():
for i in range(1, len(logins)):
prev = logins[i-1]
curr = logins[i]
if prev["country"] != curr["country"]:
time_diff_hours = 1 # simplified
if time_diff_hours < 4:
self.alerts.append({
"type": "impossible_travel",
"severity": "critical",
"user_id": user,
"from_country": prev["country"],
"to_country": curr["country"],
"time_diff_hours": time_diff_hours,
"action": "lock_account_and_notify",
})
def detect_privilege_escalation(self, audit_events):
for event in audit_events:
if event.get("action") == "grant_permission":
if event.get("target_permission") in ["admin", "root", "superuser"]:
if event.get("approved_by") is None:
self.alerts.append({
"type": "privilege_escalation",
"severity": "critical",
"user_id": event["user_id"],
"permission": event["target_permission"],
"action": "revoke_and_investigate",
})
def detect_unusual_access(self, access_events):
user_patterns = {}
for event in access_events:
user = event["user_id"]
if user not in user_patterns:
user_patterns[user] = {"resources": set(), "hours": set()}
user_patterns[user]["resources"].add(event.get("resource", ""))
user_patterns[user]["hours"].add(event.get("hour", 0))
for user, pattern in user_patterns.items():
if len(pattern["resources"]) > 20:
self.alerts.append({
"type": "unusual_resource_access",
"severity": "medium",
"user_id": user,
"resources_accessed": len(pattern["resources"]),
"action": "flag_for_review",
})
def get_alert_summary(self):
return {
"total_alerts": len(self.alerts),
"by_severity": {
"critical": sum(1 for a in self.alerts if a["severity"] == "critical"),
"high": sum(1 for a in self.alerts if a["severity"] == "high"),
"medium": sum(1 for a in self.alerts if a["severity"] == "medium"),
},
"by_type": {},
"alerts": self.alerts,
}
# Simulate events
login_events = [
{"user_id": "u001", "status": "success", "source_ip": "10.0.0.1", "country": "TH"},
{"user_id": "u001", "status": "success", "source_ip": "203.0.113.5", "country": "US"},
{"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
{"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
{"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
{"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
{"user_id": "u999", "status": "failed", "source_ip": "192.168.1.100", "country": "CN"},
]
audit_events = [
{"user_id": "u003", "action": "grant_permission", "target_permission": "admin", "approved_by": None},
]
detector = ThreatDetectionAgent()
detector.detect_brute_force(login_events)
detector.detect_impossible_travel(login_events)
detector.detect_privilege_escalation(audit_events)
print(json.dumps(detector.get_alert_summary(), indent=2))
Production Deployment และ Monitoring
Deploy และ monitor IAM agents
# === IAM Agent Production Deployment ===
# 1. Docker Deployment
# ===================================
cat > Dockerfile << 'EOF'
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY configs/ ./configs/
ENV PYTHONPATH=/app
CMD ["python", "src/main.py"]
EOF
cat > requirements.txt << 'EOF'
crewai>=0.30.0
crewai-tools>=0.4.0
langchain-openai>=0.1.0
fastapi>=0.110.0
uvicorn>=0.27.0
prometheus-client>=0.20.0
EOF
cat > docker-compose.yml << 'EOF'
version: '3'
services:
iam-agents:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=
- LOG_LEVEL=INFO
- REVIEW_SCHEDULE=0 0 * * 1
volumes:
- ./configs:/app/configs
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
EOF
# 2. Kubernetes Deployment
# ===================================
cat > k8s-deployment.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
name: iam-agents
spec:
replicas: 1
selector:
matchLabels:
app: iam-agents
template:
metadata:
labels:
app: iam-agents
spec:
containers:
- name: iam-agents
image: iam-agents:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: openai-secret
key: api-key
resources:
limits:
memory: 512Mi
cpu: 500m
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: weekly-access-review
spec:
schedule: "0 9 * * 1"
jobTemplate:
spec:
template:
spec:
containers:
- name: access-review
image: iam-agents:latest
command: ["python", "src/run_review.py"]
restartPolicy: OnFailure
EOF
# 3. Monitoring Metrics
# ===================================
# iam_access_review_total{status="completed"}
# iam_findings_total{severity="high",type="excessive_permissions"}
# iam_remediation_total{action="auto",status="success"}
# iam_threat_alerts_total{type="brute_force",severity="critical"}
# iam_agent_execution_seconds{agent="review",task="least_privilege"}
# iam_users_reviewed_total
# iam_compliance_score{framework="pdpa"}
# 4. Alert Rules
# ===================================
# - alert: CriticalThreatDetected
# expr: iam_threat_alerts_total{severity="critical"} > 0
# for: 0m
# labels: {severity: critical}
#
# - alert: ComplianceScoreLow
# expr: iam_compliance_score < 80
# for: 1h
# labels: {severity: warning}
echo "Production deployment configured"
FAQ คำถามที่พบบ่อย
Q: CrewAI กับ LangChain Agents ต่างกันอย่างไร?
A: LangChain Agents เป็น single agent ที่ใช้ tools ทำ tasks CrewAI เป็น multi-agent framework ที่ agents หลายตัวทำงานร่วมกัน มี role-based design แต่ละ agent มี specialization ของตัวเอง ข้อดีของ CrewAI agents สามารถ delegate งานให้กันได้, process แบบ sequential หรือ hierarchical, แต่ละ agent มี memory และ context ของตัวเอง เหมาะกับ complex workflows ที่ต้องการ multiple perspectives เช่น IAM ที่ต้องการทั้ง security analysis, compliance check และ remediation
Q: AI Agents สำหรับ IAM ปลอดภัยไหม?
A: ต้อง implement safeguards ที่เหมาะสม AI agents ไม่ควรมี direct access ในการเปลี่ยน permissions โดยไม่ผ่าน approval workflow ใช้ human-in-the-loop สำหรับ actions ที่ critical (เช่น revoke admin access), auto-remediation เฉพาะ low-risk actions (เช่น enforce MFA), audit log ทุก agent action, rate limiting ป้องกัน agent ทำ actions มากเกินไป, rollback mechanism สำหรับ undo actions ที่ผิดพลาด
Q: ต้องใช้ LLM อะไรกับ CrewAI?
A: CrewAI รองรับ LLMs หลายตัว GPT-4 เหมาะที่สุดสำหรับ complex reasoning tasks (IAM analysis, compliance), GPT-3.5 Turbo เพียงพอสำหรับ simple tasks (log parsing, report generation), Claude 3 ดีสำหรับ long context analysis, local LLMs (Ollama + Llama 3) สำหรับ data privacy concerns สำหรับ IAM ที่ต้อง analyze sensitive data แนะนำใช้ local LLMs หรือ enterprise API ที่มี data processing agreement
Q: ROI ของ AI-powered IAM เป็นอย่างไร?
A: ลด manual access review time 70-80% (จาก 2-3 วันต่อ quarterly review เหลือ 2-3 ชั่วโมง), ตรวจจับ threats ได้เร็วขึ้น (จากวันเหลือนาที), ลด human error ใน permission management, compliance audit preparation เร็วขึ้น 60% ค่าใช้จ่าย LLM API ประมาณ $50-200/เดือนสำหรับ weekly reviews (1000 users) เทียบกับ manual review ที่ใช้ FTE 0.5-1 คน ROI positive ภายใน 3-6 เดือน
