SiamCafe · Blog
Spark Structured Streaming กับ AR VR Development
บทความ

Spark Structured Streaming กับ AR VR Development

เผยแพร่ 28 พฤษภาคม 2569

Spark Structured Streaming คืออะไร

Spark Structured Streaming เป็น Stream Processing Engine ที่สร้างอยู่บน Apache Spark SQL Engine ใช้ DataFrame/Dataset API เหมือนกับ Batch Processing แต่ทำงานแบบ Incremental Query Processing บนข้อมูลที่ไหลเข้ามาต่อเนื่อง รองรับ Event-time Processing, Watermarking และ Exactly-once Semantics

สำหรับ AR/VR Development ข้อมูลจาก Sensors ต่างๆเช่น IMU (Inertial Measurement Unit), Camera, LiDAR, Eye Tracking, Hand Tracking สร้างข้อมูลจำนวนมาก Spark Structured Streaming ช่วยประมวลผลข้อมูลเหล่านี้แบบ Near Real-time สำหรับ Analytics, Spatial Mapping, User Behavior Analysis และ Performance Monitoring

สถาปัตยกรรม AR/VR Data Pipeline

LayerComponentหน้าที่Latency
EdgeAR/VR DeviceSensor Data Collection< 5ms
IngestionKafka / KinesisMessage Queue10-50ms
Stream ProcessingSpark Structured StreamingReal-time Analytics100ms-5s
StorageDelta Lake / IcebergData LakeBatch
ServingRedis / APIReal-time Dashboard< 50ms
MLMLflow / TensorFlowModel Inference10-100ms

Spark Structured Streaming Pipeline สำหรับ AR/VR

# ar_vr_streaming.py — Spark Structured Streaming สำหรับ AR/VR Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, avg, max as spark_max, min as spark_min,
    count, expr, lit, current_timestamp, when, struct, to_json
)
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType, LongType,
    ArrayType, TimestampType, IntegerType
)

# สร้าง SparkSession
spark = SparkSession.builder \
    .appName("AR_VR_Streaming_Analytics") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .getOrCreate()

# Schema สำหรับ AR/VR Sensor Data
sensor_schema = StructType([
    StructField("device_id", StringType()),
    StructField("user_id", StringType()),
    StructField("timestamp", LongType()),
    StructField("event_type", StringType()),
    StructField("session_id", StringType()),
    # Position (x, y, z)
    StructField("position", StructType([
        StructField("x", FloatType()),
        StructField("y", FloatType()),
        StructField("z", FloatType()),
    ])),
    # Rotation (quaternion)
    StructField("rotation", StructType([
        StructField("w", FloatType()),
        StructField("x", FloatType()),
        StructField("y", FloatType()),
        StructField("z", FloatType()),
    ])),
    # Performance Metrics
    StructField("fps", IntegerType()),
    StructField("latency_ms", FloatType()),
    StructField("battery_percent", IntegerType()),
    # Eye Tracking
    StructField("gaze_point", StructType([
        StructField("x", FloatType()),
        StructField("y", FloatType()),
    ])),
    StructField("pupil_diameter", FloatType()),
])

# อ่านข้อมูลจาก Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "ar-vr-sensors") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

# Parse JSON
sensor_df = raw_stream \
    .selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), sensor_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time",
                (col("timestamp") / 1000).cast(TimestampType()))

# === Analytics 1: Performance Monitoring (5-second windows) ===
performance_stats = sensor_df \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(
        window(col("event_time"), "5 seconds", "1 second"),
        col("device_id"),
        col("session_id"),
    ) \
    .agg(
        avg("fps").alias("avg_fps"),
        spark_min("fps").alias("min_fps"),
        avg("latency_ms").alias("avg_latency"),
        spark_max("latency_ms").alias("max_latency"),
        avg("battery_percent").alias("avg_battery"),
        count("*").alias("event_count"),
    ) \
    .withColumn("performance_status",
        when(col("avg_fps") < 30, "POOR")
        .when(col("avg_fps") < 60, "FAIR")
        .otherwise("GOOD")
    )

# เขียนไป Kafka สำหรับ Real-time Dashboard
perf_query = performance_stats \
    .select(
        col("device_id").alias("key"),
        to_json(struct("*")).alias("value"),
    ) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "ar-vr-performance") \
    .option("checkpointLocation", "/tmp/checkpoints/performance") \
    .trigger(processingTime="1 second") \
    .start()

# === Analytics 2: Spatial Heatmap (30-second windows) ===
spatial_heatmap = sensor_df \
    .withWatermark("event_time", "30 seconds") \
    .withColumn("grid_x", (col("position.x") / 0.5).cast("int")) \
    .withColumn("grid_z", (col("position.z") / 0.5).cast("int")) \
    .groupBy(
        window(col("event_time"), "30 seconds"),
        col("session_id"),
        col("grid_x"),
        col("grid_z"),
    ) \
    .agg(
        count("*").alias("visit_count"),
        avg("position.y").alias("avg_height"),
    )

# === Analytics 3: Eye Tracking / Gaze Analysis ===
gaze_analysis = sensor_df \
    .filter(col("gaze_point").isNotNull()) \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(
        window(col("event_time"), "5 seconds"),
        col("user_id"),
        col("session_id"),
    ) \
    .agg(
        avg("gaze_point.x").alias("avg_gaze_x"),
        avg("gaze_point.y").alias("avg_gaze_y"),
        avg("pupil_diameter").alias("avg_pupil"),
        count("*").alias("gaze_samples"),
    )

# เขียนไป Delta Lake สำหรับ Batch Analytics
spatial_query = spatial_heatmap \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/spatial") \
    .option("path", "/data/ar_vr/spatial_heatmap") \
    .trigger(processingTime="30 seconds") \
    .start()

# รอทุก Query
spark.streams.awaitAnyTermination()

Kafka Producer สำหรับ AR/VR Device

# ar_vr_producer.py — ส่งข้อมูลจาก AR/VR Device ไป Kafka
import json
import time
import random
import math
from kafka import KafkaProducer

class ARVRDataProducer:
    """จำลองการส่งข้อมูลจาก AR/VR Device"""

    def __init__(self, bootstrap_servers="kafka:9092"):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            key_serializer=lambda k: k.encode("utf-8"),
            acks="all",
            compression_type="lz4",
            batch_size=16384,
            linger_ms=5,
        )
        self.topic = "ar-vr-sensors"

    def generate_sensor_data(self, device_id, user_id, session_id, t):
        """สร้างข้อมูล Sensor จำลอง"""
        # Position — เคลื่อนที่เป็นวงกลม
        radius = 3.0
        speed = 0.5
        x = radius * math.cos(t * speed)
        z = radius * math.sin(t * speed)
        y = 1.7 + 0.1 * math.sin(t * 2)  # ความสูงศีรษะ

        return {
            "device_id": device_id,
            "user_id": user_id,
            "timestamp": int(time.time() * 1000),
            "event_type": "sensor_update",
            "session_id": session_id,
            "position": {"x": round(x, 3), "y": round(y, 3),
                         "z": round(z, 3)},
            "rotation": {
                "w": round(math.cos(t * speed / 2), 4),
                "x": 0.0, "y": round(math.sin(t * speed / 2), 4),
                "z": 0.0,
            },
            "fps": random.randint(60, 90),
            "latency_ms": round(random.uniform(8, 25), 1),
            "battery_percent": max(0, 100 - int(t / 10)),
            "gaze_point": {
                "x": round(random.gauss(0.5, 0.15), 3),
                "y": round(random.gauss(0.5, 0.15), 3),
            },
            "pupil_diameter": round(random.uniform(3.0, 6.0), 1),
        }

    def start_streaming(self, device_id="quest3_001", user_id="user_123",
                        rate_hz=72):
        """ส่งข้อมูลตามอัตรา Frame Rate"""
        session_id = f"session_{int(time.time())}"
        interval = 1.0 / rate_hz
        t = 0

        print(f"Streaming: {device_id} @ {rate_hz}Hz")
        while True:
            data = self.generate_sensor_data(device_id, user_id,
                                             session_id, t)
            self.producer.send(self.topic, key=device_id, value=data)
            t += interval
            time.sleep(interval)

# รัน
producer = ARVRDataProducer()
producer.start_streaming(rate_hz=72)

Optimization สำหรับ Low-latency Pipeline

# spark-submit configuration สำหรับ Low-latency
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 4 \
  --executor-memory 4g \
  --executor-cores 2 \
  --driver-memory 2g \
  --conf spark.sql.shuffle.partitions=8 \
  --conf spark.streaming.backpressure.enabled=true \
  --conf spark.sql.streaming.minBatchesToRetain=2 \
  --conf spark.locality.wait=0 \
  --conf spark.sql.codegen.wholeStage=true \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0, io.delta:delta-spark_2.12:3.1.0 \
  ar_vr_streaming.py

# === Tuning Tips สำหรับ AR/VR Pipeline ===

# 1. Trigger Interval — กำหนดความถี่ในการประมวลผล
# processingTime="1 second"  — ทุก 1 วินาที (Near real-time)
# processingTime="100 milliseconds" — ทุก 100ms (Low latency)
# continuous="1 second" — Continuous Processing (Experimental)

# 2. Watermark — กำหนดว่ารอข้อมูลล่าช้าได้นานเท่าไร
# "10 seconds" — รอ Late Data 10 วินาที
# ยิ่งน้อยยิ่งเร็ว แต่อาจ Drop Late Data

# 3. Kafka Consumer Tuning
# maxOffsetsPerTrigger=10000 — จำกัด Records ต่อ Batch
# fetchOffset.numRetries=3
# fetchOffset.retryIntervalMs=500

# 4. Checkpoint — ใช้ Local SSD หรือ HDFS ที่เร็ว
# checkpointLocation ควรอยู่บน SSD ไม่ใช่ HDD

# 5. State Store — ใช้ RocksDB สำหรับ Large State
# spark.sql.streaming.stateStore.providerClass=
#   org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

Use Cases สำหรับ AR/VR Analytics

  • Performance Monitoring: ตรวจจับ FPS Drop, High Latency, Battery Drain แบบ Real-time แจ้งเตือนเมื่อประสิทธิภาพต่ำ
  • Spatial Heatmap: วิเคราะห์ว่าผู้ใช้เคลื่อนที่ไปที่ไหนบ่อย ใช้ปรับปรุง Virtual Environment
  • Eye Tracking Analytics: วิเคราะห์ว่าผู้ใช้มองไปที่ไหน ใช้สำหรับ Foveated Rendering และ UX Research
  • Motion Sickness Detection: ตรวจจับ Pattern ที่อาจทำให้เกิด Motion Sickness เช่น Latency สูง FPS ต่ำ การเคลื่อนไหวผิดปกติ
  • Content Recommendation: วิเคราะห์ Behavior แล้ว Recommend Content ที่เหมาะสม
  • Multiplayer Analytics: วิเคราะห์ Interaction ระหว่างผู้เล่นใน Virtual Space

Spark Structured Streaming คืออะไร

Spark Structured Streaming เป็น Stream Processing Engine ของ Apache Spark ใช้ DataFrame API เหมือน Batch แต่ทำงานแบบ Real-time รองรับ Kafka, Kinesis เป็น Source มี Exactly-once Semantics, Watermarking และ Event-time Processing