SiamCafe · Blog
SQLite Litestream RBAC ABAC Policy —
บทความ

SQLite Litestream RBAC ABAC Policy —

เผยแพร่ 28 พฤษภาคม 2569

RBAC ABAC Policy

SQLite Litestream RBAC ABAC Policy Role-Based Attribute-Based Access Control Permission Policy Engine Backup S3 Zero Trust Enforcement Production Security

ModelBased OnFlexibilityComplexityScaleเหมาะกับ
RBACRolesปานกลางง่าย1K-10K usersองค์กร Role ชัดเจน
ABACAttributesสูงมากสูง10K+ usersZero Trust Complex
ReBACRelationshipsสูงสูง1M+ objectsSocial Graph Docs
ACLList per resourceต่ำง่าย100-1KFile System
PBACPoliciesสูงมากสูงมากEnterpriseCompliance Heavy

SQLite Schema

=== RBAC + ABAC Schema in SQLite ===

-- RBAC Tables

CREATE TABLE users (

id INTEGER PRIMARY KEY,

username TEXT UNIQUE NOT NULL,

email TEXT UNIQUE NOT NULL,

department TEXT,

is_active BOOLEAN DEFAULT 1,

created_at DATETIME DEFAULT CURRENT_TIMESTAMP

);

CREATE TABLE roles (

id INTEGER PRIMARY KEY,

name TEXT UNIQUE NOT NULL,

description TEXT,

priority INTEGER DEFAULT 0

);

CREATE TABLE permissions (

id INTEGER PRIMARY KEY,

resource TEXT NOT NULL,

action TEXT NOT NULL CHECK(action IN ('create','read','update','delete','admin')),

UNIQUE(resource, action)

);

CREATE TABLE user_roles (

user_id INTEGER REFERENCES users(id),

role_id INTEGER REFERENCES roles(id),

PRIMARY KEY (user_id, role_id)

);

CREATE TABLE role_permissions (

role_id INTEGER REFERENCES roles(id),

permission_id INTEGER REFERENCES permissions(id),

PRIMARY KEY (role_id, permission_id)

);

-- ABAC Tables

CREATE TABLE policies (

id INTEGER PRIMARY KEY,

name TEXT NOT NULL,

effect TEXT CHECK(effect IN ('allow', 'deny')),

priority INTEGER DEFAULT 0,

conditions JSON NOT NULL,

description TEXT,

is_active BOOLEAN DEFAULT 1

);

-- Indexes

CREATE INDEX idx_user_roles ON user_roles(user_id);

CREATE INDEX idx_role_perms ON role_permissions(role_id);

CREATE INDEX idx_policies_active ON policies(is_active, priority);

-- Seed Data

INSERT INTO roles (name, description) VALUES

('admin', 'Full access'),

('editor', 'Create and edit content'),

('viewer', 'Read-only access');

INSERT INTO permissions (resource, action) VALUES

('articles', 'create'), ('articles', 'read'),

('articles', 'update'), ('articles', 'delete'),

('users', 'read'), ('users', 'update'), ('users', 'admin');

from dataclasses import dataclass

@dataclass

class RBACRole:

role: str

permissions: str

users: int

description: str

roles = [

RBACRole("admin", "articles:* users:* settings:*", 3, "Full system access"),

RBACRole("editor", "articles:create, read, update users:read", 12, "Content management"),

RBACRole("viewer", "articles:read users:read", 45, "Read-only access"),

RBACRole("moderator", "articles:read, update comments:*", 8, "Content moderation"),

]

print("=== RBAC Roles ===")

for r in roles:

print(f" [{r.role}] Users: {r.users}")

print(f" Permissions: {r.permissions}")

print(f" Description: {r.description}")

Policy Engine

=== ABAC Policy Engine ===

import sqlite3

import json

from datetime import datetime

class PolicyEngine:

def __init__(self, db_path):

self.conn = sqlite3.connect(db_path)

self.conn.execute("PRAGMA journal_mode=WAL")

self._cache = {}

def check_rbac(self, user_id, resource, action):

query = """

SELECT COUNT(*) FROM user_roles ur

JOIN role_permissions rp ON ur.role_id = rp.role_id

JOIN permissions p ON rp.permission_id = p.id

WHERE ur.user_id = ? AND p.resource = ? AND p.action = ?

"""

result = self.conn.execute(query, (user_id, resource, action)).fetchone()

return result[0] > 0

def check_abac(self, context):

policies = self.conn.execute(

"SELECT * FROM policies WHERE is_active = 1 ORDER BY priority DESC"

).fetchall()

for policy in policies:

conditions = json.loads(policy[4])

if self._evaluate_conditions(conditions, context):

return policy[2] == 'allow' # effect

return False # Default deny

def _evaluate_conditions(self, conditions, context):

for key, expected in conditions.items():

actual = context.get(key)

if isinstance(expected, dict):

op = expected.get("op", "eq")

val = expected.get("value")

if op == "eq" and actual != val: return False

if op == "in" and actual not in val: return False

if op == "gte" and actual < val: return False

elif actual != expected:

return False

return True

def authorize(self, user_id, resource, action, context=None):

if not self.check_rbac(user_id, resource, action):

return False

if context:

return self.check_abac(context)

return True

@dataclass

class ABACPolicy:

name: str

effect: str

conditions: str

description: str

policies = [

ABACPolicy("business-hours", "allow", "time: 08:00-18:00, day: Mon-Fri", "เข้าถึงได้เฉพาะเวลาทำการ"),

ABACPolicy("internal-only", "allow", "ip: 10.0.0.0/8, 172.16.0.0/12", "เข้าถึงได้จาก Internal Network เท่านั้น"),

ABACPolicy("same-department", "allow", "user.dept == resource.dept", "แก้ไขได้เฉพาะ Resource ของ Department ตัวเอง"),

ABACPolicy("sensitivity-limit", "deny", "resource.sensitivity > user.clearance", "ไม่อนุญาตถ้า Sensitivity สูงกว่า Clearance"),

ABACPolicy("mfa-required", "deny", "resource.sensitivity >= high AND NOT user.mfa", "Resource สำคัญต้อง MFA"),

]

print("\n=== ABAC Policies ===")

for p in policies:

print(f" [{p.effect.upper()}] {p.name}")

print(f" Conditions: {p.conditions}")

print(f" Description: {p.description}")

Litestream Backup

# === Litestream Backup for Policy DB ===

# litestream.yml
# dbs:
#   - path: /data/policy.db
#     replicas:
#       - type: s3
#         bucket: policy-backup
#         path: policy.db
#         region: ap-southeast-1
#         retention: 720h
#         sync-interval: 1s
#
# Run: litestream replicate -config litestream.yml
# Restore: litestream restore -o /data/policy.db s3://policy-backup/policy.db

# Docker Compose
# services:
#   app:
#     image: my-app:latest
#     volumes: [policy-data:/data]
#   litestream:
#     image: litestream/litestream:latest
#     volumes:
#       - policy-data:/data
#       - ./litestream.yml:/etc/litestream.yml
#     command: replicate -config /etc/litestream.yml

@dataclass
class SecurityCheck:
    check: str
    status: str
    detail: str
    frequency: str

checks = [
    SecurityCheck("RBAC Roles Reviewed", "Pass", "All roles have least-privilege permissions", "Quarterly"),
    SecurityCheck("ABAC Policies Active", "Pass", "5 policies active, 0 disabled", "Weekly"),
    SecurityCheck("Orphan Permissions", "Pass", "No permissions without role assignment", "Monthly"),
    SecurityCheck("Inactive Users", "Warning", "3 users inactive > 90 days", "Monthly"),
    SecurityCheck("Litestream Backup", "Pass", "Last backup 2s ago, S3 healthy", "Real-time"),
    SecurityCheck("Policy DB Integrity", "Pass", "WAL checksum verified", "Daily"),
    SecurityCheck("Access Audit Log", "Pass", "All access logged, 30-day retention", "Real-time"),
    SecurityCheck("MFA Enforcement", "Pass", "All admin users have MFA", "Daily"),
]

print("Security Audit:")
for c in checks:
    print(f"  [{c.status}] {c.check}")
    print(f"    Detail: {c.detail} | Frequency: {c.frequency}")

best_practices = {
    "Least Privilege": "กำหนด Permission น้อยที่สุดที่จำเป็น",
    "Default Deny": "ไม่อนุญาตเป็น Default ต้อง Allow เท่านั้น",
    "Audit Log": "บันทึกทุก Access Decision สำหรับ Forensics",
    "Review": "Review Role Permission ทุก Quarter",
    "MFA": "บังคับ MFA สำหรับ Role ที่มี Privilege สูง",
    "Backup": "Litestream Backup Policy DB ทุกวินาที",
}

print(f"\n\nBest Practices:")
for k, v in best_practices.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • RBAC First: เริ่มจาก RBAC ง่ายก่อน เพิ่ม ABAC เมื่อต้องการ
  • Default Deny: Policy เป็น Deny เสมอ ต้อง Allow เท่านั้น
  • Litestream: Backup Policy DB ด้วย Litestream ทุกวินาที
  • Cache: Cache Policy ใน Memory ลด Query Load
  • Audit: บันทึกทุก Authorization Decision

RBAC คืออะไร

Role-Based Access Control สิทธิ์ตาม Role Admin Editor Viewer Permission กำหนดที่ Role ไม่ใช่ User จัดการง่าย องค์กร Role ชัดเจน