Technology

Automation Manufacturing คือ — ระบบอัตโนมัติในอุตสาหกรรมการผลิตยุค Industry 4.0

automation manufacturing คอ
automation manufacturing คือ | SiamCafe Blog
2025-09-24· อ. บอม — SiamCafe.net· 1,560 คำ

Automation ในอุตสาหกรรมการผลิตคืออะไร

Manufacturing Automation คือการใช้เทคโนโลยีเพื่อควบคุมและดำเนินกระบวนการผลิตโดยอัตโนมัติ ลดการพึ่งพาแรงงานคน เพิ่มความแม่นยำ ลดต้นทุน และเพิ่มผลผลิต ครอบคลุมตั้งแต่ระบบสายพานอัตโนมัติไปจนถึง AI-driven quality inspection

ระดับของ Manufacturing Automation แบ่งเป็น Fixed Automation สำหรับการผลิตซ้ำๆ เช่น สายการผลิตรถยนต์, Programmable Automation ที่เปลี่ยน program ได้เมื่อเปลี่ยนรุ่นผลิตภัณฑ์, Flexible Automation ที่เปลี่ยน product ได้โดยไม่ต้องหยุดสาย และ Intelligent Automation ที่ใช้ AI/ML ตัดสินใจเอง

เทคโนโลยีหลักใน Manufacturing Automation ได้แก่ PLC (Programmable Logic Controller) สำหรับควบคุมเครื่องจักร, SCADA (Supervisory Control and Data Acquisition) สำหรับ monitor ระบบ, HMI (Human Machine Interface) สำหรับ operator, Industrial IoT สำหรับ sensor data collection, MES (Manufacturing Execution System) สำหรับจัดการการผลิต และ Digital Twin สำหรับจำลองกระบวนการผลิต

Industry 4.0 คือการปฏิวัติอุตสาหกรรมครั้งที่ 4 ที่รวม cyber-physical systems, IoT, cloud computing และ AI เข้ากับการผลิต ทำให้โรงงานเป็น Smart Factory ที่สื่อสารและตัดสินใจได้เอง

สถาปัตยกรรม Industrial Automation Systems

โครงสร้างระบบ automation ในโรงงาน

# === Industrial Automation Architecture (ISA-95/Purdue Model) ===
#
# Level 4-5: Enterprise/Cloud
# ┌─────────────────────────────────────────┐
# │  ERP (SAP/Oracle) | Cloud Analytics    │
# │  Supply Chain | Business Intelligence   │
# └──────────────────┬──────────────────────┘
#                    │ (IT Network / DMZ)
# Level 3: Manufacturing Operations
# ┌──────────────────┼──────────────────────┐
# │  MES | Historian | Quality Management  │
# │  Scheduling | Recipe Management         │
# └──────────────────┬──────────────────────┘
#                    │ (OT Network)
# Level 2: Control Systems
# ┌──────────────────┼──────────────────────┐
# │  SCADA | HMI | DCS                     │
# │  Batch Control | Supervisory Control    │
# └──────────────────┬──────────────────────┘
#                    │ (Industrial Network)
# Level 1: Basic Control
# ┌──────────────────┼──────────────────────┐
# │  PLC | PID Controllers | Safety Systems│
# │  Motion Control | Robot Controllers     │
# └──────────────────┬──────────────────────┘
#                    │ (Field Bus)
# Level 0: Physical Process
# ┌──────────────────┼──────────────────────┐
# │  Sensors | Actuators | Motors          │
# │  Valves | Conveyors | Robots           │
# └─────────────────────────────────────────┘
#
# === Communication Protocols ===
# Level 0-1: Modbus RTU, PROFIBUS, HART
# Level 1-2: Modbus TCP, EtherNet/IP, PROFINET, OPC UA
# Level 2-3: OPC UA, MQTT, REST API
# Level 3-5: MQTT, REST API, gRPC, Kafka
#
# === OPC UA (Open Platform Communications Unified Architecture) ===
# - Standard protocol สำหรับ Industrial IoT
# - Platform independent
# - Secure communication (encryption, authentication)
# - Information modeling (data structures)
# - Pub/Sub support
#
# === MQTT สำหรับ IIoT ===
# - Lightweight messaging protocol
# - Publish/Subscribe model
# - QoS levels (0, 1, 2)
# - Retained messages
# - Last Will and Testament
# - เหมาะสำหรับ bandwidth-limited environments
#
# === Security Zones ===
# IT Network (Corporate) <-> DMZ <-> OT Network (Plant)
# - Firewall between IT and OT
# - Data diode for one-way communication
# - Segmented VLANs per production line
# - No direct internet access for OT devices
# - ICS-specific security (IEC 62443)

PLC Programming และ SCADA Setup

ตัวอย่าง PLC program และ SCADA configuration

# === PLC Structured Text (IEC 61131-3) ===
# ตัวอย่าง conveyor belt control

# PROGRAM ConveyorControl
# VAR
#     StartButton : BOOL := FALSE;
#     StopButton : BOOL := FALSE;
#     EmergencyStop : BOOL := FALSE;
#     SensorEntry : BOOL := FALSE;
#     SensorExit : BOOL := FALSE;
#     MotorRun : BOOL := FALSE;
#     ConveyorSpeed : REAL := 0.0;
#     ItemCount : INT := 0;
#     AlarmActive : BOOL := FALSE;
#     MotorOverload : BOOL := FALSE;
#     RunTimer : TON;
#     TotalRunTime : TIME := T#0s;
# END_VAR
#
# (* Emergency Stop - highest priority *)
# IF EmergencyStop THEN
#     MotorRun := FALSE;
#     ConveyorSpeed := 0.0;
#     AlarmActive := TRUE;
#     RETURN;
# END_IF;
#
# (* Motor overload protection *)
# IF MotorOverload THEN
#     MotorRun := FALSE;
#     ConveyorSpeed := 0.0;
#     AlarmActive := TRUE;
#     RETURN;
# END_IF;
#
# (* Start/Stop logic *)
# IF StartButton AND NOT StopButton THEN
#     MotorRun := TRUE;
#     ConveyorSpeed := 1.5; (* meters per second *)
#     AlarmActive := FALSE;
# ELSIF StopButton THEN
#     MotorRun := FALSE;
#     ConveyorSpeed := 0.0;
# END_IF;
#
# (* Item counting *)
# IF SensorEntry AND MotorRun THEN
#     ItemCount := ItemCount + 1;
# END_IF;
#
# (* Run time tracking *)
# RunTimer(IN := MotorRun, PT := T#24h);
# IF MotorRun THEN
#     TotalRunTime := TotalRunTime + T#1s;
# END_IF;

# === Python OPC UA Client ===
# pip install opcua asyncua

from opcua import Client
import time
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("opc_client")

class PLCMonitor:
    def __init__(self, opc_url="opc.tcp://plc-server:4840"):
        self.client = Client(opc_url)
        self.connected = False
    
    def connect(self):
        try:
            self.client.connect()
            self.connected = True
            logger.info("Connected to OPC UA server")
        except Exception as e:
            logger.error(f"Connection failed: {e}")
    
    def read_tags(self, node_ids):
        results = {}
        for tag_name, node_id in node_ids.items():
            try:
                node = self.client.get_node(node_id)
                value = node.get_value()
                results[tag_name] = value
            except Exception as e:
                results[tag_name] = f"ERROR: {e}"
        return results
    
    def write_tag(self, node_id, value):
        node = self.client.get_node(node_id)
        node.set_value(value)
        logger.info(f"Written {value} to {node_id}")
    
    def monitor_production(self, interval=5):
        tags = {
            "motor_running": "ns=2;s=Conveyor.MotorRun",
            "speed": "ns=2;s=Conveyor.Speed",
            "item_count": "ns=2;s=Conveyor.ItemCount",
            "temperature": "ns=2;s=Motor.Temperature",
            "vibration": "ns=2;s=Motor.Vibration",
            "alarm": "ns=2;s=System.AlarmActive",
        }
        
        while True:
            data = self.read_tags(tags)
            logger.info(f"Production data: {json.dumps(data)}")
            time.sleep(interval)
    
    def disconnect(self):
        if self.connected:
            self.client.disconnect()

# monitor = PLCMonitor()
# monitor.connect()
# monitor.monitor_production()

IoT Sensors และ Data Collection ด้วย Python

ระบบเก็บข้อมูล sensor จากสายการผลิต

#!/usr/bin/env python3
# iot_collector.py — Industrial IoT Data Collection
import paho.mqtt.client as mqtt
import json
import time
import logging
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("iot_collector")

class IndustrialIoTCollector:
    def __init__(self, mqtt_broker="mqtt-broker:1883",
                 influx_url="http://influxdb:8086",
                 influx_token="", influx_org="factory",
                 influx_bucket="production"):
        # MQTT setup
        self.mqtt_client = mqtt.Client(client_id="iot-collector")
        self.mqtt_client.on_connect = self._on_connect
        self.mqtt_client.on_message = self._on_message
        self.mqtt_broker = mqtt_broker
        
        # InfluxDB setup
        self.influx = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
        self.write_api = self.influx.write_api(write_options=SYNCHRONOUS)
        self.bucket = influx_bucket
        self.org = influx_org
        
        # Metrics buffer
        self.buffer = []
        self.buffer_size = 100
    
    def _on_connect(self, client, userdata, flags, rc):
        logger.info(f"MQTT connected (rc={rc})")
        # Subscribe to factory topics
        topics = [
            ("factory/line1/+/temperature", 1),
            ("factory/line1/+/vibration", 1),
            ("factory/line1/+/pressure", 1),
            ("factory/line1/+/speed", 1),
            ("factory/line1/+/status", 1),
            ("factory/line1/+/production_count", 1),
            ("factory/line1/+/alarm", 2),
        ]
        client.subscribe(topics)
    
    def _on_message(self, client, userdata, msg):
        try:
            topic_parts = msg.topic.split("/")
            line = topic_parts[1]
            machine = topic_parts[2]
            metric = topic_parts[3]
            
            payload = json.loads(msg.payload.decode())
            value = payload.get("value", payload)
            timestamp = payload.get("timestamp", datetime.utcnow().isoformat())
            
            # Write to InfluxDB
            point = Point("machine_metrics") \
                .tag("line", line) \
                .tag("machine", machine) \
                .tag("metric", metric) \
                .field("value", float(value) if isinstance(value, (int, float)) else 0) \
                .time(timestamp)
            
            self.buffer.append(point)
            
            if len(self.buffer) >= self.buffer_size:
                self._flush_buffer()
            
            # Check for alarms
            if metric == "alarm" and value:
                self._handle_alarm(line, machine, payload)
            
            # Check thresholds
            self._check_thresholds(line, machine, metric, value)
            
        except Exception as e:
            logger.error(f"Message processing error: {e}")
    
    def _flush_buffer(self):
        if self.buffer:
            self.write_api.write(bucket=self.bucket, org=self.org, record=self.buffer)
            logger.debug(f"Flushed {len(self.buffer)} points to InfluxDB")
            self.buffer = []
    
    def _check_thresholds(self, line, machine, metric, value):
        thresholds = {
            "temperature": {"warning": 80, "critical": 95},
            "vibration": {"warning": 5.0, "critical": 8.0},
            "pressure": {"warning": 150, "critical": 180},
        }
        
        if metric in thresholds and isinstance(value, (int, float)):
            t = thresholds[metric]
            if value >= t["critical"]:
                logger.critical(f"CRITICAL: {line}/{machine} {metric}={value}")
                self._send_alert("critical", line, machine, metric, value)
            elif value >= t["warning"]:
                logger.warning(f"WARNING: {line}/{machine} {metric}={value}")
    
    def _handle_alarm(self, line, machine, payload):
        logger.warning(f"ALARM: {line}/{machine} - {payload}")
        self._send_alert("alarm", line, machine, "alarm", payload)
    
    def _send_alert(self, severity, line, machine, metric, value):
        alert = {
            "severity": severity,
            "line": line,
            "machine": machine,
            "metric": metric,
            "value": value,
            "timestamp": datetime.utcnow().isoformat(),
        }
        self.mqtt_client.publish(
            f"factory/alerts/{severity}",
            json.dumps(alert),
            qos=2,
        )
    
    def start(self):
        self.mqtt_client.connect(self.mqtt_broker.split(":")[0],
                                  int(self.mqtt_broker.split(":")[1]))
        logger.info("IoT Collector started")
        self.mqtt_client.loop_forever()

# collector = IndustrialIoTCollector()
# collector.start()

สร้าง Manufacturing Dashboard

Dashboard สำหรับ monitor สายการผลิต

#!/usr/bin/env python3
# dashboard_api.py — Manufacturing Dashboard API
from fastapi import FastAPI, WebSocket
from influxdb_client import InfluxDBClient
import json
import asyncio
import logging
from datetime import datetime, timedelta

app = FastAPI(title="Manufacturing Dashboard API")

INFLUX_URL = "http://influxdb:8086"
INFLUX_TOKEN = ""
INFLUX_ORG = "factory"
INFLUX_BUCKET = "production"

influx = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
query_api = influx.query_api()

@app.get("/api/production/summary")
async def production_summary(line: str = "line1", hours: int = 24):
    query = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: -{hours}h)
        |> filter(fn: (r) => r["line"] == "{line}")
        |> filter(fn: (r) => r["metric"] == "production_count")
        |> last()
    '''
    
    tables = query_api.query(query, org=INFLUX_ORG)
    
    machines = {}
    for table in tables:
        for record in table.records:
            machine = record.values.get("machine", "unknown")
            machines[machine] = {
                "count": record.get_value(),
                "time": record.get_time().isoformat(),
            }
    
    return {"line": line, "period_hours": hours, "machines": machines}

@app.get("/api/machine/metrics")
async def machine_metrics(machine: str, metric: str = "temperature", hours: int = 1):
    query = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: -{hours}h)
        |> filter(fn: (r) => r["machine"] == "{machine}")
        |> filter(fn: (r) => r["metric"] == "{metric}")
        |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
    '''
    
    tables = query_api.query(query, org=INFLUX_ORG)
    
    data_points = []
    for table in tables:
        for record in table.records:
            data_points.append({
                "time": record.get_time().isoformat(),
                "value": round(record.get_value(), 2),
            })
    
    return {"machine": machine, "metric": metric, "data": data_points}

@app.get("/api/oee")
async def overall_equipment_effectiveness(line: str = "line1"):
    # OEE = Availability x Performance x Quality
    availability = 0.92   # ตัวอย่าง: 92% uptime
    performance = 0.88    # ตัวอย่าง: 88% of ideal speed
    quality = 0.97        # ตัวอย่าง: 97% good parts
    
    oee = availability * performance * quality
    
    return {
        "line": line,
        "oee": round(oee * 100, 1),
        "availability": round(availability * 100, 1),
        "performance": round(performance * 100, 1),
        "quality": round(quality * 100, 1),
        "world_class_target": 85.0,
        "status": "good" if oee >= 0.85 else "needs_improvement",
    }

@app.get("/api/alerts/recent")
async def recent_alerts(hours: int = 24, severity: str = None):
    query = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: -{hours}h)
        |> filter(fn: (r) => r["_measurement"] == "alerts")
    '''
    
    return {"alerts": [], "total": 0, "period_hours": hours}

@app.websocket("/ws/realtime")
async def websocket_realtime(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            data = {
                "timestamp": datetime.utcnow().isoformat(),
                "metrics": {
                    "line1_count": 1234,
                    "line1_speed": 95.5,
                    "line1_temp": 72.3,
                },
            }
            await websocket.send_json(data)
            await asyncio.sleep(1)
    except Exception:
        pass

# uvicorn dashboard_api:app --host 0.0.0.0 --port 8000

Predictive Maintenance ด้วย Machine Learning

ใช้ ML ทำนายการบำรุงรักษาเครื่องจักร

#!/usr/bin/env python3
# predictive_maintenance.py — ML-based Predictive Maintenance
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import joblib
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pred_maintenance")

class PredictiveMaintenanceModel:
    def __init__(self):
        self.classifier = RandomForestClassifier(n_estimators=100, random_state=42)
        self.anomaly_detector = IsolationForest(contamination=0.05, random_state=42)
        self.scaler = StandardScaler()
        self.is_trained = False
    
    def prepare_features(self, df):
        features = pd.DataFrame()
        
        # Rolling statistics
        for col in ["temperature", "vibration", "pressure", "current"]:
            if col in df.columns:
                features[f"{col}_mean"] = df[col].rolling(60).mean()
                features[f"{col}_std"] = df[col].rolling(60).std()
                features[f"{col}_max"] = df[col].rolling(60).max()
                features[f"{col}_min"] = df[col].rolling(60).min()
                features[f"{col}_trend"] = df[col].diff(30)
        
        # Time-based features
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
            features["hour"] = df["timestamp"].dt.hour
            features["day_of_week"] = df["timestamp"].dt.dayofweek
            features["hours_since_maintenance"] = (
                df["timestamp"] - df.get("last_maintenance", df["timestamp"].min())
            ).dt.total_seconds() / 3600
        
        features = features.dropna()
        return features
    
    def train(self, df, target_col="failure"):
        logger.info("Training predictive maintenance model...")
        
        features = self.prepare_features(df)
        target = df.loc[features.index, target_col]
        
        X_train, X_test, y_train, y_test = train_test_split(
            features, target, test_size=0.2, random_state=42
        )
        
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # Train classifier
        self.classifier.fit(X_train_scaled, y_train)
        
        # Train anomaly detector
        self.anomaly_detector.fit(X_train_scaled)
        
        # Evaluate
        y_pred = self.classifier.predict(X_test_scaled)
        report = classification_report(y_test, y_pred, output_dict=True)
        
        self.is_trained = True
        logger.info(f"Model trained. Accuracy: {report['accuracy']:.3f}")
        logger.info(f"Failure recall: {report.get('1', {}).get('recall', 0):.3f}")
        
        # Feature importance
        importance = pd.Series(
            self.classifier.feature_importances_,
            index=features.columns
        ).sort_values(ascending=False)
        logger.info(f"Top features: {importance.head(5).to_dict()}")
        
        return report
    
    def predict(self, current_data):
        if not self.is_trained:
            raise RuntimeError("Model not trained")
        
        features = self.prepare_features(current_data)
        if len(features) == 0:
            return {"prediction": "insufficient_data"}
        
        X_scaled = self.scaler.transform(features.iloc[[-1]])
        
        failure_prob = self.classifier.predict_proba(X_scaled)[0]
        is_anomaly = self.anomaly_detector.predict(X_scaled)[0] == -1
        
        result = {
            "failure_probability": round(float(failure_prob[1]) if len(failure_prob) > 1 else 0, 4),
            "is_anomaly": bool(is_anomaly),
            "recommendation": "normal",
            "confidence": round(float(max(failure_prob)), 4),
        }
        
        if result["failure_probability"] > 0.8:
            result["recommendation"] = "immediate_maintenance"
        elif result["failure_probability"] > 0.5:
            result["recommendation"] = "schedule_maintenance"
        elif is_anomaly:
            result["recommendation"] = "investigate"
        
        return result
    
    def save_model(self, path="models/pred_maintenance"):
        import os
        os.makedirs(path, exist_ok=True)
        joblib.dump(self.classifier, f"{path}/classifier.pkl")
        joblib.dump(self.anomaly_detector, f"{path}/anomaly_detector.pkl")
        joblib.dump(self.scaler, f"{path}/scaler.pkl")
        logger.info(f"Model saved to {path}")
    
    def load_model(self, path="models/pred_maintenance"):
        self.classifier = joblib.load(f"{path}/classifier.pkl")
        self.anomaly_detector = joblib.load(f"{path}/anomaly_detector.pkl")
        self.scaler = joblib.load(f"{path}/scaler.pkl")
        self.is_trained = True
        logger.info("Model loaded")

# model = PredictiveMaintenanceModel()
# model.train(training_data)
# result = model.predict(current_sensor_data)
# print(f"Failure probability: {result['failure_probability']:.1%}")
# print(f"Recommendation: {result['recommendation']}")

FAQ คำถามที่พบบ่อย

Q: เริ่มต้น Manufacturing Automation ต้องใช้งบเท่าไหร?

A: ขึ้นอยู่กับ scope เริ่มจาก IoT sensors สำหรับ data collection อาจใช้ 100,000-500,000 บาท (sensors, gateway, server) PLC-based automation สำหรับสายการผลิตเดียวอาจ 1-5 ล้านบาท Full smart factory transformation อาจ 10-100 ล้านบาทขึ้นไป แนะนำเริ่มจาก pilot project เล็กๆ เช่น IoT monitoring สำหรับเครื่องจักรวิกฤต แล้วค่อยขยาย

Q: PLC กับ Raspberry Pi ใช้แทนกันได้ไหม?

A: ไม่ควรใช้แทนกันใน production PLC ออกแบบมาสำหรับ industrial environment ทนอุณหภูมิ -20 ถึง 60 องศา ทนการสั่นสะเทือน มี deterministic response time (milliseconds) และ safety certifications (SIL, PLd) Raspberry Pi เหมาะสำหรับ prototyping, data collection, edge computing แต่ไม่มี safety certification ไม่ทน industrial environment และ OS ไม่ deterministic สำหรับ production ใช้ PLC เสมอ

Q: OPC UA กับ MQTT เลือกอะไร?

A: ใช้ทั้งสองในส่วนต่างกัน OPC UA เหมาะสำหรับ PLC-to-SCADA communication ในระดับ plant floor มี information model ที่ rich กว่า security ดีกว่า MQTT เหมาะสำหรับ sensor-to-cloud communication ใน IIoT scenarios lightweight กว่า scale ได้ดีกว่า ใน smart factory ทั่วไปใช้ OPC UA ใน plant floor และ MQTT สำหรับส่งข้อมูลขึ้น cloud

Q: Predictive Maintenance ต้องมีข้อมูลนานแค่ไหนถึงจะเริ่มใช้ได้?

A: ขั้นต่ำต้องมีข้อมูลครอบคลุม failure events อย่างน้อย 10-20 ครั้ง ซึ่งอาจใช้เวลา 6-12 เดือนของการเก็บข้อมูลสำหรับเครื่องจักรที่ fail ไม่บ่อย เริ่มจาก condition monitoring (alert เมื่อ threshold ถูก cross) ก่อน แล้วค่อยพัฒนาเป็น predictive model เมื่อมีข้อมูลเพียงพอ สำหรับเครื่องจักรทั่วไป 3-6 เดือนของ sensor data เพียงพอสำหรับ anomaly detection

📖 บทความที่เกี่ยวข้อง

Apache Arrow CI CD Automation Pipelineอ่านบทความ → pharmacy automation คืออ่านบทความ → REST API Design CI CD Automation Pipelineอ่านบทความ → RAG Architecture Automation Scriptอ่านบทความ → Grafana Loki LogQL Automation Scriptอ่านบทความ →

📚 ดูบทความทั้งหมด →