SiamCafe.net Blog
Technology

Spark Structured Streaming AR VR Development

spark structured streaming ar vr development
Spark Structured Streaming AR VR Development | SiamCafe Blog
2026-01-08· อ. บอม — SiamCafe.net· 11,687 คำ

Spark Structured Streaming คืออะไร

Spark Structured Streaming เป็น Stream Processing Engine ที่สร้างอยู่บน Apache Spark SQL Engine ใช้ DataFrame/Dataset API เหมือนกับ Batch Processing แต่ทำงานแบบ Incremental Query Processing บนข้อมูลที่ไหลเข้ามาต่อเนื่อง รองรับ Event-time Processing, Watermarking และ Exactly-once Semantics

สำหรับ AR/VR Development ข้อมูลจาก Sensors ต่างๆเช่น IMU (Inertial Measurement Unit), Camera, LiDAR, Eye Tracking, Hand Tracking สร้างข้อมูลจำนวนมาก Spark Structured Streaming ช่วยประมวลผลข้อมูลเหล่านี้แบบ Near Real-time สำหรับ Analytics, Spatial Mapping, User Behavior Analysis และ Performance Monitoring

สถาปัตยกรรม AR/VR Data Pipeline

LayerComponentหน้าที่Latency
EdgeAR/VR DeviceSensor Data Collection< 5ms
IngestionKafka / KinesisMessage Queue10-50ms
Stream ProcessingSpark Structured StreamingReal-time Analytics100ms-5s
StorageDelta Lake / IcebergData LakeBatch
ServingRedis / APIReal-time Dashboard< 50ms
MLMLflow / TensorFlowModel Inference10-100ms

Spark Structured Streaming Pipeline สำหรับ AR/VR

# ar_vr_streaming.py — Spark Structured Streaming สำหรับ AR/VR Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, avg, max as spark_max, min as spark_min,
    count, expr, lit, current_timestamp, when, struct, to_json
)
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType, LongType,
    ArrayType, TimestampType, IntegerType
)

# สร้าง SparkSession
spark = SparkSession.builder \
    .appName("AR_VR_Streaming_Analytics") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .getOrCreate()

# Schema สำหรับ AR/VR Sensor Data
sensor_schema = StructType([
    StructField("device_id", StringType()),
    StructField("user_id", StringType()),
    StructField("timestamp", LongType()),
    StructField("event_type", StringType()),
    StructField("session_id", StringType()),
    # Position (x, y, z)
    StructField("position", StructType([
        StructField("x", FloatType()),
        StructField("y", FloatType()),
        StructField("z", FloatType()),
    ])),
    # Rotation (quaternion)
    StructField("rotation", StructType([
        StructField("w", FloatType()),
        StructField("x", FloatType()),
        StructField("y", FloatType()),
        StructField("z", FloatType()),
    ])),
    # Performance Metrics
    StructField("fps", IntegerType()),
    StructField("latency_ms", FloatType()),
    StructField("battery_percent", IntegerType()),
    # Eye Tracking
    StructField("gaze_point", StructType([
        StructField("x", FloatType()),
        StructField("y", FloatType()),
    ])),
    StructField("pupil_diameter", FloatType()),
])

# อ่านข้อมูลจาก Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "ar-vr-sensors") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

# Parse JSON
sensor_df = raw_stream \
    .selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), sensor_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time",
                (col("timestamp") / 1000).cast(TimestampType()))

# === Analytics 1: Performance Monitoring (5-second windows) ===
performance_stats = sensor_df \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(
        window(col("event_time"), "5 seconds", "1 second"),
        col("device_id"),
        col("session_id"),
    ) \
    .agg(
        avg("fps").alias("avg_fps"),
        spark_min("fps").alias("min_fps"),
        avg("latency_ms").alias("avg_latency"),
        spark_max("latency_ms").alias("max_latency"),
        avg("battery_percent").alias("avg_battery"),
        count("*").alias("event_count"),
    ) \
    .withColumn("performance_status",
        when(col("avg_fps") < 30, "POOR")
        .when(col("avg_fps") < 60, "FAIR")
        .otherwise("GOOD")
    )

# เขียนไป Kafka สำหรับ Real-time Dashboard
perf_query = performance_stats \
    .select(
        col("device_id").alias("key"),
        to_json(struct("*")).alias("value"),
    ) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "ar-vr-performance") \
    .option("checkpointLocation", "/tmp/checkpoints/performance") \
    .trigger(processingTime="1 second") \
    .start()

# === Analytics 2: Spatial Heatmap (30-second windows) ===
spatial_heatmap = sensor_df \
    .withWatermark("event_time", "30 seconds") \
    .withColumn("grid_x", (col("position.x") / 0.5).cast("int")) \
    .withColumn("grid_z", (col("position.z") / 0.5).cast("int")) \
    .groupBy(
        window(col("event_time"), "30 seconds"),
        col("session_id"),
        col("grid_x"),
        col("grid_z"),
    ) \
    .agg(
        count("*").alias("visit_count"),
        avg("position.y").alias("avg_height"),
    )

# === Analytics 3: Eye Tracking / Gaze Analysis ===
gaze_analysis = sensor_df \
    .filter(col("gaze_point").isNotNull()) \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(
        window(col("event_time"), "5 seconds"),
        col("user_id"),
        col("session_id"),
    ) \
    .agg(
        avg("gaze_point.x").alias("avg_gaze_x"),
        avg("gaze_point.y").alias("avg_gaze_y"),
        avg("pupil_diameter").alias("avg_pupil"),
        count("*").alias("gaze_samples"),
    )

# เขียนไป Delta Lake สำหรับ Batch Analytics
spatial_query = spatial_heatmap \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/spatial") \
    .option("path", "/data/ar_vr/spatial_heatmap") \
    .trigger(processingTime="30 seconds") \
    .start()

# รอทุก Query
spark.streams.awaitAnyTermination()

Kafka Producer สำหรับ AR/VR Device

# ar_vr_producer.py — ส่งข้อมูลจาก AR/VR Device ไป Kafka
import json
import time
import random
import math
from kafka import KafkaProducer

class ARVRDataProducer:
    """จำลองการส่งข้อมูลจาก AR/VR Device"""

    def __init__(self, bootstrap_servers="kafka:9092"):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            key_serializer=lambda k: k.encode("utf-8"),
            acks="all",
            compression_type="lz4",
            batch_size=16384,
            linger_ms=5,
        )
        self.topic = "ar-vr-sensors"

    def generate_sensor_data(self, device_id, user_id, session_id, t):
        """สร้างข้อมูล Sensor จำลอง"""
        # Position — เคลื่อนที่เป็นวงกลม
        radius = 3.0
        speed = 0.5
        x = radius * math.cos(t * speed)
        z = radius * math.sin(t * speed)
        y = 1.7 + 0.1 * math.sin(t * 2)  # ความสูงศีรษะ

        return {
            "device_id": device_id,
            "user_id": user_id,
            "timestamp": int(time.time() * 1000),
            "event_type": "sensor_update",
            "session_id": session_id,
            "position": {"x": round(x, 3), "y": round(y, 3),
                         "z": round(z, 3)},
            "rotation": {
                "w": round(math.cos(t * speed / 2), 4),
                "x": 0.0, "y": round(math.sin(t * speed / 2), 4),
                "z": 0.0,
            },
            "fps": random.randint(60, 90),
            "latency_ms": round(random.uniform(8, 25), 1),
            "battery_percent": max(0, 100 - int(t / 10)),
            "gaze_point": {
                "x": round(random.gauss(0.5, 0.15), 3),
                "y": round(random.gauss(0.5, 0.15), 3),
            },
            "pupil_diameter": round(random.uniform(3.0, 6.0), 1),
        }

    def start_streaming(self, device_id="quest3_001", user_id="user_123",
                        rate_hz=72):
        """ส่งข้อมูลตามอัตรา Frame Rate"""
        session_id = f"session_{int(time.time())}"
        interval = 1.0 / rate_hz
        t = 0

        print(f"Streaming: {device_id} @ {rate_hz}Hz")
        while True:
            data = self.generate_sensor_data(device_id, user_id,
                                             session_id, t)
            self.producer.send(self.topic, key=device_id, value=data)
            t += interval
            time.sleep(interval)

# รัน
producer = ARVRDataProducer()
producer.start_streaming(rate_hz=72)

Optimization สำหรับ Low-latency Pipeline

# spark-submit configuration สำหรับ Low-latency
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 4 \
  --executor-memory 4g \
  --executor-cores 2 \
  --driver-memory 2g \
  --conf spark.sql.shuffle.partitions=8 \
  --conf spark.streaming.backpressure.enabled=true \
  --conf spark.sql.streaming.minBatchesToRetain=2 \
  --conf spark.locality.wait=0 \
  --conf spark.sql.codegen.wholeStage=true \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0, io.delta:delta-spark_2.12:3.1.0 \
  ar_vr_streaming.py

# === Tuning Tips สำหรับ AR/VR Pipeline ===

# 1. Trigger Interval — กำหนดความถี่ในการประมวลผล
# processingTime="1 second"  — ทุก 1 วินาที (Near real-time)
# processingTime="100 milliseconds" — ทุก 100ms (Low latency)
# continuous="1 second" — Continuous Processing (Experimental)

# 2. Watermark — กำหนดว่ารอข้อมูลล่าช้าได้นานเท่าไร
# "10 seconds" — รอ Late Data 10 วินาที
# ยิ่งน้อยยิ่งเร็ว แต่อาจ Drop Late Data

# 3. Kafka Consumer Tuning
# maxOffsetsPerTrigger=10000 — จำกัด Records ต่อ Batch
# fetchOffset.numRetries=3
# fetchOffset.retryIntervalMs=500

# 4. Checkpoint — ใช้ Local SSD หรือ HDFS ที่เร็ว
# checkpointLocation ควรอยู่บน SSD ไม่ใช่ HDD

# 5. State Store — ใช้ RocksDB สำหรับ Large State
# spark.sql.streaming.stateStore.providerClass=
#   org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

Use Cases สำหรับ AR/VR Analytics

Spark Structured Streaming คืออะไร

Spark Structured Streaming เป็น Stream Processing Engine ของ Apache Spark ใช้ DataFrame API เหมือน Batch แต่ทำงานแบบ Real-time รองรับ Kafka, Kinesis เป็น Source มี Exactly-once Semantics, Watermarking และ Event-time Processing

ทำไมต้องใช้ Spark Structured Streaming กับ AR/VR

AR/VR สร้างข้อมูลจำนวนมากจาก Sensors ต้องประมวลผล Real-time สำหรับ Analytics, Spatial Mapping, User Behavior Analysis, Performance Monitoring Spark จัดการ Scale ได้ดี รองรับข้อมูลจากหลายพัน Devices พร้อมกัน

Latency ที่ยอมรับได้สำหรับ AR/VR Data Pipeline เท่าไร

Rendering ต้องต่ำกว่า 20ms (ไม่ใช่งาน Spark) Analytics Pipeline 100ms-1s User Behavior Analysis 1-5 วินาที Batch Analytics นาทีถึงชั่วโมง Spark เหมาะกับ Analytics และ Monitoring ไม่ใช่ Real-time Rendering

เริ่มต้นใช้ Spark Structured Streaming อย่างไร

ติดตั้ง Apache Spark หรือใช้ Databricks/EMR สร้าง SparkSession ใช้ readStream อ่านจาก Kafka ทำ Transformation ด้วย DataFrame API ใช้ writeStream เขียนไป Sink กำหนด Trigger, Watermark และ Checkpoint Location

สรุป

Spark Structured Streaming เป็นเครื่องมือที่เหมาะสำหรับ AR/VR Data Pipeline ในส่วน Analytics ไม่ใช่ Real-time Rendering รองรับ Sensor Data จากหลายพัน Devices ประมวลผล Performance Monitoring, Spatial Heatmap, Eye Tracking Analytics แบบ Near Real-time สิ่งสำคัญคือ Tune Trigger Interval, Watermark, Kafka Consumer และ Checkpoint Location ให้เหมาะกับ Latency ที่ต้องการ

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming Infrastructure as Codeอ่านบทความ → Spark Structured Streaming Architecture Design Patternอ่านบทความ → Spark Structured Streaming Event Driven Designอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ → Spark Structured Streaming 12 Factor Appอ่านบทความ →

📚 ดูบทความทั้งหมด →