eKuiper
产业背景
随着 5G 和物联⽹的兴起和⼴泛应⽤,⼤量的数据以连续数据流的形式持续产⽣。为了实时地处理连续数据流,从海量数据中获取有价值的分析,流式处理系统应运⽽⽣。现在流⾏的流式数据处理框架,⽐如Apache Flink, Apache Spark Streaming 等,⼀般运⾏在资源不受限的云计算中⼼, 有以下四个缺点使其⽆法在边缘运⾏。⼀是消耗资源⼤, ⽆法在边缘资源受限设备上运⾏。⼆是边缘待处理数据需要通过⽹络传⼊流式计算框架,然后将分析结果传回,⽆法适⽤于低时延场景。三是全量数据必须上传云端,有安全⻛险。四是边缘端数据量巨⼤,⽆效数据多,带宽压⼒较⼤。随着⽤户对系统低时延、数据⾼安全和数据低成本传输的要求,越来越多的数据需要在接近⽤户或者设备的边缘节点进⾏处理,因此边缘端的流式数据处理的需求应运⽽⽣。
产品概述
eKuiper 是 Golang 实现的轻量级物联⽹边缘分析、流式处理开源软件,可以运⾏在各类资源受限的边缘设备上。eKuiper 设计的⼀个主要⽬标就是将在云端运⾏的实时流式计算框架(⽐如Apache Spark,Apache Storm 和 Apache Flink 等)迁移到边缘端。eKuiper 参考了上述云端流式处理项⽬的架构与实现,结合边缘流式数据处理的特点,采⽤了编写基于源 (Source),SQL (业务逻辑处理), ⽬标 (Sink) 的规则引擎来实现边缘端的流式数据处理。
eKuiper 是开源基⾦会 LF Edge 下的第⼀阶段项⽬ (Stage 1, at large)。作为边缘计算引擎,eKuiper 可以部署在靠近数据源的边缘端,实现数据就近快速分析,从⽽降低延迟并且提⾼吞吐量。
产品特性
超轻量
○ 核⼼服务安装包约 4.5MB(压缩后),初始运⾏时占⽤内存约 10MB
跨平台
○ 流⾏ CPU 架构:X86 AMD * 32, X86 AMD * 64; ARM * 32, ARM * 64 位; PPC
○ 常⻅ Linux 发⾏版、OpenWrt 嵌⼊式系统、MacOS、Docker
○ ⼯控机、树莓派、⼯业⽹关、家庭⽹关、MEC 边缘云等
轻代码
○ 使⽤ SQL 实现数据处理
○ 完整的数据处理能⼒
开放
○ 与多个开源社区边缘项⽬集成,是 EdgeX Foundry 的参考规则引擎实现
○ ⾼可扩展性
产品功能-流处理功能
消息接⼊能⼒
内置⽀持接⼊多种消息协议和物联⽹平台,包括 mqtt, edgeX Foundry, http, ,file 以及 memory。通过官⽅插件⽀持 zeroMq 和 random。
⽀持多种⽅式⾃定义消息接⼊从⽽可扩展接⼊的消息协议类型以及私有的协议。包括 go 原⽣插件扩展和 portable 插件扩展 (go 语⾔或者 python 语⾔)。
⽀持的消息格式包括 JSON ⽂本和⼆进制串。
⽀持格式化消息预处理。通过定义溜达格式,⽀持接⼊消息的格式验证,⾃动类型转换和格式化,修剪未使⽤的列等。
⽀持⾮格式化或格式不统⼀的消息流的接⼊。
数据处理能⼒
数据过滤功能。通过 WHERE 语句或者 HAVING 语句等,可对数据进⾏过滤操作。过滤条件可为布尔运算,字符串⽐较,jsonPath 条件等任意表达式。
复杂数据访问功能。⽀持多种运算符,访问数组或者 json 数据,包括嵌套结构的数据。同时,⽀持通⽤的 jsonPath 语法,⼤⼤提升数据访问能⼒。
数据抽取功能。通过 SELECT 语句,选择需要的列。或者通过 CASE WHEN 语句,依照条件,选取不同的值。
数据分组聚合能⼒。通过 GROUP By 语句,对数据进⾏分组,并计算每个分组的聚合结果,如平均值,最⼤值等。
数据排序能⼒。⽀持 Order By 语句进⾏排序。
数据变换能⼒。⽀持多种内置函数和逻辑运算符,从⽽实现多种数据计算能⼒和算法,包括:
○ 数学: sin, cos, abs, log, mod etc; 共计 25 个函数
○ 字符串: concat, substring etc; 共计 19 个函数
○ 聚合: avg, count, max, min, sum, collect & deduplicate; 共计 7 个函数
○ Conversion/ Encoding & decoding / Hashing / JSON processing / 其它; 共计 18 个函数
○ GeoHash 函数,共计 10 个函数
⼆进制处理能⼒,可对图像进⾏修改⼤⼩,⽣成缩略图等操作。
⽀持多种⽅法扩展函数,实现更多的数据处理功能和算法。包括 go 原⽣插件,portable 插件(go和 python 语⾔)以及外部服务函数。扩展的函数可以直接在 SQL 种调⽤。
AI 框架整合能⼒。⽀持与 tensorFlow,tensorFlowLite,Pytorch 等 AI 框架进⾏整合,对流数据做AI 推理。
流式分析能⼒
⽀持⽆界的流式数据的实时处理。
⽀持⽆状态的连续查询。
⽀持窗⼝式查询,⽀持的时间窗⼝类型包括 tumbling,hopping,sliding 和 session。同时,⽀持计数窗⼝。
⽀持多种时间模式,包括事件时间(以输⼊事件⾃身所带的时间戳为时间基准)和处理时间。事件时
间基于⽔位线算法实现。
⽀持 Table,可视为流的状态缓存快照。流可与表进⾏ Join,从⽽实现数据补全,动态过滤条件等功能。
多流结合或流表结合能⼒。可使⽤ Join 在⼀条规则⾥处理来⾃多个输⼊的数据。
容错能⼒
⽀持状态⾃动存储和恢复。⽀持⾃定义扩展创建和使⽤状态。
基于渐进式的快照,实现消息处理的 qos 保证。包括 exactly once, at least once 和默认的 at most once.
事件触发,告警和联动能⼒
⽀持规则触发多个动作。
消息推送能⼒,内置⽀持推送结果到多种消息协议和物联⽹平台,包括 mqtt, edgeX Foundry, http以及 memory。通过官⽅插件⽀持 zeroMq 和 random。通过 http 可触发告警服务。
消息持久化能⼒,通过内置或官⽅插件,⽀持消息写⼊ log,file,redis,influxDB 和 tdengine。
⽀持消息推送失败探测和重发。
⽀持基于 go 模板的格式化,可实现复杂的输出格式化语法。
扩展能⼒
⽀持数据接⼊,输出和⾃定义函数的扩展。扩展后的函数可直接在 SQL 中使⽤。
⽀持通过 GO 插件系统编写插件。
⽀持通过⾃研的 Portable 插件系统编写插件。可使⽤ GO 语⾔或者 Python 语⾔编写。以后还会逐步添加其他语⾔的⽀持。
⽀持通过配置的⽅式创建⾃定义函数,调⽤已有的 http,gRpc 和 msgpack rpc 服务,从⽽可以快速整合已有的微服务。
规则整合能⼒
⽀持多规则并⾏执⾏。
⽀持组合多个规则,创建规则流⽔线的能⼒。可实现更复杂的业务逻辑。
与 EMQ X Edge 集成提供了与 EMQ X Neuron 和 EMQ X Edge 的⽆缝集成,实现在边缘端从消息
接⼊到数据分析端到端的场景实现能⼒。
使⽤和部署
流式 SQL 解析
⽀持编写类 SQL 语法来描述业务逻辑,进⾏实时流式数据处理,例如事件清洗,转换和存储。
⽀持 SQL 语句验证。
⽀持⾃动 SQL 优化,例如谓词下推,列修剪等。
系统管理能⼒
⽀持引擎数据,例如流,规则等的持久化保存,以保证系统重启后原有规则可⾃动启动运⾏。可选保存⽅式包括 sqlite 和 redis。
⽀持规则的启动,停⽌,重启等操作。
⽀持获取规则的运⾏状态和指标。
通过 REST API 进⾏流,规则,插件等流计算引擎功能的管理。
通过命令⾏的⽅式进⾏流,规则,插件等流计算引擎功能的管理。
通过 Web 控制台进⾏流,规则,插件等流计算引擎功能的管理。
提供 k8s ⼯具,可使⽤ config map 进⾏流,规则,插件等流计算引擎功能的管理。
系统配置
⽀持通过 yaml ⽂件配置系统属性
⽀持通过环境变量配置系统属性,从⽽⽀持在容器编排中灵活配置。
⽀持 log 配置。log ⽂件⽀持⾃动更新⽂件。
⽀持规则指标写⼊ prometheus,实现系统监控。
⽀持配置鉴权。
⾃动化测试能⼒
项⽬的 Github 源码包含完整单元测试,并且新代码提交或发布版本前会⾃动运⾏所有单元测试。
项⽬提供基于 jmeter 的⾃动化端到端测试框架。 Github 源码包含丰富的端到端测试,并且新代码提交或发布版本前会⾃动运⾏所有单元测试。
部署能⼒
⽀持超轻量部署,核⼼版本安装包初始内存为 10MB 左右。
提供多个平台的预编译安装包,操作系统包括 linux/mac/windows;CPU 架构包括,x86, x64, arm, arm64 等;可运⾏在边缘⽹关,树莓派等。
提供适⽤于多种环境的 docker 镜像。
提供 helm chart
⽀持编译参数,⾃定义选择功能,从⽽可编译轻量化的安装包。
提供 k8s ⼯具,可使⽤ config map 进⾏流,规则,插件等流计算引擎功能的管理。
与 KubeEdge、K3s、Baetyl、OpenYurt 等基于边缘 Kubernetes 框架的集成能⼒。
应⽤场景
eKuiper 适合部署于边缘⽹关或者边缘设备上,在靠近数据源头的⼀侧进⾏低延迟的流式数据分析与计算。
eKuiper 可作为数据管道,实时聚合分析,实时数据补全,异常警报,实时图像处理,实时 AI 计算等领域。典型应⽤场景包括:
⼯业物联⽹
对⽣产线数据进⾏实时处理
⻋联⽹
对来⾃汽⻋总线数据的即时分析
智慧城市
对来⾃于各类城市设施数据的实时分析