Unity Catalog ????????? DevSecOps ?????????????????????
Databricks Unity Catalog ???????????? unified governance solution ?????????????????? data ????????? AI assets ?????? Databricks Lakehouse Platform ?????????????????? data access control, data lineage, data discovery ????????? audit logging ????????????????????????????????? ??????????????????????????? workspaces ?????????
DevSecOps ?????????????????? data platform ???????????????????????????????????? security practices ??????????????????????????????????????????????????????????????? data engineering pipeline ????????????????????? development, testing, deployment ??????????????? monitoring ????????????????????????????????? security ?????????????????? ?????????????????? security ???????????????????????? pipeline ??????????????????????????????
???????????????????????? Unity Catalog ????????? DevSecOps ????????? Centralized access control ?????????????????????????????????????????????????????????????????????, Data lineage tracking ??????????????????????????? data ???????????????????????? ??????????????????, Automated policy enforcement ??????????????????????????? policy ???????????????????????????, Audit trail ????????????????????????????????????????????????????????? data, Secure CI/CD ?????????????????? data pipelines
????????????????????? Unity Catalog
Setup Unity Catalog ?????? Databricks
# === Unity Catalog Setup ===
# 1. Create Metastore (Terraform)
cat > unity_catalog.tf << 'EOF'
terraform {
required_providers {
databricks = {
source = "databricks/databricks"
version = "~> 1.40"
}
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
# S3 Bucket for Metastore
resource "aws_s3_bucket" "metastore" {
bucket = "company-unity-catalog-metastore"
tags = {
Environment = "production"
ManagedBy = "terraform"
}
}
resource "aws_s3_bucket_versioning" "metastore" {
bucket = aws_s3_bucket.metastore.id
versioning_configuration {
status = "Enabled"
}
}
# IAM Role for Unity Catalog
resource "aws_iam_role" "unity_catalog" {
name = "unity-catalog-metastore-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole"
}
Condition = {
StringEquals = {
"sts:ExternalId" = var.databricks_account_id
}
}
}]
})
}
# Databricks Metastore
resource "databricks_metastore" "main" {
name = "company-metastore"
storage_root = "s3:///unity-catalog"
owner = "data-platform-admins"
force_destroy = false
}
# Assign Metastore to Workspace
resource "databricks_metastore_assignment" "default" {
metastore_id = databricks_metastore.main.id
workspace_id = var.databricks_workspace_id
default_catalog_name = "main"
}
# Create Catalog
resource "databricks_catalog" "production" {
metastore_id = databricks_metastore.main.id
name = "production"
comment = "Production data catalog"
}
# Create Schema
resource "databricks_schema" "analytics" {
catalog_name = databricks_catalog.production.name
name = "analytics"
comment = "Analytics data schema"
}
# Grant Permissions
resource "databricks_grants" "production_catalog" {
catalog = databricks_catalog.production.name
grant {
principal = "data-engineers"
privileges = ["USE_CATALOG", "USE_SCHEMA", "SELECT", "MODIFY"]
}
grant {
principal = "data-analysts"
privileges = ["USE_CATALOG", "USE_SCHEMA", "SELECT"]
}
grant {
principal = "data-scientists"
privileges = ["USE_CATALOG", "USE_SCHEMA", "SELECT", "CREATE_TABLE"]
}
}
EOF
terraform init
terraform plan
terraform apply
echo "Unity Catalog setup complete"
DevSecOps Pipeline ?????????????????? Data Platform
??????????????? secure CI/CD pipeline ?????????????????? data engineering
#!/usr/bin/env python3
# devsecops_pipeline.py ??? DevSecOps for Databricks
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("devsecops")
class DataDevSecOpsPipeline:
def __init__(self):
self.stages = []
def pipeline_stages(self):
return {
"1_develop": {
"name": "Development",
"security_controls": [
"Pre-commit hooks ?????????????????? secrets scanning",
"SQL linting (sqlfluff) ???????????? SQL injection patterns",
"Databricks Asset Bundles (DABs) ?????????????????? version control",
"IDE integration ????????? Unity Catalog permissions",
],
"tools": ["pre-commit", "sqlfluff", "databricks-cli", "gitleaks"],
},
"2_build": {
"name": "Build & Test",
"security_controls": [
"Unit tests ?????????????????? data transformations",
"Schema validation ???????????? data contracts",
"Dependency scanning (pip-audit, safety)",
"SAST scanning ?????????????????? PySpark code",
],
"tools": ["pytest", "great_expectations", "pip-audit", "bandit"],
},
"3_security_scan": {
"name": "Security Scanning",
"security_controls": [
"Secrets detection ?????? notebooks ????????? configs",
"IAM policy validation",
"Unity Catalog permission audit",
"Data classification check (PII detection)",
],
"tools": ["trufflehog", "checkov", "custom PII scanner"],
},
"4_deploy": {
"name": "Deployment",
"security_controls": [
"Terraform plan review ?????????????????? infrastructure",
"Unity Catalog grants validation",
"Rollback strategy",
"Blue-green deployment ?????????????????? jobs",
],
"tools": ["terraform", "databricks-cli", "GitHub Actions"],
},
"5_monitor": {
"name": "Runtime Monitoring",
"security_controls": [
"Audit log monitoring",
"Anomaly detection ?????????????????? data access patterns",
"Query performance monitoring",
"Cost monitoring ????????? alerts",
],
"tools": ["Databricks SQL Alerts", "Prometheus", "Grafana"],
},
}
def github_actions_workflow(self):
return """
name: Data Pipeline DevSecOps
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Secrets Scan
uses: trufflesecurity/trufflehog@main
with:
path: ./
- name: Python Security (Bandit)
run: |
pip install bandit
bandit -r src/ -f json -o bandit-report.json || true
- name: SQL Lint
run: |
pip install sqlfluff
sqlfluff lint src/sql/ --dialect databricks
- name: Dependency Audit
run: |
pip install pip-audit
pip-audit -r requirements.txt
deploy:
needs: security-scan
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy with Databricks CLI
env:
DATABRICKS_HOST: }
DATABRICKS_TOKEN: }
run: |
pip install databricks-cli
databricks bundle deploy --target production
"""
pipeline = DataDevSecOpsPipeline()
stages = pipeline.pipeline_stages()
print("DevSecOps Pipeline Stages:")
for stage_id, stage in stages.items():
print(f"\n {stage['name']}:")
print(f" Tools: {', '.join(stage['tools'])}")
for control in stage["security_controls"][:2]:
print(f" - {control}")
Access Control ????????? Data Governance
?????????????????? permissions ????????? data governance
#!/usr/bin/env python3
# data_governance.py ??? Unity Catalog Data Governance
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("governance")
class DataGovernance:
def __init__(self):
self.policies = {}
def access_control_model(self):
return {
"hierarchy": {
"metastore": "Top level ??? one per region",
"catalog": "Database grouping (production, staging, sandbox)",
"schema": "Logical grouping within catalog",
"table_view": "Individual data assets",
"column": "Column-level security (masking)",
},
"permission_types": {
"USE_CATALOG": "????????????????????? catalog",
"USE_SCHEMA": "????????????????????? schema",
"SELECT": "???????????? data",
"MODIFY": "??????????????? data (INSERT, UPDATE, DELETE)",
"CREATE_TABLE": "??????????????? table ????????????",
"CREATE_SCHEMA": "??????????????? schema ????????????",
"ALL_PRIVILEGES": "???????????????????????????",
},
"sql_examples": {
"grant_select": "GRANT SELECT ON SCHEMA production.analytics TO `data-analysts`;",
"grant_modify": "GRANT MODIFY ON TABLE production.analytics.orders TO `data-engineers`;",
"column_mask": """
CREATE FUNCTION mask_email(email STRING)
RETURNS STRING
RETURN CONCAT(LEFT(email, 2), '***@', SPLIT(email, '@')[1]);
ALTER TABLE production.customers
ALTER COLUMN email SET MASK mask_email;
""",
"row_filter": """
CREATE FUNCTION region_filter(region STRING)
RETURNS BOOLEAN
RETURN region = current_user_region();
ALTER TABLE production.sales
SET ROW FILTER region_filter ON (region);
""",
},
}
def data_classification(self):
return {
"levels": {
"public": {"label": "Public", "controls": "No restrictions"},
"internal": {"label": "Internal", "controls": "Employee access only"},
"confidential": {"label": "Confidential", "controls": "Need-to-know basis, encryption"},
"restricted": {"label": "Restricted (PII/PHI)", "controls": "Column masking, audit logging, encryption"},
},
"pii_columns": [
{"column": "email", "classification": "restricted", "mask": "mask_email"},
{"column": "phone", "classification": "restricted", "mask": "mask_phone"},
{"column": "national_id", "classification": "restricted", "mask": "mask_id"},
{"column": "name", "classification": "confidential", "mask": "mask_name"},
{"column": "address", "classification": "confidential", "mask": "mask_address"},
],
}
gov = DataGovernance()
model = gov.access_control_model()
print("Unity Catalog Hierarchy:")
for level, desc in model["hierarchy"].items():
print(f" {level}: {desc}")
classification = gov.data_classification()
print("\nData Classification:")
for level, info in classification["levels"].items():
print(f" {info['label']}: {info['controls']}")
Automated Security Scanning
???????????? scan ????????????????????????????????????????????? data pipelines
# === Automated Security Scanning ===
# 1. PII Detection Script
cat > pii_scanner.py << 'PYEOF'
#!/usr/bin/env python3
"""Scan Databricks tables for PII data"""
import re
import json
class PIIScanner:
def __init__(self):
self.patterns = {
"email": r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
"thai_phone": r'0[689]\d{8}',
"thai_id": r'\d{1}-\d{4}-\d{5}-\d{2}-\d{1}',
"credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
"ip_address": r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
}
def scan_sample(self, data, column_name):
"""Scan sample data for PII patterns"""
findings = []
for pii_type, pattern in self.patterns.items():
for value in data:
if isinstance(value, str) and re.search(pattern, value):
findings.append({
"column": column_name,
"pii_type": pii_type,
"action": "Apply column masking",
})
break
return findings
scanner = PIIScanner()
sample = ["john@example.com", "0891234567", "normal text"]
findings = scanner.scan_sample(sample, "contact_info")
for f in findings:
print(f" PII Found: {f['column']} contains {f['pii_type']}")
PYEOF
# 2. IaC Security Scan (Checkov)
pip install checkov
# Scan Terraform files
checkov -d terraform/ --framework terraform --output json > checkov-report.json
# Scan Databricks specific
checkov -d . --check CKV_DATABRICKS_1, CKV_DATABRICKS_2
# 3. Notebook Security Scan
cat > notebook_scanner.sh << 'BASH'
#!/bin/bash
# Scan Databricks notebooks for security issues
echo "=== Notebook Security Scan ==="
# Check for hardcoded secrets
echo "Checking for hardcoded secrets..."
grep -rn "password\s*=\s*['\"]" notebooks/ || echo " No hardcoded passwords found"
grep -rn "api_key\s*=\s*['\"]" notebooks/ || echo " No hardcoded API keys found"
grep -rn "secret\s*=\s*['\"]" notebooks/ || echo " No hardcoded secrets found"
# Check for unsafe SQL patterns
echo "Checking for SQL injection patterns..."
grep -rn "f\".*{.*}.*\"" notebooks/ --include="*.py" | grep -i "sql\|query\|select\|insert" || echo " No f-string SQL found"
# Check for broad permissions
echo "Checking for overly broad permissions..."
grep -rn "ALL_PRIVILEGES\|GRANT.*TO.*PUBLIC" notebooks/ || echo " No broad permissions found"
echo "Scan complete"
BASH
chmod +x notebook_scanner.sh
echo "Security scanning configured"
Monitoring ????????? Audit
?????????????????? data access ????????? security events
#!/usr/bin/env python3
# audit_monitoring.py ??? Unity Catalog Audit Monitoring
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("audit")
class AuditMonitor:
def __init__(self):
self.alerts = []
def audit_queries(self):
"""SQL queries for Unity Catalog audit"""
return {
"recent_access": """
-- Recent data access events
SELECT
event_time,
user_identity.email as user_email,
action_name,
request_params.full_name_arg as table_name,
response.status_code
FROM system.access.audit
WHERE event_date >= current_date() - 7
AND action_name IN ('getTable', 'commandSubmit')
ORDER BY event_time DESC
LIMIT 100;
""",
"permission_changes": """
-- Permission changes audit
SELECT
event_time,
user_identity.email as changed_by,
action_name,
request_params.securable_full_name as resource,
request_params.changes as permission_changes
FROM system.access.audit
WHERE action_name LIKE '%Grant%'
AND event_date >= current_date() - 30
ORDER BY event_time DESC;
""",
"failed_access": """
-- Failed access attempts
SELECT
event_time,
user_identity.email as user_email,
action_name,
request_params.full_name_arg as resource,
response.error_message
FROM system.access.audit
WHERE response.status_code != 200
AND event_date >= current_date() - 7
ORDER BY event_time DESC;
""",
"data_lineage": """
-- Data lineage tracking
SELECT
source_table_full_name,
target_table_full_name,
entity_type,
created_at
FROM system.access.table_lineage
WHERE target_table_full_name LIKE 'production.%'
ORDER BY created_at DESC
LIMIT 50;
""",
}
def dashboard(self):
return {
"access_summary_7d": {
"total_queries": 15420,
"unique_users": 45,
"tables_accessed": 230,
"failed_attempts": 12,
"permission_changes": 5,
},
"top_accessed_tables": [
{"table": "production.analytics.orders", "access_count": 2500},
{"table": "production.analytics.customers", "access_count": 1800},
{"table": "production.analytics.products", "access_count": 1200},
],
"security_alerts": [
{"type": "Unusual access pattern", "user": "user@company.com", "detail": "Accessed 50+ tables in 1 hour"},
{"type": "Failed access", "user": "new-user@company.com", "detail": "5 failed attempts to restricted table"},
],
"compliance": {
"pii_tables_masked": "95% (19/20)",
"audit_logging": "enabled",
"encryption_at_rest": "enabled",
"encryption_in_transit": "TLS 1.3",
},
}
monitor = AuditMonitor()
dash = monitor.dashboard()
print(f"Access Summary (7d):")
print(f" Queries: {dash['access_summary_7d']['total_queries']:,}")
print(f" Users: {dash['access_summary_7d']['unique_users']}")
print(f" Failed: {dash['access_summary_7d']['failed_attempts']}")
print(f"\nCompliance:")
for key, val in dash["compliance"].items():
print(f" {key}: {val}")
FAQ ??????????????????????????????????????????
Q: Unity Catalog ????????? Hive Metastore ???????????????????????????????????????????
A: Hive Metastore ???????????? legacy metastore ????????? Databricks ??????????????? workspace ?????? metastore ?????????????????? permissions ??????????????????????????? workspace level ??????????????? data lineage ??????????????? column-level security Unity Catalog ???????????? unified metastore ???????????? workspaces permissions ??????????????????????????????????????????????????? ?????? data lineage tracking ?????? column masking ????????? row filters ?????? audit logging ????????? ??????????????? migrate ????????? Hive ?????? Unity Catalog ?????????????????? production workloads ??????????????????????????????
Q: DevSecOps ?????????????????? data platform ????????????????????? DevSecOps ??????????????????????????????????
A: DevSecOps ?????????????????? data platform ??????????????????????????????????????????????????????????????? Data privacy ?????????????????????????????? PII/PHI compliance (PDPA, GDPR), Data access control ????????????????????????????????? (column-level, row-level), Data lineage ??????????????????????????????????????? data ????????????????????????, Data quality ???????????????????????????????????????????????? security (garbage in garbage out), Notebook security notebooks ??????????????? secrets hardcoded, Cost control data pipeline ???????????????????????? governance ?????????????????? resources ?????????????????????????????? Tools ??????????????????????????? Great Expectations (data quality), PII scanners, Terraform ?????????????????? Unity Catalog, Databricks Asset Bundles
Q: Column Masking ?????? Unity Catalog ?????????????????????????????????????
A: Column Masking ???????????????????????????????????????????????????????????? user ????????? query ??????????????? masking function (UDF) ???????????? mask_email ????????????????????? john@example.com ???????????? jo***@example.com ???????????? ALTER TABLE SET MASK ?????? column ???????????? ??????????????? data analyst query ????????????????????????????????????????????? masked ???????????? ??????????????? data engineer ????????????????????????????????????????????? query ???????????????????????????????????????????????? ???????????????????????? query engine level ????????????????????? ?????????????????????????????????????????? code ?????????????????? dynamic masking ????????? group ???????????? attribute ????????? user
Q: Unity Catalog ?????????????????????????????????????????????????????????????
A: Unity Catalog ???????????????????????????????????????????????? Databricks platform ?????????????????????????????????????????????????????????????????????????????? core features (access control, grants, metastore) ??????????????????????????????????????????????????????????????? storage ????????? metastore (S3/ADLS), compute ?????????????????? query audit logs, advanced features ????????? features ????????????????????? Premium tier (???????????? attribute-based access control) ????????????????????????????????????????????????????????????????????? compute (DBU) ????????? storage ?????????????????????????????? ROI ????????? Unity Catalog ????????????????????? risk ????????? data breach, ?????????????????? compliance audit, ???????????????????????????????????? permissions (centralized)
