NFS v4 Kerberos Data Pipeline ETL คืออะไร
NFS v4 (Network File System version 4) เป็น distributed file system protocol สำหรับแชร์ไฟล์ผ่าน network รองรับ strong authentication ผ่าน Kerberos, ACLs, file delegation และ compound operations Kerberos เป็น network authentication protocol ที่ใช้ tickets สำหรับ mutual authentication ระหว่าง client และ server โดยไม่ต้องส่ง password ผ่าน network Data Pipeline ETL (Extract, Transform, Load) คือกระบวนการดึงข้อมูลจากแหล่งต่างๆ แปลงรูปแบบ แล้วโหลดเข้า data warehouse การรวม NFS v4 + Kerberos กับ ETL pipeline ช่วยสร้างระบบ data processing ที่ปลอดภัย authenticated และ scalable
NFS v4 + Kerberos Architecture
# nfs_kerberos.py — NFS v4 Kerberos architecture
import json
class NFSKerberosArch:
COMPONENTS = {
"kdc": {
"name": "KDC (Key Distribution Center)",
"description": "Kerberos server — ออก tickets สำหรับ authentication",
"sub_components": ["Authentication Server (AS)", "Ticket Granting Server (TGS)"],
},
"nfs_server": {
"name": "NFS v4 Server",
"description": "แชร์ไฟล์ผ่าน network — ใช้ Kerberos authentication",
"config": "/etc/exports, /etc/krb5.keytab",
},
"nfs_client": {
"name": "NFS v4 Client",
"description": "Mount NFS shares + authenticate ด้วย Kerberos ticket",
"config": "/etc/fstab, /etc/krb5.conf",
},
"etl_engine": {
"name": "ETL Engine",
"description": "อ่านข้อมูลจาก NFS mount → Transform → Load เข้า data warehouse",
"tools": ["Apache Spark", "Apache Airflow", "Python pandas", "dbt"],
},
}
SECURITY_MODES = {
"krb5": "Authentication only — ตรวจสอบตัวตนอย่างเดียว",
"krb5i": "Authentication + Integrity — ป้องกัน data tampering",
"krb5p": "Authentication + Integrity + Privacy — เข้ารหัส data ทั้งหมด",
}
def show_architecture(self):
print("=== NFS v4 + Kerberos Architecture ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print()
def show_security(self):
print("=== Security Modes ===")
for mode, desc in self.SECURITY_MODES.items():
print(f" [{mode}] {desc}")
arch = NFSKerberosArch()
arch.show_architecture()
arch.show_security()
NFS v4 Kerberos Setup
# nfs_setup.py — NFS v4 Kerberos setup guide
import json
class NFSSetup:
KDC_SETUP = """
# === KDC Setup (Kerberos Server) ===
# Install Kerberos KDC
sudo apt install krb5-kdc krb5-admin-server -y
# Configure /etc/krb5.conf
[libdefaults]
default_realm = EXAMPLE.COM
dns_lookup_realm = false
dns_lookup_kdc = false
[realms]
EXAMPLE.COM = {
kdc = kdc.example.com
admin_server = kdc.example.com
}
[domain_realm]
.example.com = EXAMPLE.COM
example.com = EXAMPLE.COM
# Create Kerberos database
sudo kdb5_util create -s
# Create principals
sudo kadmin.local -q "addprinc admin/admin"
sudo kadmin.local -q "addprinc -randkey nfs/nfs-server.example.com"
sudo kadmin.local -q "addprinc -randkey nfs/nfs-client.example.com"
# Export keytabs
sudo kadmin.local -q "ktadd -k /etc/krb5.keytab nfs/nfs-server.example.com"
# Start services
sudo systemctl enable krb5-kdc krb5-admin-server
sudo systemctl start krb5-kdc krb5-admin-server
"""
NFS_SERVER = """
# === NFS v4 Server Setup ===
# Install NFS server
sudo apt install nfs-kernel-server -y
# Configure /etc/exports
/data/pipeline *(sec=krb5p, rw, sync, no_subtree_check, root_squash)
/data/staging *(sec=krb5i, rw, sync, no_subtree_check)
/data/archive *(sec=krb5, ro, sync, no_subtree_check)
# Enable NFSv4 ID mapping
echo "Domain = example.com" | sudo tee /etc/idmapd.conf
# Set Kerberos keytab
sudo ktutil
ktutil: rkt /etc/krb5.keytab
ktutil: list
ktutil: quit
# Start NFS
sudo systemctl enable nfs-kernel-server
sudo systemctl restart nfs-kernel-server
# Verify exports
sudo exportfs -v
"""
NFS_CLIENT = """
# === NFS v4 Client Setup ===
# Install NFS client
sudo apt install nfs-common -y
# Get Kerberos ticket
kinit admin@EXAMPLE.COM
# Mount with Kerberos security
sudo mount -t nfs4 -o sec=krb5p nfs-server.example.com:/data/pipeline /mnt/pipeline
# Persistent mount in /etc/fstab
nfs-server.example.com:/data/pipeline /mnt/pipeline nfs4 sec=krb5p, defaults 0 0
# Verify mount
mount | grep nfs4
ls -la /mnt/pipeline/
"""
def show_kdc(self):
print("=== KDC Setup ===")
print(self.KDC_SETUP[:500])
def show_server(self):
print("\n=== NFS Server ===")
print(self.NFS_SERVER[:500])
def show_client(self):
print("\n=== NFS Client ===")
print(self.NFS_CLIENT[:400])
setup = NFSSetup()
setup.show_kdc()
setup.show_server()
setup.show_client()
ETL Pipeline Implementation
# etl_pipeline.py — ETL pipeline reading from NFS
import json
class ETLPipeline:
CODE = """
# nfs_etl.py — ETL pipeline with NFS v4 Kerberos source
import os
import pandas as pd
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NFSETLPipeline:
def __init__(self, nfs_mount="/mnt/pipeline", output_dir="/data/warehouse"):
self.nfs_mount = Path(nfs_mount)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract(self, pattern="*.csv", date_filter=None):
'''Extract data from NFS mount'''
files = sorted(self.nfs_mount.glob(pattern))
if date_filter:
cutoff = datetime.now() - timedelta(days=date_filter)
files = [f for f in files if datetime.fromtimestamp(f.stat().st_mtime) > cutoff]
logger.info(f"Extracting {len(files)} files from NFS")
dataframes = []
for f in files:
try:
df = pd.read_csv(f)
df['_source_file'] = f.name
df['_extracted_at'] = datetime.utcnow().isoformat()
dataframes.append(df)
logger.info(f" Extracted: {f.name} ({len(df)} rows)")
except Exception as e:
logger.error(f" Failed: {f.name} - {e}")
if dataframes:
return pd.concat(dataframes, ignore_index=True)
return pd.DataFrame()
def transform(self, df):
'''Transform data — clean, validate, enrich'''
logger.info(f"Transforming {len(df)} rows")
# Remove duplicates
before = len(df)
df = df.drop_duplicates()
logger.info(f" Removed {before - len(df)} duplicates")
# Handle missing values
for col in df.select_dtypes(include=['number']).columns:
df[col] = df[col].fillna(0)
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].fillna('')
# Data type conversions
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
# Add computed columns
df['_processed_at'] = datetime.utcnow().isoformat()
df['_row_hash'] = df.apply(lambda r: hash(tuple(r)), axis=1)
logger.info(f" Transform complete: {len(df)} rows")
return df
def load(self, df, table_name, format='parquet'):
'''Load data to warehouse'''
output_path = self.output_dir / f"{table_name}.{format}"
if format == 'parquet':
df.to_parquet(output_path, index=False)
elif format == 'csv':
df.to_csv(output_path, index=False)
logger.info(f"Loaded {len(df)} rows to {output_path}")
return {"path": str(output_path), "rows": len(df)}
def run(self, pattern="*.csv", table_name="raw_data"):
'''Run full ETL pipeline'''
logger.info("=== ETL Pipeline Started ===")
# Extract
df = self.extract(pattern, date_filter=7)
if df.empty:
logger.warning("No data extracted")
return
# Transform
df = self.transform(df)
# Load
result = self.load(df, table_name)
logger.info(f"=== ETL Complete: {result['rows']} rows ===")
return result
# pipeline = NFSETLPipeline("/mnt/pipeline", "/data/warehouse")
# pipeline.run("*.csv", "sales_data")
"""
def show_code(self):
print("=== ETL Pipeline ===")
print(self.CODE[:600])
etl = ETLPipeline()
etl.show_code()
Airflow DAG for NFS ETL
# airflow_dag.py — Airflow DAG for scheduled NFS ETL
import json
class AirflowDAG:
DAG_CODE = """
# dags/nfs_etl_dag.py — Airflow DAG for NFS ETL pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['data-team@example.com'],
}
dag = DAG(
'nfs_etl_pipeline',
default_args=default_args,
schedule_interval='0 */6 * * *', # Every 6 hours
catchup=False,
tags=['etl', 'nfs', 'kerberos'],
)
def check_kerberos_ticket(**context):
'''Ensure valid Kerberos ticket exists'''
import subprocess
result = subprocess.run(['klist', '-s'], capture_output=True)
if result.returncode != 0:
subprocess.run(['kinit', '-kt', '/etc/krb5.keytab', 'etl-service@EXAMPLE.COM'])
return True
def verify_nfs_mount(**context):
'''Verify NFS mount is accessible'''
mount_path = Path('/mnt/pipeline')
if not mount_path.is_mount():
raise Exception("NFS mount not available")
files = list(mount_path.glob('*.csv'))
return len(files)
def extract_data(**context):
'''Extract new files from NFS'''
from nfs_etl import NFSETLPipeline
pipeline = NFSETLPipeline()
df = pipeline.extract('*.csv', date_filter=1)
# Save to staging
staging_path = '/tmp/etl_staging/extract.parquet'
df.to_parquet(staging_path, index=False)
context['ti'].xcom_push(key='extract_path', value=staging_path)
context['ti'].xcom_push(key='row_count', value=len(df))
def transform_data(**context):
'''Transform extracted data'''
from nfs_etl import NFSETLPipeline
staging_path = context['ti'].xcom_pull(key='extract_path')
df = pd.read_parquet(staging_path)
pipeline = NFSETLPipeline()
df = pipeline.transform(df)
transform_path = '/tmp/etl_staging/transform.parquet'
df.to_parquet(transform_path, index=False)
context['ti'].xcom_push(key='transform_path', value=transform_path)
def load_data(**context):
'''Load to data warehouse'''
transform_path = context['ti'].xcom_pull(key='transform_path')
df = pd.read_parquet(transform_path)
# Load to PostgreSQL / BigQuery / etc.
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@warehouse:5432/analytics')
df.to_sql('nfs_data', engine, if_exists='append', index=False)
return len(df)
# Tasks
t1 = PythonOperator(task_id='check_kerberos', python_callable=check_kerberos_ticket, dag=dag)
t2 = PythonOperator(task_id='verify_nfs', python_callable=verify_nfs_mount, dag=dag)
t3 = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
t4 = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
t5 = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
t1 >> t2 >> t3 >> t4 >> t5
"""
def show_dag(self):
print("=== Airflow DAG ===")
print(self.DAG_CODE[:600])
dag = AirflowDAG()
dag.show_dag()
Monitoring & Troubleshooting
# monitoring.py — NFS ETL monitoring
import json
import random
class NFSMonitoring:
HEALTH_CHECKS = """
# nfs_health.py — NFS + Kerberos health checks
import subprocess
import os
from pathlib import Path
class NFSHealthChecker:
def __init__(self, mount_point="/mnt/pipeline"):
self.mount = Path(mount_point)
def check_kerberos(self):
result = subprocess.run(['klist', '-s'], capture_output=True)
return {"kerberos": result.returncode == 0}
def check_nfs_mount(self):
return {"nfs_mounted": self.mount.is_mount()}
def check_read_access(self):
try:
list(self.mount.iterdir())
return {"read_access": True}
except PermissionError:
return {"read_access": False}
def check_write_access(self):
test_file = self.mount / ".health_check"
try:
test_file.write_text("ok")
test_file.unlink()
return {"write_access": True}
except:
return {"write_access": False}
def check_latency(self):
import time
start = time.time()
list(self.mount.glob("*.csv"))
latency = (time.time() - start) * 1000
return {"latency_ms": round(latency, 1), "healthy": latency < 1000}
def full_check(self):
checks = {}
checks.update(self.check_kerberos())
checks.update(self.check_nfs_mount())
checks.update(self.check_read_access())
checks.update(self.check_latency())
all_ok = all(v for k, v in checks.items() if isinstance(v, bool))
checks["overall"] = "healthy" if all_ok else "unhealthy"
return checks
# checker = NFSHealthChecker()
# status = checker.full_check()
"""
TROUBLESHOOTING = {
"mount_failed": {
"error": "mount.nfs4: access denied by server",
"causes": ["Kerberos ticket expired", "keytab ไม่ถูกต้อง", "NFS export config ผิด"],
"fix": "kinit -kt /etc/krb5.keytab principal → sudo mount -a",
},
"permission_denied": {
"error": "Permission denied reading files",
"causes": ["UID/GID mapping ไม่ตรง", "idmapd config ผิด", "NFSv4 ACL"],
"fix": "ตรวจสอบ /etc/idmapd.conf Domain + rpc.idmapd restart",
},
"stale_handle": {
"error": "Stale file handle",
"causes": ["Server restart แล้ว export เปลี่ยน", "file ถูกลบขณะ mount"],
"fix": "sudo umount -f /mnt/pipeline && sudo mount -a",
},
}
def show_health(self):
print("=== Health Checks ===")
print(self.HEALTH_CHECKS[:500])
def show_troubleshooting(self):
print(f"\n=== Troubleshooting ===")
for key, issue in self.TROUBLESHOOTING.items():
print(f" [{issue['error']}]")
print(f" Fix: {issue['fix']}")
mon = NFSMonitoring()
mon.show_health()
mon.show_troubleshooting()
FAQ - คำถามที่พบบ่อย
Q: ทำไมต้องใช้ Kerberos กับ NFS?
A: NFS v3 ใช้ AUTH_SYS (trust UID/GID) — ไม่ปลอดภัย ปลอม UID ได้ง่าย NFS v4 + Kerberos: mutual authentication (ทั้ง client + server ยืนยันตัวตน), integrity protection (krb5i) และ encryption (krb5p) จำเป็นสำหรับ: enterprise environments, compliance (PCI DSS, HIPAA), data ที่ sensitive
Q: NFS v4 กับ S3/MinIO อันไหนดีสำหรับ ETL?
A: NFS v4: POSIX filesystem, low latency, เหมาะกับ applications ที่ต้อง random access, legacy apps S3/MinIO: object storage, scalable มาก, เหมาะกับ big data, cloud-native ETL ใช้ NFS: เมื่อ data source เป็น file-based, ต้อง low latency, legacy systems ใช้ S3: เมื่อ data volume ใหญ่มาก, ต้อง scalability, cloud-first strategy
Q: Performance ของ NFS v4 + Kerberos ช้าไหม?
A: krb5: overhead น้อยมาก (authentication only) krb5i: overhead ~5-10% (integrity checksums) krb5p: overhead ~15-30% (full encryption) Tuning: เพิ่ม rsize/wsize (1MB), ใช้ async mount, NFS over RDMA สำหรับ HPC แนะนำ: ใช้ krb5p สำหรับ sensitive data, krb5i สำหรับ general purpose
Q: Kerberos ticket expire ทำให้ ETL fail ได้ไหม?
A: ได้ — default ticket lifetime 10 ชั่วโมง ถ้า ETL job รันนานกว่า → fail แก้: ใช้ keytab + cron job renew ticket อัตโนมัติ หรือ k5start daemon ที่ auto-renew ใน Airflow: เพิ่ม task ตรวจสอบ/renew Kerberos ticket ก่อนทุก ETL run
