RocketMQ 桥接

EMQX 企业版技术支持发表于:2022年05月23日 10:58:12

RocketMQ 桥接

提示:EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用RocketMQ 桥接规则引擎中创建 RocketMQ 桥接

EMQX 桥接转发 MQTT 消息到 RocketMQ 集群:

image

RocketMQ 桥接插件配置文件: etc/plugins/emqx_bridge_rocket.conf。

配置 RocketMQ 集群地址

## RocketMQ 服务器集群配置
## bridge.rocket.servers = 127.0.0.1:9876,127.0.0.2:9876,127.0.0.3:9876
bridge.rocket.servers = 127.0.0.1:9876

bridge.rocket.refresh_topic_route_interval = 5S

## 分区生产者是同步/异步模式选择
bridge.rocket.produce = sync

## 生产者同步模式下的超时时间
## bridge.rocket.produce.sync_timeout = 3s

## 生产者 batch 的消息数量
## bridge.rocket.producer.batch_size = 100

## 采用 base64 编码或不编码
## bridge.rocket.encode_payload_type = base64

## bridge.rocket.sock.buffer = 32KB
## bridge.rocket.sock.recbuf = 32KB
bridge.rocket.sock.sndbuf = 1MB
## bridge.rocket.sock.read_packets = 20

配置 RocketMQ 桥接规则

## Bridge RocketMQ Hooks
## ${topic}: the RocketMQ topics to which the messages will be published.
## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .

## Client Connected Record Hook
bridge.rocket.hook.client.connected.1     = {"topic": "ClientConnected"}

## Client Disconnected Record Hook
bridge.rocket.hook.client.disconnected.1  = {"topic": "ClientDisconnected"}

## Session Subscribed Record Hook
bridge.rocket.hook.session.subscribed.1   = {"filter": "#",  "topic": "SessionSubscribed"}

## Session Unsubscribed Record Hook
bridge.rocket.hook.session.unsubscribed.1 = {"filter": "#",  "topic": "SessionUnsubscribed"}

## Message Publish Record Hook
bridge.rocket.hook.message.publish.1      = {"filter": "#",  "topic": "MessagePublish"}

## Message Delivered Record Hook
bridge.rocket.hook.message.delivered.1    = {"filter": "#",  "topic": "MessageDeliver"}

## Message Acked Record Hook
bridge.rocket.hook.message.acked.1        = {"filter": "#",  "topic": "MessageAcked"}

RocketMQ 桥接规则说明

事件说明
bridge.rocket.hook.client.connected.1客户端登录
bridge.rocket.hook.client.disconnected.1客户端退出
bridge.rocket.hook.session.subscribed.1订阅主题
bridge.rocket.hook.session.unsubscribed.1取消订阅主题 
bridge.rocket.hook.message.publish.1发布消息
bridge.rocket.hook.message.delivered.1delivered 消息
bridge.rocket.hook.message.acked.1ACK 消息

客户端上下线事件转发 RocketMQ

设备上线 EMQX 转发上线事件消息到 RocketMQ:

topic = "ClientConnected",
value = {
         "client_id": ${clientid},
         "username": ${username},
         "node": ${node},
         "ts": ${ts}
        }

设备下线 EMQX 转发下线事件消息到 RocketMQ:

topic = "ClientDisconnected",
value = {
         "client_id": ${clientid},
         "username": ${username},
         "reason": ${reason},
         "node": ${node},
         "ts": ${ts}
        }

客户端订阅主题事件转发 RocketMQ

topic = "SessionSubscribed"

value = {
         "client_id": ${clientid},
         "topic": ${topic},
         "qos": ${qos},
         "node": ${node},
         "ts": ${timestamp}
        }

客户端取消订阅主题事件转发 RocketMQ

topic = "SessionUnsubscribed"

value = {
         "client_id": ${clientid},
         "topic": ${topic},
         "qos": ${qos},
         "node": ${node},
         "ts": ${timestamp}
        }

MQTT 消息转发到 RocketMQ

topic = "MessagePublish"

value = {
         "client_id": ${clientid},
         "username": ${username},
         "topic": ${topic},
         "payload": ${payload},
         "qos": ${qos},
         "node": ${node},
         "ts": ${timestamp}
        }

MQTT 消息派发 (Deliver) 事件转发 RocketMQ

topic = "MessageDeliver"

value = {
         "client_id": ${clientid},
         "username": ${username},
         "from": ${fromClientId},
         "topic": ${topic},
         "payload": ${payload},
         "qos": ${qos},
         "node": ${node},
         "ts": ${timestamp}
        }

MQTT 消息确认 (Ack) 事件转发 RocketMQ

topic = "MessageAcked"

value = {
         "client_id": ${clientid},
         "username": ${username},
         "from": ${fromClientId},
         "topic": ${topic},
         "payload": ${payload},
         "qos": ${qos},
         "node": ${node},
         "ts": ${timestamp}
        }

RocketMQ 消费示例

RocketMQ 读取 MQTT 客户端上下线事件消息:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientConnected
  
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientDisconnected

RocketMQ 读取 MQTT 主题订阅事件消息:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionSubscribed

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionUnsubscribed

RocketMQ 读取 MQTT 发布消息:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessagePublish

RocketMQ 读取 MQTT 消息发布 (Deliver)、确认 (Ack) 事件:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessageDeliver

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessageAcked

 提示:默认 payload 被 base64 编码,可通过修改配置 bridge.rocket.encode_payload_type 指定 payload 数据格式。 

启用 RocketMQ 桥接插件

./bin/emqx_ctl plugins load emqx_bridge_rocket


    您需要登录后才可以回复