跳到主要内容

工业 I/O 桥接:Modbus RTU 转 MQTT 与本地硬件联动

本示例演示如何将标准 Modbus RTU I/O 模块改造为智能 MQTT 网关。以 MA01-AXCX4020 为例,同时实现远程云端控制和本地硬件按钮逻辑,包含边沿触发切换和安全关键的 LED 互斥控制。

本示例实现的功能

  • 500ms 轮询 2 路离散输入和 2 路线圈,两次请求之间错开 200ms,确保满足设备 100ms 响应时间要求。
  • 为本地硬件切换实现边沿检测:DI1(常开)切换绿色 LED;DI2(常闭)切换红色 LED。
  • 通过先关后开的 Modbus 写入序列,强制互斥,确保同一时刻只有一个 LED 亮起。
  • 桥接至 MQTT:将实时 I/O 状态以 JSON 形式发布至 cycbox/state,并在 cycbox/commands 上监听远程字符串命令。
  • 维护本地状态:将 Lua 脚本内部变量与实际硬件寄存器读值同步,确保远程操作与本地操作的一致性。

端到端数据流

以下流程图展示了一次本地按钮按下如何触发一系列 Modbus 命令,同时通知 MQTT 代理。

设备与线路协议

MA01-AXCX4020 是一款工业 RS485 I/O 模块,具备 4 路数字输入和 2 路继电器输出。

  • 物理层:RS485,9600 bps,8-N-1。
  • 从站地址32(0x20)。根据数据手册,由硬件拨码开关设定值 31 加上默认软件偏移量 1 计算得出。
  • I/O 映射
    • DI1(常开按钮):离散输入 0x0000
    • DI2(常闭按钮):离散输入 0x0001
    • DO1(绿色 LED):线圈 0x0000
    • DO2(红色 LED):线圈 0x0001
  • 时序:设备处理一次请求约需 100ms;配置中两次 Modbus 请求之间错开 200ms 以防止帧碰撞。

CycBox 配置

连接 0:串口(Modbus RTU)

该连接直接与 MA01-AXCX4020 模块硬件通信。

{
"app": {
"app_transport": "serial_port_transport",
"app_codec": "modbus_rtu_codec",
"app_transformer": "disable_transformer",
"app_encoding": "UTF-8"
},
"serial_port_transport": {
"serial_port_transport_data_bits": 8,
"serial_port_transport_port": "/dev/ttyUSB0",
"serial_port_transport_baud_rate": 9600,
"serial_port_transport_parity": "none",
"serial_port_transport_stop_bits": "1",
"serial_port_transport_flow_control": "none"
},
"modbus_rtu_codec": {
"with_receive_timeout": 20
}
}

连接 1:MQTT 客户端

该连接作为下游 MQTT 代理的桥接,用于自动化控制与监控。

{
"app": {
"app_transport": "mqtt_transport",
"app_codec": "timeout_codec",
"app_transformer": "disable_transformer",
"app_encoding": "UTF-8"
},
"mqtt_transport": {
"mqtt_transport_subscribe_qos": 1,
"mqtt_transport_client_id": "cycbox-300JSYB6",
"mqtt_transport_broker_url": "mqtt://localhost:1883",
"mqtt_transport_subscribe_topics": "cycbox/commands",
"mqtt_transport_use_tls": false
},
"timeout_codec": {
"with_receive_timeout": 100
}
}

连接索引映射

Lua 脚本通过以下 ID 在现场总线和云端之间路由数据。

connection_id角色协议
0现场设备Modbus RTU(RS485)
1上游桥接MQTT 客户端
2内部代理MQTT 服务端

Lua 管道详解

脚本负责三个独立任务:周期轮询、命令解析以及本地硬件切换状态机。

-- MA01-AXCX4020 DI/DO 轮询、MQTT 与硬件切换脚本
-- 使用 serial_port_transport + modbus_rtu_codec 轮询 IO 状态,
-- 使用 mqtt_transport 发布状态并接收命令。
-- 每 500ms 轮询 2 路 DI 和 2 路 DO,以 JSON 格式发布到 "cycbox/state"。
-- 支持本地硬件切换:DI1(常开)切换绿色 LED,DI2(常闭)切换红色 LED。
-- LED 互斥(同一时刻只能有一个亮起)。
--
-- 识别的命令(MQTT 消息的字符串 payload):
-- "green_on" : 打开绿色 LED(DO1),关闭红色 LED(DO2)
-- "green_off" : 关闭绿色 LED(DO1)
-- "red_on" : 打开红色 LED(DO2),关闭绿色 LED(DO1)
-- "red_off" : 关闭红色 LED(DO2)
--
-- MA01-AXCX4020 设备
-- 从站地址:32(0x20)
-- Discrete 0x0000:DI1(常开按钮)
-- Discrete 0x0001:DI2(常闭按钮)
-- Coil 0x0000:DO1(绿色 LED)
-- Coil 0x0001:DO2(红色 LED)

local SLAVE_ID = 32
local POLL_INTERVAL_MS = 500
local timer_ms = 0
local MQTT_CONN_ID = 1

-- 用于边沿检测和切换的状态跟踪变量
local last_di1 = nil
local last_di2 = nil
local current_do1 = false
local current_do2 = false

function on_start()
log("info", "启动 MA01-AXCX4020 MQTT 轮询与切换脚本")
end

function on_timer(now_ms)
timer_ms = timer_ms + 100

if timer_ms >= POLL_INTERVAL_MS then
-- 从地址 0x0000 开始读取 2 路离散输入(DI1-DI2)
modbus_rtu_read_discrete_inputs(SLAVE_ID, 0x0000, 2, 0, 0)

-- 从地址 0x0000 开始读取 2 路线圈(DO1-DO2)
-- 错开 200ms,等待设备响应上一次请求
modbus_rtu_read_coils(SLAVE_ID, 0x0000, 2, 200, 0)

timer_ms = 0
end
end

function on_receive()
-- 1. 处理连接 1 上的 MQTT 命令
if message.connection_id == MQTT_CONN_ID then
local topic = message:get_metadata("mqtt_topic")
if topic == "cycbox/commands" and message.payload then
local cmd = string.lower(message.payload)
log("info", "收到 MQTT 命令:" .. cmd)

-- 互斥逻辑:打开一个之前,先关闭另一个
if string.find(cmd, "green_on") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0) -- DO2(红色)关
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, true, 100, 0) -- DO1(绿色)开
current_do2 = false
current_do1 = true
elseif string.find(cmd, "green_off") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0) -- DO1(绿色)关
current_do1 = false
elseif string.find(cmd, "red_on") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0) -- DO1(绿色)关
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, true, 100, 0) -- DO2(红色)开
current_do1 = false
current_do2 = true
elseif string.find(cmd, "red_off") then
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0) -- DO2(红色)关
current_do2 = false
end
end
return false
end

-- 2. 处理连接 0 上的 Modbus 响应
if message.connection_id == 0 then
local modified = false

-- 编解码器自动将有效的 Modbus 响应解析为按字符串 ID 映射的值
local di1 = message:get_value(string.format("modbus_rtu_%d:discrete_0000", SLAVE_ID))
local di2 = message:get_value(string.format("modbus_rtu_%d:discrete_0001", SLAVE_ID))
local do1 = message:get_value(string.format("modbus_rtu_%d:coil_0000", SLAVE_ID))
local do2 = message:get_value(string.format("modbus_rtu_%d:coil_0001", SLAVE_ID))

-- 将本地状态与实际硬件读值同步
if do1 ~= nil then current_do1 = do1 end
if do2 ~= nil then current_do2 = do2 end

-- 本地硬件切换:DI1(常开)边沿检测(false -> true)
if di1 ~= nil then
if last_di1 ~= nil and last_di1 == false and di1 == true then
log("info", "DI1 常开按钮按下,切换绿色 LED")
if current_do1 then
-- 若已亮,则关闭
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0)
current_do1 = false
else
-- 若已灭,执行互斥(先关红色,再开绿色)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, true, 100, 0)
current_do2 = false
current_do1 = true
end
end
last_di1 = di1
end

-- 本地硬件切换:DI2(常闭)边沿检测(true -> false)
if di2 ~= nil then
if last_di2 ~= nil and last_di2 == true and di2 == false then
log("info", "DI2 常闭按钮按下,切换红色 LED")
if current_do2 then
-- 若已亮,则关闭
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, false, 0, 0)
current_do2 = false
else
-- 若已灭,执行互斥(先关绿色,再开红色)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0000, false, 0, 0)
modbus_rtu_write_single_coil(SLAVE_ID, 0x0001, true, 100, 0)
current_do1 = false
current_do2 = true
end
end
last_di2 = di2
end

local json_parts = {}

if di1 ~= nil then
message:add_bool_value("Button_NO_DI1", di1)
table.insert(json_parts, string.format('"button_di1":%s', di1 and "true" or "false"))
modified = true
end
if di2 ~= nil then
message:add_bool_value("Button_NC_DI2", di2)
table.insert(json_parts, string.format('"button_di2":%s', di2 and "true" or "false"))
modified = true
end
if do1 ~= nil then
message:add_bool_value("LED_Green_DO1", do1)
table.insert(json_parts, string.format('"led_green":%s', do1 and "true" or "false"))
modified = true
end
if do2 ~= nil then
message:add_bool_value("LED_Red_DO2", do2)
table.insert(json_parts, string.format('"led_red":%s', do2 and "true" or "false"))
modified = true
end

-- 若解析到新值,向 MQTT 发布合并后的 JSON 消息
if modified and #json_parts > 0 then
local json_payload = "{" .. table.concat(json_parts, ",") .. "}"
mqtt_publish("cycbox/state", json_payload, 0, false, 0, MQTT_CONN_ID)
end

return modified
end

return false
end

回调逻辑说明

  1. on_timer:每 100ms 触发一次。每满 500ms 发起一次 DI 读取,随后错开 200ms 发起 DO 读取,避免帧重叠。
  2. on_receive(MQTT):解析 green_on 等命令。在执行 ON 命令前,强制向对侧 LED 发送 OFF 命令,确保互斥规则不被破坏。
  3. on_receive(Modbus):通过将新 DI 状态与 last_diX 对比来执行边沿检测。
    • DI1(常开):在 false → true(上升沿)时触发切换。
    • DI2(常闭):在 true → false(下降沿)时触发切换。
    • 切换确认后,执行与 MQTT 命令相同的互斥逻辑。

下游服务契约

MQTT

以下主题和 Schema 用于通信。

  • 主题(状态)cycbox/state
  • 主题(命令)cycbox/commands
  • Payload Schema(状态):包含全部 4 个 I/O 点布尔值的 JSON 对象。
  • 示例 Payload
{
"button_di1": false,
"button_di2": true,
"led_green": true,
"led_red": false
}

切换与互斥逻辑

下表定义了切换事件中用于强制互斥的状态转换规则。

触发条件当前状态动作 1动作 2结果
DI1 按下绿色 ONDO1 → OFF全部关闭
DI1 按下绿色 OFFDO2 → OFFDO1 → ON绿色亮起
DI2 按下红色 ONDO2 → OFF全部关闭
DI2 按下红色 OFFDO1 → OFFDO2 → ON红色亮起

运维注意事项

  • 机械继电器寿命:频繁切换(如快速连续按按钮)会加速 MA01 机械继电器的磨损。
  • 轮询延迟:500ms 的轮询间隔意味着按钮至少需要按住 500ms 才能保证被检测到。如需更快响应,可将轮询间隔缩短至 250ms,但需保持错开时序。
  • 常闭逻辑反转:请注意 button_di2 为常闭信号。JSON 中 true 表示按钮处于空闲状态,false 表示按钮处于按下状态。

复现本示例

  1. 硬件:一台 MA01-AXCX4020 模块、一个 USB 转 RS485 适配器、一个常开按钮(DI1)、一个常闭按钮(DI2)。
  2. 接线:将 DI1 接在 DI1 端子与 GND 之间;将 DI2 接在 DI2 端子与 GND 之间。
  3. 配置:将串口和 MQTT 的 JSON 配置块粘贴到 CycBox 引擎中。
  4. 脚本:粘贴上方提供的 Lua 脚本。
  5. 环境:确保本地 MQTT 代理(如 Mosquitto)正在 localhost:1883 运行。

常见问题与建议

  • 启动同步:重启后首次轮询时,脚本会更新 last_diXcurrent_doX。在收到第一条命令之前,互斥逻辑尚未生效。
  • Modbus 地址计算:若设备无响应,请核查拨码开关。全部拨到 ON 时硬件地址为 31,加上默认软件偏移量 1,即为本示例使用的从站 ID 32。
  • 超时调优modbus_rtu_codecwith_receive_timeout 设置为 20ms。若使用高延迟无线 RS485 网桥,请将其增大至 100ms。