Elixir Nerves คืออะไรและเหมาะกับ IoT อย่างไร
Nerves เป็น framework สำหรับสร้าง embedded systems และ IoT devices ด้วยภาษา Elixir ที่ทำงานบน Erlang/OTP virtual machine (BEAM) Nerves สร้าง custom Linux firmware ที่มีขนาดเล็ก boot เร็ว และมี Elixir runtime พร้อมใช้งาน ทำให้พัฒนา IoT applications ได้อย่างมีประสิทธิภาพ
จุดเด่นของ Nerves สำหรับ IoT คือ Fault Tolerance จาก OTP supervision trees ที่ restart processes อัตโนมัติเมื่อเกิดข้อผิดพลาด Concurrency จาก lightweight BEAM processes ที่รองรับ connections จำนวนมาก OTA Updates ที่อัปเดต firmware ผ่านเครือข่ายได้ Hot Code Reloading ที่เปลี่ยนโค้ดได้โดยไม่ต้อง restart และ Small Footprint ที่ firmware มีขนาดเพียง 30-50 MB
Hardware ที่ Nerves รองรับได้แก่ Raspberry Pi (ทุกรุ่น), BeagleBone, GRiSP boards และ custom hardware ที่ใช้ ARM processors Nerves ถูกใช้จริงในอุตสาหกรรมเช่น smart agriculture, industrial monitoring, home automation และ fleet management
สำหรับ Incident Management ใน IoT, Nerves มีข้อได้เปรียบเพราะ BEAM VM ออกแบบมาสำหรับระบบ telecom ที่ต้องมี uptime 99.999% (five nines) ทำให้ระบบสามารถ detect, respond และ recover จาก incidents ได้อัตโนมัติ
ติดตั้ง Nerves และสร้างโปรเจกต์ IoT แรก
ขั้นตอนการตั้งค่า development environment และสร้างโปรเจกต์
# ติดตั้ง Elixir และ Nerves
# macOS
brew install elixir
brew install fwup squashfs coreutils xz pkg-config
# Ubuntu/Debian
sudo apt-get install elixir erlang-dev erlang-dialyzer
sudo apt-get install fwup squashfs-tools ssh-askpass
# ติดตั้ง Nerves archive
mix archive.install hex nerves_bootstrap
# สร้างโปรเจกต์ใหม่ (target: Raspberry Pi 4)
mix nerves.new iot_monitor --target rpi4
cd iot_monitor
# โครงสร้างโปรเจกต์
# iot_monitor/
# ├── config/
# │ ├── config.exs # shared config
# │ ├── host.exs # development config
# │ └── target.exs # device config
# ├── lib/
# │ ├── iot_monitor.ex # main application
# │ └── iot_monitor/
# │ ├── application.ex # OTP application
# │ ├── sensor.ex # sensor reading
# │ ├── incident.ex # incident detection
# │ └── reporter.ex # incident reporting
# ├── rel/
# │ └── vm.args.eex # VM arguments
# ├── rootfs_overlay/ # custom files for firmware
# ├── test/
# ├── mix.exs
# └── mix.lock
# config/target.exs
# import Config
# config :iot_monitor, target: Mix.target()
# config :nerves, :firmware,
# rootfs_overlay: "rootfs_overlay",
# provisioning: :nerves_hub_link
#
# config :nerves_ssh,
# authorized_keys: [
# File.read!(Path.join(System.user_home!(), ".ssh/id_rsa.pub"))
# ]
#
# config :vintage_net,
# regulatory_domain: "TH",
# config: [
# {"wlan0", %{
# type: VintageNetWiFi,
# vintage_net_wifi: %{
# networks: [%{ssid: "MyWiFi", psk: "password123", key_mgmt: :wpa_psk}]
# },
# ipv4: %{method: :dhcp}
# }}
# ]
# Build firmware
export MIX_TARGET=rpi4
mix deps.get
mix firmware
# Burn to SD card
mix firmware.burn
# OTA update (after first boot)
mix firmware
mix upload iot_monitor.local
สร้างระบบ Incident Detection สำหรับ IoT Devices
โค้ด Elixir สำหรับตรวจจับและจัดการ incidents
# lib/iot_monitor/application.ex
defmodule IotMonitor.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{IotMonitor.SensorHub, []},
{IotMonitor.IncidentDetector, []},
{IotMonitor.IncidentManager, []},
{IotMonitor.AlertDispatcher, []},
{IotMonitor.MetricsCollector, interval: 5_000},
{IotMonitor.HealthChecker, interval: 10_000},
]
opts = [strategy: :one_for_one, name: IotMonitor.Supervisor]
Supervisor.start_link(children, opts)
end
end
# lib/iot_monitor/sensor_hub.ex
defmodule IotMonitor.SensorHub do
use GenServer
require Logger
defstruct [:readings, :last_update]
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def get_reading(sensor_id) do
GenServer.call(__MODULE__, {:get_reading, sensor_id})
end
@impl true
def init(_) do
schedule_read()
{:ok, %{readings: %{}, last_update: nil}}
end
@impl true
def handle_info(:read_sensors, state) do
readings = %{
temperature: read_temperature(),
humidity: read_humidity(),
cpu_temp: read_cpu_temp(),
memory_usage: read_memory(),
disk_usage: read_disk(),
network_status: check_network(),
}
new_state = %{state | readings: readings, last_update: DateTime.utc_now()}
Phoenix.PubSub.broadcast(IotMonitor.PubSub, "sensors", {:new_readings, readings})
schedule_read()
{:noreply, new_state}
end
@impl true
def handle_call({:get_reading, sensor_id}, _from, state) do
{:reply, Map.get(state.readings, sensor_id), state}
end
defp schedule_read, do: Process.send_after(self(), :read_sensors, 5_000)
defp read_temperature do
case File.read("/sys/bus/w1/devices/28-*/temperature") do
{:ok, data} -> String.trim(data) |> String.to_integer() |> Kernel./(1000)
_ -> nil
end
end
defp read_cpu_temp do
case File.read("/sys/class/thermal/thermal_zone0/temp") do
{:ok, data} -> String.trim(data) |> String.to_integer() |> Kernel./(1000)
_ -> nil
end
end
defp read_humidity, do: Enum.random(40..80) / 1.0
defp read_memory, do: :erlang.memory(:total) / 1_048_576
defp read_disk do
{output, 0} = System.cmd("df", ["-h", "/"])
output |> String.split("\n") |> Enum.at(1) |> String.split() |> Enum.at(4)
end
defp check_network, do: match?({:ok, _}, :inet.gethostbyname('google.com'))
end
# lib/iot_monitor/incident_detector.ex
defmodule IotMonitor.IncidentDetector do
use GenServer
require Logger
@thresholds %{
cpu_temp_critical: 80.0,
cpu_temp_warning: 70.0,
memory_usage_critical: 900,
temperature_high: 40.0,
temperature_low: 5.0,
humidity_high: 85.0,
}
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
@impl true
def init(_) do
Phoenix.PubSub.subscribe(IotMonitor.PubSub, "sensors")
{:ok, %{active_incidents: %{}}}
end
@impl true
def handle_info({:new_readings, readings}, state) do
incidents = detect_incidents(readings)
Enum.each(incidents, fn incident ->
unless Map.has_key?(state.active_incidents, incident.id) do
IotMonitor.IncidentManager.create_incident(incident)
Logger.warning("Incident detected: #{incident.type} — #{incident.message}")
end
end)
resolved = state.active_incidents
|> Enum.filter(fn {id, _} -> not Enum.any?(incidents, &(&1.id == id)) end)
|> Enum.each(fn {id, _} ->
IotMonitor.IncidentManager.resolve_incident(id)
Logger.info("Incident resolved: #{id}")
end)
active = Map.new(incidents, fn i -> {i.id, i} end)
{:noreply, %{state | active_incidents: active}}
end
defp detect_incidents(readings) do
[]
|> maybe_add(check_cpu_temp(readings[:cpu_temp]))
|> maybe_add(check_temperature(readings[:temperature]))
|> maybe_add(check_memory(readings[:memory_usage]))
|> maybe_add(check_network(readings[:network_status]))
end
defp check_cpu_temp(nil), do: nil
defp check_cpu_temp(temp) when temp >= 80.0 do
%{id: :cpu_critical, type: :hardware, severity: :critical,
message: "CPU temperature critical: #{temp}C", value: temp}
end
defp check_cpu_temp(temp) when temp >= 70.0 do
%{id: :cpu_warning, type: :hardware, severity: :warning,
message: "CPU temperature high: #{temp}C", value: temp}
end
defp check_cpu_temp(_), do: nil
defp check_temperature(nil), do: nil
defp check_temperature(temp) when temp >= 40.0 do
%{id: :temp_high, type: :environment, severity: :warning,
message: "Temperature high: #{temp}C", value: temp}
end
defp check_temperature(_), do: nil
defp check_memory(nil), do: nil
defp check_memory(mem) when mem >= 900 do
%{id: :mem_critical, type: :system, severity: :critical,
message: "Memory usage critical: #{round(mem)}MB", value: mem}
end
defp check_memory(_), do: nil
defp check_network(false) do
%{id: :network_down, type: :connectivity, severity: :critical,
message: "Network connectivity lost", value: false}
end
defp check_network(_), do: nil
defp maybe_add(list, nil), do: list
defp maybe_add(list, item), do: [item | list]
end
OTP Supervision Tree สำหรับ Fault Tolerance
ออกแบบ supervision tree ที่ทำให้ระบบ recover อัตโนมัติ
# lib/iot_monitor/incident_manager.ex
defmodule IotMonitor.IncidentManager do
use GenServer
require Logger
defmodule Incident do
defstruct [:id, :type, :severity, :message, :value,
:created_at, :resolved_at, :status, :actions_taken]
end
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def create_incident(incident_data) do
GenServer.cast(__MODULE__, {:create, incident_data})
end
def resolve_incident(incident_id) do
GenServer.cast(__MODULE__, {:resolve, incident_id})
end
def list_incidents do
GenServer.call(__MODULE__, :list)
end
@impl true
def init(_) do
{:ok, %{incidents: %{}, history: []}}
end
@impl true
def handle_cast({:create, data}, state) do
incident = %Incident{
id: data.id,
type: data.type,
severity: data.severity,
message: data.message,
value: data.value,
created_at: DateTime.utc_now(),
status: :active,
actions_taken: [],
}
# Auto-remediation
actions = auto_remediate(incident)
incident = %{incident | actions_taken: actions}
# Dispatch alerts
IotMonitor.AlertDispatcher.dispatch(incident)
new_state = put_in(state, [:incidents, incident.id], incident)
{:noreply, new_state}
end
@impl true
def handle_cast({:resolve, id}, state) do
case Map.get(state.incidents, id) do
nil -> {:noreply, state}
incident ->
resolved = %{incident | status: :resolved, resolved_at: DateTime.utc_now()}
history = [resolved | state.history] |> Enum.take(100)
incidents = Map.delete(state.incidents, id)
{:noreply, %{state | incidents: incidents, history: history}}
end
end
@impl true
def handle_call(:list, _from, state) do
{:reply, Map.values(state.incidents), state}
end
defp auto_remediate(%{id: :cpu_critical} = _incident) do
Logger.info("Auto-remediation: reducing CPU load")
# ลด workload อัตโนมัติ
System.cmd("renice", ["+10", "-p", "#{System.pid()}"])
["Reduced process priority"]
end
defp auto_remediate(%{id: :mem_critical} = _incident) do
Logger.info("Auto-remediation: clearing memory")
:erlang.garbage_collect()
["Triggered garbage collection"]
end
defp auto_remediate(%{id: :network_down} = _incident) do
Logger.info("Auto-remediation: restarting network")
System.cmd("ip", ["link", "set", "wlan0", "down"])
Process.sleep(2_000)
System.cmd("ip", ["link", "set", "wlan0", "up"])
["Restarted network interface"]
end
defp auto_remediate(_), do: []
end
# lib/iot_monitor/alert_dispatcher.ex
defmodule IotMonitor.AlertDispatcher do
use GenServer
require Logger
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def dispatch(incident) do
GenServer.cast(__MODULE__, {:dispatch, incident})
end
@impl true
def init(_) do
{:ok, %{sent_alerts: %{}}}
end
@impl true
def handle_cast({:dispatch, incident}, state) do
# Deduplicate: ส่ง alert เดียวกันไม่เกิน 1 ครั้งต่อ 5 นาที
key = "#{incident.id}_#{incident.severity}"
last_sent = Map.get(state.sent_alerts, key)
now = System.monotonic_time(:second)
if is_nil(last_sent) or (now - last_sent) > 300 do
send_alert(incident)
new_state = put_in(state, [:sent_alerts, key], now)
{:noreply, new_state}
else
{:noreply, state}
end
end
defp send_alert(%{severity: :critical} = incident) do
# ส่งทุกช่องทาง
send_webhook(incident)
send_mqtt(incident)
Logger.error("CRITICAL ALERT: #{incident.message}")
end
defp send_alert(%{severity: :warning} = incident) do
send_mqtt(incident)
Logger.warning("WARNING: #{incident.message}")
end
defp send_alert(incident) do
Logger.info("INFO: #{incident.message}")
end
defp send_webhook(incident) do
payload = Jason.encode!(%{
text: "[#{incident.severity}] #{incident.message}",
device: node(),
timestamp: DateTime.utc_now(),
})
HTTPoison.post(webhook_url(), payload, [{"Content-Type", "application/json"}])
end
defp send_mqtt(incident) do
topic = "iot/#{node()}/incidents/#{incident.type}"
Tortoise.publish("iot_client", topic, Jason.encode!(incident), qos: 1)
end
defp webhook_url, do: Application.get_env(:iot_monitor, :webhook_url, "")
end
# Supervision Tree Structure:
#
# IotMonitor.Supervisor (one_for_one)
# ├── IotMonitor.SensorHub — อ่าน sensor data
# ├── IotMonitor.IncidentDetector — ตรวจจับ incidents
# ├── IotMonitor.IncidentManager — จัดการ incidents
# ├── IotMonitor.AlertDispatcher — ส่ง alerts
# ├── IotMonitor.MetricsCollector — เก็บ metrics
# └── IotMonitor.HealthChecker — ตรวจสอบ health
#
# ถ้า SensorHub crash → restart อัตโนมัติ
# ถ้า IncidentDetector crash → restart, subscribe ใหม่
# ถ้า AlertDispatcher crash → restart, alerts queue ไม่หาย
Remote Device Management และ OTA Updates
จัดการ devices ระยะไกลและอัปเดต firmware
# mix.exs — Dependencies สำหรับ OTA และ Device Management
defp deps do
[
{:nerves, "~> 1.10", runtime: false},
{:nerves_system_rpi4, "~> 1.24", runtime: false, targets: :rpi4},
{:nerves_pack, "~> 0.7"},
{:nerves_ssh, "~> 0.4"},
{:nerves_hub_link, "~> 2.0"}, # OTA updates via NervesHub
{:vintage_net, "~> 0.13"},
{:vintage_net_wifi, "~> 0.12"},
{:phoenix_pubsub, "~> 2.1"},
{:tortoise, "~> 0.10"}, # MQTT client
{:jason, "~> 1.4"},
{:httpoison, "~> 2.0"},
{:circuits_gpio, "~> 1.1"}, # GPIO access
{:circuits_i2c, "~> 2.0"}, # I2C sensors
]
end
# lib/iot_monitor/health_checker.ex
defmodule IotMonitor.HealthChecker do
use GenServer
require Logger
def start_link(opts) do
interval = Keyword.get(opts, :interval, 10_000)
GenServer.start_link(__MODULE__, interval, name: __MODULE__)
end
def get_health do
GenServer.call(__MODULE__, :get_health)
end
@impl true
def init(interval) do
schedule_check(interval)
{:ok, %{interval: interval, health: %{}, uptime_start: System.monotonic_time(:second)}}
end
@impl true
def handle_info(:check, state) do
health = %{
status: :healthy,
uptime_seconds: System.monotonic_time(:second) - state.uptime_start,
erlang_processes: :erlang.system_info(:process_count),
memory_mb: :erlang.memory(:total) / 1_048_576,
cpu_temp: read_cpu_temp(),
firmware_version: Application.spec(:iot_monitor, :vsn) |> to_string(),
nerves_target: Nerves.Runtime.KV.get("nerves_serial_number"),
network: check_network_health(),
disk_free: check_disk_free(),
last_check: DateTime.utc_now() |> DateTime.to_iso8601(),
}
health = if health.cpu_temp > 80 or health.memory_mb > 900 do
%{health | status: :degraded}
else
health
end
# Publish health status via MQTT
publish_health(health)
schedule_check(state.interval)
{:noreply, %{state | health: health}}
end
@impl true
def handle_call(:get_health, _from, state) do
{:reply, state.health, state}
end
defp read_cpu_temp do
case File.read("/sys/class/thermal/thermal_zone0/temp") do
{:ok, val} -> String.trim(val) |> String.to_integer() |> Kernel./(1000)
_ -> 0.0
end
end
defp check_network_health do
case :inet.gethostbyname('google.com') do
{:ok, _} -> :connected
_ -> :disconnected
end
end
defp check_disk_free do
{output, 0} = System.cmd("df", ["-m", "/"])
output |> String.split("\n") |> Enum.at(1)
|> String.split() |> Enum.at(3) |> String.to_integer()
rescue
_ -> 0
end
defp publish_health(health) do
topic = "iot/#{node()}/health"
Tortoise.publish("iot_client", topic, Jason.encode!(health), qos: 0)
end
defp schedule_check(interval) do
Process.send_after(self(), :check, interval)
end
end
# OTA Update via NervesHub:
# 1. สร้าง firmware: mix firmware
# 2. Sign firmware: mix nerves_hub.firmware.sign
# 3. Upload: mix nerves_hub.firmware.publish
# 4. Deploy: mix nerves_hub.deployment.create
# Devices จะดาวน์โหลดและ install อัตโนมัติ
Monitoring Dashboard และ Alerting
สร้างระบบ monitoring สำหรับ fleet ของ IoT devices
# lib/iot_monitor/metrics_collector.ex
defmodule IotMonitor.MetricsCollector do
use GenServer
def start_link(opts) do
interval = Keyword.get(opts, :interval, 5_000)
GenServer.start_link(__MODULE__, interval, name: __MODULE__)
end
@impl true
def init(interval) do
:telemetry.attach_many(
"iot-metrics",
[
[:iot_monitor, :sensor, :read],
[:iot_monitor, :incident, :created],
[:iot_monitor, :incident, :resolved],
],
&handle_event/4,
nil
)
schedule_collect(interval)
{:ok, %{interval: interval, metrics: []}}
end
@impl true
def handle_info(:collect, state) do
metrics = %{
timestamp: DateTime.utc_now() |> DateTime.to_iso8601(),
system: %{
processes: :erlang.system_info(:process_count),
memory_total: :erlang.memory(:total),
memory_processes: :erlang.memory(:processes),
run_queue: :erlang.statistics(:run_queue),
schedulers: :erlang.system_info(:schedulers_online),
},
vm: %{
reductions: elem(:erlang.statistics(:reductions), 0),
gc_count: elem(:erlang.statistics(:garbage_collection), 0),
io_input: elem(:erlang.statistics(:io), 0) |> elem(1),
io_output: elem(:erlang.statistics(:io), 1) |> elem(1),
}
}
# ส่งไปยัง InfluxDB หรือ Prometheus
send_metrics(metrics)
schedule_collect(state.interval)
{:noreply, %{state | metrics: [metrics | state.metrics] |> Enum.take(720)}}
end
def handle_event([:iot_monitor, :sensor, :read], measurements, metadata, _config) do
# Log sensor read latency
:telemetry.execute(
[:iot_monitor, :metrics],
%{sensor_read_us: measurements.duration},
metadata
)
end
def handle_event([:iot_monitor, :incident, event], _measurements, metadata, _config) do
IO.puts("Incident #{event}: #{inspect(metadata)}")
end
defp send_metrics(metrics) do
# InfluxDB line protocol
line = "iot_system, device=#{node()} " <>
"processes=#{metrics.system.processes}," <>
"memory=#{metrics.system.memory_total}," <>
"run_queue=#{metrics.system.run_queue} " <>
"#{DateTime.utc_now() |> DateTime.to_unix(:nanosecond)}"
HTTPoison.post(
"http://influxdb:8086/write?db=iot_metrics",
line,
[{"Content-Type", "text/plain"}]
)
end
defp schedule_collect(interval) do
Process.send_after(self(), :collect, interval)
end
end
# Grafana Dashboard JSON สำหรับ IoT Monitoring
# {
# "dashboard": {
# "title": "IoT Fleet Monitoring",
# "panels": [
# {
# "title": "Device Temperature",
# "type": "timeseries",
# "targets": [{"query": "SELECT mean(cpu_temp) FROM iot_system GROUP BY device, time(1m)"}]
# },
# {
# "title": "Active Incidents",
# "type": "stat",
# "targets": [{"query": "SELECT count(*) FROM incidents WHERE status='active'"}]
# },
# {
# "title": "Device Health Map",
# "type": "geomap",
# "targets": [{"query": "SELECT last(status), lat, lon FROM device_health GROUP BY device"}]
# }
# ]
# }
# }
FAQ คำถามที่พบบ่อย
Q: Nerves เหมาะกับ production IoT มากแค่ไหน?
A: Nerves ใช้ใน production จริงในหลายบริษัท เช่น FarmBot (smart agriculture), Rose Point Navigation (maritime systems) และ Le Tote (fashion logistics) ข้อดีคือ BEAM VM มีความเสถียรสูงมาก มี supervision trees สำหรับ self-healing และ OTA updates ลดค่าใช้จ่ายในการ maintain devices ที่ติดตั้งในสถานที่ห่างไกล
Q: Nerves รองรับ sensors อะไรบ้าง?
A: Nerves รองรับ sensors ผ่าน GPIO, I2C, SPI และ UART libraries เช่น circuits_gpio, circuits_i2c, circuits_spi sensors ที่ใช้ได้เช่น DHT22 (temperature/humidity), BMP280 (barometric pressure), MCP3008 (ADC), DS18B20 (temperature) และ camera modules นอกจากนี้ยังรองรับ USB devices ผ่าน Linux kernel drivers
Q: OTA update ปลอดภัยไหมถ้า update ล้มเหลว?
A: Nerves ใช้ A/B partition scheme ที่มี firmware slot สอง slots เมื่อ update จะเขียนลง slot ที่ไม่ได้ใช้งาน ถ้า boot สำเร็จจะ confirm slot ใหม่ ถ้า boot ล้มเหลวจะ rollback กลับไป slot เดิมอัตโนมัติ ทำให้ device ไม่มีวัน brick จาก bad update NervesHub เพิ่ม signing และ encryption สำหรับ firmware integrity
Q: Elixir/Nerves กับ MicroPython หรือ Arduino ต่างกันอย่างไร?
A: Arduino เหมาะกับ microcontrollers ง่ายๆ ทรัพยากรจำกัดมาก MicroPython เหมาะกับ rapid prototyping บน microcontrollers ขนาดกลาง Nerves เหมาะกับ Linux-capable boards (Raspberry Pi ขึ้นไป) ที่ต้องการ networking, concurrency, fault tolerance และ OTA updates ข้อเด่นของ Nerves คือ production-grade reliability ที่มาจาก Erlang/OTP
