Spark Structured Streaming คืออะไร
Spark Structured Streaming เป็น Stream Processing Engine ที่สร้างอยู่บน Apache Spark SQL Engine ใช้ DataFrame/Dataset API เหมือนกับ Batch Processing แต่ทำงานแบบ Incremental Query Processing บนข้อมูลที่ไหลเข้ามาต่อเนื่อง รองรับ Event-time Processing, Watermarking และ Exactly-once Semantics
สำหรับ AR/VR Development ข้อมูลจาก Sensors ต่างๆเช่น IMU (Inertial Measurement Unit), Camera, LiDAR, Eye Tracking, Hand Tracking สร้างข้อมูลจำนวนมาก Spark Structured Streaming ช่วยประมวลผลข้อมูลเหล่านี้แบบ Near Real-time สำหรับ Analytics, Spatial Mapping, User Behavior Analysis และ Performance Monitoring
สถาปัตยกรรม AR/VR Data Pipeline
| Layer | Component | หน้าที่ | Latency |
|---|---|---|---|
| Edge | AR/VR Device | Sensor Data Collection | < 5ms |
| Ingestion | Kafka / Kinesis | Message Queue | 10-50ms |
| Stream Processing | Spark Structured Streaming | Real-time Analytics | 100ms-5s |
| Storage | Delta Lake / Iceberg | Data Lake | Batch |
| Serving | Redis / API | Real-time Dashboard | < 50ms |
| ML | MLflow / TensorFlow | Model Inference | 10-100ms |
Spark Structured Streaming Pipeline สำหรับ AR/VR
# ar_vr_streaming.py — Spark Structured Streaming สำหรับ AR/VR Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, avg, max as spark_max, min as spark_min,
count, expr, lit, current_timestamp, when, struct, to_json
)
from pyspark.sql.types import (
StructType, StructField, StringType, FloatType, LongType,
ArrayType, TimestampType, IntegerType
)
# สร้าง SparkSession
spark = SparkSession.builder \
.appName("AR_VR_Streaming_Analytics") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
.config("spark.sql.shuffle.partitions", "8") \
.config("spark.streaming.backpressure.enabled", "true") \
.getOrCreate()
# Schema สำหรับ AR/VR Sensor Data
sensor_schema = StructType([
StructField("device_id", StringType()),
StructField("user_id", StringType()),
StructField("timestamp", LongType()),
StructField("event_type", StringType()),
StructField("session_id", StringType()),
# Position (x, y, z)
StructField("position", StructType([
StructField("x", FloatType()),
StructField("y", FloatType()),
StructField("z", FloatType()),
])),
# Rotation (quaternion)
StructField("rotation", StructType([
StructField("w", FloatType()),
StructField("x", FloatType()),
StructField("y", FloatType()),
StructField("z", FloatType()),
])),
# Performance Metrics
StructField("fps", IntegerType()),
StructField("latency_ms", FloatType()),
StructField("battery_percent", IntegerType()),
# Eye Tracking
StructField("gaze_point", StructType([
StructField("x", FloatType()),
StructField("y", FloatType()),
])),
StructField("pupil_diameter", FloatType()),
])
# อ่านข้อมูลจาก Kafka
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "ar-vr-sensors") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
# Parse JSON
sensor_df = raw_stream \
.selectExpr("CAST(value AS STRING) as json_str") \
.select(from_json(col("json_str"), sensor_schema).alias("data")) \
.select("data.*") \
.withColumn("event_time",
(col("timestamp") / 1000).cast(TimestampType()))
# === Analytics 1: Performance Monitoring (5-second windows) ===
performance_stats = sensor_df \
.withWatermark("event_time", "10 seconds") \
.groupBy(
window(col("event_time"), "5 seconds", "1 second"),
col("device_id"),
col("session_id"),
) \
.agg(
avg("fps").alias("avg_fps"),
spark_min("fps").alias("min_fps"),
avg("latency_ms").alias("avg_latency"),
spark_max("latency_ms").alias("max_latency"),
avg("battery_percent").alias("avg_battery"),
count("*").alias("event_count"),
) \
.withColumn("performance_status",
when(col("avg_fps") < 30, "POOR")
.when(col("avg_fps") < 60, "FAIR")
.otherwise("GOOD")
)
# เขียนไป Kafka สำหรับ Real-time Dashboard
perf_query = performance_stats \
.select(
col("device_id").alias("key"),
to_json(struct("*")).alias("value"),
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "ar-vr-performance") \
.option("checkpointLocation", "/tmp/checkpoints/performance") \
.trigger(processingTime="1 second") \
.start()
# === Analytics 2: Spatial Heatmap (30-second windows) ===
spatial_heatmap = sensor_df \
.withWatermark("event_time", "30 seconds") \
.withColumn("grid_x", (col("position.x") / 0.5).cast("int")) \
.withColumn("grid_z", (col("position.z") / 0.5).cast("int")) \
.groupBy(
window(col("event_time"), "30 seconds"),
col("session_id"),
col("grid_x"),
col("grid_z"),
) \
.agg(
count("*").alias("visit_count"),
avg("position.y").alias("avg_height"),
)
# === Analytics 3: Eye Tracking / Gaze Analysis ===
gaze_analysis = sensor_df \
.filter(col("gaze_point").isNotNull()) \
.withWatermark("event_time", "10 seconds") \
.groupBy(
window(col("event_time"), "5 seconds"),
col("user_id"),
col("session_id"),
) \
.agg(
avg("gaze_point.x").alias("avg_gaze_x"),
avg("gaze_point.y").alias("avg_gaze_y"),
avg("pupil_diameter").alias("avg_pupil"),
count("*").alias("gaze_samples"),
)
# เขียนไป Delta Lake สำหรับ Batch Analytics
spatial_query = spatial_heatmap \
.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/spatial") \
.option("path", "/data/ar_vr/spatial_heatmap") \
.trigger(processingTime="30 seconds") \
.start()
# รอทุก Query
spark.streams.awaitAnyTermination()
Kafka Producer สำหรับ AR/VR Device
# ar_vr_producer.py — ส่งข้อมูลจาก AR/VR Device ไป Kafka
import json
import time
import random
import math
from kafka import KafkaProducer
class ARVRDataProducer:
"""จำลองการส่งข้อมูลจาก AR/VR Device"""
def __init__(self, bootstrap_servers="kafka:9092"):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8"),
acks="all",
compression_type="lz4",
batch_size=16384,
linger_ms=5,
)
self.topic = "ar-vr-sensors"
def generate_sensor_data(self, device_id, user_id, session_id, t):
"""สร้างข้อมูล Sensor จำลอง"""
# Position — เคลื่อนที่เป็นวงกลม
radius = 3.0
speed = 0.5
x = radius * math.cos(t * speed)
z = radius * math.sin(t * speed)
y = 1.7 + 0.1 * math.sin(t * 2) # ความสูงศีรษะ
return {
"device_id": device_id,
"user_id": user_id,
"timestamp": int(time.time() * 1000),
"event_type": "sensor_update",
"session_id": session_id,
"position": {"x": round(x, 3), "y": round(y, 3),
"z": round(z, 3)},
"rotation": {
"w": round(math.cos(t * speed / 2), 4),
"x": 0.0, "y": round(math.sin(t * speed / 2), 4),
"z": 0.0,
},
"fps": random.randint(60, 90),
"latency_ms": round(random.uniform(8, 25), 1),
"battery_percent": max(0, 100 - int(t / 10)),
"gaze_point": {
"x": round(random.gauss(0.5, 0.15), 3),
"y": round(random.gauss(0.5, 0.15), 3),
},
"pupil_diameter": round(random.uniform(3.0, 6.0), 1),
}
def start_streaming(self, device_id="quest3_001", user_id="user_123",
rate_hz=72):
"""ส่งข้อมูลตามอัตรา Frame Rate"""
session_id = f"session_{int(time.time())}"
interval = 1.0 / rate_hz
t = 0
print(f"Streaming: {device_id} @ {rate_hz}Hz")
while True:
data = self.generate_sensor_data(device_id, user_id,
session_id, t)
self.producer.send(self.topic, key=device_id, value=data)
t += interval
time.sleep(interval)
# รัน
producer = ARVRDataProducer()
producer.start_streaming(rate_hz=72)
Optimization สำหรับ Low-latency Pipeline
# spark-submit configuration สำหรับ Low-latency
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 4 \
--executor-memory 4g \
--executor-cores 2 \
--driver-memory 2g \
--conf spark.sql.shuffle.partitions=8 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.sql.streaming.minBatchesToRetain=2 \
--conf spark.locality.wait=0 \
--conf spark.sql.codegen.wholeStage=true \
--conf spark.sql.adaptive.enabled=true \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0, io.delta:delta-spark_2.12:3.1.0 \
ar_vr_streaming.py
# === Tuning Tips สำหรับ AR/VR Pipeline ===
# 1. Trigger Interval — กำหนดความถี่ในการประมวลผล
# processingTime="1 second" — ทุก 1 วินาที (Near real-time)
# processingTime="100 milliseconds" — ทุก 100ms (Low latency)
# continuous="1 second" — Continuous Processing (Experimental)
# 2. Watermark — กำหนดว่ารอข้อมูลล่าช้าได้นานเท่าไร
# "10 seconds" — รอ Late Data 10 วินาที
# ยิ่งน้อยยิ่งเร็ว แต่อาจ Drop Late Data
# 3. Kafka Consumer Tuning
# maxOffsetsPerTrigger=10000 — จำกัด Records ต่อ Batch
# fetchOffset.numRetries=3
# fetchOffset.retryIntervalMs=500
# 4. Checkpoint — ใช้ Local SSD หรือ HDFS ที่เร็ว
# checkpointLocation ควรอยู่บน SSD ไม่ใช่ HDD
# 5. State Store — ใช้ RocksDB สำหรับ Large State
# spark.sql.streaming.stateStore.providerClass=
# org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
Use Cases สำหรับ AR/VR Analytics
- Performance Monitoring: ตรวจจับ FPS Drop, High Latency, Battery Drain แบบ Real-time แจ้งเตือนเมื่อประสิทธิภาพต่ำ
- Spatial Heatmap: วิเคราะห์ว่าผู้ใช้เคลื่อนที่ไปที่ไหนบ่อย ใช้ปรับปรุง Virtual Environment
- Eye Tracking Analytics: วิเคราะห์ว่าผู้ใช้มองไปที่ไหน ใช้สำหรับ Foveated Rendering และ UX Research
- Motion Sickness Detection: ตรวจจับ Pattern ที่อาจทำให้เกิด Motion Sickness เช่น Latency สูง FPS ต่ำ การเคลื่อนไหวผิดปกติ
- Content Recommendation: วิเคราะห์ Behavior แล้ว Recommend Content ที่เหมาะสม
- Multiplayer Analytics: วิเคราะห์ Interaction ระหว่างผู้เล่นใน Virtual Space
Spark Structured Streaming คืออะไร
Spark Structured Streaming เป็น Stream Processing Engine ของ Apache Spark ใช้ DataFrame API เหมือน Batch แต่ทำงานแบบ Real-time รองรับ Kafka, Kinesis เป็น Source มี Exactly-once Semantics, Watermarking และ Event-time Processing
ทำไมต้องใช้ Spark Structured Streaming กับ AR/VR
AR/VR สร้างข้อมูลจำนวนมากจาก Sensors ต้องประมวลผล Real-time สำหรับ Analytics, Spatial Mapping, User Behavior Analysis, Performance Monitoring Spark จัดการ Scale ได้ดี รองรับข้อมูลจากหลายพัน Devices พร้อมกัน
Latency ที่ยอมรับได้สำหรับ AR/VR Data Pipeline เท่าไร
Rendering ต้องต่ำกว่า 20ms (ไม่ใช่งาน Spark) Analytics Pipeline 100ms-1s User Behavior Analysis 1-5 วินาที Batch Analytics นาทีถึงชั่วโมง Spark เหมาะกับ Analytics และ Monitoring ไม่ใช่ Real-time Rendering
เริ่มต้นใช้ Spark Structured Streaming อย่างไร
ติดตั้ง Apache Spark หรือใช้ Databricks/EMR สร้าง SparkSession ใช้ readStream อ่านจาก Kafka ทำ Transformation ด้วย DataFrame API ใช้ writeStream เขียนไป Sink กำหนด Trigger, Watermark และ Checkpoint Location
สรุป
Spark Structured Streaming เป็นเครื่องมือที่เหมาะสำหรับ AR/VR Data Pipeline ในส่วน Analytics ไม่ใช่ Real-time Rendering รองรับ Sensor Data จากหลายพัน Devices ประมวลผล Performance Monitoring, Spatial Heatmap, Eye Tracking Analytics แบบ Near Real-time สิ่งสำคัญคือ Tune Trigger Interval, Watermark, Kafka Consumer และ Checkpoint Location ให้เหมาะกับ Latency ที่ต้องการ
