SiamCafe.net Blog
Technology

NFS v4 Kerberos Data Pipeline ETL

nfs v4 kerberos data pipeline etl
NFS v4 Kerberos Data Pipeline ETL | SiamCafe Blog
2025-07-19· อ. บอม — SiamCafe.net· 1,934 คำ

NFS v4 Kerberos Data Pipeline ETL คืออะไร

NFS v4 (Network File System version 4) เป็น distributed file system protocol สำหรับแชร์ไฟล์ผ่าน network รองรับ strong authentication ผ่าน Kerberos, ACLs, file delegation และ compound operations Kerberos เป็น network authentication protocol ที่ใช้ tickets สำหรับ mutual authentication ระหว่าง client และ server โดยไม่ต้องส่ง password ผ่าน network Data Pipeline ETL (Extract, Transform, Load) คือกระบวนการดึงข้อมูลจากแหล่งต่างๆ แปลงรูปแบบ แล้วโหลดเข้า data warehouse การรวม NFS v4 + Kerberos กับ ETL pipeline ช่วยสร้างระบบ data processing ที่ปลอดภัย authenticated และ scalable

NFS v4 + Kerberos Architecture

# nfs_kerberos.py — NFS v4 Kerberos architecture
import json

class NFSKerberosArch:
    COMPONENTS = {
        "kdc": {
            "name": "KDC (Key Distribution Center)",
            "description": "Kerberos server — ออก tickets สำหรับ authentication",
            "sub_components": ["Authentication Server (AS)", "Ticket Granting Server (TGS)"],
        },
        "nfs_server": {
            "name": "NFS v4 Server",
            "description": "แชร์ไฟล์ผ่าน network — ใช้ Kerberos authentication",
            "config": "/etc/exports, /etc/krb5.keytab",
        },
        "nfs_client": {
            "name": "NFS v4 Client",
            "description": "Mount NFS shares + authenticate ด้วย Kerberos ticket",
            "config": "/etc/fstab, /etc/krb5.conf",
        },
        "etl_engine": {
            "name": "ETL Engine",
            "description": "อ่านข้อมูลจาก NFS mount → Transform → Load เข้า data warehouse",
            "tools": ["Apache Spark", "Apache Airflow", "Python pandas", "dbt"],
        },
    }

    SECURITY_MODES = {
        "krb5": "Authentication only — ตรวจสอบตัวตนอย่างเดียว",
        "krb5i": "Authentication + Integrity — ป้องกัน data tampering",
        "krb5p": "Authentication + Integrity + Privacy — เข้ารหัส data ทั้งหมด",
    }

    def show_architecture(self):
        print("=== NFS v4 + Kerberos Architecture ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print()

    def show_security(self):
        print("=== Security Modes ===")
        for mode, desc in self.SECURITY_MODES.items():
            print(f"  [{mode}] {desc}")

arch = NFSKerberosArch()
arch.show_architecture()
arch.show_security()

NFS v4 Kerberos Setup

# nfs_setup.py — NFS v4 Kerberos setup guide
import json

class NFSSetup:
    KDC_SETUP = """
# === KDC Setup (Kerberos Server) ===

# Install Kerberos KDC
sudo apt install krb5-kdc krb5-admin-server -y

# Configure /etc/krb5.conf
[libdefaults]
    default_realm = EXAMPLE.COM
    dns_lookup_realm = false
    dns_lookup_kdc = false

[realms]
    EXAMPLE.COM = {
        kdc = kdc.example.com
        admin_server = kdc.example.com
    }

[domain_realm]
    .example.com = EXAMPLE.COM
    example.com = EXAMPLE.COM

# Create Kerberos database
sudo kdb5_util create -s

# Create principals
sudo kadmin.local -q "addprinc admin/admin"
sudo kadmin.local -q "addprinc -randkey nfs/nfs-server.example.com"
sudo kadmin.local -q "addprinc -randkey nfs/nfs-client.example.com"

# Export keytabs
sudo kadmin.local -q "ktadd -k /etc/krb5.keytab nfs/nfs-server.example.com"

# Start services
sudo systemctl enable krb5-kdc krb5-admin-server
sudo systemctl start krb5-kdc krb5-admin-server
"""

    NFS_SERVER = """
# === NFS v4 Server Setup ===

# Install NFS server
sudo apt install nfs-kernel-server -y

# Configure /etc/exports
/data/pipeline  *(sec=krb5p, rw, sync, no_subtree_check, root_squash)
/data/staging   *(sec=krb5i, rw, sync, no_subtree_check)
/data/archive   *(sec=krb5, ro, sync, no_subtree_check)

# Enable NFSv4 ID mapping
echo "Domain = example.com" | sudo tee /etc/idmapd.conf

# Set Kerberos keytab
sudo ktutil
ktutil: rkt /etc/krb5.keytab
ktutil: list
ktutil: quit

# Start NFS
sudo systemctl enable nfs-kernel-server
sudo systemctl restart nfs-kernel-server

# Verify exports
sudo exportfs -v
"""

    NFS_CLIENT = """
# === NFS v4 Client Setup ===

# Install NFS client
sudo apt install nfs-common -y

# Get Kerberos ticket
kinit admin@EXAMPLE.COM

# Mount with Kerberos security
sudo mount -t nfs4 -o sec=krb5p nfs-server.example.com:/data/pipeline /mnt/pipeline

# Persistent mount in /etc/fstab
nfs-server.example.com:/data/pipeline  /mnt/pipeline  nfs4  sec=krb5p, defaults  0  0

# Verify mount
mount | grep nfs4
ls -la /mnt/pipeline/
"""

    def show_kdc(self):
        print("=== KDC Setup ===")
        print(self.KDC_SETUP[:500])

    def show_server(self):
        print("\n=== NFS Server ===")
        print(self.NFS_SERVER[:500])

    def show_client(self):
        print("\n=== NFS Client ===")
        print(self.NFS_CLIENT[:400])

setup = NFSSetup()
setup.show_kdc()
setup.show_server()
setup.show_client()

ETL Pipeline Implementation

# etl_pipeline.py — ETL pipeline reading from NFS
import json

class ETLPipeline:
    CODE = """
# nfs_etl.py — ETL pipeline with NFS v4 Kerberos source
import os
import pandas as pd
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class NFSETLPipeline:
    def __init__(self, nfs_mount="/mnt/pipeline", output_dir="/data/warehouse"):
        self.nfs_mount = Path(nfs_mount)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def extract(self, pattern="*.csv", date_filter=None):
        '''Extract data from NFS mount'''
        files = sorted(self.nfs_mount.glob(pattern))
        
        if date_filter:
            cutoff = datetime.now() - timedelta(days=date_filter)
            files = [f for f in files if datetime.fromtimestamp(f.stat().st_mtime) > cutoff]
        
        logger.info(f"Extracting {len(files)} files from NFS")
        
        dataframes = []
        for f in files:
            try:
                df = pd.read_csv(f)
                df['_source_file'] = f.name
                df['_extracted_at'] = datetime.utcnow().isoformat()
                dataframes.append(df)
                logger.info(f"  Extracted: {f.name} ({len(df)} rows)")
            except Exception as e:
                logger.error(f"  Failed: {f.name} - {e}")
        
        if dataframes:
            return pd.concat(dataframes, ignore_index=True)
        return pd.DataFrame()
    
    def transform(self, df):
        '''Transform data — clean, validate, enrich'''
        logger.info(f"Transforming {len(df)} rows")
        
        # Remove duplicates
        before = len(df)
        df = df.drop_duplicates()
        logger.info(f"  Removed {before - len(df)} duplicates")
        
        # Handle missing values
        for col in df.select_dtypes(include=['number']).columns:
            df[col] = df[col].fillna(0)
        
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = df[col].fillna('')
        
        # Data type conversions
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
        
        # Add computed columns
        df['_processed_at'] = datetime.utcnow().isoformat()
        df['_row_hash'] = df.apply(lambda r: hash(tuple(r)), axis=1)
        
        logger.info(f"  Transform complete: {len(df)} rows")
        return df
    
    def load(self, df, table_name, format='parquet'):
        '''Load data to warehouse'''
        output_path = self.output_dir / f"{table_name}.{format}"
        
        if format == 'parquet':
            df.to_parquet(output_path, index=False)
        elif format == 'csv':
            df.to_csv(output_path, index=False)
        
        logger.info(f"Loaded {len(df)} rows to {output_path}")
        return {"path": str(output_path), "rows": len(df)}
    
    def run(self, pattern="*.csv", table_name="raw_data"):
        '''Run full ETL pipeline'''
        logger.info("=== ETL Pipeline Started ===")
        
        # Extract
        df = self.extract(pattern, date_filter=7)
        if df.empty:
            logger.warning("No data extracted")
            return
        
        # Transform
        df = self.transform(df)
        
        # Load
        result = self.load(df, table_name)
        
        logger.info(f"=== ETL Complete: {result['rows']} rows ===")
        return result

# pipeline = NFSETLPipeline("/mnt/pipeline", "/data/warehouse")
# pipeline.run("*.csv", "sales_data")
"""

    def show_code(self):
        print("=== ETL Pipeline ===")
        print(self.CODE[:600])

etl = ETLPipeline()
etl.show_code()

Airflow DAG for NFS ETL

# airflow_dag.py — Airflow DAG for scheduled NFS ETL
import json

class AirflowDAG:
    DAG_CODE = """
# dags/nfs_etl_dag.py — Airflow DAG for NFS ETL pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-team@example.com'],
}

dag = DAG(
    'nfs_etl_pipeline',
    default_args=default_args,
    schedule_interval='0 */6 * * *',  # Every 6 hours
    catchup=False,
    tags=['etl', 'nfs', 'kerberos'],
)

def check_kerberos_ticket(**context):
    '''Ensure valid Kerberos ticket exists'''
    import subprocess
    result = subprocess.run(['klist', '-s'], capture_output=True)
    if result.returncode != 0:
        subprocess.run(['kinit', '-kt', '/etc/krb5.keytab', 'etl-service@EXAMPLE.COM'])
    return True

def verify_nfs_mount(**context):
    '''Verify NFS mount is accessible'''
    mount_path = Path('/mnt/pipeline')
    if not mount_path.is_mount():
        raise Exception("NFS mount not available")
    files = list(mount_path.glob('*.csv'))
    return len(files)

def extract_data(**context):
    '''Extract new files from NFS'''
    from nfs_etl import NFSETLPipeline
    pipeline = NFSETLPipeline()
    df = pipeline.extract('*.csv', date_filter=1)
    
    # Save to staging
    staging_path = '/tmp/etl_staging/extract.parquet'
    df.to_parquet(staging_path, index=False)
    context['ti'].xcom_push(key='extract_path', value=staging_path)
    context['ti'].xcom_push(key='row_count', value=len(df))

def transform_data(**context):
    '''Transform extracted data'''
    from nfs_etl import NFSETLPipeline
    staging_path = context['ti'].xcom_pull(key='extract_path')
    
    df = pd.read_parquet(staging_path)
    pipeline = NFSETLPipeline()
    df = pipeline.transform(df)
    
    transform_path = '/tmp/etl_staging/transform.parquet'
    df.to_parquet(transform_path, index=False)
    context['ti'].xcom_push(key='transform_path', value=transform_path)

def load_data(**context):
    '''Load to data warehouse'''
    transform_path = context['ti'].xcom_pull(key='transform_path')
    df = pd.read_parquet(transform_path)
    
    # Load to PostgreSQL / BigQuery / etc.
    from sqlalchemy import create_engine
    engine = create_engine('postgresql://user:pass@warehouse:5432/analytics')
    df.to_sql('nfs_data', engine, if_exists='append', index=False)
    
    return len(df)

# Tasks
t1 = PythonOperator(task_id='check_kerberos', python_callable=check_kerberos_ticket, dag=dag)
t2 = PythonOperator(task_id='verify_nfs', python_callable=verify_nfs_mount, dag=dag)
t3 = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
t4 = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
t5 = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

t1 >> t2 >> t3 >> t4 >> t5
"""

    def show_dag(self):
        print("=== Airflow DAG ===")
        print(self.DAG_CODE[:600])

dag = AirflowDAG()
dag.show_dag()

Monitoring & Troubleshooting

# monitoring.py — NFS ETL monitoring
import json
import random

class NFSMonitoring:
    HEALTH_CHECKS = """
# nfs_health.py — NFS + Kerberos health checks
import subprocess
import os
from pathlib import Path

class NFSHealthChecker:
    def __init__(self, mount_point="/mnt/pipeline"):
        self.mount = Path(mount_point)
    
    def check_kerberos(self):
        result = subprocess.run(['klist', '-s'], capture_output=True)
        return {"kerberos": result.returncode == 0}
    
    def check_nfs_mount(self):
        return {"nfs_mounted": self.mount.is_mount()}
    
    def check_read_access(self):
        try:
            list(self.mount.iterdir())
            return {"read_access": True}
        except PermissionError:
            return {"read_access": False}
    
    def check_write_access(self):
        test_file = self.mount / ".health_check"
        try:
            test_file.write_text("ok")
            test_file.unlink()
            return {"write_access": True}
        except:
            return {"write_access": False}
    
    def check_latency(self):
        import time
        start = time.time()
        list(self.mount.glob("*.csv"))
        latency = (time.time() - start) * 1000
        return {"latency_ms": round(latency, 1), "healthy": latency < 1000}
    
    def full_check(self):
        checks = {}
        checks.update(self.check_kerberos())
        checks.update(self.check_nfs_mount())
        checks.update(self.check_read_access())
        checks.update(self.check_latency())
        
        all_ok = all(v for k, v in checks.items() if isinstance(v, bool))
        checks["overall"] = "healthy" if all_ok else "unhealthy"
        return checks

# checker = NFSHealthChecker()
# status = checker.full_check()
"""

    TROUBLESHOOTING = {
        "mount_failed": {
            "error": "mount.nfs4: access denied by server",
            "causes": ["Kerberos ticket expired", "keytab ไม่ถูกต้อง", "NFS export config ผิด"],
            "fix": "kinit -kt /etc/krb5.keytab principal → sudo mount -a",
        },
        "permission_denied": {
            "error": "Permission denied reading files",
            "causes": ["UID/GID mapping ไม่ตรง", "idmapd config ผิด", "NFSv4 ACL"],
            "fix": "ตรวจสอบ /etc/idmapd.conf Domain + rpc.idmapd restart",
        },
        "stale_handle": {
            "error": "Stale file handle",
            "causes": ["Server restart แล้ว export เปลี่ยน", "file ถูกลบขณะ mount"],
            "fix": "sudo umount -f /mnt/pipeline && sudo mount -a",
        },
    }

    def show_health(self):
        print("=== Health Checks ===")
        print(self.HEALTH_CHECKS[:500])

    def show_troubleshooting(self):
        print(f"\n=== Troubleshooting ===")
        for key, issue in self.TROUBLESHOOTING.items():
            print(f"  [{issue['error']}]")
            print(f"    Fix: {issue['fix']}")

mon = NFSMonitoring()
mon.show_health()
mon.show_troubleshooting()

FAQ - คำถามที่พบบ่อย

Q: ทำไมต้องใช้ Kerberos กับ NFS?

A: NFS v3 ใช้ AUTH_SYS (trust UID/GID) — ไม่ปลอดภัย ปลอม UID ได้ง่าย NFS v4 + Kerberos: mutual authentication (ทั้ง client + server ยืนยันตัวตน), integrity protection (krb5i) และ encryption (krb5p) จำเป็นสำหรับ: enterprise environments, compliance (PCI DSS, HIPAA), data ที่ sensitive

Q: NFS v4 กับ S3/MinIO อันไหนดีสำหรับ ETL?

A: NFS v4: POSIX filesystem, low latency, เหมาะกับ applications ที่ต้อง random access, legacy apps S3/MinIO: object storage, scalable มาก, เหมาะกับ big data, cloud-native ETL ใช้ NFS: เมื่อ data source เป็น file-based, ต้อง low latency, legacy systems ใช้ S3: เมื่อ data volume ใหญ่มาก, ต้อง scalability, cloud-first strategy

Q: Performance ของ NFS v4 + Kerberos ช้าไหม?

A: krb5: overhead น้อยมาก (authentication only) krb5i: overhead ~5-10% (integrity checksums) krb5p: overhead ~15-30% (full encryption) Tuning: เพิ่ม rsize/wsize (1MB), ใช้ async mount, NFS over RDMA สำหรับ HPC แนะนำ: ใช้ krb5p สำหรับ sensitive data, krb5i สำหรับ general purpose

Q: Kerberos ticket expire ทำให้ ETL fail ได้ไหม?

A: ได้ — default ticket lifetime 10 ชั่วโมง ถ้า ETL job รันนานกว่า → fail แก้: ใช้ keytab + cron job renew ticket อัตโนมัติ หรือ k5start daemon ที่ auto-renew ใน Airflow: เพิ่ม task ตรวจสอบ/renew Kerberos ticket ก่อนทุก ETL run

📖 บทความที่เกี่ยวข้อง

NFS v4 Kerberos Microservices Architectureอ่านบทความ → NFS v4 Kerberos Open Source Contributionอ่านบทความ → NFS v4 Kerberos Pod Schedulingอ่านบทความ → NFS v4 Kerberos Metric Collectionอ่านบทความ → NFS v4 Kerberos Docker Container Deployอ่านบทความ →

📚 ดูบทความทั้งหมด →