SQLite Litestream RBAC ABAC Policy —
RBAC ABAC Policy
SQLite Litestream RBAC ABAC Policy Role-Based Attribute-Based Access Control Permission Policy Engine Backup S3 Zero Trust Enforcement Production Security
| Model | Based On | Flexibility | Complexity | Scale | เหมาะกับ |
|---|---|---|---|---|---|
| RBAC | Roles | ปานกลาง | ง่าย | 1K-10K users | องค์กร Role ชัดเจน |
| ABAC | Attributes | สูงมาก | สูง | 10K+ users | Zero Trust Complex |
| ReBAC | Relationships | สูง | สูง | 1M+ objects | Social Graph Docs |
| ACL | List per resource | ต่ำ | ง่าย | 100-1K | File System |
| PBAC | Policies | สูงมาก | สูงมาก | Enterprise | Compliance Heavy |
SQLite Schema
=== RBAC + ABAC Schema in SQLite ===
-- RBAC Tables
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT,
is_active BOOLEAN DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE roles (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT,
priority INTEGER DEFAULT 0
);
CREATE TABLE permissions (
id INTEGER PRIMARY KEY,
resource TEXT NOT NULL,
action TEXT NOT NULL CHECK(action IN ('create','read','update','delete','admin')),
UNIQUE(resource, action)
);
CREATE TABLE user_roles (
user_id INTEGER REFERENCES users(id),
role_id INTEGER REFERENCES roles(id),
PRIMARY KEY (user_id, role_id)
);
CREATE TABLE role_permissions (
role_id INTEGER REFERENCES roles(id),
permission_id INTEGER REFERENCES permissions(id),
PRIMARY KEY (role_id, permission_id)
);
-- ABAC Tables
CREATE TABLE policies (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
effect TEXT CHECK(effect IN ('allow', 'deny')),
priority INTEGER DEFAULT 0,
conditions JSON NOT NULL,
description TEXT,
is_active BOOLEAN DEFAULT 1
);
-- Indexes
CREATE INDEX idx_user_roles ON user_roles(user_id);
CREATE INDEX idx_role_perms ON role_permissions(role_id);
CREATE INDEX idx_policies_active ON policies(is_active, priority);
-- Seed Data
INSERT INTO roles (name, description) VALUES
('admin', 'Full access'),
('editor', 'Create and edit content'),
('viewer', 'Read-only access');
INSERT INTO permissions (resource, action) VALUES
('articles', 'create'), ('articles', 'read'),
('articles', 'update'), ('articles', 'delete'),
('users', 'read'), ('users', 'update'), ('users', 'admin');
from dataclasses import dataclass
@dataclass
class RBACRole:
role: str
permissions: str
users: int
description: str
roles = [
RBACRole("admin", "articles:* users:* settings:*", 3, "Full system access"),
RBACRole("editor", "articles:create, read, update users:read", 12, "Content management"),
RBACRole("viewer", "articles:read users:read", 45, "Read-only access"),
RBACRole("moderator", "articles:read, update comments:*", 8, "Content moderation"),
]
print("=== RBAC Roles ===")
for r in roles:
print(f" [{r.role}] Users: {r.users}")
print(f" Permissions: {r.permissions}")
print(f" Description: {r.description}")
Policy Engine
=== ABAC Policy Engine ===
import sqlite3
import json
from datetime import datetime
class PolicyEngine:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA journal_mode=WAL")
self._cache = {}
def check_rbac(self, user_id, resource, action):
query = """
SELECT COUNT(*) FROM user_roles ur
JOIN role_permissions rp ON ur.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.id
WHERE ur.user_id = ? AND p.resource = ? AND p.action = ?
"""
result = self.conn.execute(query, (user_id, resource, action)).fetchone()
return result[0] > 0
def check_abac(self, context):
policies = self.conn.execute(
"SELECT * FROM policies WHERE is_active = 1 ORDER BY priority DESC"
).fetchall()
for policy in policies:
conditions = json.loads(policy[4])
if self._evaluate_conditions(conditions, context):
return policy[2] == 'allow' # effect
return False # Default deny
def _evaluate_conditions(self, conditions, context):
for key, expected in conditions.items():
actual = context.get(key)
if isinstance(expected, dict):
op = expected.get("op", "eq")
val = expected.get("value")
if op == "eq" and actual != val: return False
if op == "in" and actual not in val: return False
if op == "gte" and actual < val: return False
elif actual != expected:
return False
return True
def authorize(self, user_id, resource, action, context=None):
if not self.check_rbac(user_id, resource, action):
return False
if context:
return self.check_abac(context)
return True
@dataclass
class ABACPolicy:
name: str
effect: str
conditions: str
description: str
policies = [
ABACPolicy("business-hours", "allow", "time: 08:00-18:00, day: Mon-Fri", "เข้าถึงได้เฉพาะเวลาทำการ"),
ABACPolicy("internal-only", "allow", "ip: 10.0.0.0/8, 172.16.0.0/12", "เข้าถึงได้จาก Internal Network เท่านั้น"),
ABACPolicy("same-department", "allow", "user.dept == resource.dept", "แก้ไขได้เฉพาะ Resource ของ Department ตัวเอง"),
ABACPolicy("sensitivity-limit", "deny", "resource.sensitivity > user.clearance", "ไม่อนุญาตถ้า Sensitivity สูงกว่า Clearance"),
ABACPolicy("mfa-required", "deny", "resource.sensitivity >= high AND NOT user.mfa", "Resource สำคัญต้อง MFA"),
]
print("\n=== ABAC Policies ===")
for p in policies:
print(f" [{p.effect.upper()}] {p.name}")
print(f" Conditions: {p.conditions}")
print(f" Description: {p.description}")
Litestream Backup
# === Litestream Backup for Policy DB ===
# litestream.yml
# dbs:
# - path: /data/policy.db
# replicas:
# - type: s3
# bucket: policy-backup
# path: policy.db
# region: ap-southeast-1
# retention: 720h
# sync-interval: 1s
#
# Run: litestream replicate -config litestream.yml
# Restore: litestream restore -o /data/policy.db s3://policy-backup/policy.db
# Docker Compose
# services:
# app:
# image: my-app:latest
# volumes: [policy-data:/data]
# litestream:
# image: litestream/litestream:latest
# volumes:
# - policy-data:/data
# - ./litestream.yml:/etc/litestream.yml
# command: replicate -config /etc/litestream.yml
@dataclass
class SecurityCheck:
check: str
status: str
detail: str
frequency: str
checks = [
SecurityCheck("RBAC Roles Reviewed", "Pass", "All roles have least-privilege permissions", "Quarterly"),
SecurityCheck("ABAC Policies Active", "Pass", "5 policies active, 0 disabled", "Weekly"),
SecurityCheck("Orphan Permissions", "Pass", "No permissions without role assignment", "Monthly"),
SecurityCheck("Inactive Users", "Warning", "3 users inactive > 90 days", "Monthly"),
SecurityCheck("Litestream Backup", "Pass", "Last backup 2s ago, S3 healthy", "Real-time"),
SecurityCheck("Policy DB Integrity", "Pass", "WAL checksum verified", "Daily"),
SecurityCheck("Access Audit Log", "Pass", "All access logged, 30-day retention", "Real-time"),
SecurityCheck("MFA Enforcement", "Pass", "All admin users have MFA", "Daily"),
]
print("Security Audit:")
for c in checks:
print(f" [{c.status}] {c.check}")
print(f" Detail: {c.detail} | Frequency: {c.frequency}")
best_practices = {
"Least Privilege": "กำหนด Permission น้อยที่สุดที่จำเป็น",
"Default Deny": "ไม่อนุญาตเป็น Default ต้อง Allow เท่านั้น",
"Audit Log": "บันทึกทุก Access Decision สำหรับ Forensics",
"Review": "Review Role Permission ทุก Quarter",
"MFA": "บังคับ MFA สำหรับ Role ที่มี Privilege สูง",
"Backup": "Litestream Backup Policy DB ทุกวินาที",
}
print(f"\n\nBest Practices:")
for k, v in best_practices.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- RBAC First: เริ่มจาก RBAC ง่ายก่อน เพิ่ม ABAC เมื่อต้องการ
- Default Deny: Policy เป็น Deny เสมอ ต้อง Allow เท่านั้น
- Litestream: Backup Policy DB ด้วย Litestream ทุกวินาที
- Cache: Cache Policy ใน Memory ลด Query Load
- Audit: บันทึกทุก Authorization Decision
RBAC คืออะไร
Role-Based Access Control สิทธิ์ตาม Role Admin Editor Viewer Permission กำหนดที่ Role ไม่ใช่ User จัดการง่าย องค์กร Role ชัดเจน