SiamCafe.net Blog
Cybersecurity

SQLite Litestream RBAC ABAC Policy

sqlite litestream rbac abac policy
SQLite Litestream RBAC ABAC Policy | SiamCafe Blog
2025-08-03· อ. บอม — SiamCafe.net· 11,891 คำ

RBAC ABAC Policy

SQLite Litestream RBAC ABAC Policy Role-Based Attribute-Based Access Control Permission Policy Engine Backup S3 Zero Trust Enforcement Production Security

ModelBased OnFlexibilityComplexityScaleเหมาะกับ
RBACRolesปานกลางง่าย1K-10K usersองค์กร Role ชัดเจน
ABACAttributesสูงมากสูง10K+ usersZero Trust Complex
ReBACRelationshipsสูงสูง1M+ objectsSocial Graph Docs
ACLList per resourceต่ำง่าย100-1KFile System
PBACPoliciesสูงมากสูงมากEnterpriseCompliance Heavy

SQLite Schema

# === RBAC + ABAC Schema in SQLite ===

# -- RBAC Tables
# CREATE TABLE users (
#   id INTEGER PRIMARY KEY,
#   username TEXT UNIQUE NOT NULL,
#   email TEXT UNIQUE NOT NULL,
#   department TEXT,
#   is_active BOOLEAN DEFAULT 1,
#   created_at DATETIME DEFAULT CURRENT_TIMESTAMP
# );
#
# CREATE TABLE roles (
#   id INTEGER PRIMARY KEY,
#   name TEXT UNIQUE NOT NULL,
#   description TEXT,
#   priority INTEGER DEFAULT 0
# );
#
# CREATE TABLE permissions (
#   id INTEGER PRIMARY KEY,
#   resource TEXT NOT NULL,
#   action TEXT NOT NULL CHECK(action IN ('create','read','update','delete','admin')),
#   UNIQUE(resource, action)
# );
#
# CREATE TABLE user_roles (
#   user_id INTEGER REFERENCES users(id),
#   role_id INTEGER REFERENCES roles(id),
#   PRIMARY KEY (user_id, role_id)
# );
#
# CREATE TABLE role_permissions (
#   role_id INTEGER REFERENCES roles(id),
#   permission_id INTEGER REFERENCES permissions(id),
#   PRIMARY KEY (role_id, permission_id)
# );
#
# -- ABAC Tables
# CREATE TABLE policies (
#   id INTEGER PRIMARY KEY,
#   name TEXT NOT NULL,
#   effect TEXT CHECK(effect IN ('allow', 'deny')),
#   priority INTEGER DEFAULT 0,
#   conditions JSON NOT NULL,
#   description TEXT,
#   is_active BOOLEAN DEFAULT 1
# );
#
# -- Indexes
# CREATE INDEX idx_user_roles ON user_roles(user_id);
# CREATE INDEX idx_role_perms ON role_permissions(role_id);
# CREATE INDEX idx_policies_active ON policies(is_active, priority);

# -- Seed Data
# INSERT INTO roles (name, description) VALUES
#   ('admin', 'Full access'),
#   ('editor', 'Create and edit content'),
#   ('viewer', 'Read-only access');
#
# INSERT INTO permissions (resource, action) VALUES
#   ('articles', 'create'), ('articles', 'read'),
#   ('articles', 'update'), ('articles', 'delete'),
#   ('users', 'read'), ('users', 'update'), ('users', 'admin');

from dataclasses import dataclass

@dataclass
class RBACRole:
    role: str
    permissions: str
    users: int
    description: str

roles = [
    RBACRole("admin", "articles:* users:* settings:*", 3, "Full system access"),
    RBACRole("editor", "articles:create, read, update users:read", 12, "Content management"),
    RBACRole("viewer", "articles:read users:read", 45, "Read-only access"),
    RBACRole("moderator", "articles:read, update comments:*", 8, "Content moderation"),
]

print("=== RBAC Roles ===")
for r in roles:
    print(f"  [{r.role}] Users: {r.users}")
    print(f"    Permissions: {r.permissions}")
    print(f"    Description: {r.description}")

Policy Engine

# === ABAC Policy Engine ===

# import sqlite3
# import json
# from datetime import datetime
#
# class PolicyEngine:
#     def __init__(self, db_path):
#         self.conn = sqlite3.connect(db_path)
#         self.conn.execute("PRAGMA journal_mode=WAL")
#         self._cache = {}
#
#     def check_rbac(self, user_id, resource, action):
#         query = """
#         SELECT COUNT(*) FROM user_roles ur
#         JOIN role_permissions rp ON ur.role_id = rp.role_id
#         JOIN permissions p ON rp.permission_id = p.id
#         WHERE ur.user_id = ? AND p.resource = ? AND p.action = ?
#         """
#         result = self.conn.execute(query, (user_id, resource, action)).fetchone()
#         return result[0] > 0
#
#     def check_abac(self, context):
#         policies = self.conn.execute(
#             "SELECT * FROM policies WHERE is_active = 1 ORDER BY priority DESC"
#         ).fetchall()
#
#         for policy in policies:
#             conditions = json.loads(policy[4])
#             if self._evaluate_conditions(conditions, context):
#                 return policy[2] == 'allow'  # effect
#         return False  # Default deny
#
#     def _evaluate_conditions(self, conditions, context):
#         for key, expected in conditions.items():
#             actual = context.get(key)
#             if isinstance(expected, dict):
#                 op = expected.get("op", "eq")
#                 val = expected.get("value")
#                 if op == "eq" and actual != val: return False
#                 if op == "in" and actual not in val: return False
#                 if op == "gte" and actual < val: return False
#             elif actual != expected:
#                 return False
#         return True
#
#     def authorize(self, user_id, resource, action, context=None):
#         if not self.check_rbac(user_id, resource, action):
#             return False
#         if context:
#             return self.check_abac(context)
#         return True

@dataclass
class ABACPolicy:
    name: str
    effect: str
    conditions: str
    description: str

policies = [
    ABACPolicy("business-hours", "allow", "time: 08:00-18:00, day: Mon-Fri", "เข้าถึงได้เฉพาะเวลาทำการ"),
    ABACPolicy("internal-only", "allow", "ip: 10.0.0.0/8, 172.16.0.0/12", "เข้าถึงได้จาก Internal Network เท่านั้น"),
    ABACPolicy("same-department", "allow", "user.dept == resource.dept", "แก้ไขได้เฉพาะ Resource ของ Department ตัวเอง"),
    ABACPolicy("sensitivity-limit", "deny", "resource.sensitivity > user.clearance", "ไม่อนุญาตถ้า Sensitivity สูงกว่า Clearance"),
    ABACPolicy("mfa-required", "deny", "resource.sensitivity >= high AND NOT user.mfa", "Resource สำคัญต้อง MFA"),
]

print("\n=== ABAC Policies ===")
for p in policies:
    print(f"  [{p.effect.upper()}] {p.name}")
    print(f"    Conditions: {p.conditions}")
    print(f"    Description: {p.description}")

Litestream Backup

# === Litestream Backup for Policy DB ===

# litestream.yml
# dbs:
#   - path: /data/policy.db
#     replicas:
#       - type: s3
#         bucket: policy-backup
#         path: policy.db
#         region: ap-southeast-1
#         retention: 720h
#         sync-interval: 1s
#
# Run: litestream replicate -config litestream.yml
# Restore: litestream restore -o /data/policy.db s3://policy-backup/policy.db

# Docker Compose
# services:
#   app:
#     image: my-app:latest
#     volumes: [policy-data:/data]
#   litestream:
#     image: litestream/litestream:latest
#     volumes:
#       - policy-data:/data
#       - ./litestream.yml:/etc/litestream.yml
#     command: replicate -config /etc/litestream.yml

@dataclass
class SecurityCheck:
    check: str
    status: str
    detail: str
    frequency: str

checks = [
    SecurityCheck("RBAC Roles Reviewed", "Pass", "All roles have least-privilege permissions", "Quarterly"),
    SecurityCheck("ABAC Policies Active", "Pass", "5 policies active, 0 disabled", "Weekly"),
    SecurityCheck("Orphan Permissions", "Pass", "No permissions without role assignment", "Monthly"),
    SecurityCheck("Inactive Users", "Warning", "3 users inactive > 90 days", "Monthly"),
    SecurityCheck("Litestream Backup", "Pass", "Last backup 2s ago, S3 healthy", "Real-time"),
    SecurityCheck("Policy DB Integrity", "Pass", "WAL checksum verified", "Daily"),
    SecurityCheck("Access Audit Log", "Pass", "All access logged, 30-day retention", "Real-time"),
    SecurityCheck("MFA Enforcement", "Pass", "All admin users have MFA", "Daily"),
]

print("Security Audit:")
for c in checks:
    print(f"  [{c.status}] {c.check}")
    print(f"    Detail: {c.detail} | Frequency: {c.frequency}")

best_practices = {
    "Least Privilege": "กำหนด Permission น้อยที่สุดที่จำเป็น",
    "Default Deny": "ไม่อนุญาตเป็น Default ต้อง Allow เท่านั้น",
    "Audit Log": "บันทึกทุก Access Decision สำหรับ Forensics",
    "Review": "Review Role Permission ทุก Quarter",
    "MFA": "บังคับ MFA สำหรับ Role ที่มี Privilege สูง",
    "Backup": "Litestream Backup Policy DB ทุกวินาที",
}

print(f"\n\nBest Practices:")
for k, v in best_practices.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

RBAC คืออะไร

Role-Based Access Control สิทธิ์ตาม Role Admin Editor Viewer Permission กำหนดที่ Role ไม่ใช่ User จัดการง่าย องค์กร Role ชัดเจน

ABAC คืออะไร

Attribute-Based Access Control สิทธิ์ตาม Attribute User Resource Environment Policy Engine ยืดหยุ่น Zero Trust Complex Fine-grained

ใช้ SQLite เก็บ Policy อย่างไร

Table Users Roles Permissions Policies SQL JOIN Query WAL Mode Cache Litestream Backup S3 Trigger Update Single Server Application เบา

RBAC กับ ABAC ใช้ร่วมกันอย่างไร

RBAC Base Role Admin Editor Viewer ABAC Fine-grained Department Time IP Sensitivity MFA Policy Engine ตรวจทั้ง Role Attribute

สรุป

SQLite Litestream RBAC ABAC Policy Role Attribute Access Control Permission Engine Backup S3 Zero Trust Audit Default Deny Production Security

📖 บทความที่เกี่ยวข้อง

Text Generation WebUI RBAC ABAC Policyอ่านบทความ → Passkeys WebAuthn RBAC ABAC Policyอ่านบทความ → SQLite Litestream Network Segmentationอ่านบทความ → SQLite Litestream Blue Green Canary Deployอ่านบทความ → SQLite Litestream Docker Container Deployอ่านบทความ →

📚 ดูบทความทั้งหมด →