Thingsboard Thingsboard 之 MQTT 协议介绍

sds · 2020年03月03日 · 178 次阅读
本帖已被设为精华帖!

Thingsboard中文社区群聊二维码.png 可复制:121202538

中文社区:http://thingsboard.org.cn

TB 的 MQTT 设备协议
TB 官网: https://thingsboard.io/

TB GitHub: https://github.com/thingsboard/thingsboard

TB 提供的体验地址: http://demo.thingsboard.io/

BY Thingsboard team

以下内容是在原文基础上演绎的译文。除非另行注明,页面上所有内容采用知识共享-[署名(CC BY 2.5 AU)协议]
(http://creativecommons.org/licenses/by/2.5/au/deed.zh) 共享。

原文地址: ThingsBoard API 参考:MQTT 设备 API

视频演示:

https://cdn.iotschool.com/uploads/20200301/b7ba8c5eb7f562563bf7728dc8fdcde8.mp4

MQTT 基础知识

MQTT 是一种轻量级的发布 - 订阅消息传递协议,可能使其最适合各种物联网设备。您可以在此处找到有关 MQTT 的更多信息。
ThingsBoard 服务器节点充当 MQTT Broker,支持 QoS 级别 0(最多一次)和 1(至少一次)以及一组预定义主题。

客户端库设置

您可以在 Web 上找到大量 MQTT 客户端库。本文中的示例将基于Mosquitto,MQTT.jsPaho,要设置其中一个工具。

客户端库设置

您可以在 Web 上找到大量 MQTT 客户端库。本文中的示例将基于 Mosquitto,MQTT.js 和 Paho,要设置其中一个工具。

键值格式

默认情况下,ThingsBoard 支持 JSON 中的键值内容。Key 始终是一个字符串,而 value 可以是 string,boolean,double 或 long。也可以使用自定义二进制格式或某些序列化框架。有关详细信息,请参阅物模型。例如:

{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}

遥测上传 API

为了将遥测数据发布到 ThingsBoard 服务器节点,请将 PUBLISH 消息发送到以下主题:

v1/devices/me/telemetry

最简单的支持数据格式是:

{"key1":"value1", "key2":"value2"}

要么

[{"key1":"value1"}, {"key2":"value2"}]

请注意,在这种情况下,服务器端时间戳将分配给上传的数据!
如果您的设备能够获取客户端时间戳,您可以使用以下格式:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}

在上面的示例中,我们假设 “1451649600512” 是具有毫秒精度的 unix 时间戳。例如,值'1451649600512'对应于'Fri,2016年1月1日12:00:00.512 GMT'

属性 API

ThingsBoard 属性 API 允许设备
将客户端设备属性上载到服务器。
将属性更新发布到服务器
要将客户端设备属性发布到 ThingsBoard 服务器节点,请将 PUBLISH 消息发送到以下主题:

v1/devices/me/attributes

Thingsboard 的 MQTT 传输协议架构

因为 Thingsboard 最新 release,是基于微服务架构,不利用单独理解代码。
Thingsboard 源代码: https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/mqtt
本文基于上面源代码后,剔除相关的安全验证和处理之后搭建简易的讲解项目:
https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-MQTT

MQTT 框架

因为 Thingsboard 是一个 JVM 技术栈的 PaaS 平台,所以使用的是基于 Java 通讯框架的 Netty,如果有对 Netty 不太熟悉的同学,可以参考我之前搭建的 Netty 实践学习案例: https://github.com/sanshengshui/netty-learning-example

项目结构

├── IOT-Guide-MQTT.iml
├── pom.xml
└── src
    └── main
        └── java
            └── com
                └── sanshengshui
                    └── mqtt
                        ├── adapter
                        │   └── JsonMqttAdaptor.java // MQTT json转换器,在跟Thingsboard学习IOT-物模型有所讲解
                        ├── IOTMqttServer.java // MQTT服务
                        ├── MqttTopicMatcher.java
                        ├── MqttTopics.java 
                        ├── MqttTransportHandler.java //MQTT处理类
                        └── MqttTransportServerInitializer.java

项目代码讲解

IOTMqttServer

private static final int PORT = 1884;
   private static final String leakDetectorLevel = "DISABLED";
   private static final Integer bossGroupThreadCount = 1;
   private static final Integer workerGroupThreadCount = 12;
   private static final Integer maxPayloadSize = 65536;

   public static void main(String[] args) throws Exception {
       ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

       EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
       EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);

       try {

           ServerBootstrap b = new ServerBootstrap();
           b.group(bossGroup,workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .handler(new LoggingHandler(LogLevel.INFO))
                   .childHandler(new MqttTransportServerInitializer(maxPayloadSize));
           ChannelFuture f = b.bind(PORT);
           f.channel().closeFuture().sync();
       } finally {
           bossGroup.shutdownGracefully();
           workerGroup.shutdownGracefully();
       }
   }

第 8 行,设置服务端 Netty 内存读写泄漏级别,缺省条件下为:DISABLED

第 10 行和第 11 行,设置 boss 线程组和 work 线程组的线程数量。默认情况下,boss 线程组的线程数量为 1,work 线程组的数量为运行服务机器内核数量的 2 倍。

第 15 行,通过创建 ServerBootstrap 对象,在第 16 行设置使用 EventLoopGroup。

在 17 和 19 行,设置要被实例化的 NioServerSockerChannel 类,并设置最大的负载内容数量。

最后我们通过 shutdowGracefully() 函数优雅的关闭 bossGroup 和 workGroup。

MqttTransportHandler#processMqttMsg()

private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            processDisconnect(ctx);
            return;
        }

        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;

        }
    }

第3行,通过判断消息的固定头部是否为空,如果空;则通过 processDisconnect(ctx) 将设备连接关闭。

processDisconnect(channelHandlerContext ctx)
private void processDisconnect(ChannelHandlerContext ctx) {
        ctx.close();  // 关闭socket通道
    }

第 8 行,通过判断固定头部的 MQTT 消息类型,针对不同消息做相应的处理。
MqttTransportHandler#PublishDevicePublish
以下是对发布消息进行相关的解读,更多消息类型的处理类,大家请参考我上面的 IOT-Guide-MQTT 进行阅读。

private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
        try {
            if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { //如果主题为v1/devices/me/attributes
                JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg);
            } else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
            } else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
                JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
            }
        } catch (AdaptorException e) {

        }

    }

我上面的代码仅是对消息的主题进行判断,然后对主题内的内容进行物模型的解析,得到相关属性或者遥测数据的获得。

演示效果

我们通过 Paho 或者 MQTT.js 和服务进行连接,发布消息到以下主题:

v1/devices/me/telemetry

简易的数据格式如下:

{"key1":"value1", "key2":"value2"}

Paho 图示: image.png

服务器控制台打印数据:

七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf2bfb3a8] REGISTERED
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816]
key= 1563946708305
属性名=temperature 属性值=38
属性名=humidity 属性值=60

如上所示,希望大家对 Thingsboard 的 IOT 架构-MQTT 设备协议这块有所了解!

本文作者: 穆书伟
本文链接: https://blog.grozacloud.com/2019/07/24/-跟着 Thingsboard 学 IOT 架构-MQTT 设备协议/物联网时代
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

sds 将本帖设为了精华贴 03月03日 03:46
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册