EMQX
# EMQX简介
MQTT属于是物联网的通信协议,在MQTT协议中有两大角色:客户端(发布者/订阅者),服务端(Mqtt broker);针对客户端和服务端需要有遵循该协议的的具体实现,EMQ/EMQ X就是MQTT Broker的一种实现。
EMQ官网:https://www.emqx.io/zh
# EMQ X是什么
EMQ X 基于 Erlang/OTP 平台开发的 MQTT 消息服务器,是开源社区中最流行的 MQTT 消息服务器
EMQ X 是开源百万级分布式 MQTT 消息服务器(MQTT Messaging Broker),用于支持各种接入标准 MQTT 协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联 网设备的数据采集,和对设备的操作和控制
# 为什么选择EMQ X
到目前为止,比较流行的 MQTT Broker 有几个:
Eclipse Mosquitto: https://github.com/eclipse/mosquitto 使用 C 语言实现的 MQTT Broker。Eclipse 组织还还包含了大量的 MQTT 客户端项目: https://www.eclipse.org/paho/#
EMQX: https://github.com/emqx/emqx
使用 Erlang 语言开发的 MQTT Broker,支持许多其他 IoT 协议比如 CoAP、LwM2M 等
Mosca: https://github.com/mcollina/mosca
使用 Node.JS 开发的 MQTT Broker,简单易用
VerneMQ: https://github.com/vernemq/vernemq
同样使用 Erlang 开发的 MQTT Broker
从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。 与别的MQTT服务器相比EMQ X 主要有以下的特点:
- 经过100+版本的迭代,EMQ X 目前为开源社区中最流行的 MQTT 消息中间件,在各种客户严格的生产环 境上经受了严苛的考验
- 优化的架构设计,支持超大规模的设备连接。企业版单机能支持百万的 MQTT 连接;集群能支持千万级 别的 MQTT 连接
- 易于安装和使用
- 灵活的扩展性,支持企业的一些定制场景
- 中国本地的技术支持服务,通过微信、QQ等线上渠道快速响应客户需求
- 基于 Apache 2.0 协议许可,完全开源。EMQ X 的代码都放在 Github 中,用户可以查看所有源代码
- EMQ X 3.0 支持 MQTT 5.0 协议,是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。除了 MQTT 协议之外,EMQ X 还支持别的一些物联网协议
- 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。EMQ X 中应用了多种技术以实现上述功 能,
- 利用 Erlang/OTP 平台的软实时、高并发和容错(电信领域久经考验的语言)
- 全异步架构
- 连接、会话、路由、集群的分层设计
- 消息平面和控制平面的分离等
- 扩展模块和插件,EMQ X 提供了灵活的扩展机制,可以实现私有协议、认证鉴权、数据持久化、桥接转 发和管理控制台等的扩展
- 桥接:EMQ X 可以跟别的消息系统进行对接,比如 EMQ X Enterprise 版本中可以支持将消息转发到 Kafka、RabbitMQ 或者别的 EMQ 节点等
- 共享订阅:共享订阅支持通过负载均衡的方式在多个订阅者之间来分发 MQTT 消息。比如针对物联网等 数据采集场景,会有比较多的设备在发送数据,通过共享订阅的方式可以在订阅端设置多个订阅者来实现这 几个订阅者之间的工作负载均衡
# EMQ X 与物联网平台的关系是什么
典型的物联网平台包括设备硬件、数据采集、数据存储、分析、Web / 移动应用等。EMQ X 位于数据采集这 一层,分别与硬件和数据存储、分析进行交互,是物联网平台的核心:前端的硬件通过 MQTT 协议与位于数据采 集层的 EMQ X 交互,通过 EMQ X 将数据采集后,通过 EMQ X 提供的数据接口,将数据保存到后台的持久化平台 中(各种关系型数据库和 NOSQL 数据库),或者流式数据处理框架等,上层应用通过这些数据分析后得到的结果 呈现给最终用户
# EMQ X 消息服务器功能列表
- EMQ X 消息服务器功能列表
- QoS0, QoS1, QoS2 消息支持
- 持久会话与离线消息支持
- Retained 消息支持
- Last Will 消息支持
- TCP/SSL 连接支持
- MQTT/WebSocket/SSL 支持
- HTTP 消息发布接口支持
- $SYS/# 系统主题支持
- 客户端在线状态查询与订阅支持
- 客户端 ID 或 IP 地址认证支持
- 用户名密码认证支持
- LDAP 认证 Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成
- 浏览器 Cookie 认证
- 基于客户端 ID、IP 地址、用户名的访问控制 (ACL)
- 多服务器节点集群 (Cluster) 支持
- manual、mcast、dns、etcd、k8s 等多种集群发现方式
- 网络分区自动愈合
- 消息速率限制
- 连接速率限制
- 按分区配置节点
- 多服务器节点桥接 (Bridge)
- MQTT Broker 桥接支持
- Stomp 协议支持
- MQTT-SN 协议支持
- CoAP 协议支持
- Stomp/SockJS 支持
- 延时 Publish ($delay/topic)
- Flapping 检测
- 黑名单支持
- 共享订阅 ($share/:group/topic)
- TLS/PSK 支持
- 规则引擎
- 空动作 (调试) 消息重新发
- 布 桥接数据到
- MQTT Broker
- 检查 (调试)
- 发送数据到 Web 服务
# EMQ X服务端环境搭建与配置
# 二进制包安装
从该地址下载最新版本:https://www.emqx.io/cn/downloads#broker
执行如下命令执行安装
rpm -ivh emqx-centos7-v4.0.5.x86_64.rpm
1安装完成后直接使用如下命令启动emqx
emqx start
1查看emqx broker的启动状态
emqx_ctl status
1web端访问
http://192.168.200.129:18083
1默认用户名:admin,默认密码:public
停止emqx broker请使用如下命令
emqx stop
1卸载 EMQ X Broker
rpm -e emqx
1
# Docker安装
首先拉取emqx的镜像
docker pull emqx/emqx:v4.0.5
1使用docker命令运行得到docker容器
docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.5
1
2访问Dashboard 查看启动效果!
# 基本命令
如果采用的是非docker部署的,那么EMQ X提供了一些常用的命令行工具,方便用户对EMQ X进行启动、关 闭、进入控制台等操作。
- emq start 启动 EMQ X Broker
- emqx stop 关闭 EMQ X Broker
- emqx restart 重启 EMQ X Broker
- emqx console 使用控制台启动 EMQ X Broker
- emqx foreground 使用控制台启动 EMQ X Broker,与 emqx console 不同, emqx foreground 不支持输入 Erlang 命令
- emqx ping Ping EMQ X Broke
# 目录结构
不同安装方式得到的 EMQ X 其目录结构会有所不同,具体如下:
以上目录中,用户经常接触与使用的是 bin 、 etc 、 data 、 log 目录。
bin 目录
emqx、emqx.cmd:EMQ X 的可执行文件
emqx_ctl、emqx_ctl.cmd:EMQ X 管理命令的可执行文件
etc 目录 EMQ X 通过 etc 目录下配置文件进行设置,主要配置文件包括:
data 目录
EMQ X 将运行数据存储在 data 目录下,主要的文件包括:
configs/app.*.config
EMQ X 读取 etc/emqx.conf 和 etc/plugins/*.conf 中的配置后,转换为 Erlang 原生配置文件格式,并在 运行时读取其中的配置。
loaded_plugins
loaded_plugins 文件记录了 EMQ X 默认启动的插件列表,可以修改此文件以增删默认启动的插件。 loaded_plugins 中启动项格式为 {, }. , 字段为布尔类型,EMQ X 会在启 动时根据 的值判断是否需要启动该插件。
$ cat loaded_plugins
{emqx_management,true}.
{emqx_recon,true}.
{emqx_retainer,true}.
{emqx_dashboard,true}.
{emqx_rule_engine,true}.
{emqx_bridge_mqtt,false}.
2
3
4
5
6
7
mnesia
Mnesia 数据库是 Erlang 内置的一个分布式 DBMS,可以直接存储 Erlang 的各种数据结构
EMQ X 使用 Mnesia 数据库存储自身运行数据,例如告警记录、规则引擎已创建的资源和规则、Dashbaord 用户信息
可以通过 emqx_ctl mnesia 命令查询 EMQ X 中 Mnesia 数据库的系统信息。
log 目录
emqx.log.*:EMQ X 运行时产生的日志文件 *
*crash.dump:EMQ X 的崩溃转储文件,可以通过 etc/emqx.conf 修改配置。 *
erlang.log.:以 emqx start 方式后台启动 EMQ X 时,控制台日志的副本文件。
# 配置说明
EMQ X 的配置文件通常以 .conf 作为后缀名,你可以在 etc 目录找到这些配置文件,主要配置文件包括:
需要注意的是,安装方式不同 etc 目录所处的路径可能不同,
语法规则
- 采用类似 sysctl 的 k = v 通用格式
- 单个配置项的所有信息都在同一行内,换行意味着创建一个新的配置项
- 键可以通过 . 进行分层,支持按树形结构管理配置项
- 值的类型可以是 integer , fload , percent , enum , ip , string , atom , flag , duration and bytesize
- 任何以#开头的行均被视为注释
# 客户端websocket消息收发
在EMQ X Broker提供的 Dashboard 中 TOOLS 导航下的 Websocket 页面提供了一个简易但有效的 WebSocket 客户端工具,它包含了连接、订阅和发布功能,同时还能查看自己发送和接收的报文数据,我们期望 它可以帮助您快速地完成某些场景或功能的测试验证:
MQTT是为了物联网场景设计的基于TCP的Pub/Sub协议,有许多为物联网优化的特性,比如适应 不同网络的QoS、层级主题、遗言等等。
WebSocket是为了HTML5应用方便与服务器双向通讯而设计的协议,HTTP握手然后转TCP协议, 用于取代之前web服务器推送数据的Server Push、Comet、长轮询等老旧实现。
两者之所有有交集,是因为一个应用场景:如何通过HTML5应用来作为MQTT的客户端,以便接受设备 消息或者向设备发送信息
# 认证
# 认证简介
身份认证是大多数应用的重要组成部分,MQTT 协议支持用户名密码认证,启用身份认证能有效阻止非法客户 端的连接。
EMQ X 中的认证指的是当一个客户端连接到 EMQ X 的时候,通过服务器端的配置来控制客户端连接服务器的 权限
EMQ X 的认证支持包括两个层面:
- MQTT 协议本身在 CONNECT 报文中指定用户名和密码,EMQ X 以插件形式支持基于 Username、 ClientID、HTTP、JWT、LDAP 及各类数据库如 MongoDB、MySQL、PostgreSQL、Redis 等多种形式的认 证。
- 在传输层上,TLS 可以保证使用客户端证书的客户端到服务器的身份验证,并确保服务器向客户端验证服 务器证书。也支持基于 PSK 的 TLS/DTLS 认证。
# 认证方式
EMQ X 支持使用内置数据源(文件、内置数据库)、JWT、外部主流数据库和自定义 HTTP API 作为身份认证 数据源。
连接数据源、进行认证逻辑通过插件实现的,每个插件对应一种认证方式,使用前需要启用相应的插件。
客户端连接时插件通过检查其 username/clientid 和 password 是否与指定数据源的信息一致来实现对客户端 的身份认证。
EMQ X 支持的认证方式:
内置数据源
- Username 认证
- Cliend ID 认证
使用配置文件与 EMQ X 内置数据库提供认证数据源,通过 HTTP API 进行管理,足够简单轻量。
外部数据库
- LDAP 认证
- MySQL 认证
- PostgreSQL 认证
- Redis 认证
- MongoDB 认证
外部数据库可以存储大量数据,同时方便与外部设备管理系统集成。
其他
- HTTP 认证
- JWT 认证
JWT 认证可以批量签发认证信息,HTTP 认证能够实现复杂的认证鉴权逻辑。
更改插件配置后需要重启插件才能生效,部分认证鉴权插件包含 ACL 功能
认证结果
任何一种认证方式最终都会返回一个结果:
- 认证成功:经过比对客户端认证成功
- 认证失败:经过比对客户端认证失败,数据源中密码与当前密码不一致
- 忽略认证(ignore):当前认证方式中未查找到认证数据,无法显式判断结果是成功还是失败,交由认 证链下一认证方式或匿名认证来判断
匿名认证
EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQ X。没有启用认证插件或认证插件没有显式允 许/拒绝(ignore)连接请求时,EMQ X 将根据匿名认证启用情况决定是否允许客户端连接。
配置匿名认证开关:
# /etc/emqx/emqx.conf
## Value: true | false
allow_anonymous = true
2
3
生产环境中请禁用匿名认证。
密码加盐规则与哈希方法
EMQ X 多数认证插件中可以启用哈希方法,数据源中仅保存密码密文,保证数据安全。
启用哈希方法时,用户可以为每个客户端都指定一个 salt(盐)并配置加盐规则,数据库中存储的密码是按照 加盐规则与哈希方法处理后的密文。
以 MySQL 认证为例:
# etc/plugins/emqx_auth_mysql.conf
## 不加盐,仅做哈希处理
auth.mysql.password_hash = sha256
## salt 前缀:使用 sha256 加密 salt + 密码 拼接的字符串
auth.mysql.password_hash = salt,sha256
## salt 后缀:使用 sha256 加密 密码 + salt 拼接的字符串
auth.mysql.password_hash = sha256,salt
## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20
2
3
4
5
6
7
8
9
10
如何生成认证信息
- 为每个客户端分用户名、Client ID、密码以及 salt(盐)等信息
- 使用与 MySQL 认证相同加盐规则与哈希方法处理客户端信息得到密文
- 将客户端信息写入数据库,客户端的密码应当为密文信息
# EMQ X 身份认证流程
- 根据配置的认证 SQL 结合客户端传入的信息,查询出密码(密文)和 salt(盐)等认证数据,没有查询 结果时,认证将终止并返回 ignore 结果
- 根据配置的加盐规则与哈希方法计算得到密文,没有启用哈希方法则跳过此步
- 将数据库中存储的密文与当前客户端计算的到的密文进行比对,比对成功则认证通过,否则认证失败
PostgreSQL 认证功能逻辑图:
写入数据的加盐规则、哈希方法与对应插件的配置一致时认证才能正常进行。更改哈希方法会造成现有认证数 据失效。
认证链
当同时启用多个认证方式时,EMQ X 将按照插件开启先后顺序进行链式认证:
一旦认证成功,终止认证链并允许客户端接入
一旦认证失败,终止认证链并禁止客户端接入
直到最后一个认证方式仍未通过,根据
匿名认证
配置判定
- 匿名认证开启时,允许客户端接入
- 匿名认证关闭时,禁止客户端接入
同时只启用一个认证插件可以提高客户端身份认证效率。
# Username 认证
4.3版本中emqx_auth_clientid 与 emqx_auth_usernmae 合并为 emqx_auth_mnesia
Username 认证使用配置文件预设客户端用户名与密码,支持通过 HTTP API 管理认证数据。
Username 认证不依赖外部数据源,使用上足够简单轻量。使用这种认证方式前需要开启插件,我们可以在 Dashboard里找到这个插件并开启。
哈希方法
Username 认证默认使用 sha256 进行密码哈希加密,可在 /etc/emqx/plugins/emqx_auth_username.conf 中更 改:
# etc/plugins/emqx_auth_username.conf
## Value: plain | md5 | sha | sha256
auth.user.password_hash = sha256
2
3
配置哈希方法后,新增的预设认证数据与通过 HTTP API 添加的认证数据将以哈希密文存储在 EMQ X 内置数 据库中。
# 预设认证数据
可以通过配置文件预设认证数据,编辑配置文件:/etc/emqx/plugins/emqx_auth_mnesia.conf
## Password hash.
##
## Value: plain | md5 | sha | sha256 | sha512
auth.mnesia.password_hash = sha256
##--------------------------------------------------------------------
## ClientId Authentication
##--------------------------------------------------------------------
## Examples
##auth.client.1.clientid = id
##auth.client.1.password = passwd
##auth.client.2.clientid = dev:devid
##auth.client.2.password = passwd2
##auth.client.3.clientid = app:appid
##auth.client.3.password = passwd3
##auth.client.4.clientid = client~!@#$%^&*()_+
##auth.client.4.password = passwd~!@#$%^&*()_+
##--------------------------------------------------------------------
## Username Authentication
##--------------------------------------------------------------------
## Examples:
##auth.user.1.username = admin
##auth.user.1.password = public
##auth.user.2.username = feng@emqtt.io
##auth.user.2.password = public
##auth.user.3.username = name~!@#$%^&*()_+
##auth.user.3.password = pwsswd~!@#$%^&*()_+
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
插件启动时将读取预设认证数据并加载到 EMQ X 内置数据库中,节点上的认证数据会在此阶段同步至集群 中。
预设认证数据在配置文件中使用了明文密码,出于安全性与可维护性考虑应当避免使用该功能
# HTTP API 管理认证数据
EMQ X提供了对应的HTTP API用以维护内置数据源中的认证信息,我们可以添加/查看/取消/更改认证数据
我们通过VSCode来访问EMQ X的API /auth_username 完成认证数据的相关操作
Basic 后面的数据需要Base64 加密
查看已有认证用户数据:
GET api/v4/auth_username
@hostname = 192.168.200.129 @port=18083 @contentType=application/json @userName=admin @password=public #############查看已有用户认证数据############## GET http://{{hostname}}:{{port}}/api/v4/auth_username HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // Basic 后面的数据需要Base64 加密
1
2
3
4
5
6
7
8
9添加认证数据API 定义:
POST api/v4/auth_username{ "username": "emqx_u", "password": "emqx_p"}
########添加用户认证数据############## POST http://{{hostname}}:{{port}}/api/v4/auth_username HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // Basic 后面的数据需要Base64 加密 { "username": "user", "password": "123456" }
1
2
3
4
5
6
7
8更改指定用户名的密码API 定义:
PUT api/v4/auth_username/${username}{ "password": "emqx_new_p"}
指定用户名,传递新密码进行更改,再次连接时需要使用新密码进行连接:
###########更改指定用户名的密码############# PUT http://{{hostname}}:{{port}}/api/v4/auth_username/user HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // Basic 后面的数据需要Base64 加密 { "password": "user" }
1
2
3
4
5
6
7查看指定用户名信息API 定义:
GET api/v4/auth_username/${username}
###########查看指定用户名信息############# GET http://{{hostname}}:{{port}}/api/v4/auth_username/user HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}}
1
2
3
4删除认证数据API 定义:
DELETE api/v4/auth_username/${username}
用以删除指定认证数据
###########删除指定的用户信息############# DELETE http://{{hostname}}:{{port}}/api/v4/auth_username/user HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // Basic 后面的数据需要Base64 加密
1
2
3
4
# MQTTX客户端验证
使用mqtt客户端工具验证使用username连接登录的功能。从 https://github.com/emqx/MQTTX 这个地址下载 对应操作系统的mqtt客户端工具。
# Client ID 认证
Client ID 认证使用配置文件预设客户端Client ID 与密码,支持通过 HTTP API 管理认证数据。 Client ID 认证不依赖外部数据源,使用上足够简单轻量,使用该种认证方式时需要开启 emqx_auth_clientid插件,直接在DashBoard中开启即可。
哈希方法 Client ID 认证默认使用 sha256 进行密码哈希加密,可在 etc/plugins/emqx_auth_mnesia.conf 中更改:
# etc/plugins/emqx_auth_clientid.conf
## Value: plain | md5 | sha | sha256
auth.client.password_hash = sha256
2
3
配置哈希方法后,新增的预设认证数据与通过 HTTP API 添加的认证数据将以哈希密文存储在 EMQ X 内置数据库中。
# 预设认证数据
可以通过配置文件预设认证数据,编辑配置文件: etc/plugins/emqx_auth_mnesia.conf
## Password hash.
##
## Value: plain | md5 | sha | sha256 | sha512
auth.mnesia.password_hash = sha256
##--------------------------------------------------------------------
## ClientId Authentication
##--------------------------------------------------------------------
## Examples
##auth.client.1.clientid = id
##auth.client.1.password = passwd
##auth.client.2.clientid = dev:devid
##auth.client.2.password = passwd2
##auth.client.3.clientid = app:appid
##auth.client.3.password = passwd3
##auth.client.4.clientid = client~!@#$%^&*()_+
##auth.client.4.password = passwd~!@#$%^&*()_+
##--------------------------------------------------------------------
## Username Authentication
##--------------------------------------------------------------------
## Examples:
##auth.user.1.username = admin
##auth.user.1.password = public
##auth.user.2.username = feng@emqtt.io
##auth.user.2.password = public
##auth.user.3.username = name~!@#$%^&*()_+
##auth.user.3.password = pwsswd~!@#$%^&*()_+
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
插件启动时将读取预设认证数据并加载到 EMQ X 内置数据库中,节点上的认证数据会在此阶段同步至集群中。
预设认证数据在配置文件中使用了明文密码,出于安全性与可维护性考虑应当避免使用该功能
# HTTP API 管理认证数据
我们使用VSCode来通过EMQ X的API来添加和查看Client ID的认证数据。
添加认证数据API 定义:
POST api/v4/auth_clientid{ "clientid": "emqx_c", "password": "emqx_p"}
####添加clientId和密码##### POST http://{{hostname}}:{{port}}/api/v4/auth_clientid HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // {{userName}}:{{password}}需要Base64加密 { "clientid": "emq-client1", "password": "123456" }
1
2
3
4
5
6
7
8查看已经添加的认证数据API 定义:
GET api/v4/auth_clientid
#############获取所有详细信息######## GET http://{{hostname}}:{{port}}/api/v4/auth_clientid HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // {{userName}}:{{password}}需要Base64加密
1
2
3
4更改指定 Client ID 的密码API 定义:
PUT api/v4/auth_clientid/${clientid}{ "password":"emqx_new_p"}
#############更改指定 Client ID 的密码######## PUT http://{{hostname}}:{{port}}/api/v4/auth_clientid/emq-client1 HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}} // {{userName}}:{{password}}需要Base64加密 { "password": "654321" }
1
2
3
4
5
6
7查看指定 Client ID 信息API 定义:
GET api/v4/auth_clientid/${clientid}
#############获取指定ClientId详细信息######## GET http://{{hostname}}:{{port}}/api/v4/auth_clientid/emq-client1 HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}}
1
2
3
4删除认证数据API 定义:
DELETE api/v4/auth_clientid/${clientid}
#############删除指定的client信息######## DELETE http://{{hostname}}:{{port}}/api/v4/auth_clientid/emq-client1 HTTP/1.1 Content-Type: {{contentType}} Authorization: Basic {{userName}}:{{password}}
1
2
3
4
# MQTTX客户端验证
此时用户名字段需要输入一个,但是可以随便填写
# HTTP认证
HTTP 认证使用外部自建 HTTP 应用认证数据源,根据 HTTP API 返回的数据判定认证结果,能够实现复杂的认证鉴权逻辑。启用该功能需要将 emqx_auth_http 插件启用,并且修改该插件的配置文件,在里面指定HTTP认证接口的url。 emqx_auth_http 插件同时还包含了ACL的功能,我们暂时还用不上,通过注释将其禁用
在Dashboard中中开启 emqx_auth_http 插件,同时为了避免误判我们可以停止通过username,clientID进行认证的插件 emqx_auth_clientid , emqx_auth_username
# 认证原理
EMQ X 在设备连接事件中使用当前客户端相关信息作为参数,向用户自定义的认证服务发起请求查询权限,通过返回的 HTTP 响应状态码 (HTTP statusCode) 来处理认证请求
- 认证失败:API 返回 4xx 状态码
- 认证成功:API 返回 200 状态码
- 忽略认证:API 返回 200 状态码且消息体 ignore
# HTTP 请求信息
HTTP API 基础请求信息,配置证书、请求头与重试规则
# etc/plugins/emqx_auth_http.conf
## 启用 HTTPS 所需证书信息
## auth.http.ssl.cacertfile = etc/certs/ca.pem
## auth.http.ssl.certfile = etc/certs/client-cert.pem
## auth.http.ssl.keyfile = etc/certs/client-key.pem
## 请求头设置
## auth.http.header.Accept = */*
## 重试设置
auth.http.request.retry_times = 3
auth.http.request.retry_interval = 1s
auth.http.request.retry_backoff = 2.0
2
3
4
5
6
7
8
9
10
11
加盐规则与哈希方法
HTTP 在请求中传递明文密码,加盐规则与哈希方法取决于 HTTP 应用
# 认证请求
进行身份认证时,EMQ X 将使用当前客户端信息填充并发起用户配置的认证查询请求,查询出该客户端在HTTP 服务器端的认证数据
打开etc/plugins/emqx_auth_http.conf配置文件,通过修改如下内容:修改完成后需要重启EMQX服务
# etc/plugins/emqx_auth_http.conf
## 请求地址
auth.http.auth_req = http://150.158.78.149:8991/mqtt/auth
## HTTP 请求方法
## Value: post | get | put
auth.http.auth_req.method = post
## 请求参数
auth.http.auth_req.params = clientid=%c,username=%u,password=%P
2
3
4
5
6
7
8
HTTP 请求方法为 GET 时,请求参数将以 URL 查询字符串的形式传递;POST、PUT 请求则将请求参数以普通表单形式提交(content-type 为 x-www-form-urlencoded)
你可以在认证请求中使用以下占位符,请求时 EMQ X 将自动填充为客户端信息:
- %u:用户名
- %c:Client ID
- %a:客户端 IP 地址
- %r:客户端接入协议
- %P:明文密码
- %p:客户端端口
- %C:TLS 证书公用名(证书的域名或子域名),仅当 TLS 连接时有效
- %d:TLS 证书 subject,仅当 TLS 连接时有效
推荐使用 POST 与 PUT 方法,使用 GET 方法时明文密码可能会随 URL 被记录到传输过程中的服务器日志中
# 创建基于springboot的应用程序: emq-demo
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36创建application.yml配置文件并配置
server: port: 8991 spring: application: name: emq-demo
1
2
3
4
5创建Controller
@RestController @RequestMapping("/mqtt") public class HttpAuthController { private Map<String,String> users; @PostConstruct public void init(){ users = new HashMap<>(); users.put("user","123456");//实际的密码应该是密文,mqtt的http认证组件传输过来的密码是明,我们需要自己进行加密验证 users.put("emq-client2","123456"); users.put("emq-client3","123456"); } @PostMapping("/auth") public ResponseEntity<?> auth(@RequestParam("clientid") String clientid, @RequestParam("username") String username, @RequestParam("password") String password){ System.out.println("clientid:" + clientid); System.out.println("username:" + username); System.out.println("password:" + password); String value = users.get(username); if(StringUtils.isEmpty(value)){ return new ResponseEntity<Object>(HttpStatus.UNAUTHORIZED); } if(!value.equals(password)){ return new ResponseEntity<Object>(HttpStatus.UNAUTHORIZED); } return new ResponseEntity<Object>(HttpStatus.OK); } }pA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 客户端SDK
在实际项目中我们要针对接MQTT消息代理服务端,从而向其发布消息、订阅消息等来完成我们自己的业务逻辑的开发。EMQ X针对不同的客户端语言都提供了不同的SDK工具包,可以在官网上查看并下下载:
https://www.emqx.io/cn/products/broker
# Eclipse Paho Java
# Paho介绍
Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。
Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景
Paho Java客户端提供了两个API:
- MqttAsyncClient提供了一个完全异步的API,其中活动的完成是通过注册的回调通知的。
- MqttClient是MqttAsyncClient周围的同步包装器,在这里,功能似乎与应用程序同步。
# Paho实现消息收发
创建工程
POM依赖
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46详细Demo
https://github.com/insist-backend/springboot-example-damoncai-V2/tree/master/SpringBoot-Emqx/Emqx-Client-Paho-Java
# MQTT.js
MQTT.js是MQTT协议的客户端JS库,是用JavaScript为node.js和浏览器编写的
GitHub项目地址:https://github.com/mqttjs/MQTT.js
# API列表
- mqtt.connect()
- mqtt.Client()
- mqtt.Client#publish()
- mqtt.Client#subscribe()
- mqtt.Client#unsubscribe()
- mqtt.Client#end()
- mqtt.Client#removeOutgoingMessage()
- mqtt.Client#reconnect()
- mqtt.Client#handleMessage()
- mqtt.Client#connected
- mqtt.Client#reconnecting
- mqtt.Client#getLastMessageId()
- mqtt.Store()
- mqtt.Store#put()
- mqtt.Store#del()
- mqtt.Store#createStream()
- mqtt.Store#close()
# MQTT.js实现消息收发
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>mqtt.js测试</title>
<style type="text/css">
div{
width: 300px;
height: 300px;
float: left;
border: red solid 1px;
}
</style>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js" ></script>
<script type="text/javascript">
$(function () {
//页面加载完成后
// 连接选项
const options = {
clean: true, // 保留回话
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: 'emqx_client_h5',
username: 'damoncai',
password: 'damoncai',
}
// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接 8083端口
// wss 加密 WebSocket 连接 8084端口
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://150.158.78.149:8083/mqtt'
const client = mqtt.connect(connectUrl, options)
/**
* mqtt.Client相关事件
*/
//当重新连接启动触发回调
client.on('reconnect', () => {
$("#div1").text("正在重连.....");
});
//连接断开后触发的回调
client.on("close",function () {
$("#div1").text("客户端已断开连接.....");
});
//从broker接收到断开连接的数据包后发出。MQTT 5.0特性
client.on("disconnect",function (packet) {
$("#div1").text("从broker接收到断开连接的数据包....."+packet);
});
//客户端脱机下线触发回调
client.on("offline",function () {
$("#div1").text("客户端脱机下线.....");
});
//当客户端无法连接或出现错误时触发回调
client.on("error",(error) =>{
$("#div1").text("客户端出现错误....."+error);
});
//以下两个事件监听粒度细
//当客户端发送任何数据包时发出。这包括published()包以及MQTT用于管理订阅和连接的包
client.on("packetsend",(packet)=>{
$("#div1").text("客户端已发出数据包....."+packet);
});
//当客户端接收到任何数据包时发出。这包括来自订阅主题的信息包以及MQTT用于管理订阅和连接的信息包
client.on("packetreceive",(packet)=>{
$("#div1").text("客户端已收到数据包....."+packet);
});
//成功连接后触发的回调
client.on("connect",function (connack) {
$("#div1").text("成功连接上服务器"+new Date());
//订阅某主题
/**
* client.subscribe(topic/topic array/topic object, [options], [callback])
* topic:一个string类型的topic或者一个topic数组,也可以是一个对象
* options
*/
client.subscribe("testtopic/#",{qos:2});
//每隔2秒发布一次数据
setInterval(publish,2000)
});
function publish() {
//发布数据
/**
* client.publish(topic,message,[options], [callback])
*
* message: Buffer or String
* options:{
* qos:0, //默认0
* retain:false, //默认false
* dup:false, //默认false
* properties:{}
* }
* callback:function (err){}
*/
const message = "h5 message "+Math.random()+new Date();
client.publish("testtopic/123",message,{qos:2});
$("#div2").text("客户端发布了数据:"+message);
}
//当客户端接收到发布消息时触发回调
/**
* topic:收到的数据包的topic
* message:收到的数据包的负载playload
* packet:收到的数据包
*/
client.on('message', (topic, message,packet) => {
$("#div3").text("客户端收到订阅消息,topic="+topic+";消息数据:"+message+";数据包:"+packet);
});
//页面离开自动断开连接
$(window).bind("beforeunload",()=>{
$("#div1").text("客户端窗口关闭,断开连接");
client.disconnect();
})
})
</script>
</head>
<body>
<div id="div1"></div>
<div id="div2"></div>
<div id="div3"></div>
</body>
</html>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# 日志与追踪
# 控制日志输出
EMQ X 支持将日志输出到控制台或者日志文件,或者同时使用两者。可在 emqx.conf 中配置 :
log.to = both
log.to 默认值是 both,可选的值为:
- off: 完全关闭日志功能
- file: 仅将日志输出到文件
- console: 仅将日志输出到标准输出(emqx 控制台)
- both: 同时将日志输出到文件和标准输出(emqx 控制台)
# 日志级别
EMQ X 的日志分 8 个等级, 由低到高分别为:
debug < info < notice < warning < error < critical < alert < emergency
EMQ X 的默认日志级别为 warning,可在 emqx.conf 中修改:
log.level = warning
此配置将所有 log handler 的配置设置为 warning。
# 日志文件和日志滚动
EMQ X 的默认日志文件目录在 ./log (zip包解压安装) 或者 /var/log/emqx (二进制包安装)。可在emqx.conf 中配置:
log.dir = log
在文件日志启用的情况下 (log.to = file 或 both),日志目录下会有如下几种文件:
- emqx.log.N: 以 emqx.log 为前缀的文件为日志文件,包含了 EMQ X 的所有日志消息。比如emqx.log.1 , emqx.log.2 ...
- emqx.log.siz 和 emqx.log.idx: 用于记录日志滚动信息的系统文件
- run_erl.log: 以 emqx start 方式后台启动 EMQ X 时,用于记录启动信息的系统文件
- erlang.log.N: 以 erlang.log 为前缀的文件为日志文件,是以 emqx start 方式后台启动 EMQ X 时,控制台日志的副本文件。比如 erlang.log.1 , erlang.log.2 ...
可在 emqx.conf 中修改日志文件的前缀,默认为 emqx.log :
log.file = emqx.log
EMQ X 默认在单日志文件超过 10MB 的情况下,滚动日志文件,最多可有 5 个日志文件:第 1 个日志文件为emqx.log.1,第 2 个为 emqx.log.2,并以此类推。当最后一个日志文件也写满 10MB 的时候,将从序号最小的日志的文件开始覆盖。文件大小限制和最大日志文件个数可在 emqx.conf 中修改:
log.rotation.size = 10MB
log.rotation.count = 5
2
# 针对日志级别输出日志文件
如果想把大于或等于某个级别的日志写入到单独的文件,可以在 emqx.conf 中配置 log..file : 将 info 及info 以上的日志单独输出到 info.log.N 文件中:
log.info.file = info.log
将 error 及 error 以上的日志单独输出到 error.log.N 文件中
log.error.file = error.log
# 日志格式
可在 emqx.conf 中修改单个日志消息的最大字符长度,如长度超过限制则截断日志消息并用 ... 填充。默认不限制长度:
将单个日志消息的最大字符长度设置为 8192:
log.chars_limit = 8192
日志消息的格式为(各个字段之间用空格分隔)
date time level client_info module_info msg
- date: 当地时间的日期。格式为:YYYY-MM-DD
- time: 当地时间,精确到毫秒。格式为:hh:mm:ss.ms
- level: 日志级别,使用中括号包裹。格式为:[Level]
- client_info: 可选字段,仅当此日志消息与某个客户端相关时存在。其格式为:ClientId@Peername 或ClientId 或 Peername
- module_info: 可选字段,仅当此日志消息与某个模块相关时存在。其格式为:[Module Info]
- msg: 日志消息内容。格式任意,可包含空格。
日志消息举例 1:
2020-05-18 16:10:03.872 [debug] <<"mqttjs_9e49354bb3">>@127.0.0.1:57105 [MQTT/WS] SEND CONNACK(Q0, R0, D0, AckFlags=0, ReasonCode=0)
此日志消息里各个字段分别为:
- date: 2020-02-18
- time: 16:10:03.872
- level: [debug]
- client_info: <<"mqttjs_9e49354bb3">>@127.0.0.1:57105
- module_info: [MQTT/WS]
- msg: SEND CONNACK(Q0, R0, D0, AckFlags=0, ReasonCode=0)
日志消息举例 2:
2020-05-18 16:10:08.474 [warning] [Alarm Handler] New Alarm: system_memory_high_watermark,Alarm Info: []
此日志消息里各个字段分别为:
- date: 2020-02-18
- time: 16:10:08.474
- level: [warning]
- module_info: [Alarm Handler]
- msg: New Alarm: system_memory_high_watermark, Alarm Info: [ ]
注意此日志消息中,client_info 字段不存在
# 日志级别和log handlers
EMQ X 使用了分层的日志系统,在日志级别上,包括全局日志级别 (primary log level)、以及各 log hanlder的日志级别
[Primary Level] -- global log level and filters
/ \
[Handler 1] [Handler 2] -- log levels and filters at each handler
2
3
log handler 是负责日志处理和输出的工作进程,它由 log handler id 唯一标识,并负有如下任务:
- 接收什么级别的日志
- 如何过滤日志消息
- 将日志输出到什么地方
我们来看一下 emqx 默认安装的 log handlers:
/opt/emqx $ emqx_ctl log handlers list
LogHandler(id=ssl_handler, level=debug, destination=console)
LogHandler(id=file, level=warning, destination=log/emqx.log)
LogHandler(id=default, level=warning, destination=console)
2
3
4
file: 负责输出到日志文件的 log handler。它没有设置特殊过滤条件,即所有日志消息只要级别满足要求就输出。输出目的地为日志文件。
default: 负责输出到控制台的 log handler。它没有设置特殊过滤条件,即所有日志消息只要级别满足要求就输出。输出目的地为控制台
ssl_handler: ssl 的 log handler。它的过滤条件设置为当日志是来自 ssl 模块时输出。输出目的地为控制台。
日志消息输出前,首先检查消息是否高于 primary log level,日志消息通过检查后流入各 log handler,再检查各 handler 的日志级别,如果日志消息也高于 handler level,则由对应的 handler 执行相应的过滤条件,过滤条件通过则输出。
设想一个场景,假设 primary log level 设置为 info,log handler default (负责输出到控制台) 的级别设置为debug,log handler file (负责输出到文件) 的级别设置为 warning:
- 虽然 console 日志是 debug 级别,但此时 console 日志只能输出 info 以及 info 以上的消息,因为经过primary level 过滤之后,流到 default 和 file 的日志只剩下 info 及以上的级别;
- emqx.log.N 文件里面,包含了 warning 以及 warning 以上的日志消息。
在日志级别小节中提到的 log.level 是修改了全局的日志级别。这包括 primary log level 和各个 handlers的日志级别,都设置为了同一个值。
Primary Log Level 相当于一个自来水管道系统的总开关,一旦关闭则各个分支管道都不再有水流通过。这个机制保证了日志系统的高性能运作。
# 运行时修改日志级别
可以使用 EMQ X 的命令行工具 emqx_ctl 在运行时修改 emqx 的日志级别:
修改全局日志级别:
例如,将 primary log level 以及所有 log handlers 的级别设置为 debug:
$ emqx_ctl log set-level debug
修改主日志级别:
例如,将 primary log level 设置为 debug:
$ emqx_ctl log primary-level debug
修改某个log handler的日志级别:
例如,将 log handler file 设置为 debug:
$ emqx_ctl log handlers set-level file debug
# 日志追踪
EMQ X 支持针对 ClientID 或 Topic 过滤日志并输出到文件。在使用日志追踪功能之前,必须将 primary loglevel 设置为 debug:
$ emqx_ctl log primary-level debug
开启 ClientID 日志追踪,将所有 ClientID 为 emq-demo 的日志都输出到 log/my_client.log:
$ emqx_ctl log primary-level debug
debug
$ emqx_ctl trace start client emq-demo log/emq-demo.log
trace clientid emq-demo successfully
2
3
4
开启 Topic 日志追踪,将主题能匹配到 'testtopic/#' 的消息发布日志输出到 log/topic_testtopic.log:
$ emqx_ctl log primary-level debug
debug
$ emqx_ctl trace start topic 'testtopic/#' log/topic_testtopic.log
trace topic testtopic/# successfully
2
3
4
提示:即使 emqx.conf 中, log.level 设置为 error,使用消息追踪功能仍然能够打印出某 client 或 topic的 debug 级别的信息。这在生产环境中非常有用。
# 发布订阅ACL
# 发布订阅ACL简介
发布订阅ACL是指对发布(publish)/订阅(subscribe)操作的权限控制。例如拒绝用户 emq-demo 向 testTopic/a主题发布消息。 EMQ X 支持通过客户端发布订阅 ACL 进行客户端权限的管理。
# ACL 插件
EMQ X 支持使用配置文件、外部主流数据库和自定义 HTTP API 作为 ACL 数据源。
连接数据源、进行访问控制功能是通过插件实现的,使用前需要启用相应的插件。
客户端订阅主题、发布消息时插件通过检查目标主题(Topic)是否在指定数据源允许/禁止列表内来实现对客户端的发布、订阅权限管理。
配置文件
- 内置 ACL
使用配置文件提供认证数据源,适用于变动较小的 ACL 管理。
外部数据库
- MySQL ACL
- PostgreSQL ACL
- Redis ACL
- MongoDB ACL
外部数据库可以存储大量数据、动态管理 ACL,方便与外部设备管理系统集成。
其他
- HTTP ACL
HTTP ACL 能够实现复杂的 ACL 管理
# ACL规则详解
ACL 是允许与拒绝条件的集合,EMQ X 中使用以下元素描述 ACL 规则:
## Allow-Deny Who Pub-Sub Topic
"允许(Allow) / 拒绝(Deny)" "谁(Who)" "订阅(Subscribe) / 发布(Publish)" "主题列表(Topics)"
2
同时具有多条 ACL 规则时,EMQ X 将按照规则排序进行合并,以 ACL 文件中的默认 ACL 为例,ACL 文件中配置了默认的 ACL 规则,规则从下至上加载:
- 第一条规则允许客户端发布订阅所有主题
- 第二条规则禁止全部客户端订阅 $SYS/# 与 # 主题
- 第三条规则允许 ip 地址为 127.0.0.1 的客户端发布/订阅 $SYS/# 与 # 主题,为第二条开了特例
- 第四条规则允许用户名为 dashboard 的客户端订阅 $SYS/# 主题,为第二条开了特例
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.
2
3
4
# 授权结果
任何一次 ACL 授权最终都会返回一个结果:
- 允许:经过检查允许客户端进行操作
- 禁止:经过检查禁止客户端操作
- 忽略(ignore):未查找到 ACL 权限信息(no match),无法显式判断结果是允许还是禁止,交由下一sACL 插件或默认 ACL 规则来判断
# 全局配置
默认配置中 ACL 是开放授权的,即授权结果为忽略(ignore)时允许客户端通过授权
通过 etc/emqx.conf 中的 ACL 配置可以更改该属性
# etc/emqx.conf
## ACL 未匹配时默认授权
## Value: allow | deny
acl_nomatch = allow
2
3
4
此处我们需要修改全局配置文件中关于acl的配置,将 acl_nomatch 配置项的值改为: deny
完成配置后使用 emqx restart 重启emqx broker服务
配置默认 ACL 文件,使用文件定义默认 ACL 规则:
# etc/emqx.conf
acl_file = etc/acl.conf
2
配置 ACL 授权结果为禁止的响应动作,为 disconnect 时将断开设备:
# etc/emqx.conf
## Value: ignore | disconnect
acl_deny_action = ignore
2
3
在 MQTT v3.1 和 v3.1.1 协议中,发布操作被拒绝后服务器无任何报文错误返回,这是协议设计的一个缺陷。但在 MQTT v5.0 协议上已经支持应答一个相应的错误报文。
# 超级用户
客户端在进行认证的时候客户端可拥有“超级用户”身份,超级用户拥有最高权限不受 ACL 限制
- 认证鉴权插件启用超级用户功能后,发布订阅时 EMQ X 将优先检查客户端超级用户身份
- 客户端为超级用户时,通过授权并跳过后续 ACL 检查
# ACL缓存
ACL 缓存允许客户端在命中某条 ACL 规则后,便将其缓存至内存中,以便下次直接使用,客户端发布、订阅频率较高的情况下开启 ACL 缓存可以提高 ACL 检查性能
在 etc/emqx.conf 可以配置 ACL 缓存大小与缓存时间:
# etc/emqx.conf
## 是否启用
enable_acl_cache = on
## 单个客户端最大缓存规则数量
acl_cache_max_size = 32
## 缓存失效时间,超时后缓存将被清除,默认1分钟
acl_cache_ttl = 1m
2
3
4
5
6
7
清除缓存:
在更新 ACL 规则后,某些客户端由于已经存在缓存,则无法立即生效。若要立即生效,则需手动清除所有的ACL 缓存,清除缓存需要需要使用EMQ X Broker提供的监控管理的HTTP API
查询指定客户端的 ACL 缓存: GET /api/v4/clients/{clientid}/acl_cache
##############查询指定客户端的 ACL 缓存####################
GET http://{{hostname}}:{{port}}/api/v4/clients/emq-client1/acl_cache HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
2
3
4
清除指定客户端的ACL缓存: DELETE /api/v4/clients/{clientid}/acl_cache
##############清除指定客户端的 ACL 缓存####################
DELETE http://{{hostname}}:{{port}}/api/v4/clients/emq-client1/acl_cache HTTP/1.1
Content-Type: {{contentType}}
Authorization: Basic {{userName}}:{{password}}
2
3
4
测试时注意开启对应的插件
# ACL 鉴权链
当同时启用多个 ACL 插件时,EMQ X 将按照插件开启先后顺序进行链式鉴权:
一通过授权,终止链并允许客户端通过验证
一旦授权失败,终止链并禁止客户端通过验证
直到最后一个 ACL 插件仍未通过,根据
默认授权
配置判定
- 默认授权为允许时,允许客户端通过验证
- 默认授权为禁止时,禁止客户端通过验证
同时只启用一个 ACL 插件可以提高客户端 ACL 检查性能
# 内置ACL
内置 ACL 通过文件设置规则,使用上足够简单轻量,适用于规则数量可预测、无变动需求或变动较小的项目。
ACL 规则文件:
etc/acl.conf
内置 ACL 优先级最低,可以被 ACL 插件覆盖,如需禁用全部注释即可。规则文件更改后需重启 EMQ X 以应用生效。
# 定义ACL
内置 ACL 是优先级最低规则表,在所有的 ACL 检查完成后,如果仍然未命中则检查默认的 ACL 规则。该规则文件以 Erlang 语法的格式进行描述:
%% 允许 "dashboard" 用户 订阅 "$SYS/#" 主题
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
%% 允许 IP 地址为 "127.0.0.1" 的用户 发布/订阅 "#SYS/#","#" 主题
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
%% 拒绝 "所有用户" 订阅 "$SYS/#" "#" 主题
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
%% 允许其它任意的发布订阅操作
{allow, all}.
2
3
4
5
6
7
8
# acl.conf 编写规则
文件中的规则按书写顺序从上往下匹配。注意,匹配是从上往下,规则加载是从下往上加载
acl.conf 的语法规则包含在顶部的注释中,熟悉 Erlang 语法的可直接阅读文件顶部的注释。或参考以下的释义:
- 以 %% 表示行注释。
- 每条规则由四元组组成,以 . 结束。
- 元组第一位:表示规则命中成功后,执行权限控制操作,可取值为:
- allow :表示 允许
- deny : 表示 拒绝
- 元组第二位:表示规则所生效的用户,可使用的格式为:
- {user, "dashboard"} :表明规则仅对 用户名 (Username) 为 "dashboard" 的用户生效
- {clientid, "dashboard"} :表明规则仅对 客户端标识 (ClientId) 为 "dashboard" 的用户生效
- {ipaddr, "127.0.0.1"} :表明规则仅对 源地址 为 "127.0.0.1" 的用户生效
- all :表明规则对所有的用户都生效
- 元组第三位:表示规则所控制的操作,可取值为:
- publish :表明规则应用在 PUBLISH 操作上
- subscribe :表明规则应用在 SUBSCRIBE 操作上
- pubsub :表明规则对 PUBLISH 和 SUBSCRIBE 操作都有效
- 元组第四位:表示规则所限制的主题列表,内容以数组的格式给出,例如:
- "$SYS/#" :为一个 主题过滤器 (Topic Filter);表示规则可命中与 $SYS/# 匹配的主题;如:可命中 $SYS/# ,也可命中 $SYS/a/b/c
- {eq, "#"} :表示字符的全等,规则仅可命中主题为 # 的字串,不能命中 /a/b/c 等
- 除此之外还存在两条特殊的规则:
- {allow, all} :允许所有操作
- {deny, all} :拒绝所有操作
在 acl.conf 修改完成后,并不会自动加载至 EMQ X 系统。需要手动执行:
./bin/emqx_ctl acl reload
acl.conf 中应只包含一些简单而通用的规则,使其成为系统基础的 ACL 原则。如果需要支持复杂、大量的 ACL内容,需要使用认证插件。
# HTTP ACL
HTTP 认证使用外部自建 HTTP 应用认证授权数据源,根据 HTTP API 返回的数据判定授权结果,能够实现复杂的 ACL 校验逻辑。
emqx_auth_http
注意:emqx_auth_http 插件同时包含认证功能,可通过注释禁用。
# ACL授权原理
EMQ X 在设备发布、订阅事件中使用当前客户端相关信息作为参数,向用户自定义的认证服务发起请求权限,通过返回的 HTTP 响应状态码 (HTTP statusCode) 来处理 ACL 授权请求。
- 无权限:API 返回 4xx 状态码
- 授权成功:API 返回 200 状态码
- 忽略授权:API 返回 200 状态码且消息体 ignore
# HTTP 请求信息
要启用 HTTP ACL,需要在 etc/plugins/emqx_auth_http.conf 中配置
与认证HTTP请求相同的HTTP API 基础请求信息,配置证书、请求头与重试规则。
# etc/plugins/emqx_auth_http.conf
## 启用 HTTPS 所需证书信息
## auth.http.ssl.cacertfile = etc/certs/ca.pem
## auth.http.ssl.certfile = etc/certs/client-cert.pem
## auth.http.ssl.keyfile = etc/certs/client-key.pem
## 请求头设置
## auth.http.header.Accept = */*
## 重试设置
auth.http.request.retry_times = 3
auth.http.request.retry_interval = 1s
auth.http.request.retry_backoff = 2.0
2
3
4
5
6
7
8
9
10
11
进行发布、订阅认证时,EMQ X 将使用当前客户端信息填充并发起用户配置的 ACL 授权查询请求,查询出该客户端在 HTTP 服务器端的授权数据。
# superuser 请求
首先查询客户端是否为超级用户,客户端为超级用户时将跳过 ACL 查询。
# etc/plugins/emqx_auth_http.conf
####使用vi编辑该配置,修改URL请求地址
##--------------------------------------------------------------------
## Superuser request.
##
## Variables:
## - %u: username
## - %c: clientid
## - %a: ipaddress
## - %r: protocol
## - %P: password
## - %p: sockport of server accepted
## - %C: common name of client TLS cert
## - %d: subject of client TLS cert
##
## Value: URL 请求地址
auth.http.super_req = http://150.158.78.149:8991/mqtt/superuser
## Value: post | get | put 请求方法
auth.http.super_req.method = post
## Value: Params 请求参数
auth.http.super_req.params = clientid=%c,username=%u
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ACL 授权查询请求
# etc/plugins/emqx_auth_http.conf
####使用vi编辑该配置,修改URL请求地址以及请求方式
##--------------------------------------------------------------------
## ACL request.
##
## Variables:
## - %A: 1 | 2, 1 = sub, 2 = pub
## - %u: username
## - %c: clientid
## - %a: ipaddress
## - %r: protocol
## - %m: mountpoint
## - %t: topic
##
## Value: URL
auth.http.acl_req = http://150.158.78.149:8991/mqtt/acl
## Value: post | get | put
auth.http.acl_req.method = post
## Value: Params
auth.http.acl_req.params =access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
请求说明
HTTP 请求方法为 GET 时,请求参数将以 URL 查询字符串的形式传递;POST、PUT 请求则将请求参数以普通表单形式提交(content-type 为 x-www-form-urlencoded)。
你可以在认证请求中使用以下占位符,请求时 EMQ X 将自动填充为客户端信息:
- %u:用户名
- %c:Client ID
- %a:客户端 IP 地址
- %r:客户端接入协议
- %P:明文密码
- %p:客户端端口
- %C:TLS 证书公用名(证书的域名或子域名),仅当 TLS 连接时有效
- %d:TLS 证书 subject,仅当 TLS 连接时有效
- %m:topic的安装点,是桥接的连接属性
推荐使用 POST 与 PUT 方法,使用 GET 方法时明文密码可能会随 URL 被记录到传输过程中的服务器日志中
# HTTP ACL接口开发
在原有的项目 emq-demo 中我们已经开发了基于HTTP API的认证Controller: AuthController ,按照我们的请求URL配置,我们需要在该Controller中添加两个接口方法,一个是用于查询superuser的,一个是用于进行ACL授权查询的,这两个方法分别如下:
查询客户端是否为超级用户
@PostMapping("/superuser") public ResponseEntity<?> auth(@RequestParam("clientid") String clientid, @RequestParam("username") String username){ log.info("clientid:{}",clientid); log.info("username:{}",username); if(clientid.contains("admin") || username.contains("admin")){ log.info("用户{}是超级用户",username); //是超级用户 return new ResponseEntity<Object>(HttpStatus.OK); }else { log.info("用户{}不是超级用户",username); //不是超级用户 return new ResponseEntity<Object>(HttpStatus.UNAUTHORIZED); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15注意,我们在初始化方法init中添加一个超级用户:admin/admin
ACL 授权查询请求
@PostMapping("/acl") public ResponseEntity acl(@RequestParam("access")int access, @RequestParam("username")String username, @RequestParam("clientid")String clientid, @RequestParam("ipaddr")String ipaddr, @RequestParam("topic")String topic, @RequestParam("mountpoint")String mountpoint){ log.info("EMQX发起客户端操作授权查询请求,access={},username={},clientid={},ipaddr={},topic={},mountpoint={}", access,username,clientid,ipaddr,topic,mountpoint); if(username.equals("emq-client2") && topic.equals("testtopic/#") && access == 1){ log.info("客户端{}有权限订阅{}",username,topic); return new ResponseEntity<>(HttpStatus.OK); } if(username.equals("emq-client3") && topic.equals("testtopic/123") && access == 2){ log.info("客户端{}有权限向{}发布消息",username,topic); return new ResponseEntity<>(null, HttpStatus.OK); } log.info("客户端{},username={},没有权限对主题{}进行{}操作",clientid,username,topic,access==1?"订阅":"发布"); return new ResponseEntity(HttpStatus.UNAUTHORIZED);//无权限 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# HTTP ACL接口测试
# WebHook
# WebHook简介
WebHook 是由 emqx_web_hook 插件提供的 将 EMQ X 中的钩子事件通知到某个 Web 服务 的功能
钩子(Hooks) 是 EMQ X 提供的一种机制,它通过拦截模块间的函数调用、消息传递、事件传递来修改或扩展系统功能。
简单来讲,该机制目的在于增强软件系统的扩展性、方便与其他三方系统的集成、或者改变其系统原有的默认行为。如下图:
当系统中不存在 钩子 (Hooks) 机制时,整个事件处理流程 从 事件 (Event) 的输入,到 处理 (Handler),再到完成后的返回 结果 (Result) 对于系统外部而讲,都是不可见、且无法修改的。
而在这个过程中加入一个可挂载函数的点 (HookPoint),允许外部插件挂载多个回调函数,形成一个调用链。达到对内部事件处理过程的扩展和修改。系统中常用到的认证插件则是按照该逻辑进行实现的。
因此,在 EMQ X 中,钩子 (Hooks) 这种机制极大地方便了系统的扩展。我们不需要修改 emqx 核心代码,仅需要在特定的位置埋下 挂载点 (HookPoint) ,便能允许外部插件扩展 EMQ X 的各种行为。
对于实现者来说仅需要关注:
- 挂载点 (HookPoint) 的位置:包括其作用、执行的时机、和如何挂载和取消挂载。
- 回调函数 的实现:包括回调函数的入参个数、作用、数据结构等,及返回值代表的含义。
- 了解回调函数在 链 上执行的机制:包括回调函数执行的顺序,及如何提前终止链的执行。
如果你是在开发扩展插件中使用钩子,你应该能 完全地明白这三点,且尽量不要在钩子内部使用阻塞函数,这会影响系统的吞吐。
WebHook 的内部实现是基于钩子,但它更靠近顶层一些。它通过在钩子上的挂载回调函数,获取到 EMQ X中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器
以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下
WebHook 对于事件的处理是单向的,它仅支持将 EMQ X 中的事件推送给 Web 服务,并不关心 Web 服务的返回。 借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。
# 配置项说明
Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf :
说明:当消息内容是不可见字符(如二进制数据)时,为了能够在 HTTP 协议中传输,使用 encode_payload是十分有用的。
配置触发规则:
在 etc/plugins/emqx_web_hooks.conf 可配置触发规则,其配置的格式如下:
## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>
## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
2
3
4
5
Event 触发事件:
目前支持以下事件:
Number
同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。
Rule
触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:
action:字符串,取固定值,每种事件下规则中的action在 etc/plugins/emqx_web_hooks.conf 文件中有定义
web.hook.rule.client.connect.1 = {"action": "on_client_connect"} web.hook.rule.client.connack.1 = {"action": "on_client_connack"} web.hook.rule.client.connected.1 = {"action": "on_client_connected"} web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"} web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"} web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"} web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"} web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"} web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"} web.hook.rule.message.publish.1 = {"action": "on_message_publish"} web.hook.rule.message.delivered.1 = {"action": "on_message_delivered"} web.hook.rule.message.acked.1 = {"action": "on_message_acked"}
1
2
3
4
5
6
7
8
9
10
11
12topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发
例如,我们只将与 a/b/c 和 foo/# 主题匹配的消息转发到 Web 服务器上,其配置应该为:
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"} web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
1
2这样 Webhook 仅会转发与 a/b/c 和 foo/# 主题匹配的消息,例如 foo/bar 等,而不是转发 a/b/d 或fo/bar 。
# Webhook事件参数
事件触发时 Webhook 会按照配置将每个事件组成一个 HTTP 请求发送到 api.url 所配置的 Web 服务器上。其请求格式为:
URL: <api.url> # 来自于配置中的 `api.url` 字段
Method: POST # 固定为 POST 方法
Body: <JSON> # Body 为 JSON 格式字符串
2
3
4
对于不同的事件,请求 Body 体内容有所不同,下表列举了各个事件中 Body 的参数列表:
client.connect
client.connack
client.connected
client.disconnected
client.subscribe
opts 包含
client.unsubscribe
message.publish
message.delivered
# Webhook实现客户端断连监控
断连监控需求
# 代码实现
@Slf4j @RestController @RequestMapping("mqtt") public class WebhookController { @PostMapping("webhook") public void webhook(@RequestBody Map map) { log.info("data: " + map); } }
1
2
3
4
5
6
7
8
9
10
# 测试
# EMQ X集群
# EMQ X 集群概述
EMQ X是由Erlang语言编写的,Erlang/OTP 最初是爱立信为开发电信设备系统设计的编程语言平台。Erlang分布式的定义为:由分布互联的 Erlang 运行时系统组成,每个 Erlang 运行时系统被称为节点(Node),节点间通过TCP 两两互联,组成一个网状结构。
Erlang 节点有着唯一的节点名称标识,节点名称由 @ 分隔的两部分组成
<name>@<ip-address>
节点间通过节点名称进行通信寻址,所有节点组成一个集群后,每个节点都会与其他节点建立一个TCP连接,每当一个新的节点加入集群时,它也会与集群中所有的节点都建立一个 TCP 连接,最终构成一个网状结构如下:
Erlang 节点间通过 cookie 进行互连认证。cookie 是一个字符串,只有 cookie 相同的两个节点才能建立连接cookie 的配置在 etc/emqx.conf 配置文件中,默认配置如下
## Cookie for distributed node communication.
## 采用默认配置即可
## Value: String
node.cookie = emqxsecretcookie
2
3
4
另外可查看该配置文件中默认的节点名称如下:
## Node name.
##
## See: http://erlang.org/doc/reference_manual/distributed.html
##
## Value: <name>@<host>
##
## Default: emqx@127.0.0.1
node.name = 14332431afdad9@172.17.0.2
2
3
4
5
6
7
8
EMQ X 集群协议设置
Erlang 集群中各节点可通过 TCPv4、TCPv6 或 TLS 方式连接,可在 etc/emqx.conf 中配置连接方式:
# EMQ X 分布式集群设计
EMQ X 分布式的基本功能是将消息转发和投递给各节点上的订阅者,如下图所示:
为实现此过程,EMQ X 维护了几个与之相关的数据结构:订阅表,路由表,主题树。
# 订阅表: 主题 - 订阅者
MQTT 客户端订阅主题时,EMQ X 会维护主题(Topic) -> 订阅者(Subscriber) 映射的订阅表。订阅表只存在于订阅者所在的 EMQ X 节点上,例如:
node1:
topic1 -> client1, client2
topic2 -> client3`
node2:
topic1 -> client4
2
3
4
5
# 路由表: 主题 - 节点
而同一集群的所有节点,都会复制一份主题(Topic) -> 节点(Node) 映射的路由表,例如:
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
2
3
这样的话:一旦某一个主题有消息发布过来,我们可以通过路由表将消息路由到该主题对应的节点上,然后根据订阅表将该主题的消息推送给具体的订阅者
# 主题树: 带通配符的主题匹配
除路由表之外,EMQ X 集群中的每个节点也会维护一份主题树(Topic Trie) 的备份。
例如下述主题订阅关系:
# 消息派发过程
当 MQTT 客户端发布消息时,所在节点会根据消息主题,检索路由表并转发消息到相关节点,再由相关节点检索本地的订阅表并将消息发送给相关订阅者。
例如 client1 向主题 t/a 发布消息,消息在节点间的路由与派发流程:
- client1 发布主题为 t/a 的消息到节点 node1
- node1 通过查询主题树,得知 t/a 可匹配到现有的 t/a 、 t/# 这两个主题。
- node1 通过查询路由表,得知主题 t/a 只在 node3 上有订阅者,而主题 t/# 只在 node2 上有订阅者。故 node1 将消息转发给 node2 和 node3。
- node2 收到转发来的 t/a 消息后,查询本地订阅表,获取本节点上订阅了 t/# 的订阅者,并把消息投递给他们。
- node3 收到转发来的 t/a 消息后,查询本地订阅表,获取本节点上订阅了 t/a 的订阅者,并把消息投递给他们。
- 消息转发和投递结束。
数据分片与共享方式
EMQ X 的订阅表在集群中是分片(partitioned)的,而主题树和路由表是共享(replicated)的
# 节点发现与自动集群
EMQ X 支持基于 Ekka 库的集群自动发现 (Autocluster)。Ekka 是为 Erlang/OTP 应用开发的集群管理库,支持 Erlang 节点自动发现 (Service Discovery)、自动集群 (Autocluster)、脑裂自动愈合 (Network PartitionAutoheal)、自动删除宕机节点 (Autoclean)。
EMQ X 支持多种节点发现策略:
节点发现策略在配置文件: etc/emqx.conf 中的配置项 cluster.discovery
可通过如下命令查看默认的配置:
/opt/emqx/etc $ cat emqx.conf | grep cluster.discovery
cluster.discovery = manual
2
manual 手动创建集群
默认配置为手动创建集群,节点须通过 ./bin/emqx_ctl join \ 命令加入:
cluster.discovery = manual
基于 static 节点列表自动集群
配置固定的节点列表,自动发现并创建集群:
cluster.discovery = static
##节点名称列表
cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1
2
3
基于 mcast 组播自动集群
基于 UDP 组播自动发现并创建集群:
cluster.discovery = mcast
cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on
2
3
4
5
6
基于 DNS A 记录自动集群
基于 DNS A 记录自动发现并创建集群:
cluster.discovery = dns
cluster.dns.name = localhost
cluster.dns.app = ekka
2
3
基于 etcd 自动集群 基于 etcd 自动发现并创建集群:
cluster.discovery = etcd
cluster.etcd.server = http://127.0.0.1:2379
cluster.etcd.prefix = emqcl
cluster.etcd.node_ttl = 1m
2
3
4
基于 kubernetes 自动集群
Kubernetes 下自动发现并创建集群:
cluster.discovery = k8s
cluster.k8s.apiserver = http://10.110.111.204:8080
cluster.k8s.service_name = ekka
cluster.k8s.address_type = ip
cluster.k8s.app_name = ekka
2
3
4
5
# manual方式管理集群实践
在两台主机上搭建Emqx服务
注意: 节点名格式为 Name@Host, Host 必须是 IP 地址或 FQDN (主机名。域名)
修改集群发现方式 cluster.discovery = manual
同时确保 node.cookie = emqxsecretcookie ,同一集群中的节点需要通过cookie进行互联认证,要保证和另一节点上的一致,因为采用的是默认值故是一致的
防火墙设置
如果集群节点间存在防火墙,防火墙需要开启 4369 端口和一个 TCP 端口段。4369 由 epmd 端口映射服务使用,TCP 端口段用于节点间建立连接与通信。
防火墙设置后,需要在 emqx/etc/emqx.conf 中配置相同的端口段:
## Distributed node port range node.dist_listen_min = 6369 node.dist_listen_max = 7369
1
2
3节点加入集群
emqx_ctl cluster join emqx2@127.0.0.1 // 在一台机器上添加另外一台机器
1退出集群
// 自己退出 ./bin/emqx_ctl cluster leave // 踢人 ./bin/emqx_ctl cluster force-leave emqx2@127.0.0.1
1
2
3
4
5
# 集群脑裂与自动愈合
EMQ X 支持集群脑裂自动恢复(Network Partition Autoheal),可在 etc/emqx.conf 中配置:
cluster.autoheal = on
集群脑裂自动恢复流程:
- 节点收到 Mnesia 的 inconsistent_database 事件 3 秒后进行集群脑裂确认;
- 节点确认集群脑裂发生后,向 Leader 节点 (集群中最早启动节点) 上报脑裂消息;
- Leader 节点延迟一段时间后,在全部节点在线状态下创建脑裂视图 (SplitView);
- Leader 节点在多数派 (majority) 分区选择集群自愈的 Coordinator 节点;
- Coordinator 节点重启少数派 (minority) 分区节点恢复集群。
# 集群节点自动清除
EMQ X 支持从集群自动删除宕机节点 (Autoclean),可在 etc/emqx.conf 中配置:
cluster.autoclean = 5m
# 管理监控API的使用
EMQ X 提供了 HTTP API 以实现与外部系统的集成,例如查询客户端信息、发布消息和创建规则等。
EMQ X 的 HTTP API 服务默认监听 8081 端口,可通过 etc/plugins/emqx_management.conf 配置文件修改监听端口,或启用 HTTPS 监听。EMQ X 4.0.0 以后的所有 API 调用均以 api/v4 开头。
# 接口安全及响应码
EMQ X 的 HTTP API 使用 Basic 认证方式, id 和 password 须分别填写 AppID 和 AppSecret。 默认的AppID 和 AppSecret 是: amdin/public 。你可以在 Dashboard 的左侧菜单栏里,选择 "MANAGEMENT" ->"Applications" 来修改和添加 AppID/AppSecret。
响应码
EMQ X 接口在调用成功时总是通过HTTP status code返回 200 OK,响应内容则以 JSON 格式返回, 可能的状态码如下:
返回码
EMQ X 接口的响应消息体为 JSON 格式,其中总是包含返回码 code 。
可能的返回码如下:
# 保留消息
# 简介
服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外,保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。
当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下非常重要的。
EMQ X 默认开启保留消息的功能,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 以禁用保留消息功能。如果 EMQ X 在保留消息功能被禁用的情况下依然收到了保留消息,那么将返回原因码为0x9A(不支持保留消息)的 DISCONNECT 报文。
应用场景举例:
某车联网项目,车辆出租公司会实时监控所有车辆的GPS地理位置信息,这些信息是通过每个车辆每10分钟定时上报的GPS信息,这些信息需要展示在某调度系统的大屏上,该调度系统因为其他模块升级需要重新部署,升级后也需要去订阅获取所有车辆的GPS信息,上线完成后刚好错过了车辆最近一次上报的GPS信息,如果这些消息不是保留消息,该调度系统大屏上是空白的,必须等10分钟后才能调度这些车辆,10分钟内无法做出任何操作,用户体验非常差,但是如果这些信息是保留消息,该系统上线后立即就会收到最近所有车辆的位置信息,立即就可以展示然后进行调度。
# 保留消息配置
EMQ X 的保留消息功能是由 emqx_retainer 插件实现,该插件默认开启,通过修改 emqx_retainer 插件的配置,可以调整 EMQ X 储存保留消息的位置,限制接收保留消息数量和 Payload 最大长度,以及调整保留消息的过期时间。
emqx_retainer 插件默认开启,插件的配置路径为 etc/plugins/emqx_retainer.conf 。
EMQ X Enterprise 中可将保留消息存储到多种外部数据库。
# 共享订阅
# 简介
共享订阅是在多个订阅者之间实现负载均衡的订阅方式:
上图中,3 个 subscriber 用共享订阅的方式订阅了同一个主题 $share/g/topic ,其中 topic 才是它们订阅的真实主题名,而 $share/g/ 只是共享订阅前缀。
EMQ X 支持两种格式的共享订阅前缀:
注意:共享订阅的主题格式是针对订阅端来指定的,例如: $share/g/t/a ;而消息的发布方是向主题: t/a发布消息。这样在订阅方才能达到负载均衡的效果。
应用场景举例说明:
某智能售货机平台下在全国有50万台售货机设备,在实际运营过程中平均每秒中会收到5万台设备上报过来的出货结果数据,假如用普通主题订阅来处理的话,消息的消费节点会有5万的并发,极有可能会导致该节点宕机,造成出货数据的丢失,对后续结算等业务操作造成极大困扰;如果只是简单增加消费节点的话也无法解决该问题,因为每个节点都会收到所有同样的数据,在这种业务场景下我们,我们希望通过增加消费节点并且节点之间是分摊消息的消费,以此来增强整个系统的负载能力和可用性,那我们就可以通过共享订阅来满足这种业务场景。
# 带群组的共享订阅
以 $share/
group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQ X 会向不同群组广播消息。
例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。那么当 EMQ X 向这个主题发布消息 msg1 的时候:
- EMQ X 会向两个群组 g1 和 g2 同时发送 msg1
- s1,s2,s3 中只有一个会收到 msg1
- s4,s5 中只有一个会收到 msg1
# 不带群组的共享订阅
以 $queue/ 为前缀的共享订阅是不带群组的共享订阅。它是 $share 订阅的一种特例,相当与所有订阅者都在一个订阅组里面:
# 均衡策略与派发 Ack 配置
EMQ X 的共享订阅支持均衡策略与派发 Ack 配置:
# etc/emqx.conf
# 均衡策略
## Dispatch strategy for shared subscription
##
## Value: Enum
## - random
## - round_robin
## - sticky
## - hash
broker.shared_subscription_strategy = random
# 共享分发时是否需要 ACK,适用于 QoS1 QoS2 消息,启用时,当通过shared_subscription_strategy选中的
一个订阅者离线时,应该允许将消息发送到组中的另一个订阅者
broker.shared_dispatch_ack_enabled = false
2
3
4
5
6
7
8
9
10
11
12
13
特别提示:
无论是单客户端订阅还是共享订阅都要注意客户端性能与消息接收速率,否则会引发消息堆积、客户端崩溃等错误。
# 延迟发布
# 简介
EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。当客户端使用特殊主题前缀 $delayed/{DelayInteval} 发布消息到 EMQ X 时,将触发延迟发布功能。延迟发布的功能是针对消息发布者而言的,订阅方只需要按照正常的主题订阅即可。
延迟发布主题的具体格式如下:
$delayed/{DelayInterval}/{TopicName}
- $delayed : 使用 $delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。延迟间隔由下一主题层级中的内容决定。
- {DelayInterval} : 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967秒。如果 {DelayInterval} 无法被解析为一个整型数字,EMQ X 将丢弃该消息,客户端不会收到任何信息。
- {TopicName} : MQTT 消息的主题名称。
例如:
$delayed/15/x/y : 15 秒后将 MQTT 消息发布到主题 x/y 。
$delayed/60/a/b : 1 分钟后将 MQTT 消息发布到 a/b 。
$delayed/3600/$SYS/topic : 1 小时后将 MQTT 消息发布到 $SYS/topic
2
3
此功能由 emqx-delay-publish 插件提供,该插件默认关闭,需要开启插件后才能使用此功能。
# 代理订阅
EMQ X 的代理订阅功能使得客户端在连接建立时,不需要发送额外的 SUBSCRIBE 报文,便能自动建立用户预设的订阅关系。
# 内置代理订阅
# 开启代理订阅
EMQ X 通过内置代理订阅模块就可以通过配置文件来指定代理订阅规则从而实现代理订阅,适用于有规律可循的静态的代理订阅需求。
代理订阅功能默认关闭,开启此功能需要修改 etc/emqx.conf 文件中的 module.subscription 配置项。默认 off 表示关闭,如需开启请修改为 on 。
module.subscription = off
# 配置代理订阅规则
仅仅开启并不意味代理订阅已经工作,你还需要配置相应的规则,EMQ X 的代理订阅规则支持用户自行配置,用户可以自行添加多条代理订阅规则,每条代理订阅规则都需要指定 Topic 和 QoS,规则的数量没有限制,代理订阅规则的格式如下:
module.subscription.<number>.topic = <topic>
module.subscription.<number>.qos = <qos>
2
在配置代理订阅的主题时,EMQ X 提供了 %c 和 %u 两个占位符供用户使用,EMQ X 会在执行代理订阅时将配置中的 %c 和 %u 分别替换为客户端的 Client ID 和 Username ,需要注意的是, %c 和 %u 必须占用一整个主题层级。
例如,在 etc/emqx.conf 文件中添加以下代理订阅规则:
module.subscription.1.topic = client/%c
module.subscription.1.qos = 1
module.subscription.2.topic = user/%u
module.subscription.2.qos = 2
module.subscription.3.topic = testtopic/#
module.subscription.3.qos = 2
2
3
4
5
6
当一个客户端连接 EMQ X 的时候,假设客户端的 Client ID 为 testclient , Username 为 tester ,根据上文的配置规则,代理订阅功能会主动帮客户端订阅 QoS 为 1 的 client/testclient 和 QoS 为 2 的user/tester 这两个主题。
这些配置都是基于配置文件实现的静态代理订阅,开源版本的EMQ X目前只支持这种静态代理订阅,收费的EMQ X Enterprise 版本中支持动态代理订阅,通过外部数据库设置主题列表在设备连接时读取列表实现代理订阅。
但是我们也可以结合使用Webhook和管理监控的API对处于连接状态的客户端实现动态的订阅和取消订阅的操作。比如我们在客户端连接上来之后通过程序调用API来实现为客户端自动订阅主题。
# 基于Webhook和API实现动态代理订阅
# 动态代理订阅需求
客户端连接上来之后动态订阅主题,客户端下线后自动取消订阅
# 代码实现
实现思路分析:
开启了 emqx_web_hook 组件后,EMQ X的事件都会勾起对我们配置的webhook接口进行回调,在该webhook接口中我们能够获取客户端的相关信息比如 clientId,username 等,然后我们可以在该接口方法中针对该客户端自动订阅某一主题,订阅的实现我们基于EMQ X给我们提供的监控管理的相关HTTP API,意味着我们调用相关的HTTP API可完成客户端订阅的功能,相关的HTTP API可在Dashboard中查看,也可以在官方的产品文档中查找: https://docs.emqx.io/broker/latest/cn/advanced/http-api.html
@Slf4j
@RestController
@RequestMapping("mqtt")
public class DynamicProxyController {
private Map clientStatusMap = new HashMap();
@PostMapping("webhook")
public void webhook(@RequestBody Map params) {
log.info("data: " + params);
String action = (String) params.get("action");
String clientId = (String) params.get("clientid");
if(action.equals("client_connected")){
//客户端成功接入
clientStatusMap.put(clientId,true);
//自动订阅autosub主题
autoSub(clientId,"autosub/#",QosEnum.QoS2,true);
}
if(action.equals("client_disconnected")){
//客户端断开连接
clientStatusMap.put(clientId,false);
//自动取消订阅autosub主题
autoSub(clientId,"autosub/#",QosEnum.QoS2,false);
}
}
/**
* 自动订阅或取消订阅
* @param clientId
* @param topicfilter
* @param qos
* @param sub
*/
private void autoSub(String clientId, String topicfilter, QosEnum qos, boolean sub) {
RestTemplate restTemplate = new RestTemplateBuilder()
.basicAuthentication("admin", "public")
.defaultHeader(HttpHeaders.CONTENT_TYPE,
org.springframework.http.MediaType.APPLICATION_JSON_VALUE)
.build();
//装配参数
Map param = new HashMap();
param.put("clientid", clientId);
param.put("qos", qos.value());
param.put("topic", topicfilter);
log.info("请求emq的相关参数:{}", param);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(org.springframework.http.MediaType.APPLICATION_JSON);
HttpEntity<Object> entity = new HttpEntity<Object>(param, headers);
//自动订阅
if (sub) {
new Thread(() -> {
ResponseEntity<String> responseEntity =
restTemplate.postForEntity("http://192.168.200.129:8081/api/v4/mqtt/subscribe", entity,
String.class);
log.info("自动订阅的结果:{}", responseEntity.getBody());
}).start();
return;
}
//自动取消订阅
ResponseEntity<String> responseEntity =
restTemplate.postForEntity("http://192.168.200.129:8081/api/v4/mqtt/unsubscribe", entity,
String.class);
log.info("自动取消订阅的结果:{}", responseEntity.getBody());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 主题重写
# 简介
EMQ X 的主题重写功能支持根据用户配置的规则在客户端订阅主题、发布消息、取消订阅的时候将 A 主题重写为 B 主题。
EMQ X 的保留消息和延迟发布可以与主题重写配合使用,例如,当用户想使用延迟发布功能,但不方便修改客户端发布的主题时,可以使用主题重写将相关主题重写为延迟发布的主题格式。
主题重写功能默认关闭,开启此功能需要修改 etc/emqx.conf 文件中的 module.rewrite 配置项。默认off 表示关闭,如需开启请修改为 on 。
module.rewrite = off
# 配置主题重写规则
EMQ X 的主题重写规则需要用户自行配置,用户可以自行添加多条主题重写规则,规则的数量没有限制,但由于任何携带主题的 MQTT 报文都需要匹配一遍重写规则,因此此功能在高吞吐场景下带来的性能损耗与规则数量是成正比的,用户需要谨慎地使用此功能。
每条主题重写规则的格式如下:
module.rewrite.rule.<number> = 主题过滤器 正则表达式 目标表达式
每条重写规则都由以空格分隔的主题过滤器、正则表达式、目标表达式三部分组成。在主题重写功能开启的前提下,EMQ X 在收到诸如 PUBLISH 报文等带有主题的 MQTT 报文时,将使用报文中的主题去依次匹配配置文件中规则的主题过滤器部分,一旦成功匹配,则使用正则表达式提取主题中的信息,然后替换至目标表达式以构成新的主题。
目标表达式中可以使用 $N 这种格式的变量匹配正则表达中提取出来的元素, $N 的值为正则表达式中提取出来的第 N 个元素,比如 $1 即为正则表达式提取的第一个元素。
需要注意的是,EMQ X 使用倒序读取配置文件中的重写规则,当一条主题可以同时匹配多条主题重写规则的主题过滤器时,EMQ X 仅会使用它匹配到的第一条规则进行重写,如果该条规则中的正则表达式与 MQTT 报文主题不匹配,则重写失败,不会再尝试使用其他的规则进行重写。因此用户在使用时需要谨慎的设计 MQTT 报文主题以及主题重写规则。
# 主题重写配置
假设 etc/emqx.conf 文件中已经添加了以下主题重写规则:
module.rewrite.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
module.rewrite.rule.2 = x/# ^x/y/(.+)$ z/y/x/$1
module.rewrite.rule.3 = x/y/+ ^x/y/(\d+)$ z/y/$1
2
3
正则表达式解析: ^ 匹配输入字符串的开始位置,除非在方括号表达式中使用,当该符号在方括号表达式中使用时,表示 不接受该方括号表达式中的字符集合 $ 匹配输入字符串的结尾位置 ( ) 表示一个标记一个子表达式的开始和结束位置, [ 标记一个中括号表达式的开始 . 匹配除换行符 \n 之外的任何单字符,
- 匹配前面的子表达式一次或多次
- 匹配前面的子表达式零次或多次 ? 匹配前面的子表达式零次或一次 | 指明两项之间的一个选择 {n} n 是一个非负整数。匹配确定的 n 次 {n,} n 是一个非负整数。至少匹配n 次 {n,m} m 和 n 均为非负整数,其中n <= m。最少匹配 n 次且最多匹配 m 次 \d 匹配一个数字字符。等价于 [0-9]
验证:
我们在 emqx.conf 配置文件中添加如下配置:
## Rewrite Module
## Enable Rewrite Module.
##
## Value: on | off
module.rewrite = on
## {rewrite, Topic, Re, Dest}
module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
## x/y/1 z/y/1
## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
2
3
4
5
6
7
8
9
# 黑名单
# 简介
EMQ X 为用户提供了黑名单功能,用户可以通过相关的 HTTP API 将指定客户端加入黑名单以拒绝该客户端访问,除了客户端标识符以外,还支持直接封禁用户名甚至 IP 地址。
黑名单只适用于少量客户端封禁需求,如果有大量客户端需要认证管理,我们需要使用认证功能来实现。
在黑名单功能的基础上,EMQ X 支持自动封禁那些被检测到短时间内频繁登录的客户端,并且在一段时间内拒绝这些客户端的登录,以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。
需要注意的是,自动封禁功能只封禁客户端标识符,并不封禁用户名和 IP 地址,即该机器只要更换客户端标识符就能够继续登录。
此功能默认关闭,用户可以在 emqx.conf 配置文件中将 enable_flapping_detect 配置项设为 on 以启用此功能。
zone.external.enable_flapping_detect = off
用户可以在 emqx.conf 配置文件中调整触发阈值和封禁时长等配置:
flapping_detect_policy = 30, 1m, 5m
此配置项的值以 , 分隔,依次表示客户端离线次数,检测的时间范围以及封禁时长,因此上述默认配置即表示如果客户端在 1 分钟内离线次数达到 30 次,那么该客户端使用的客户端标识符将被封禁 5 分钟。
# 提供黑名单API接口
# 速率限制
# 速率限制简介和配置
EMQ X 提供对接入速度、消息速度的限制:当客户端连接请求速度超过指定限制的时候,暂停新连接的建立;当消息接收速度超过指定限制的时候,暂停接收消息。
速率限制是一种 backpressure 方案,从入口处避免了系统过载,保证了系统的稳定和可预测的吞吐。速率限制可在 etc/emqx.conf 中配置:
backpressure :背压,反向压力
- max_conn_rate 是单个 emqx 节点上连接建立的速度限制。 1000 代表每秒最多允许 1000 个客户端接入。
- publish_limit 是单个连接上接收 PUBLISH 报文的速率限制。 100,10s 代表每个连接上允许收到的最大PUBLISH 消息速率是每 10 秒 100 个。
- rate_limit 是单个连接上接收 TCP数据包的速率限制。 100KB,10s 代表每个连接上允许收到的最大 TCP报文速率是每 10 秒 100KB。
publish_limit 和 rate_limit 提供的都是针对单个连接的限制,EMQ X 目前没有提供全局的消息速率限制。
# 速率限制原理
EMQ X 使⽤令牌桶Token Bucket算法来对所有的 Rate Limit 来做控制。令牌桶算法 的逻辑如下图:
- 存在一个可容纳令牌(Token) 的最大值 burst 的桶(Bucket),最大值 burst 简记为 b 。
- 存在一个 rate 为每秒向桶添加令牌的速率,简记为 r 。当桶满时则不不再向桶中加⼊入令牌。
- 每当有 1 个(或 N 个)请求抵达时,则从桶中拿出 1 个 (或 N 个) 令牌。如果令牌不不够则阻塞,等待令牌的⽣生成。 由此可知该算法中:
- 长期来看,所限制的请求速率的平均值等于 rate 的值。
- 记实际请求达到速度为 M,且 M > r,那么,实际运⾏中能达到的最大(峰值)速率为 M = b + r,证明:容易想到,最大速率 M 为:能在1个单位时间内消耗完满状态令牌桶的速度。而桶中令牌的消耗速度为M - r,故可知:b / (M - r) = 1,得 M = b + r
# 令牌桶算法在EMQX中的应用
当使用如下配置做报文速率限制的时候:
listener.tcp.external.rate_limit = 100KB,10s
EMQ X 将使用两个值初始化每个连接的 rate-limit 处理器:
- rate = 100 KB / 10s = 10240 B/s
- burst = 100 KB = 102400 B
根据消息速率限制原理中的算法,可知:
- 长期来看允许的平均速率限制为 10240 B/s
- 允许的峰值速率为 102400 + 10240 = 112640 B/s
为提高系统吞吐,EMQ X 的接入模块不会一条一条的从 socket 读取报文,而是每次从 socket 读取 N 条报文。rate-limit 检查的时机就是在收到这 N 条报文之后,准备继续收取下个 N 条报文之前。故实际的限制速率不会如算法一样精准。EMQ X 只提供了一个大概的速率限制。 N 的值可以在 etc/emqx.conf 中配置:
# 飞行窗口和消息队列
# 简介
为了提高消息吞吐效率和减少网络波动带来的影响,EMQ X 允许多个未确认的 QoS 1 和 QoS 2 报文同时存在于网路链路上。这些已发送但未确认的报文将被存放在飞行窗口(Inflight Window)中直至完成确认。
当网络链路中同时存在的报文超出限制,即飞行窗口到达长度限制(在emqx.conf配置文件中的max_inflight 相关配置)时,EMQ X 将不再发送后续的报文,而是将这些报文存储在 Message Queue 中。一旦飞行窗口中有报文完成确认,Message Queue 中的报文就会以先入先出的顺序被发送,同时存储到飞行窗口中。
需要注意的是,如果 Message Queue 也到达了长度限制,后续的报文将依然缓存到 Message Queue,但相应的 Message Queue 中最先缓存的消息将被丢弃。因此,根据你的实际情况配置一个合适的 Message Queue 长度限制(在emqx.conf配置文件中的 max_mqueue_len 相关配置)是非常重要的。
# 飞行队列与Receive Maximum
MQTT v5.0 协议为 CONNECT 报文新增了一个 Receive Maximum 的属性,官方对它的解释是:
客户端使用此值限制客户端愿意同时处理的 QoS 为 1 和 QoS 为 2 的发布消息最大数量。没有机制可以限制服务端试图发送的 QoS 为 0 的发布消息 。
也就是说,服务端可以在等待确认时使用不同的报文标识符向客户端发送后续的 PUBLISH 报文,直到未被确认的报文数量到达 Receive Maximum 限制。
不难看出, Receive Maximum 其实与 EMQ X 中的飞行窗口机制如出一辙,只是在 MQTT v5.0 协议发布前,EMQ X 就已经对接入的 MQTT 客户端提供了这一功能。现在,使用 MQTT v5.0 协议的客户端将按照 ReceiveMaximum 的规范来设置飞行窗口的最大长度,而更低版本 MQTT 协议的客户端则依然按照配置来设置。
配置项说明:
# 消息重传
# 简介
消息重传 (Message Retransmission) 是属于 MQTT 协议标准规范的一部分。
协议中规定了作为通信的双方 服务端 和 客户端 对于自己发送到对端的 PUBLISH 消息都应满足其 服务质量(Quality of Service levels) 的要求。如:
QoS 1:表示 消息至少送达一次 (At least once delivery);即发送端会一直重发该消息,除非收到了对端对该消息的确认。意思是在 MQTT 协议的上层(即业务的应用层)相同的 QoS 1 消息可能会收到多次
QoS 2:表示 消息只送达一次 (Exactly once delivery);即该消息在上层仅会接收到一次。
虽然,QoS 1 和 QoS 2 的 PUBLISH 报文在 MQTT 协议栈这一层都会发生重传,但请你谨记的是:
- QoS 1 消息发生重传后,在 MQTT 协议栈上层,也会收到这些重发的 PUBLISH 消息。
- QoS 2 消息无论如何重传,最终在 MQTT 协议栈上层,都只会收到一条 PUBLISH 消息
# 基础配置
有两种场景会导致消息重发:
- PUBLISH 报文发送给对端后,规定时间内未收到应答。则重发这个报文。
- 在保持会话的情况下,客户端重连后;EMQ X 会自动重发 未应答的消息,以确保 QoS 流程的正确。在 etc/emqx.conf 中可配置:
# 规则引擎
# 规则引擎概述
# 简介
EMQ X Rule Engine (以下简称规则引擎) 用于配置 EMQ X 消息流与设备事件的处理、响应规则。
规则引擎用于配置一套规则,该规则是针对EMQ X的消息流和设备事件如何处理的一套细则。
规则引擎不仅提供了清晰、灵活的 "配置式" 的业务集成方案,简化了业务开发流程,提升用户易用性,降低业务系统与 EMQ X 的耦合度;也为 EMQ X 的私有功能定制提供了一个更优秀的基础架构。
EMQ X 在 消息发布或事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息。
消息发布
规则引擎借助响应动作可将特定主题的消息处理结果存储到关系型数据库(mysql,PostgreSQL),NoSql(Redis,MongoDB),发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。
事件触发
规则引擎使用 $events/ 开头的虚拟主题(事件主题)处理 EMQ X 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS 1 QoS 2 的消息抵达记录、设备上下线记录等业务中。
# 应用场景
- 动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则引擎配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力;
- 数据筛选:车辆网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于40 km/h 时的数据,此场景下可以使用规则引擎对消息进行条件过滤,向业务消息队列写入满足条件的数据
- 消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则引擎将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置;
- 消息编解码:其他公共协议 / 私有 TCP 协议接入、工控行业等应用场景下,可以通过规则引擎的本地处理函数(可在 EMQ X 上定制开发)做二进制 / 特殊格式消息体的编解码工作;亦可通过规则引擎的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力。
# 规则引擎的组成
使用 EMQ X 的规则引擎可以灵活地处理消息和事件。使用规则引擎可以方便地实现诸如将消息转换成指定格式,然后存入数据库表,或者发送到消息队列等
与 EMQ X 规则引擎相关的概念包括: 规则(rule)、动作(action)、资源(resource) 和 资源类型(resource-type)。 规则、动作、资源的关系:
规则: {
SQL 语句,
动作列表: [
{
动作1,
动作参数,
绑定资源: {
资源配置
}
},
{
动作2,
动作参数,
绑定资源: {
资源配置
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 规则(Rule): 规则由 SQL 语句和动作列表组成。动作列表包含一个或多个动作及其参数
- SQL 语句用于筛选或转换消息中的数据
- 动作(Action) 是 SQL 语句匹配通过之后,所执行的任务。动作定义了一个针对数据的操作。 动作可以绑定资源,也可以不绑定。例如,“inspect” 动作不需要绑定资源,它只是简单打印数据内容和动作参数。而“data_to_webserver” 动作需要绑定一个 web_hook 类型的资源,此资源中配置了 URL。
- 资源(Resource): 资源是通过资源类型为模板实例化出来的对象,保存了与资源相关的配置(比如数据库连接地址和端口、用户名和密码等) 和系统资源(如文件句柄,连接套接字等)。
- 资源类型 (Resource Type): 资源类型是资源的静态定义,描述了此类型资源需要的配置项。
注意: 动作和资源类型是由 emqx 或插件的代码提供的,不能通过 API 和 CLI 动态创建。
总的来说:规则描述了 数据从哪里来、如何筛选并处理数据、处理结果到哪里去 三个配置,即一条可用的规则包含三个要素:
- 触发事件:规则通过事件触发,触发时事件给规则注入事件的上下文信息(数据源),通过 SQL 的FROM 子句指定事件类型;
- 处理规则(SQL):使用 SELECT 子句 和 WHERE 子句以及内置处理函数, 从上下文信息中过滤和处理数据;
- 响应动作:如果有处理结果输出,规则将执行相应的动作,如持久化到数据库、重新发布处理后的消息、转发消息到消息队列等。一条规则可以配置多个响应动作。
# SQL语句
# SQL语法
FROM、SELECT 和 WHERE 子句:
SQL 语句用于从原始数据中,根据条件筛选出字段,并进行预处理和转换,基本格式为:
SELECT <字段名> FROM <主题> [WHERE <条件>]
FROM、SELECT 和 WHERE 子句:
- FROM 子句将规则挂载到某个主题上(向该主题发布消息时触发,该主题是事件主题则事件发生时触发)
- SELECT 子句用于选择输出结果中的字段
- WHERE 子句用于根据条件筛选消息
FOREACH、DO 和 INCASE 子句:
如果对于一个数组数据,想针对数组中的每个元素分别执行一些操作并执行 Actions,需要使用 FOREACH-DO-INCASE 语法。其基本格式为:
FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]
- FOREACH 子句用于选择需要做 foreach 操作的字段,注意选择出的字段必须为数组类型
- DO 子句用于对 FOREACH 选择出来的数组中的每个元素进行变换,并选择出感兴趣的字段
- INCASE 子句用于对 DO 选择出来的某个字段施加条件过滤
其中 DO 和 INCASE 子句都是可选的。DO 相当于针对当前循环中对象的 SELECT 子句,而 INCASE 相当于针对当前循环中对象的 WHERE 语句。
{
"time": "2020-04-24",
"users": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}
2
3
4
5
6
7
8
# SQL语句相关示例
# 基本语法举例
从 topic 为 "t/a" 的消息中提取所有字段:
SELECT * FROM "t/a"
1从 topic 为 "t/a" 或 "t/b" 的消息中提取所有字段:
SELECT * FROM "t/a","t/b"
1从 topic 能够匹配到 't/#' 的消息中提取所有字段。
SELECT * FROM "t/#"
1从 topic 能够匹配到 't/#' 的消息中提取 qos, username 和 clientid 字段:
SELECT qos, username, clientid FROM "t/#"
1从任意 topic 的消息中提取 username 字段,并且筛选条件为 username = 'Steven':
SELECT username FROM "#" WHERE username='Steven'
1从任意 topic 的 JSON 消息体(payload) 中提取 x 字段,并创建别名 x 以便在 WHERE 子句中使用。WHERE 子句限定条件为 x = 1。下面这个 SQL 语句可以匹配到消息体 {"x": 1}, 但不能匹配到消息体 {"x": 2}:
SELECT payload as p FROM "#" WHERE p.x = 1
1类似于上面的 SQL 语句,但嵌套地提取消息体中的数据,下面的 SQL 语句可以匹配到 JSON 消息体 {"x":{"y": 1}}:
SELECT payload as a FROM "#" WHERE a.x.y = 1
1在 clientid = 'c1' 尝试连接时,提取其来源 IP 地址和端口号:
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
1筛选所有订阅 't/#' 主题且订阅级别为 QoS1 的 clientid:
SELECT clientid FROM "$events/session_subscribed" WHERE topic = 't/#' and qos = 1
1筛选所有订阅主题能匹配到 't/#' 且订阅级别为 QoS1 的 clientid。注意与上例不同的是,这里用的是主题匹配操作符 '=~',所以会匹配订阅 't' 或 't/+/a' 的订阅事件:
SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
1FROM 子句后面的主题需要用双引号 "" 引起来。
WHERE 子句后面接筛选条件,如果使用到字符串需要用单引号 '' 引起来。
FROM 子句里如有多个主题,需要用逗号 "," 分隔。例如 SELECT * FROM "t/1", "t/2" 。
可以使用使用 "." 符号对 payload 进行嵌套选择
# 遍历语法举例
假设有 ClientID 为 c_steve 、主题为 t/1 的消息,消息体为 JSON 格式,其中 sensors 字段为包含多个Object 的数组:
{
"date": "2020-04-24",
"sensors": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}
2
3
4
5
6
7
8
示例1: 要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx} 主题,内容为${name} 。即最终规则引擎将会发出 3 条消息:
- 主题:sensors/0 内容:a
- 主题:sensors/1 内容:b
- 主题:sensors/2 内容:c
要完成这个规则,我们需要配置如下动作:
- 动作类型:消息重新发布 (republish)
- 目的主题:sensors/${idx}
- 目的 QoS:0
- 消息内容模板:${name}
以及如下 SQL 语句:
FOREACH
payload.sensors
FROM "t/#"
2
3
示例解析:
这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为:
[
{
"name": "a",
"idx": 0
},
{
"name": "b",
"idx": 1
},
{
"name": "c",
"idx": 2
}
]
2
3
4
5
6
7
8
9
10
11
12
13
14
因为选出来的结果中所有字段我们都需要因此不需要DO子句,没有条件过滤因为不需要INCASE子句
FOREACH 语句将会对于结果数组里的每个对象分别执行 "消息重新发布" 动作,所以将会执行重新发布动作 3次。
示例2: 要求将 sensors 里的 idx 值大于或等于 1 的对象,分别作为数据输入重新发布消息到sensors/${idx} 主题,内容为clientid=${clientid},name=${name},date=${date} 。即最终规则引擎将会发出 2条消息:
{
"date": "2020-04-24",
"sensors": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}
2
3
4
5
6
7
8
- 主题:sensors/1 内容:clientid=c_steve,name=b,date=2020-04-24
- 主题:sensors/2 内容:clientid=c_steve,name=c,date=2020-04-24
要完成这个规则,我们需要配置如下动作:
- 动作类型:消息重新发布 (republish)
- 目的主题:sensors/${idx}
- 目的 QoS:0
- 消息内容模板: clientid=${clientid},name=${name},date=${date}
以及如下 SQL 语句:
FOREACH
payload.sensors
DO
clientid,
item.name as name,
item.idx as idx
INCASE
item.idx >= 1
FROM "t/#"
2
3
4
5
6
7
8
9
示例解析:
这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors ; DO 子句选取每次操作需要的字段,这里我们选了外层的 clientid 字段,以及当前 sensor 对象的 name 和 idx 两个字段,注意 item 代表 sensors 数组中本次循环的对象。INCASE 子句是针对 DO 语句中字段的筛选条件,仅仅当 idx >= 1 满足条件。所以 SQL 的选取结果为:
[
{
"name": "b",
"idx": 1,
"clientid": "c_steve"
},
{
"name": "c",
"idx": 2,
"clientid": "c_steve"
}
]
2
3
4
5
6
7
8
9
10
11
12
FOREACH 语句将会对于结果数组里的每个对象分别执行 "消息重新发布" 动作,所以将会执行重新发布动作 2次。
在 DO 和 INCASE 语句里,可以使用 item 访问当前循环的对象,也可以通过在 FOREACH 使用 as 语法自定义一个变量名。所以本例中的 SQL 语句又可以写为:
FOREACH
payload.sensors as s
DO
clientid,
s.name as name,
s.idx as idx
INCASE
s.idx >= 1
FROM "t/#"
2
3
4
5
6
7
8
9
示例3: 在示例2 的基础上,去掉 clientid 字段 c_steve 中的 c_ 前缀
在 FOREACH 和 DO 语句中可以调用各类 SQL 函数,若要将 c_steve 变为 steve ,则可以把例2 中的 SQL改为:
FOREACH
payload.sensors as s
DO
nth(2, tokens(clientid,'_')) as clientid,
s.name as name,
s.idx as idx
INCASE
s.idx >= 1
FROM "t/#"
2
3
4
5
6
7
8
9
CASE-WHEN 语法示例
示例1: 将消息中 x 字段的值范围限定在 0~7 之间
SELECT
CASE WHEN payload.x < 0 THEN 0
WHEN payload.x > 7 THEN 7
ELSE payload.x
END as x
FROM "t/#"
2
3
4
5
6
SQL事件和字段 FROM 子句可用的事件主题
From子句除了写事件主题外就是普通的主题了,譬如 t/+,q/# 等。
SELECT 和 WHERE 子句可用的字段
SELECT 和 WHERE 子句可用的字段与事件的类型相关。其中 clientid , username 和 event 是通用字段,每种事件类型都有。
- 普通主题 (消息发布),比如 t/+,q/#等
- $events/message_delivered (消息投递)
$events/message_acked (消息确认)
$events/message_dropped (消息丢弃)
$events/client_connected (终端连接成功)
$events/client_disconnected (终端连接断开)
$events/session_subscribed (终端订阅成功)
$events/session_unsubscribed (取消终端订阅成功)
# SQL 运算符和函数
运算符号
SQL 语句中可用的函数
数学函数
数据类型判断函数
数据类型转换函数
字符串函数
Map 函数
数组函数
哈希函数
编解码函数
# Dashboard中测试SQL语句
Dashboard 界面提供了 SQL 语句测试功能,通过给定的 SQL 语句和事件参数,展示 SQL 测试结果。
在创建规则界面,输入 规则SQL,
SELECT CASE WHEN payload.x < 0 THEN 0 WHEN payload.x > 7 THEN 7 ELSE payload.x END as x FROM "t/#"
1
2
3
4
5
6
# 规则引擎案例
需求:现需要通过规则引擎提取出从 username=emq-client2 该客户端发送过来原始数据中的msg,user,orderNo 等数据,需要过滤 password 字段,同时还需要提取消息发布的qos信息,然后将最终过滤出来的消息通知到我们的web服务上。 消息主体是: rule/# ,消息数据模板为
{
"msg": "hello",
"user":"emq-client2",
"password":"123456",
"orderNo":"12345sfd"
}
2
3
4
5
6
# 创建资源
打开 emqx dashboard ,选择左侧的 “规则引擎” 选项卡,打开资源页面
点击新建,创建资源
开源的EMQ X Broker默认支持的资源类型只有这三种,EMQ X 企业版支持的更多,这些资源类型其实对应了规则匹配后的具体动作 Action ,
填写资源信
# 创建规则
打开 emqx dashboard ,选择左侧的 “规则引擎” 选项卡,打开规则页面
右上角点击 新建 创建规则,填写规则 SQL,消息数据模板为
{ "msg": "hello", "user":"emq-client2", "password":"123456", "orderNo":"12345sfd" }
1
2
3
4
5
6需要通过规则引擎提取出从 username=emq-client2 该客户端发送过来原始数据中的 msg,user,orderNo 等数据,需要过滤 password 字段,同时还需要提取消息发布的qos信息。
sql
SELECT payload.msg as msg, payload.user as user, payload.orderNo as orderNo, qos FROM "rule/#" WHERE username = 'emq-client2'
1
2
3
4
5
6
7
8
9关联动作:
# 创建资源webhook接口
修改原有代码,添加一个http接口用来接收emq通过Post发送过来的数据,在这里我们只是简单的输出到控制台,证明我们的程序已经接收到了通过自己创建的规则引擎转发过来的数据。在实际业务中,我们会将接收到的数据进行后续复杂的业务处理,这里只是简单演示。
测试
# 系统调优
EMQ X 消息服务器 4.x 版本 MQTT 连接压力测试到 130 万,在一台 8 核心、32G 内存的 CentOS 服务器上。100 万连接测试所需的 Linux 内核参数,网络协议栈参数,Erlang 虚拟机参数, EMQ X 消息服务器参数设置如下:
# Linux 操作系统参数
系统全局允许分配的最大文件句柄数:
在文件I/O中,要从一个文件读取数据,应用程序首先要调用操作系统函数并传送文件名,并选一个到该文件的路径来打开文件。该函数取回一个顺序号,即文件句柄(file handle),该文件句柄对于打开的文件是唯一的识别依据。要从文件中读取一块数据,应用程序需要调用函数ReadFile,并将文件句柄在内存中的地址和要拷贝的字节数传送给操作系统。当完成任务后,再通过调用系统函数来关闭该文件。
# 2 millions system-wide sysctl -w fs.file-max=2097152 sysctl -w fs.nr_open=2097152 echo 2097152 > /proc/sys/fs/nr_open
1
2
3
4file-max是所有进程最大的文件数 nr_open是单个进程可分配的最大文件数
允许当前会话 / 进程打开文件句柄数:
ulimit -n 1048576
1ulimit其实就是对单一程序的限制,进程级别的,ulimit 参数说明如下
/etc/sysctl.conf
持久化 'fs.file-max' 设置到 /etc/sysctl.conf 文件:
fs.file-max = 1048576
/etc/systemd/system.conf 设置服务最大文件句柄数:
DefaultLimitNOFILE=104857
/etc/security/limits.conf 是linux资源限制配置文件,限制用户进程的数量,limits.conf文件限制着用户可以使用的最大文件数,最大线程,最大内存等资源使用量。 /etc/security/limits.conf 持久化设置允许用户 / 进程打开文件句柄数:
#<domain> <type> <item> <value>
#
#* soft core 0
#* hard rss 10000
* soft nofile 1048576
* hard nofile 1048576
2
3
4
5
6
* soft nofile 655350 #任何用户可以打开的最大的文件描述符数量,默认1024,这里的数值会限制tcp连
接
* hard nofile 655350
* soft nproc 655350 #任何用户可以打开的最大进程数
* hard nproc 650000
@student hard nofile 65535
@student soft nofile 4096
@student hard nproc 50 #学生组中的任何人不能拥有超过50个进程,并且会在拥有30个进程时发出警告
@student soft nproc 30
2
3
4
5
6
7
8
9
hard和soft两个值都代表什么意思呢?
soft是一个警告值,而hard则是一个真正意义的阀值,超过就会报错
2
# TCP 协议栈网络参数
并发连接 backlog 设置:
sysctl -w net.core.somaxconn=32768 #backlog值
sysctl -w net.ipv4.tcp_max_syn_backlog=16384 #控制半连接的队列的长度
sysctl -w net.core.netdev_max_backlog=16384 #网卡设备的backlog
2
3
TCP 通过三次握手建立连接的过程应该都不陌生了。从服务器的角度看,它分为以下几步
- 将 TCP 状态设置为 LISTEN 状态,开启监听客户端的连接请求
- 收到客户端发送的 SYN 报文后, TCP 状态切换为 SYN RECEIVED ,并发送 SYN ACK 报文
- 收到客户端发送的 ACK 报文后, TCP 三次握手完成,状态切换为 ESTABLISHED在 Unix 系统中,开启监听是通过 listen 完成。
int listen(int sockfd, int backlog)
1listen 有两个参数,第一个参数 sockfd 表示要设置的套接字,本文主要关注的是其第二个参数backlog ;
backlog :将其描述为已完成的连接队列( ESTABLISHED )与未完成连接队列( SYN_RCVD )之和的上限
一般我们将 ESTABLISHED 状态的连接称为全连接,而将 SYN_RCVD 状态的连接称为半连接
当服务器收到一个 SYN 后,它创建一个子连接加入到 SYN_RCVD 队列。在收到 ACK 后,它将这个子连接移动到 ESTABLISHED 队列。最后当用户调用 accept() 时,会将连接从 ESTABLISHED 队列取出。 backlog对应参数是:net.core.somaxconn tcp_max_syn_backlog :这个参数是控制半连接的队列的长度;这个表现出来的故障是: 无法建立连接。 netdev_max_backlog:表示网卡设备的backlog, 因为网卡接收数据包的速度远大于内核处理这些数据包的速度,所以,就出现了网卡设备的backlog.
可用端口范围:
sysctl -w net.ipv4.ip_local_port_range='1000 65535'
在对于繁忙的网络服务器,如代理服务器或负载均衡器,我们可能需要增加网络端口范围来增强它的处理能力 在Linux上,有一个sysctl参数 ip_local_port_range ,可用于定义网络连接可用作其源(本地)端口的最小和最大端口的限制,同时适用于TCP和UDP连接。
TCP Socket 读写 Buffer 设置:
sysctl -w net.core.rmem_default=262144 #默认的接收窗口大小
sysctl -w net.core.wmem_default=262144 #默认的发送窗口大小
sysctl -w net.core.rmem_max=16777216 #最大的TCP数据接收缓冲
sysctl -w net.core.wmem_max=16777216 #最大的TCP数据发送缓冲
sysctl -w net.core.optmem_max=16777216 #每个套接字所允许的最大缓冲区的大小。
#sysctl -w net.ipv4.tcp_mem='16777216 16777216 16777216'
sysctl -w net.ipv4.tcp_rmem='1024 4096 16777216'
#定义socket使用的内存。第一个值是为socket接收缓冲区分配的最少字节数;第二个值是默认值(该值会被rmem_default覆盖),缓冲区在系统负载不重的情况下可以增长到这个值;第三个值是接收缓冲区空间的最大字节数(该值会被rmem_max覆盖)
sysctl -w net.ipv4.tcp_wmem='1024 4096 16777216'
#定义socket使用的内存。第一个值是为socket发送缓冲区分配的最少字节数;第二个值是默认值(该值会被wmem_default覆盖),缓冲区在系统负载不重的情况下可以增长到这个值;第三个值是发送缓冲区空间的最大字节数(该值会被wmem_max覆盖)。
2
3
4
5
6
7
8
9
10
TCP 连接追踪设置:
sysctl -w net.nf_conntrack_max=1000000
sysctl -w net.netfilter.nf_conntrack_max=1000000
sysctl -w net.netfilter.nf_conntrack_tcp_timeout_time_wait=30
2
3
TIME-WAIT Socket 最大数量、回收与重用设置:
sysctl -w net.ipv4.tcp_max_tw_buckets=1048576
#在 TIME_WAIT 数量等于 tcp_max_tw_buckets 时,不会有新的 TIME_WAIT 产生
# 注意:不建议开启該设置,NAT 模式下可能引起连接 RST
# sysctl -w net.ipv4.tcp_tw_recycle=1
# sysctl -w net.ipv4.tcp_tw_reuse=1
2
3
4
5
FIN-WAIT-2 Socket 超时设置:
sysctl -w net.ipv4.tcp_fin_timeout=15
# Erlang 虚拟机参数
优化设置 Erlang 虚拟机启动参数,配置文件 emqx/etc/emqx.conf;
## Erlang Process Limit
node.process_limit = 2097152
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 1048576
2
3
4
# EMQ X 消息服务器参数
设置 TCP 监听器的 Acceptor 池大小,最大允许连接数。配置文件 emqx/etc/emqx.conf:
## TCP Listener
listener.tcp.external = 0.0.0.0:1883
listener.tcp.external.acceptors = 64
listener.tcp.external.max_connections = 1024000
2
3
4
测试客户端设置
测试客户端服务器在一个接口上,最多只能创建 65000 连接:
sysctl -w net.ipv4.ip_local_port_range="500 65535"
echo 1000000 > /proc/sys/fs/nr_open
ulimit -n 100000
2
3
emqtt_bench 并发连接测试工具: http://github.com/emqx/emqtt_bench