概览
从 eKuiper 0.9.1 版本开始,每发布一个 eKuiper 新版本,会随之发布对应版本的管理控制台。本文以一个实际例子来说明如何使用管理控制台对 eKuiper 节点进行操作与管理。文中将订阅来自于 MQTT 服务器的数据,通过 eKuiper 写好的规则,经过处理后发送到业务指定平台,演示说明如下:
通过管理控制台创建一个 eKuiper 节点
创建一个流,用于订阅 MQTT 服务器中的数据,本例演示订阅 MQTT 服务器,相关信息如下所示。
地址为:
tcp://broker.emqx.io:1883
,主题为:
emqx/up
,数据为:
{"temperature": 20, "humidity" : 20}
、{"temperature": 30, "humidity" : 20}、
{"temperature": 40, "humidity" : 20}
创建一个规则,用于计算订阅到的数据,并将数据写入目标 (sink) 端「本例演示将订阅到的消息写入到文件中」。
eKuiper 目前已经支持多种源和目标。用户只需安装相对应的插件,便能实现对应的功能「本例的源为 MQTT源是内置支持,无需安装;目标为文件 (file),非内置支持,需要另安装」
数据框架
部署
本文以docker-compose安装方式为例:
version: '3.4' services: nanomq: image: emqx/nanomq:0.13-slim container_name: nanomq hostname: nanomq ports: - "1883:1883" manager: image: emqx/ekuiper-manager:1.7 container_name: kuiper-manager hostname: manager ports: - "9082:9082" kuiper: image: lfedge/ekuiper:1.7-slim-python container_name: kuiper hostname: kuiper ports: - "9081:9081"
登录 kuiper-manager
登录时需要提供 kuiper-manager 的地址,用户名、密码。如下图所示:
地址:
http://$yourhost:9082
用户名:admin
密码:public
创建 eKuiper 服务
创建 eKuiper 服务时需要填写「服务类型」,「服务名称」和「端点 URL 」。
服务类型 : 选择
直接连接服务
(华为 IEF 服务
专用于华为用户)。服务名称 : 自拟,本例为
example
。端点URL:
http://$IP:9081
,IP 获取命令如下:
docker inspect kuiper | grep IPAddress
创建 eKuiper 服务样例如下图所示,如果把端口暴露到了主机,那么也可以直接使用主机上的 9081 端口地址。
创建流
如下图,创建一个名为 nanomq
的流,
用于订阅地址为
tcp://broker.emqx.io:1883
的 MQTT 服务器消息消息主题为
nanomq/up
流结构体定义包含了以下三个字段。
用户也可以去掉「是否为带结构的流」来定义一个 schemaless 的数据源。
temperature: bigint
humidity: bigint
id:bigint
「流类型」可以不选择,不选的话为缺省的「mqtt」,或者如下图所示直接选择「mqtt」
「配置组」,与「流类型」类似,用户不选的话,使用缺省的「default」
「流格式」,与「流类型」类似,用户不选的话,使用缺省的「json」
如上所示用的是缺省的「default」配置组,用户也可以根据需求编写自己的配置。在ekuiper 1.7版本上可以在直接在创建流的时候去添加或修改配置组,如图:
名称:自定义
服务器地址:MQTT 消息代理的服务器
用户名:MQTT 连接用户名
密码:MQTT 连接密码
MQTT协议版本:MQTT 协议版本。3.1 (也被称为 MQTT 3) 或者 3.1.1 (也被称为 MQTT 4)。 如果未指定,缺省值为 3.1
客户端ID:MQTT 连接的客户端 ID。 如果未指定,将使用一个 uuid
QoS级别:默认订阅 QoS 级别
证书路径:证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 server 命令的路径
私钥路径:私钥路径。可以为绝对路径,也可以为相对路径
跟证书路径:根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径
跳过证书验证:控制是否跳过证书认证。如果被设置为 true,那么跳过证书认证;否则进行证书验证
Kubeedge版本号:Kubeedge 版本号,不同的版本号对应的文件内容不同
KubeEdge模型文件:KubeEdge 模版文件名,文件指定放在 etc/sources 文件夹中
创建规则
如下图,创建一条名为 rule_001 的规则(这里简单使用了一个时间窗口函数做了下数据过滤处理,更多函数使用详情可参考:https://ekuiper.org/docs/zh/latest/),将30s为一个时间窗口发送过来的数据中 temperature > 30 的数据过滤出来。SQL 编辑器在用户写 SQL 的过程中可以给出提示,方便用户完成 SQL 的编写。
单击「添加」按钮,弹出对话框如下所示。
是否忽略输出:如果选择结果为空,则忽略输出。
将结果数据按条发送:输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为 false,则输出消息将为{"result":"${the string of received message}"}。 例如,{"result":"[{"count":30},""count":20}]"}。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 {"count":30},然后发送{"count":20} 到 RESTful 端点。默认为 false。
流格式:默认是json格式
数据模版:Golang 模板格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是 map 数组。 如果未指定数据模板,则将数据作为原始输入。
创建规则后,如果一切正常,那么规则处于运行状态。
验证
通过nanomq_cli发布3条数据模拟传感器数据发送到 MQTT 服务器 tcp://broker.emqx.io:1883
的主题 nanomq/up
中
docker exec -it nanomq bash nanomq_cli pub --url 127.0.0.1:1883 -t nanomq/up -m '{"id":1,temperature": 30, "humidity" : 20}'
订阅
通过MQTTX客户端工具连接EMQX broker去订阅ekuiper上传消息topicekuiper/up
,可以看到根据规则筛选得到30s内temperature大于30的数据消息,如图: