
LangChain Agent Micro-segmentation — AI Agent
LangChain Micro-segmentation

LangChain Agent Micro-segmentation AI Network Security Zero Trust Policy Automation ReAct GPT-4 Cilium Calico VMware NSX Lateral Movement Prevention
| Tool | Type | Layer | AI Integration | เหมาะกับ |
|---|---|---|---|---|
| Cilium | eBPF-based | L3-L7 | API | Kubernetes |
| Calico | Network Policy | L3-L4 | API | Kubernetes |
| VMware NSX | SDN | L2-L7 | API | VMware Env |
| Illumio | Host-based | L3-L7 | Built-in | Enterprise |
| Guardicore | Agent-based | L3-L7 | Built-in | Data Center |
LangChain Agent Setup
=== LangChain Security Agent ===
pip install langchain langchain-openai
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.prompts import PromptTemplate
# Tools for Network Security
def analyze_traffic(query: str) -> str:
"""Analyze network traffic patterns"""
# Query traffic database
return f"Traffic analysis: {query}"
def create_policy(spec: str) -> str:
"""Create Kubernetes NetworkPolicy"""
# Generate and apply policy
return f"Policy created: {spec}"
def check_compliance(namespace: str) -> str:
"""Check security compliance"""
# Verify policies
return f"Compliance check: {namespace}"
def block_ip(ip: str) -> str:
"""Block suspicious IP address"""
# Add to blocklist
return f"Blocked: {ip}"
tools = [
Tool(name="TrafficAnalyzer", func=analyze_traffic,
description="Analyze network traffic patterns and anomalies"),
Tool(name="PolicyCreator", func=create_policy,
description="Create Kubernetes NetworkPolicy YAML"),
Tool(name="ComplianceChecker", func=check_compliance,
description="Check namespace security compliance"),
Tool(name="IPBlocker", func=block_ip,
description="Block suspicious IP address"),
]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_react_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# Example: Auto-analyze and create policy
result = executor.invoke({
"input": "Analyze traffic for namespace 'payment' and create "
"micro-segmentation policy allowing only required connections"
})
from dataclasses import dataclass
@dataclass
class AgentTool:
name: str
function: str
input_type: str
output: str
risk_level: str
agent_tools = [
AgentTool("TrafficAnalyzer", "วิเคราะห์ Traffic Pattern", "Namespace/Pod", "Traffic Report", "Low"),
AgentTool("PolicyCreator", "สร้าง NetworkPolicy", "Policy Spec", "YAML Applied", "Medium"),
AgentTool("ComplianceChecker", "ตรวจ Compliance", "Namespace", "Compliance Score", "Low"),
AgentTool("IPBlocker", "Block IP ที่น่าสงสัย", "IP Address", "Block Confirmed", "High"),
AgentTool("LogAnalyzer", "วิเคราะห์ Security Log", "Time Range", "Threat Report", "Low"),
AgentTool("VulnScanner", "Scan Vulnerabilities", "Target", "CVE List", "Medium"),
]
print("=== Agent Tools ===")
for t in agent_tools:
print(f" [{t.risk_level}] {t.name}")
print(f" Function: {t.function}")
print(f" Input: {t.input_type} | Output: {t.output}")
Kubernetes Network Policy
=== Micro-segmentation with Cilium ===
Kubernetes NetworkPolicy — Deny All Default
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: payment
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
Allow specific traffic — Payment Service
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-payment-api
namespace: payment
spec:
podSelector:
matchLabels:
app: payment-api
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:

name: frontend
- podSelector:
matchLabels:
app: checkout
ports:
- protocol: TCP
port: 8080
egress:
- to:
- podSelector:
matchLabels:
app: payment-db
ports:
- protocol: TCP
port: 5432
- to:
- ipBlock:
cidr: 0.0.0.0/0
ports:
- protocol: TCP
port: 443 # Stripe API
Cilium L7 Policy — HTTP Method Filtering
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
name: payment-l7
spec:
endpointSelector:
matchLabels:
app: payment-api
ingress:
- fromEndpoints:
- matchLabels:
app: checkout
toPorts:
- ports:
- port: "8080"
rules:
http:
- method: POST
path: "/api/v1/charge"
- method: GET
path: "/api/v1/status/.*"
@dataclass
class SegmentPolicy:
namespace: str
service: str
allowed_ingress: str
allowed_egress: str
l7_filter: bool
status: str
policies = [
SegmentPolicy("payment", "payment-api", "checkout only", "payment-db + Stripe", True, "Enforced"),
SegmentPolicy("payment", "payment-db", "payment-api only", "None (isolated)", False, "Enforced"),
SegmentPolicy("frontend", "web-app", "ALB ingress", "api-gateway", False, "Enforced"),
SegmentPolicy("backend", "api-gateway", "web-app + mobile", "All backend services", True, "Enforced"),
SegmentPolicy("monitoring", "prometheus", "All namespaces (scrape)", "alertmanager", False, "Enforced"),
]
print("\n=== Segmentation Policies ===")
for p in policies:
l7 = "L7" if p.l7_filter else "L3/L4"
print(f" [{p.status}] {p.namespace}/{p.service}")
print(f" Ingress: {p.allowed_ingress}")
print(f" Egress: {p.allowed_egress} | Filter: {l7}")
AI-powered Security
# === AI Security Automation ===
# Agent Workflow:
# 1. Collect traffic data from Cilium Hubble
# 2. LLM analyzes patterns and anomalies
# 3. Generate recommended policies
# 4. Human approval (or auto-apply low-risk)
# 5. Monitor and adjust
# Traffic Analysis Prompt
# """
# Analyze the following network traffic for namespace 'payment':
# - Source: checkout-pod -> payment-api:8080 (500 req/min)
# - Source: unknown-pod -> payment-api:8080 (50 req/min)
# - Source: payment-api -> payment-db:5432 (480 req/min)
# - Source: payment-api -> external:443 (100 req/min)
#
# Identify:
# 1. Normal traffic patterns
# 2. Suspicious connections
# 3. Recommended NetworkPolicy
# """
@dataclass
class SecurityEvent:
time: str
source: str
destination: str
action: str
risk: str
ai_decision: str
events = [
SecurityEvent("14:23:15", "checkout", "payment-api:8080", "ALLOW", "Low", "Normal traffic"),
SecurityEvent("14:23:45", "unknown-pod", "payment-api:8080", "ALERT", "High", "Block + investigate"),
SecurityEvent("14:24:00", "payment-api", "payment-db:5432", "ALLOW", "Low", "Normal DB query"),
SecurityEvent("14:24:30", "payment-api", "external:443", "ALLOW", "Medium", "Stripe API call"),
SecurityEvent("14:25:00", "debug-pod", "payment-db:5432", "DENY", "Critical", "Unauthorized DB access"),
SecurityEvent("14:25:30", "scanner-pod", "payment-api:22", "DENY", "Critical", "Port scan detected"),
]
print("AI Security Events:")
for e in events:
print(f" [{e.risk}] {e.time} | {e.source} -> {e.destination}")
print(f" Action: {e.action} | AI: {e.ai_decision}")
security_metrics = {
"Policies Enforced": "28",
"Namespaces Protected": "8",
"Blocked Connections (24h)": "1,250",
"AI-generated Policies": "12",
"False Positive Rate": "2.1%",
"Mean Time to Detect": "15 seconds",
"Mean Time to Respond": "45 seconds",
"Compliance Score": "96%",
}
print(f"\n\nSecurity Dashboard:")
for k, v in security_metrics.items():
print(f" {k}: {v}")
เคล็ดลับ
- Default Deny: เริ่มจาก Deny All แล้วค่อย Allow
- L7: ใช้ Cilium L7 Policy กรอง HTTP Method Path
- AI: ใช้ LangChain Agent วิเคราะห์ Traffic แนะนำ Policy
- Human-in-loop: AI แนะนำ แต่คนอนุมัติ High-risk Policy
- Monitor: ใช้ Hubble ดู Traffic Real-time ทุก Namespace
LangChain Agent คืออะไร
AI Agent LLM ตัดสินใจ Tool ReAct คิด-ทำ-สังเกต API Database Search GPT-4 Claude Autonomous วิเคราะห์ Network สร้าง Policy อัตโนมัติ