工业 I/O 桥接:Modbus RTU 转 MQTT 与本地硬件联动
本示例演示如何将标准 Modbus RTU I/O 模块改造为智能 MQTT 网关。以 MA01-AXCX4020 为例,同时实现远程云端控制和本地硬件按钮逻辑,包含边沿触发切换和安全关键的 LED 互斥控制。
本示例实现的功能
- 每 500ms 轮询 2 路离散输入和 2 路线圈,两次请求之间错开 200ms,确保满足设备 100ms 响应时间要求。
- 为本地硬件切换实现边沿检测:DI1(常开)切换绿色 LED;DI2(常闭)切换红色 LED。
- 通过先关后开的 Modbus 写入序列,强制互斥,确保同一时刻只有一个 LED 亮起。
- 桥接至 MQTT:将实时 I/O 状态以 JSON 形式发布至
cycbox/state,并在cycbox/commands上监听远程字符串命令。 - 维护本地状态:将 Lua 脚本内部变量与实际硬件寄存器读值同步,确保远程操作与本地操作的一致性。
端到端数据流
以下流程图展示了一次本地按钮按下如何触发一系列 Modbus 命令,同时通知 MQTT 代理。
设备与线路协议
MA01-AXCX4020 是一款工业 RS485 I/O 模块,具备 4 路数字输入和 2 路继电器输出。
- 物理层:RS485,9600 bps,8-N-1。
- 从站地址:32(0x20)。根据数据手册,由硬件拨码开关设定值
31加上默认软件偏移量1计算得出。 - I/O 映射:
- DI1(常开按钮):离散输入
0x0000。 - DI2(常闭按钮):离散输入
0x0001。 - DO1(绿色 LED):线圈
0x0000。 - DO2(红色 LED):线圈
0x0001。
- DI1(常开按钮):离散输入
- 时序:设备处理一次请求约需 100ms;配置中两次 Modbus 请求之间错开 200ms 以防止帧碰撞。
CycBox 配置
连接 0:串口(Modbus RTU)
该连接直接与 MA01-AXCX4020 模块硬件通信。
{
"app": {
"app_transport": "serial_port_transport",
"app_codec": "modbus_rtu_codec",
"app_transformer": "disable_transformer",
"app_encoding": "UTF-8"
},
"serial_port_transport": {
"serial_port_transport_data_bits": 8,
"serial_port_transport_port": "/dev/ttyUSB0",
"serial_port_transport_baud_rate": 9600,
"serial_port_transport_parity": "none",
"serial_port_transport_stop_bits": "1",
"serial_port_transport_flow_control": "none"
},
"modbus_rtu_codec": {
"with_receive_timeout": 20
}
}
连接 1:MQTT 客户端
该连接作为下游 MQTT 代理的桥接,用于自动化控制与监控。
{
"app": {
"app_transport": "mqtt_transport",
"app_codec": "timeout_codec",
"app_transformer": "disable_transformer",
"app_encoding": "UTF-8"
},
"mqtt_transport": {
"mqtt_transport_subscribe_qos": 1,
"mqtt_transport_client_id": "cycbox-300JSYB6",
"mqtt_transport_broker_url": "mqtt://localhost:1883",
"mqtt_transport_subscribe_topics": "cycbox/commands",
"mqtt_transport_use_tls": false
},
"timeout_codec": {
"with_receive_timeout": 100
}
}
连接索引映射
Lua 脚本通过以下 ID 在现场总线和云端之间路由数据。
| connection_id | 角色 | 协议 |
|---|---|---|
| 0 | 现场设备 | Modbus RTU(RS485) |
| 1 | 上游桥接 | MQTT 客户端 |
| 2 | 内部代理 | MQTT 服务端 |
Lua 管道详解
脚本负责三个独立任务:周期轮询、命令解析以及本地硬件切换状态机。
-- MA01-AXCX4020 DI/DO 轮询、MQTT 与硬件切换脚本
-- 使用 serial_port_transport + modbus_rtu_codec 轮询 IO 状态,
-- 使用 mqtt_transport 发布状态并接收命令。
-- 每 500ms 轮询 2 路 DI 和 2 路 DO,以 JSON 格式发布到 "cycbox/state"。
-- 支持本地硬件切换:DI1(常开)切换绿色 LED,DI2(常闭)切换红色 LED。
-- LED 互斥(同一时刻只能有一个亮起)。
--
-- 识别的命令(MQTT 消息的字符串 payload):
-- "green_on" : 打开绿色 LED(DO1),关闭红色 LED(DO2)
-- "green_off" : 关闭绿色 LED(DO1)
-- "red_on" : 打开红色 LED(DO2),关闭绿色 LED(DO1)
-- "red_off" : 关闭红色 LED(DO2)
--
-- MA01-AXCX4020 设备
-- 从站地址:32(0x20)
-- Discrete 0x0000:DI1(常开按钮)
-- Discrete 0x0001:DI2(常闭按钮)
-- Coil 0x0000:DO1(绿色 LED)
-- Coil 0x0001:DO2(红色 LED)
local SLAVE_ID = 32
local POLL_INTERVAL_MS = 500
local timer_ms = 0
local MQTT_CONN_ID = 1
-- 用于边沿检测和切换的状态跟踪变量
local last_di1 = nil
local last_di2 = nil
local current_do1 = false
local current_do2 = false
function on_start()
log("info", "启动 MA01-AXCX4020 MQTT 轮询与切换脚本")
end
function on_timer(now_ms)
timer_ms = timer_ms + 100
if timer_ms >= POLL_INTERVAL_MS then
-- 从地址 0x0000 开始读取 2 路离散输入(DI1-DI2)
modbus_rtu_read_discrete_inputs(SLAVE_ID, 0x0000, 2, 0, 0)
-- 从地址 0x0000 开始读取 2 路线圈(DO1-DO2)
-- 错开 200ms,等待设备响应上一次请求
modbus_rtu_read_coils(SLAVE_ID, 0x0000, 2, 200, 0)
timer_ms = 0
end
end
function on_receive()
-- 1. 处理连接 1 上的 MQTT 命令
if message.connection_id == MQTT_CONN_ID then
local topic = message:get_metadata("mqtt_topic")
if topic == "cycbox/commands" and message.payload then
local cmd = string.lower(message.payload)
log("info", "收到 MQTT 命令:" .. cmd)
-- 互斥逻辑:打开一个之前,先关闭另一个
if string.find(cmd, "green_on") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0) -- DO2(红色)关
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, true, 100, 0) -- DO1(绿色)开
current_do2 = false
current_do1 = true
elseif string.find(cmd, "green_off") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0) -- DO1(绿色)关
current_do1 = false
elseif string.find(cmd, "red_on") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0) -- DO1(绿色)关
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, true, 100, 0) -- DO2(红色)开
current_do1 = false
current_do2 = true
elseif string.find(cmd, "red_off") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0) -- DO2(红色)关
current_do2 = false
end
end
return false
end
-- 2. 处理连接 0 上的 Modbus 响应
if message.connection_id == 0 then
local modified = false
-- 编解码器自动将有效的 Modbus 响应解析为按字符串 ID 映射的值
local di1 = message:get_value(string.format("modbus_rtu_%d:discrete_0000", SLAVE_ID))
local di2 = message:get_value(string.format("modbus_rtu_%d:discrete_0001", SLAVE_ID))
local do1 = message:get_value(string.format("modbus_rtu_%d:coil_0000", SLAVE_ID))
local do2 = message:get_value(string.format("modbus_rtu_%d:coil_0001", SLAVE_ID))
-- 将本地状态与实际硬件读值同步
if do1 ~= nil then current_do1 = do1 end
if do2 ~= nil then current_do2 = do2 end
-- 本地硬件切换:DI1(常开)边沿检测(false -> true)
if di1 ~= nil then
if last_di1 ~= nil and last_di1 == false and di1 == true then
log("info", "DI1 常开按钮按下,切换绿色 LED")
if current_do1 then
-- 若已亮,则关闭
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0)
current_do1 = false
else
-- 若已灭,执行互斥(先关红色,再开绿色)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, true, 100, 0)
current_do2 = false
current_do1 = true
end
end
last_di1 = di1
end
-- 本地硬件切换:DI2(常闭)边沿检测(true -> false)
if di2 ~= nil then
if last_di2 ~= nil and last_di2 == true and di2 == false then
log("info", "DI2 常闭按钮按下,切换红色 LED")
if current_do2 then
-- 若已亮,则关闭
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0)
current_do2 = false
else
-- 若已灭,执行互斥(先关绿色,再开红色)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, true, 100, 0)
current_do1 = false
current_do2 = true
end
end
last_di2 = di2
end
local json_parts = {}
if di1 ~= nil then
message:add_bool_value("Button_NO_DI1", di1)
table.insert(json_parts, string.format('"button_di1":%s', di1 and "true" or "false"))
modified = true
end
if di2 ~= nil then
message:add_bool_value("Button_NC_DI2", di2)
table.insert(json_parts, string.format('"button_di2":%s', di2 and "true" or "false"))
modified = true
end
if do1 ~= nil then
message:add_bool_value("LED_Green_DO1", do1)
table.insert(json_parts, string.format('"led_green":%s', do1 and "true" or "false"))
modified = true
end
if do2 ~= nil then
message:add_bool_value("LED_Red_DO2", do2)
table.insert(json_parts, string.format('"led_red":%s', do2 and "true" or "false"))
modified = true
end
-- 若解析到新值,向 MQTT 发布合并后的 JSON 消息
if modified and #json_parts > 0 then
local json_payload = "{" .. table.concat(json_parts, ",") .. "}"
mqtt_publish("cycbox/state", json_payload, 0, false, 0, MQTT_CONN_ID)
end
return modified
end
return false
end
回调逻辑说明
on_timer:每 100ms 触发一次。每满 500ms 发起一次 DI 读取,随后错开 200ms 发起 DO 读取,避免帧重叠。on_receive(MQTT):解析green_on等命令。在执行ON命令前,强制向对侧 LED 发送OFF命令,确保互斥规则不被破坏。on_receive(Modbus):通过将新 DI 状态与last_diX对比来执行边沿检测。- DI1(常开):在
false → true(上升沿)时触发切换。 - DI2(常闭):在
true → false(下降沿)时触发切换。 - 切换确认后,执行与 MQTT 命令相同的互斥逻辑。
- DI1(常开):在
下游服务契约
MQTT
以下主题和 Schema 用于通信。
- 主题(状态):
cycbox/state - 主题(命令):
cycbox/commands - Payload Schema(状态):包含全部 4 个 I/O 点布尔值的 JSON 对象。
- 示例 Payload:
{
"button_di1": false,
"button_di2": true,
"led_green": true,
"led_red": false
}
切换与互斥逻辑
下表定义了切换事件中用于强制互斥的状态转换规则。
| 触发条件 | 当前状态 | 动作 1 | 动作 2 | 结果 |
|---|---|---|---|---|
| DI1 按下 | 绿色 ON | DO1 → OFF | — | 全部关闭 |
| DI1 按下 | 绿色 OFF | DO2 → OFF | DO1 → ON | 绿色亮起 |
| DI2 按下 | 红色 ON | DO2 → OFF | — | 全部关闭 |
| DI2 按下 | 红色 OFF | DO1 → OFF | DO2 → ON | 红色亮起 |
运维注意事项
- 机械继电器寿命:频繁切换(如快速连续按按钮)会加速 MA01 机械继电器的磨损。
- 轮询延迟:500ms 的轮询间隔意味着按钮至少需要按住 500ms 才能保证被检测到。如需更快响应,可将轮询间隔缩短至 250ms,但需保持错开时序。
- 常闭逻辑反转:请注意
button_di2为常闭信号。JSON 中true表示按钮处于空闲状态,false表示按钮处于按下状态。
复现本示例
- 硬件:一台 MA01-AXCX4020 模块、一个 USB 转 RS485 适配器、一个常开按钮(DI1)、一个常闭按钮(DI2)。
- 接线:将 DI1 接在
DI1端子与GND之间;将 DI2 接在DI2端子与GND之间。 - 配置:将串口和 MQTT 的 JSON 配置块粘贴到 CycBox 引擎中。
- 脚本:粘贴上方提供的 Lua 脚本。
- 环境:确保本地 MQTT 代理(如 Mosquitto)正在
localhost:1883运行。
常见问题与建议
- 启动同步:重启后首次轮询时,脚本会更新
last_diX和current_doX。在收到第一条命令之前,互斥逻辑尚未生效。 - Modbus 地址计算:若设备无响应,请核查拨码开关。全部拨到
ON时硬件地址为 31,加上默认软件偏移量 1,即为本示例使用的从站 ID 32。 - 超时调优:
modbus_rtu_codec的with_receive_timeout设置为 20ms。若使用高延迟无线 RS485 网桥,请将其增大至 100ms。