首页 > 基础资料 博客日记

一文学习入门 ThingsBoard 开源物联网平台

2026-04-16 22:00:02基础资料围观1

极客资料网推荐一文学习入门 ThingsBoard 开源物联网平台这篇文章给大家,欢迎收藏极客资料网享受知识的乐趣

2026-04-02

ThingsBoard

Wath is ThingsBoard ?

ThingsBoard  is an open-source IoT platform for data collection, processing, visualization, and device management

It enables device connectivity via industry standard IoT protocols - MQTT, CoAP and HTTP and supports both cloud and on-premises deployments. ThingsBoard combines scalability, fault-tolerance and performance so you will never lose your data.

https://thingsboard.io/docs/
https://thingsboard.io/docs/user-guide/install/installation-options/
https://thingsboard.io/docs/pe/guides/

github源码仓库 - https://github.com/thingsboard/
前端Vue3 适配 4.2X - https://gitee.com/oliver225/thingsboard-ui-vue3
前端Vue3 适配 4.3X - https://github.com/oliver225/thingsboard-ui-vue3

主要概念和设计的逻辑

跟国内物联网平台都不太一样, 先理清概念关系..

用户关系图

image

实体-设备/资产关系图

image

https://thingsboard.io/docs/user-guide/device-profiles/

实体关系说明

ThingsBoard 提供用户界面和 REST API,用于在您的物联网应用中配置和管理多种实体类型及其关系。支持的实体包括:

  • 租户——您可以将租户视为一个独立的商业实体:它是拥有或生产设备和资产的个人或组织;租户可能拥有多个租户管理员用户和数百万客户、设备和资产;
  • 客户——客户也是一个独立的商业实体:购买或使用租户设备和/或资产的个人或组织;客户可能拥有多个用户和数百万台设备和/或资产;
  • 用户- 用户可以浏览仪表盘和管理实体;
  • 设备——基本的物联网实体,可以生成遥测数据并处理 RPC 命令。例如,传感器、执行器、开关;
  • 资产——抽象的物联网实体,可能与其他设备和资产相关。例如,工厂、田地、车辆;
  • 实体视图- 如果您只想与客户共享部分设备或资产数据,则此视图非常有用;
  • 警报- 用于识别您的资产、设备或其他实体问题的事件;
  • 仪表盘- 可视化您的物联网数据,并通过用户界面控制特定设备;
  • 规则节点- 用于处理传入消息、实体生命周期事件等的处理单元;
  • 规则链——定义规则引擎中的处理流程。可能包含多个规则节点,并链接到其他规则链;

每个实体都支持:

  • 属性——与实体关联的静态和半静态键值对。例如,序列号、型号、固件版本;
  • 时间序列数据——可用于存储、查询和可视化的时间序列数据点。例如,温度、湿度、电池电量;
  • 关系——与其他实体的定向连接。例如,包含、管理、拥有、生产。

部分实体支持用户配置文件:

  • 租户配置文件- 包含多个租户的通用设置:实体、API 和速率限制等。每个租户在某一时刻都只有一个配置文件。
  • 设备配置文件- 包含多个设备的通用设置:处理和传输配置等。每个设备在某一时刻只有一个配置文件。可以理解为配置 产品(定义设备接入平台的配置, 类似 物模型)
  • 资产配置文件- 包含多个资产的通用设置:处理配置等。每个资产在某一时刻只有一个配置文件。

资产和设备的区别

特性 设备 (Device) 资产 (Asset)
角色定位 代表物理或虚拟的物联网硬件。 代表具有业务意义的物理或逻辑实体。可以理解为抽象出设备数据的逻辑视图,包括但不限于设备
数据产生 可以产生和上传遥测数据(如温度、湿度)。 本身不产生遥测数据。
典型示例 传感器、执行器、网关、智能电表。 建筑物、车辆、工厂、产线、办公室。

设备与资产关联

接口 - /api/relation
实体统一关联表 - relation

{
  "type": "Contains",
  "additionalInfo": null,
  "typeGroup": "COMMON",
  "from": {
    "entityType": "ASSET",
    "id": "db626fb0-37d1-11f1-b2ba-13123c3d838e"
  },
  "to": { "entityType": "DEVICE", "id": "c0a8b350-37d1-11f1-b2ba-13123c3d838e" }
}

整体的架构

https://thingsboard.io/docs/reference/

[[thingsboard-architecture.svg]]
image

ThingsBoard Transports

ThingsBoard 提供基于 MQTT、HTTP、CoAP 及 LwM2M 协议的 API,供设备应用程序 / 固件使用。各类协议 API 均由独立的服务端组件实现,隶属于 ThingsBoard「传输层」。其中,MQTT 传输组件还提供网关 API,用于对接可接入多台设备及 / 或传感器的网关设备。

Once the Transport receives the message from device, it is parsed and pushed to durable Message Queue. The message delivery is acknowledged to device only after corresponding message is acknowledged by the message queue.
传输层接收到设备上报的消息后(这里是Transports),会对消息进行解析并推送至持久化消息队列(这里指kafka)。只有当消息队列确认收到相应的消息后,才会向设备确认消息已送达。

注意一点 如果是非集群模式, 消息上来则是在内存中直接路由。

ThingsBoard Core

ThingsBoard 核心服务负责处理 REST API 调用与 WebSocket 订阅,同时维护当前活跃设备会话的最新信息,并监控设备在线状态。其底层基于 Actor 模型,为租户、设备等核心实体构建对应的 Actor 实例。平台节点可组建集群,各节点负责处理特定分区的入站消息。

ThingsBoard Rule Engine

规则引擎是系统的核心组件,负责处理所有入站消息。其底层同样基于 Actor 模型,为规则链、规则节点等核心实体实现对应的 Actor 实例。规则引擎节点可加入集群,每个节点负责处理特定分区的入站消息。

规则引擎订阅消息队列的入站数据流,仅在消息处理完成后才进行消息确认。系统支持多种策略,用于控制消息处理顺序与消息确认条件,详情可参考提交策略与处理策略相关说明。

ThingsBoard 规则引擎支持两种运行模式:共享模式与隔离模式。

  1. 共享模式下,规则引擎可处理多个租户的消息;
  2. 隔离模式下,规则引擎可配置为仅处理特定租户配置下的租户消息。

ThingsBoard Web UI

ThingsBoard 提供基于 Express.js 框架开发的轻量组件,用于托管静态 Web UI 资源。该组件为完全无状态设计,可配置项较少。静态 Web UI 包含应用程序打包资源,加载完成后,应用将通过 ThingsBoard 核心服务提供的 REST API 与 WebSocket API 运行。

ThingsBoard message queues

ThingsBoard 支持两类消息队列:Kafka 队列与内存队列。

  • Kafka 是一款应用广泛的分布式持久化消息队列系统,专为处理海量数据设计,适用于对高吞吐、容错性与可扩展性有严格要求的生产环境。
    [[N_Kafka]]

  • 内存队列为轻量、高效、简洁的队列实现,面向测试、小规模部署或开发环境使用。消息存储于内存而非磁盘,优先保证性能而非持久化。

使用可持久化、可扩展的 Kafka 队列,可使 ThingsBoard 实现背压控制与负载均衡。背压机制在流量峰值场景下至关重要。系统针对具体队列实现封装了抽象层,并统一维护两大核心概念:主题(Topic)与主题分区(Topic Partition)。一个主题可配置多个分区。由于多数队列实现不原生支持分区,系统采用 主题 + "." + 分区号 的命名格式。

ThingsBoard 消息生产者会根据实体 ID 的哈希值决定目标分区,因此同一实体的所有消息始终被写入同一分区。消息消费者通过 ZooKeeper 进行协调,并使用一致性哈希算法确定每个消费者应订阅的分区列表。在微服务模式下,每个服务还会基于唯一服务 ID 拥有专属的 Notifications 主题,该主题仅含单个分区。

ThingsBoard 使用以下 topics:

  • tb_transport.api.requests:传输层向核心服务发送通用 API 请求,用于校验设备凭证。
  • tb_transport.api.responses:核心服务向传输层返回设备凭证校验结果。
  • tb_core:传输层或规则引擎向核心服务推送消息,包括会话生命周期事件、属性与 RPC 订阅等。
  • tb_rule_engine:传输层或核心服务向规则引擎推送消息,包括上报遥测数据、设备状态、实体生命周期事件等。

设备的数据

https://thingsboard.io/docs/user-guide/attributes/

在平台中, 设备的数据主要分为两种 一种是属性, 一种是遥测数据 简而言之:

  1. 遥测数据 是高频变化的设备的实时数据, 时序数据(类似于 Jetlinks 或者阿里云的设备属性)
  2. 属性主要是为了提高平台的扩展性, 提供额外的元数据给设备, 或者第三方系统读取和写入 (类似于 Jetlinks 或者阿里云的设备标签)
类型 用途 存储 频率 典型数据
遥测 Telemetry 实时监测 存历史 温度、电压、GPS
属性 Attributes 描述 / 配置 只存最新 型号、阈值、版本

遥测数据

https://thingsboard.io/docs/user-guide/telemetry/

For MQTT API - https://thingsboard.io/docs/reference/mqtt-api/#telemetry-upload-api

ThingsBoard 内部将时间序列数据处理为带时间戳的键值对。我们将单个带时间戳的键值对称为一个数据点。这种键值格式的灵活性和简洁性使其能够轻松无缝地与市面上几乎所有物联网设备集成。键始终为字符串,本质上是数据点的键名;而值可以是字符串、布尔值、双精度浮点数、整数或 JSON 格式。

{
// "ts": 1527863043000, 可以不传, 默认使用服务器时间戳
 "temperature": 42.2, 
 "humidity": 70,
 "hvacEnabled": true,
 "hvacState": "IDLE",
 "configuration": {
    "someNumber": 42,
    "someArray": [1,2,3],
    "someNestedObject": {"key": "value"}
 }
}

内置的各种协议格式 - https://thingsboard.io/docs/reference/mqtt-api/#telemetry-upload-api

属性数据

https://thingsboard.io/docs/user-guide/attributes/

ThingsBoard supports three types of attributes: server-sideshared, and client-side. Each type has a specific purpose, access rules, and use cases, which we explain below.

客户端属性

这种类型的属性仅适用于设备。它用于将各种半静态数据从设备(客户端)报告给 ThingsBoard(服务器)。它类似于共享属性,但有一个重要的区别:设备固件/应用程序可以将属性值从设备发送到平台(设备可以修改它)。

客户端属性最常见的用途是报告设备状态。假设我们使用相同的楼宇监控解决方案,并来看几个示例:

  1. currentFirmwareVersion 属性 可用于向平台报告设备已安装的固件/应用程序版本。
  2. currentConfiguration属性 可用于向平台报告当前的固件/应用程序配置。
  3. 如果设备没有持久存储,则可以使用 currentState 过网络持久化和恢复当前的固件/应用程序状态。

用户端和服务器端应用程序可以通过 UI/REST API 浏览客户端属性,但无法更改这些属性。简而言之,客户端属性的值对于 UI/REST API 来说是只读的。

设备自己上传,服务端只读

共享属性

https://thingsboard.io/docs/user-guide/attributes/#shared-attributes
共享属性仅适用于设备类型的实体。其他类型的实体(例如资产或客户)无法使用共享属性。共享属性用于将配置或运行参数(例如: thresholds )从服务器发送到设备。

设备如何与共享属性交互?
设备无法发布或更改共享属性的值。从设备的角度来看,这些属性是只读的。设备的固件或应用程序可以通过以下方式与共享属性进行交互:

服务器下发,设备可读、可订阅

服务属性

几乎所有平台实体都支持此类属性:设备、资产、客户、租户、用户等。服务器端属性是指您可以通过管理界面或 REST API 进行配置的属性。设备固件无法访问服务器端属性。

仅仅平台内部使用,设备不可见, 不可用

平台<>设备(RPC逻辑)

https://thingsboard.io/docs/user-guide/rpc/

Server-side RPC feature allows you to send the request from the platform to the device and optionally get the response back to the platform.

The typical use cases of the server-side RPC calls is all sorts of remote control: reboot, turn the engine on/off, change state of the gpio/actuators, change configuration parameters, etc.

Server-side RPC is divided into one-way and two-way:

  • One-way RPC request does not expect device to provide any reply.
    image

  • Two-way RPC request expects to receive a response from the device within configurable timeout.
    image

Before version 3.3, ThingsBoard supported lightweight RPC only. The lightweight RPC calls are short-lived, typically within 30 seconds which is the default timeout of any REST API call to the platform. Since they are short-lived, there was no reason for storing them to the database. They lived in memory of the server, assuming that if server dies, the dashboard widget will send the same request to other ThingsBoard server in the cluster. The lightweight RPC consume low amount of resources since their processing does not invoke any input/output operations accept storing of the audit logs and rule engine messages.

上行

https://thingsboard.io/docs/user-guide/rpc/#client-side-rpc

设备向平台发送消息,该消息由规则引擎处理。规则引擎可能会使用设备属性、遥测数据或平台中存储的任何其他数据进行一些计算。如有需要,规则引擎还可以调用外部系统。消息处理完毕后,结果将发送回设备

下行

https://thingsboard.io/docs/user-guide/rpc/#server-side-rpc
平台控制设备(下行)的核心机制是 RPC(远程过程调用)

底层逻辑是

  1. 指令发出后,会进入根规则链(Root Rule Chain)进行处理。

    • 消息类型识别:请求会被封装成一个消息,其类型通常标记为 RPC_CALL_FROM_DASHBOARD 或类似的 RPC 类型。
    • 节点路由
      • 消息类型开关:根规则链通常包含一个“消息类型开关”节点,它根据消息类型将流量分发到不同的子规则链(例如“设备控制链”)。
      • 脚本转换:在控制链中,通常会使用脚本节点来构建具体的控制 payload。你需要在这里编写 JavaScript 代码,定义下发给设备的消息格式(例如 {"method": "setPower", "params": {"enabled": true}})。
      • RPC 调用节点:最后,消息会进入一个专门的 RPC 调用节点。这个节点负责将消息标记为 RPC 请求,并指定目标设备。
  2. 规则引擎处理完毕,消息会被推送到消息队列,随后由 MQTT 传输服务(MQTT Transport)消费。

  3. 设备处理完成后,必须向平台发送一条响应消息,告知执行结果。(注意: 响应中的 {requestId} 必须与请求中的一致)

持久化RPC的状态

https://thingsboard.io/docs/user-guide/rpc/#persistent-rpc

States
ThingsBoard tracks state of the persistent RPC. There are 7 available states:

  • QUEUED - RPC was created and saved to the database; No attempt to send the RPC to device yet; ThingsBoard will attempt to send the RPC immediately when device becomes online or if it is already online; The platform will attempt to send all pending RPC calls at once by default. In rare cased of constrained devices and multiple messages in the queue this may lead to overload of the network or device. To avoid the overload, you may enable sequential delivery of RPC calls using “ACTORS_RPC_SEQUENTIAL” configuration parameter.
  • SENT - ThingsBoard performed attempt to send the RPC to device.
  • DELIVERED - device confirmed that the RPC was delivered; This is the last step of processing for one-way RPC;
  • SUCCESSFUL - ThingsBoard received reply for the two-way RPC;
  • TIMEOUT - ThingsBoard transport layer (MQTT/CoAP/LwM2M, etc) detected timeout of the RPC delivery; The timeout is controlled using one of the corresponding configuration parameters: MQTT_TIMEOUT (10 seconds by default), COAP_TIMEOUT (10 seconds by default), LWM2M_TIMEOUT (120 seconds by default) By default, platform will not retry delivery of the RPC, and the state will change to FAILED. You may configure number of retries in the RPC body. The maximum number of retries is controlled by “ACTORS_RPC_MAX_RETRIES” configuration parameter (5 by default).
  • EXPIRED - The RPC was not delivered or platform did not receive the reply from device within configured expiration time;
  • FAILED - failed to deliver the RPC during configurable number of retries or device firmware does not support such a command.

调用链路

向设备发送 RPC 的调用链路

 REST API
    │ POST /api/rpc/twoway/{deviceId}
    ▼
  AbstractRpcController                          [tb-core]
    │ ToDeviceRpcRequest
    ▼
  DefaultTbCoreDeviceRpcService
    │ TbMsg(RPC_CALL_FROM_SERVER_TO_DEVICE)
    │ Kafka: tb_rule_engine.*
    ▼
  TbSendRPCRequestNode (规则链节点)             [tb-rule-engine]
    │ RuleEngineDeviceRpcRequest
    ▼
  DefaultTbRuleEngineRpcService
    │ 一致性哈希 → 找到设备所在 Core 节点
    │ Kafka: tb_core  (或本地直接调用)
    ▼
  DeviceActorMessageProcessor#processRpcRequest  [tb-core]
    │ ToTransportMsg
    │ Kafka: tb_transport.notifications.{nodeId}
    ▼
  DefaultTransportService (消费通知 topic)        [tb-transport]
    │ onToDeviceRpcRequest()
    ▼
  MqttTransportHandler
    │ Netty writeAndFlush
    ▼
  设备 (MQTT publish to v1/devices/me/rpc/request/{requestId})
  • RPC 请求必须经过规则引擎,规则链可以在中间做鉴权、过滤、转换
  • rpcSubscriptions 存储的是设备当前活跃 session 列表,若为空则请求会挂起直到设备上线(pending requests)
  • originServiceId 贯穿全程,确保响应能路由回发起请求的那个 Core 节点

告警系统

设备告警 - https://thingsboard.io/docs/user-guide/alarms/
规则告警 - https://thingsboard.io/docs/user-guide/alarm-rules/

告警状态(二元组)

告警的状态二元组

Together, these dimensions form four possible alarm states:

  • Active & Unacknowledged
  • Active & Acknowledged
  • Cleared & Unacknowledged
  • Cleared & Acknowledged

警报可以由用户手动清除,也可以在规则里条件满足时自动清除(所以有可能是 cleared 但是 unacknown 的状态)。

规则链系统

https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/?scriptfunctionfilterconfig=anonymous

规则节点总共分为6大类

Filter nodes

根据不同条件过滤和路由消息
Filter nodes are the routing and conditional logic components of ThingsBoard’s rule engine that examine messages and determine how they should be routed to downstream nodes based on various criteria.

Enrichment nodes

利用数据可给节点增加(丰富)元数据信息?
这些节点可以丰富消息的属性、最新的时间序列值、历史时间序列数据以及从各种来源(包括消息发送者、相关实体、当前租户或客户)获取的实体详细信息。

Transformation nodes

修改现有消息或创建新消息
Transformation nodes are the data processing and manipulation(操纵) components of ThingsBoard’s rule engine that modify the content, structure, or format of incoming messages.

Action nodes

https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/action/

Perform various actions based on configuration and message
根据配置和消息执行各种操作

  • 创建警报 — 为消息发送者创建新警报或更新现有的活动警报。
  • 清除警报 — 清除消息发送者现有的活动警报。
  • 计算字段 — 触发时间序列或属性数据的计算字段处理,而不将原始数据持久化到数据库中。
  • 删除关系 — 根据类型和方向,从所选实体到消息发送者的关系中删除关系。

External nodes

https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/
External nodes are the integration components of ThingsBoard’s rule engine that send messages to third-party services and external systems.

  • AI 请求 — 向大型语言模型发送带有可定制提示和可选文件附件的请求,将 AI 生成的响应作为传出消息数据返回。
  • MQTT — 使用 QoS 1(至少一次)将传入的消息数据发布到外部 MQTT 代理,支持动态主题模式、多种身份验证方法和 TLS/SSL 加密。

Flow nodes

https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/flow/

流程节点控制消息如何在规则引擎的不同部分之间移动。 这些节点可以在队列之间传输消息,以实现顺序处理或分离不同的工作负载,将消息转发到其他规则链,并将结果返回给调用规则链。

  • 规则链 — 将传入的消息转发到指定的规则链。
  • 检查点 — 将传入的消息转移到指定的队列。

自定义节点

https://thingsboard.io/docs/user-guide/contribution/rule-node-development/

核心概念

在 ThingsBoard 中:

  • 每条数据 → 被包装成 TbMsg
  • 每个节点 → 处理一条消息
  • 输出 → 发送给下一个节点(带 relation)
ThingsBoard 类比
Rule Chain 工作流 DAG
Rule Node 函数节点
TbMsg 消息对象
relationType 分支条件

节点生命周期

init() → 初始化(解析配置)  
onMsg() → 处理消息(核心写逻辑的地方)  
destroy() → 销毁

核心逻辑(onMsg)
真正写业务逻辑的地方:

@Override  
public void onMsg(TbContext ctx, TbMsg msg) {  
    // 处理 msg  
}

ThingsBoard 不会自动继续流程,你必须手动:
ctx.tellSuccess(msg);
或者:
ctx.tellNext(msg, "True");

节点定义(@RuleNode 注解)

这是“声明元数据”,类似前端组件注册

@RuleNode(  
    type = ComponentType.FILTER,  
    name = "check key",  
    relationTypes = {"True", "False"},  
    configClazz = MyConfig.class  
)
字段 本质
type 节点分类(影响UI分组)
name 节点名字
relationTypes 输出分支(非常重要)
configClazz 配置结构
configDirective UI配置(可选)

relationTypes = “你这个节点会产生几种出口”
例如:

  • True / False
  • Success / Failure

核心能力

获取输入内容能力

msg.getData()       // JSON 数据
msg.getMetadata()   // 元数据
msg.getOriginator() // 来源(设备)

使用平台能力

ctx.getDeviceService()   //设备服务
ctx.getTelemetryService()//遥测数据服务
ctx.getAlarmService()    //告警服务


ctx.getTelemetryService().save(...);

调用通常是异步的, 所以:

  • 不要阻塞, 使用回调

同一个设备的数据 → 永远发到同一个节点实例, 所以:

  • 可以放心做缓存, 状态管理(按设备)

输出消息

生成新消息

TbMsg newMsg = TbMsg.transformMsg(msg, newData);  
ctx.tellNext(newMsg, "Success");

a sample project

Clone the repository and navigate to the repo folder:

git clone -b release-4.3 https://github.com/thingsboard/rule-node-examples
cd rule-node-examples

Getting Started with ThingsBoard

https://thingsboard.io/docs/getting-started-guides/helloworld/

helloworld - 如何接一个设备上去?

Step 1. Provision device

Let's add a device that sends temperature data to ThingsBoard:

  • Log in to ThingsBoard and navigate to the "Devices" page of the "Entities" section.
  • Click on the "+" icon in the top right corner of the table and select "Add new device" from drop-down menu.
  • Enter a device name (e.g., "My New Device") No other changes required at this time. Click "Add".
  • A window for checking the device connection will open — we'll skip this step for now and return to connection checking in the next step.
  • Congratulations, you've added your first device! As you add more devices, they will be added at the top of the table, as the table automatically sorts devices by their creation time, with the newest ones listed first.

Additional provisioning methods

Step 2. Connect device

Let's verify your device's connection to ThingsBoard:

  • Click on your device, then click the "Check connectivity” button in the "Device details" window.

  • In the opened window, choose your messaging protocol and operating system. Install any necessary client tools and copy the provided command.

  • Execute the copied command in Terminal. Once telemetry data (e.g., temperature readings) is successfully published, the device status will change from "Inactive" to "Active," and you'll see the data displayed. You can now close the connectivity window.

  • 接入方式
    MQTT 客户端 id 对应 device 表 的 id
    MQTT 客户端 username 对应 device_credentials 表的 credentials_id

In order to publish telemetry data to ThingsBoard server node, send PUBLISH message to the following topic:

v1/devices/me/telemetry

The simplest supported data formats are:

{"key1":"value1", "key2":"value2"}

or

[{"key1":"value1"}, {"key2":"value2"}]

in case your device is able to get the client-side timestamp, you can use following format:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}

Step 3. Create dashboard

A dashboard in ThingsBoard allows users to visualize and monitor data collected from IoT devices.

Let's create a dashboard and add three widgets to it in order to display a list of entities and their latest values, as well as show alarm signals related to the specified entity.

Step 3.1 Create an empty dashboard

To create a new dashboard, follow these steps:

  • Navigate to the "Dashboards" page through the main menu on the left of the screen. Then, click the "+" sign in the upper right corner of the screen, and select "Create new dashboard" from the drop-down menu.
  • In the opened dialog, it is necessary to enter a dashboard title, description is optional. Click "Add".
  • After creating the dashboard, it will open automatically, and you can immediately start adding widgets to it. To save the dashboard, click "Save" button in the upper right corner.
  • Your first dashboard has been successfully created. As you continue to add new dashboards, they will appear at the top of the list. This default sorting is based on the creation timestamp.

设备OTA过程

https://thingsboard.io/docs/user-guide/ota-updates/

image

Provision OTA package to ThingsBoard repository

gateway

Modbus Connector Configuration

https://thingsboard.io/docs/iot-gateway/config/modbus/

服务集群

部署方式

集群节点类型

通过 service.type 配置(thingsboard.yml:2078):

  service:
    type: "${TB_SERVICE_TYPE:monolith}"  # monolith / tb-core / tb-rule-engine
  1. monolith:单体模式,所有服务在一个进程,队列使用内存实现

  2. 微服务模式:tb-core、tb-rule-engine、tb-transport 分开部署(共有五个),通过 Kafka 通信
    msa/ 目录包含各微服务的独立打包配置

名称 对应 ServiceType 枚举 说明
tb-core TB_CORE 核心服务
tb-rule-engine TB_RULE_ENGINE 规则引擎服务
tb-transport TB_TRANSPORT 传输层服务
tb-vc-executor TB_VC_EXECUTOR 版本控制执行器
edqs EDQS 实体数据查询服务
ServiceType 枚举类型
@RequiredArgsConstructor  
@Getter  
public enum ServiceType {  
  
    TB_CORE("TB Core"),  
    TB_RULE_ENGINE("TB Rule Engine"),  
    TB_TRANSPORT("TB Transport"),  
    JS_EXECUTOR("JS Executor"),  
    TB_VC_EXECUTOR("TB VC Executor"),  
    EDQS("TB Entity Data Query Service"),  
    TASK_PROCESSOR("Task Processor");  
  
    private final String label;  
  
    public static ServiceType of(String serviceType) {  
        return ServiceType.valueOf(serviceType.replace("-", "_").toUpperCase());  
    }  
  
}

单独消息队列

这是核心服务间通信的主干,通过 Protobuf 序列化消息传输。

# Queue configuration parameters  
queue:  
  type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka)  
  prefix: "${TB_QUEUE_PREFIX:}" # Global queue prefix. If specified, prefix is added before   
  kafka:  
    # Kafka Bootstrap Servers  

消息格式(transport.proto)定义了完整的消息体系:

  • ToCoreMsg — 发送给 Core 服务的消息
  • ToRuleEngineMsg — 发送给规则引擎的消息
  • ToTransportMsg — 发送给 Transport 服务的消息
  • TransportApiRequestMsg/ResponseMsg — Transport 与 Core 之间的请求/响应

实现类:common/queue/src/main/java/org/thingsboard/server/queue/kafka/

  • TbKafkaProducerTemplate.java — 生产者
  • TbKafkaConsumerTemplate.java — 消费者

集群的核心逻辑

分配设备消息 Actor

一致性哈希 (Consistent Hashing)
这是 ThingsBoard 集群的基石。为了确保同一个设备的所有消息都能被同一个“设备 Actor”处理(这对于维护设备状态、避免竞态条件至关重要),ThingsBoard 使用基于设备 ID 的一致性哈希算法。

无论请求到达哪个节点,系统都能通过哈希计算,将消息路由到负责该设备消息的特定节点上进行处理。

关键代码
在 HashPartitionService.java 中,resolveByPartitionIdx 方法如何根据哈希值找到负责该分区的节点:

// common/cluster-api/src/main/java/org/thingsboard/server/cluster/HashPartitionService.java

@Override
public ServiceInfo resolveByPartitionIdx(List<ServiceInfo> services, int partitionIdx) {
    // 1. 获取哈希函数 (默认 Murmur3)
    HashFunction hashFunction = this.hashFunction;
    
    // 2. 计算哈希值:将 "分区索引" 作为输入进行哈希
    // 注意:这里利用了分区索引作为种子,确保同一个设备的不同分区请求能路由到同一节点
    int hash = hashFunction.hashInt(partitionIdx);
    
    // 3. 取模运算,找到对应的节点索引
    int index = Math.abs(hash % services.size());
    
    return services.get(index);
}

集群异步通信

消息队列 (Message Queue)
在微服务架构中,Apache Kafka 是组件间异步通信的骨干。它负责在各个服务(如传输服务、核心服务、规则引擎)之间传递消息,实现解耦和可扩展性。

ThingsBoard  common/queue 模块,完全抽象了底层的消息队列实现(支持 Kafka, AWS SQS, Google Pub/Sub, Azure Service Bus 等)。

  • 抽象接口TbQueueProducer.java
  • Kafka 实现TbKafkaProducerTemplate.java

负载均衡

分区与负载均衡 (Partitioning & Load Balancing)

ThingsBoard 内部使用“Actor 模型”来处理并发。集群中的每个服务(如 TB_CORETB_RULE_ENGINE)都有其对应的消息主题和分区。

  • PartitionService 负责管理这些分区。当一个节点启动时,它会通过 DiscoveryService 获取集群中所有节点的信息,然后运行 recalculatePartitions 方法。
  • 该方法会重新计算并分配当前节点应该负责消费哪些主题(如 tb_coretb_rule_engine)的哪些分区(Partition),从而实现消息处理的负载均衡。

recalculatePartitions 方法是集群负载均衡的心脏。它遍历所有配置的队列(如 tb_coretb_rule_engine),计算每个分区应该由谁负责。


@Override
public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
    // 1. 合并当前节点和其他节点,形成完整的集群视图
    List<ServiceInfo> allServices = new ArrayList<>();
    allServices.add(currentService);
    allServices.addAll(otherServices);

    // 2. 按服务类型分组 (例如:所有 tb-core 节点一组,所有 tb-rule-engine 节点一组)
    Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
    for (ServiceInfo service : allServices) {
        // ... 代码省略:将节点添加到对应的 ServiceQueueKey 列表中
    }

    // 3. 清空旧的分区分配
    myPartitions = new ConcurrentHashMap<>();

    // 4. 遍历所有配置的主题和分区数
    partitionSizes.forEach((serviceQueue, size) -> {
        ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myTenantId);
        List<ServiceInfo> services = queueServicesMap.get(myServiceQueueKey);
        
        if (services != null) {
            List<Integer> partitions = new ArrayList<>();
            // 5. 核心循环:检查每一个分区 (0 到 size-1)
            for (int i = 0; i < size; i++) {
                // 调用一致性哈希算法,计算分区 i 应该由哪个节点负责
                ServiceInfo serviceInfo = resolveByPartitionIdx(services, i);
                
                // 6. 如果负责的节点是当前节点,则将该分区加入“我的分区”列表
                if (serviceInfo.getServiceId().equals(currentService.getServiceId())) {
                    partitions.add(i);
                }
            }
            if (!partitions.isEmpty()) {
                myPartitions.put(myServiceQueueKey, partitions);
            }
        }
    });
    
    // 7. 发布事件,通知消费者服务开始消费这些分区
    publishPartitionChangeEvent();
}

服务信息提供者 (DefaultTbServiceInfoProvider)

这个类负责在启动时构建当前节点的服务信息,包括服务ID、服务类型等。


@PostConstruct
public void init() {
    // 1. 获取本机 HostName 作为 serviceId
    if (StringUtils.isEmpty(serviceId)) {
        serviceId = InetAddress.getLocalHost().getHostName();
    }
    
    // 2. 确定服务类型。如果是 "monolith" (单体模式),则包含所有服务类型
    if (serviceType.equalsIgnoreCase("monolith")) {
        serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values()));
    } else {
        serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
    }

    // 3. 构建 ServiceInfo 对象,用于向集群注册
    ServiceInfo.Builder builder = ServiceInfo.newBuilder()
        .setServiceId(serviceId)
        .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList()));
    // ... 设置租户ID等其他信息
    serviceInfo = builder.build();
}

分区服务 (HashPartitionService)

这个服务负责根据配置管理各个服务队列的主题和分区数量。

// HashPartitionService.java 中的 init() 方法
@PostConstruct
public void init() {
    // 1. 初始化哈希函数,默认为 murmur3_128
    this.hashFunction = forName(hashFunctionName);
    
    // 2. 配置核心服务 (TB_CORE) 的主题和分区数
    partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions); // 默认10个分区
    partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);     // 默认 "tb_core"

    // 3. 配置规则引擎服务 (TB_RULE_ENGINE) 的主题和分区数
    tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
        partitionTopics.put(
            new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), 
            queueConfiguration.getTopic()
        );
        partitionSizes.put(
            new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), 
            queueConfiguration.getPartitions()
        );
    });
}

服务发现与分区重算 (DummyDiscoveryService)

这个类展示了当应用准备就绪时,如何触发分区重新计算。这里是 DummyDiscoveryService 的实现,它不包含其他节点信息。

// DummyDiscoveryService.java
// 在收到 Spring 的 ApplicationReadyEvent 事件后触发
public void onApplicationEvent(ApplicationReadyEvent event) {
    // 调用分区重算方法
    partitionService.recalculatePartitions(currentServiceInfo, Collections.emptyList());
}

// 在真正的 Zookeeper 实现 (ZkDiscoveryService) 中,
// 会监听 Zookeeper 节点变化,并在变化时调用 recalculatePartitions,
// 并传入所有其他服务节点的信息列表 (otherServices)。

分区重算核心逻辑 (recalculatePartitions)

这是集群协调的核心,决定了当前节点负责处理哪些消息分区。

// PartitionService.java 中的 recalculatePartitions 方法
public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
    // 1. 将所有节点(当前节点+其他节点)按服务队列类型分组
    Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
    addNode(queueServicesMap, currentService);
    for (ServiceInfo other : otherServices) {
        addNode(queueServicesMap, other);
    }

    // 2. 清空旧的分区分配
    myPartitions = new ConcurrentHashMap<>();

    // 3. 为当前服务分配分区
    partitionSizes.forEach((serviceQueue, size) -> {
        ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myTenantId);
        for (int i = 0; i < size; i++) {
            // 核心:通过一致性哈希,根据分区索引 i 找到负责该分区的节点
            ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
            
            // 如果负责的节点是当前节点,则将该分区加入 myPartitions
            if (serviceInfo.getServiceId().equals(currentService.getServiceId())) {
                myPartitions.computeIfAbsent(myServiceQueueKey, key -> new ArrayList<>()).add(i);
            }
        }
    });

    // 4. 对比新旧分区分配,如果有变化,则发布 PartitionChangeEvent 事件,
    // 通知消费者服务开始或停止消费相应的 Kafka 分区。
    // ...
}

注册中心是 Zookeeper

org.thingsboard.server.queue.discovery.ZkDiscoveryService

public interface DiscoveryService {
    // 获取所有其他服务的节点信息
    List<ServiceInfo> getOtherServices();
    
    // 获取当前服务自身的信息
    ServiceInfo getCurrentService();
}

org.thingsboard.server.queue.discovery.ZkDiscoveryService#publishCurrentServer中,服务注册的核心逻辑:


public synchronized void publishCurrentServer() {
    TransportProtos.ServiceInfo self = serviceInfoProvider.getServiceInfo();
    if (currentServerExists()) {
        // 更新现有节点
        client.setData().forPath(nodePath, serviceInfoProvider.generateNewServiceInfoWithCurrentSystemInfo().toByteArray());
    } else {
        // 创建新节点
        nodePath = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(zkNodesDir + "/", self.toByteArray());
    }

从连机制
org.thingsboard.server.queue.discovery.ZkDiscoveryService#reconnect

 private synchronized void reconnect() {
        if (!reconnectInProgress) {
            reconnectInProgress = true;
            try {
                destroyZkClient();
                initZkClient();
                subscribeToEvents();
                publishCurrentServer();
            } catch (Exception e) {
                log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
            } finally {
                reconnectInProgress = false;
            }
        }
    }

主要技术栈

Hibernate, JPA, Netty, Spring Boot, Kafka 与 Zookeeper

源码环境

需要注意的是

  1. 将工程目录中dao中resources中的sql和cassandra文件夹拷贝至applecation中data文件夹下
  2. 默认端口 8080
  3. 启动的用户/账户密码 如下表

账号

默认超管的账号密码

角色 用户名 密码 用途
系统管理员 sysadmin@thingsboard.org sysadmin 全局管理租户、平台配置

API文档

http://$THINGSBOARD_HOST:PORT/swagger-ui.html
http://127.0.0.1:8080/swagger-ui.html

关键接口

http://127.0.0.1:8080/swagger-ui/index.html

  • 登录接口

http://127.0.0.1:8080/api/auth/login

login-endpoint文档 - http://127.0.0.1:8080/swagger-ui/index.html#/login-endpoint

  • 创建租户配置
    org.thingsboard.server.controller.TenantProfileController#saveTenantProfile

  • 添加设备
    /api/device

源码分析

一个设备消息的处理过程(MQTT)

以MQTT协议为例, 源码版本是: <version>4.3.1.1</version>

启动入口模块是 transport/mqtt 基于Spring 体系的@PostConstruct 注解
源码逻辑入口模块是: common/transport/mqtt 初始化类是: org.thingsboard.server.transport.mqtt.MqttTransportService

底层其实是 Netty..

@Service("MqttTransportService")
@TbMqttTransportComponent
@Slf4j
public class MqttTransportService implements TbTransportService {

    public static AttributeKey<InetSocketAddress> ADDRESS = AttributeKey.newInstance("SRC_ADDRESS");

    @Value("${transport.mqtt.bind_address}")
    private String host;
    @Value("${transport.mqtt.bind_port}")
    private Integer port;

    @Value("${transport.mqtt.ssl.enabled}")
    private boolean sslEnabled;

    @Value("${transport.mqtt.ssl.bind_address}")
    private String sslHost;
    @Value("${transport.mqtt.ssl.bind_port}")
    private Integer sslPort;

    @Value("${transport.mqtt.netty.leak_detector_level}")
    private String leakDetectorLevel;
    @Value("${transport.mqtt.netty.boss_group_thread_count}")
    private Integer bossGroupThreadCount;
    @Value("${transport.mqtt.netty.worker_group_thread_count}")
    private Integer workerGroupThreadCount;
    @Value("${transport.mqtt.netty.so_keep_alive}")
    private boolean keepAlive;

    @Autowired
    private MqttTransportContext context;

    private Channel serverChannel;
    private Channel sslServerChannel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context, false))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        if (sslEnabled) {
            b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MqttTransportServerInitializer(context, true))
                    .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
            sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
        }
        log.info("Mqtt transport started!");
    }
    ...........

消息处理入口 (processMqttMsg)

org.thingsboard.server.transport.mqtt.MqttTransportHandler#processMqttMsg

void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
    // 检查消息头
    if (msg.fixedHeader() == null) {
        closeCtx(ctx, MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
        return;
    }
    deviceSessionCtx.setChannel(ctx);

    // 判断消息类型
    if (CONNECT.equals(msg.fixedHeader().messageType())) {
        processConnect(ctx, (MqttConnectMessage) msg);  // 处理连接
    } else if (deviceSessionCtx.isProvisionOnly()) {
        processProvisionSessionMsg(ctx, msg);           // 处理预注册消息
    } else {
        enqueueRegularSessionMsg(ctx, msg);             // 入队处理常规消息
    }
}

常规会话消息处理 (processRegularSessionMsg)

void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
    switch (msg.fixedHeader().messageType()) {
        case PUBLISH:
            processPublish(ctx, (MqttPublishMessage) msg);    // 处理发布消息
            break;
        case SUBSCRIBE:
            processSubscribe(ctx, (MqttSubscribeMessage) msg);  // 处理订阅
            break;
        case PINGREQ:
            // 处理心跳
            ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, ...)));
            transportService.recordActivity(deviceSessionCtx.getSessionInfo());
            break;
        case DISCONNECT:
            closeCtx(ctx, MqttReasonCodes.Disconnect.NORMAL_DISCONNECT);
            break;
    }
}

设备发布消息处理 (processDevicePublish)

private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
    MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();

    // 处理设备属性消息
    if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
        TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
        transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, ...);
    }
    // 处理设备遥测数据
    else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
        TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
        transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, ...);
    }
    // 处理属性请求
    else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
        TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(...);
        transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, ...);
    }
    // 处理RPC相关消息
    // ...
}

遥测数据处理 (process)

public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg,
                   TbMsgMetaData md, TransportServiceCallback<Void> callback) {
    // 计算数据点数量
    int dataPoints = 0;
    for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
        dataPoints += tsKv.getKvCount();
    }

    // 限流检查
    if (checkLimits(sessionInfo, msg, callback, dataPoints)) {
        recordActivityInternal(sessionInfo);

        // 提取会话信息
        TenantId tenantId = getTenantId(sessionInfo);
        DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
        CustomerId customerId = getCustomerId(sessionInfo);

        // 处理每条时间序列数据
        MsgPackCallback packCallback = new MsgPackCallback(...);
        for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
            // 构建元数据
            TbMsgMetaData metaData = md != null ? md.copy() : new TbMsgMetaData();
            metaData.putValue("deviceName", sessionInfo.getDeviceName());
            metaData.putValue("deviceType", sessionInfo.getDeviceType());
            metaData.putValue("ts", tsKv.getTs() + "");

            // 转换为JSON对象
            JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());

            // 发送到规则引擎
            sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData,
                           TbMsgType.POST_TELEMETRY_REQUEST, packCallback);
        }
    }
}

发送到规则引擎 (sendToRuleEngine)

private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId,
                              TransportProtos.SessionInfoProto sessionInfo, JsonObject json,
                              TbMsgMetaData metaData, TbMsgType tbMsgType, TbQueueCallback callback) {
    // 获取设备配置
    DeviceProfileId deviceProfileId = new DeviceProfileId(...);
    DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
    RuleChainId ruleChainId;
    String queueName;

    if (deviceProfile == null) {
        ruleChainId = null;
        queueName = null;
    } else {
        ruleChainId = deviceProfile.getDefaultRuleChainId();  // 获取默认规则链ID
        queueName = deviceProfile.getDefaultQueueName();      // 获取默认队列名称
    }

    // 构建TbMsg消息对象
    TbMsg tbMsg = TbMsg.newMsg()
            .queueName(queueName)
            .type(tbMsgType)
            .originator(deviceId)
            .customerId(customerId)
            .copyMetaData(metaData)
            .data(gson.toJson(json))
            .ruleChainId(ruleChainId)
            .build();

    // 发送到规则引擎
    ruleEngineProducerService.sendToRuleEngine(ruleEngineMsgProducer, tenantId, tbMsg, new StatsCallback(callback, ruleEngineProducerStats));
	
}

org.thingsboard.server.queue.common.TbRuleEngineProducerService

队列路由(Router)

单体是 org.thingsboard.server.queue.memory.InMemoryTbQueueProducer#send 在内存路由
集群是 org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate#send 到kafak去

消息类型与主题映射

见这里的常量 org.thingsboard.server.common.data.device.profile.MqttTopics

消息类型 MQTT 主题
遥测数据 v1/devices/me/telemetry
设备属性 v1/devices/me/attributes
属性请求 v1/devices/me/attributes/request
RPC 响应 v1/devices/me/rpc/response
RPC 请求 v1/devices/me/rpc/request
设备声明 v1/devices/me/claim

离线判定

触发离线的完整流程

  1. 定时轮询(DefaultDeviceStateService.java)

org.thingsboard.server.service.state.DefaultDeviceStateService#init

  // 服务启动时注册,每 60 秒执行一次
  scheduledExecutor.scheduleWithFixedDelay(
      this::checkStates,
      new Random().nextInt(defaultStateCheckIntervalInSec),  // 随机初始延迟,避免集群雪崩
      defaultStateCheckIntervalInSec,
      TimeUnit.SECONDS
  );
  1. 判断每个设备(updateInactivityStateIfExpired)
  // 同时满足以下 3 个条件才触发离线:
  if (!isActive(ts, state)                                           // 1. 超过 inactivityTimeout 未活跃
      && (state.getLastInactivityAlarmTime() == 0L                   // 2. 之前没报过离线
          || state.getLastInactivityAlarmTime() <= state.getLastActivityTime()) // 或者离线后又活跃过
      && stateData.getDeviceCreationTime() + state.getInactivityTimeout() <= ts) { // 3. 设备创建时间也满足(防止新设备误判)
      reportInactivity(ts, stateData);
  }
  1. 触发离线事件
    org.thingsboard.server.service.state.DefaultDeviceStateService#onDeviceActivityStatusChange
// reportInactivity → onDeviceActivityStatusChange(false, ...) // 丢到集群消息队列去
pushRuleEngineMessage(stateData, TbMsgType.INACTIVITY_EVENT);

同时保存 active=false 到属性/时序,并触发通知规则, 活跃时间的来源(何时更新 lastActivityTime)
设备以下任何行为都会刷新 lastActivityTime:

  1. 关键配置
# Device state parameters
state:
  # Device inactivity timeout is a global configuration parameter that defines when the device will be marked as "inactive" by the server.
  # The parameter value is in seconds. A user can overwrite this parameter for an individual device by setting the “inactivityTimeout” server-side attribute (NOTE: expects value in milliseconds).
  # We recommend this parameter to be in sync with session inactivity timeout ("transport.sessions.inactivity_timeout" or TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT) parameter
  # which is responsible for detection of the stale device connection sessions.
  # The value of the session inactivity timeout parameter should be greater or equal to the device inactivity timeout.
  # Note that the session inactivity timeout is set in milliseconds while device inactivity timeout is in seconds.
  defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:600}"
  defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:60}" # Interval for checking the device state after a specified period. Time in seconds

组件配置参数

https://thingsboard.io/docs/user-guide/install/how-to-change-config/
https://thingsboard.io/docs/user-guide/install/http-transport-config/

SQL、NoSQL 混合数据库配置

ThingsBoard 使用数据库来存储 实体(设备、资产、客户、仪表盘等)和 遥测数据(属性、时间序列传感器读数、统计数据、事件)。平台目前支持三种数据库选项:

  • SQL - 将所有实体和遥测数据存储在 SQL 数据库中。ThingsBoard 的开发者建议使用 PostgreSQL,这也是 ThingsBoard 支持的主流 SQL 数据库。HSQLDB 可用于本地开发。我们不建议将 HSQLDB 用于除运行测试和启动负载尽可能低的开发实例之外的任何用途。
  • 混合型(PostgreSQL + Cassandra) - 将所有实体存储在 PostgreSQL 数据库中,并将时间序列数据存储在 Cassandra 数据库中。
  • 混合型(PostgreSQL + TimescaleDB) - 将所有实体存储在 PostgreSQL 数据库中,并将时间序列数据存储在 Timescale 数据库中。
    可以使用 thingsboard.yml 文件配置这些选项。更多详情请参阅数据库配置页面。

MQTT协议的每个中间件 完整配参数表

https://thingsboard.io/docs/user-guide/install/mqtt-transport-config/

Spring common parameters

Parameter Environment Variable Default Value Description
spring.main.web-environment WEB_APPLICATION_ENABLE false If you enabled process metrics you should also enable 'web-environment'.
spring.main.web-application-type WEB_APPLICATION_TYPE none If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
spring.main.allow-circular-references "true" Spring Boot configuration property that controls whether circular dependencies between beans are allowed.

Server common parameters

Parameter Environment Variable Default Value Description
server.address HTTP_BIND_ADDRESS 0.0.0.0 Server bind address (has no effect if web-environment is disabled).
server.port HTTP_BIND_PORT 8083 Server bind port (has no effect if web-environment is disabled).

Zookeeper connection parameters. Used for service discovery.

Parameter Environment Variable Default Value Description
zk.enabled ZOOKEEPER_ENABLED false Enable/disable zookeeper discovery service.
zk.url ZOOKEEPER_URL localhost:2181 Zookeeper connect string
zk.retry_interval_ms ZOOKEEPER_RETRY_INTERVAL_MS 3000 Zookeeper retry interval in milliseconds
zk.connection_timeout_ms ZOOKEEPER_CONNECTION_TIMEOUT_MS 3000 Zookeeper connection timeout in milliseconds
zk.session_timeout_ms ZOOKEEPER_SESSION_TIMEOUT_MS 3000 Zookeeper session timeout in milliseconds
zk.zk_dir ZOOKEEPER_NODES_DIR /thingsboard Name of the directory in zookeeper 'filesystem'
zk.recalculate_delay ZOOKEEPER_RECALCULATE_DELAY_MS 0 The recalculate_delay property is recommended in a microservices architecture setup for rule-engine services. This property provides a pause to ensure that when a rule-engine service is restarted, other nodes don't immediately attempt to recalculate their partitions. The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability.

Cache parameters

Parameter Environment Variable Default Value Description
cache.type CACHE_TYPE redis caffeine or redis
cache.entityLimits.timeToLiveInMinutes CACHE_SPECS_ENTITY_LIMITS_TTL 5 Entity limits cache TTL
cache.entityLimits.maxSize CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE 100000 0 means the cache is disabled

Redis/Valkey configuration parameters

Parameter Environment Variable Default Value Description
redis.connection.type REDIS_CONNECTION_TYPE standalone Redis deployment type: Standalone (single Redis node deployment) OR Cluster
redis.standalone.host REDIS_HOST localhost Redis connection host
redis.standalone.port REDIS_PORT 6379 Redis connection port
redis.standalone.useDefaultClientConfig REDIS_USE_DEFAULT_CLIENT_CONFIG true Use default Redis configuration file
redis.standalone.clientName REDIS_CLIENT_NAME standalone this value may be used only if you used not default ClientConfig
redis.standalone.connectTimeout REDIS_CLIENT_CONNECT_TIMEOUT 30000 this value may be used only if you used not default ClientConfig
redis.standalone.readTimeout REDIS_CLIENT_READ_TIMEOUT 60000 this value may be used only if you used not default ClientConfig
redis.standalone.usePoolConfig REDIS_CLIENT_USE_POOL_CONFIG false this value may be used only if you used not default ClientConfig
redis.cluster.nodes REDIS_NODES Comma-separated list of "host:port" pairs to bootstrap from.
redis.cluster.max-redirects REDIS_MAX_REDIRECTS 12 Maximum number of redirects to follow when executing commands across the cluster.
redis.cluster.useDefaultPoolConfig REDIS_USE_DEFAULT_POOL_CONFIG true if set false will be used pool config build from values of the pool config section
redis.sentinel.master REDIS_MASTER name of master node
redis.sentinel.sentinels REDIS_SENTINELS comma-separated list of "host:port" pairs of sentinels
redis.sentinel.password REDIS_SENTINEL_PASSWORD password to authenticate with sentinel
redis.sentinel.useDefaultPoolConfig REDIS_USE_DEFAULT_POOL_CONFIG true if set false will be used pool config build from values of the pool config section
redis.db REDIS_DB 0 db index
redis.password REDIS_PASSWORD db password
redis.username REDIS_USERNAME Redis username for ACL authentication (Redis 6.0+). Leave empty for legacy password-only auth
redis.ssl.enabled TB_REDIS_SSL_ENABLED false Enable/disable secure connection
redis.ssl.credentials.cert_file TB_REDIS_SSL_PEM_CERT Path redis server (CA) certificate
redis.ssl.credentials.user_cert_file TB_REDIS_SSL_PEM_KEY Path to user certificate file. This is optional for the client and can be used for two-way authentication for the client
redis.ssl.credentials.user_key_file TB_REDIS_SSL_PEM_KEY_PASSWORD Path to user private key file. This is optional for the client and only needed if ‘user_cert_file’ is configured.
redis.pool_config.maxTotal REDIS_POOL_CONFIG_MAX_TOTAL 128 Maximum number of connections that can be allocated by the connection pool
redis.pool_config.maxIdle REDIS_POOL_CONFIG_MAX_IDLE 128 Maximum number of idle connections that can be maintained in the pool without being closed
redis.pool_config.minIdle REDIS_POOL_CONFIG_MIN_IDLE 16 Minumum number of idle connections that can be maintained in the pool without being closed
redis.pool_config.testOnBorrow REDIS_POOL_CONFIG_TEST_ON_BORROW false Enable/Disable PING command send when a connection is borrowed
redis.pool_config.testOnReturn REDIS_POOL_CONFIG_TEST_ON_RETURN false The property is used to specify whether to test the connection before returning it to the connection pool.
redis.pool_config.testWhileIdle REDIS_POOL_CONFIG_TEST_WHILE_IDLE true The property is used in the context of connection pooling in Redis
redis.pool_config.minEvictableMs REDIS_POOL_CONFIG_MIN_EVICTABLE_MS 60000 Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds
redis.pool_config.evictionRunsMs REDIS_POOL_CONFIG_EVICTION_RUNS_MS 30000 Specifies the time interval in milliseconds between two consecutive eviction runs
redis.pool_config.maxWaitMills REDIS_POOL_CONFIG_MAX_WAIT_MS 60000 Maximum time in milliseconds where a client is willing to wait for a connection from the pool when all connections are exhausted
redis.pool_config.numberTestsPerEvictionRun REDIS_POOL_CONFIG_NUMBER_TESTS_PER_EVICTION_RUN 3 Specifies the number of connections to test for eviction during each eviction run
redis.pool_config.blockWhenExhausted REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED true Determines the behavior when a thread requests a connection from the pool but there are no available connections and the pool cannot create more due to the maxTotal configuration

MQTT server parameters

Parameter Environment Variable Default Value Description
transport.mqtt.bind_address MQTT_BIND_ADDRESS 0.0.0.0 MQTT bind-address
transport.mqtt.bind_port MQTT_BIND_PORT 1883 MQTT bind port
transport.mqtt.proxy_enabled MQTT_PROXY_PROTOCOL_ENABLED false Enable proxy protocol support. Disabled by default. If enabled, supports both v1 and v2. Useful to get the real IP address of the client in the logs and for rate limits.
transport.mqtt.timeout MQTT_TIMEOUT 10000 MQTT processing timeout in milliseconds
transport.mqtt.disconnect_timeout MQTT_DISCONNECT_TIMEOUT 1000 MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message.
transport.mqtt.msg_queue_size_per_device_limit MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT 100 messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism
transport.mqtt.gateway_metrics_report_interval_sec MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC 60 Interval of periodic report of the gateway metrics
transport.mqtt.netty.leak_detector_level NETTY_LEAK_DETECTOR_LVL DISABLED Netty leak detector level
transport.mqtt.netty.boss_group_thread_count NETTY_BOSS_GROUP_THREADS 1 Netty BOSS threads count
transport.mqtt.netty.worker_group_thread_count NETTY_WORKER_GROUP_THREADS 12 Netty worker threads count
transport.mqtt.netty.max_payload_size NETTY_MAX_PAYLOAD_SIZE 65536 Max payload size in bytes
transport.mqtt.netty.so_keep_alive NETTY_SO_KEEPALIVE false Enables TCP keepalive. This means that TCP starts sending keepalive probes when a connection is idle for some time
transport.mqtt.ssl.enabled MQTT_SSL_ENABLED false Enable/disable SSL support
transport.mqtt.ssl.bind_address MQTT_SSL_BIND_ADDRESS 0.0.0.0 MQTT SSL bind-address
transport.mqtt.ssl.bind_port MQTT_SSL_BIND_PORT 8883 MQTT SSL bind port
transport.mqtt.ssl.protocol MQTT_SSL_PROTOCOL TLSv1.2 SSL protocol: See https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#sslcontext-algorithms
transport.mqtt.ssl.credentials.type MQTT_SSL_CREDENTIALS_TYPE PEM Server credentials type (PEM - pem certificate file; KEYSTORE - java keystore)
transport.mqtt.ssl.credentials.pem.cert_file MQTT_SSL_PEM_CERT mqttserver.pem Path to the server certificate file (holds server certificate or certificate chain, may include server private key)
transport.mqtt.ssl.credentials.pem.key_file MQTT_SSL_PEM_KEY mqttserver_key.pem Path to the server certificate private key file. Optional by default. Required if the private key is not present in server certificate file;
transport.mqtt.ssl.credentials.pem.key_password MQTT_SSL_PEM_KEY_PASSWORD server_key_password Server certificate private key password (optional)
transport.mqtt.ssl.credentials.keystore.type MQTT_SSL_KEY_STORE_TYPE JKS Type of the key store (JKS or PKCS12)
transport.mqtt.ssl.credentials.keystore.store_file MQTT_SSL_KEY_STORE mqttserver.jks Path to the key store that holds the SSL certificate
transport.mqtt.ssl.credentials.keystore.store_password MQTT_SSL_KEY_STORE_PASSWORD server_ks_password Password used to access the key store
transport.mqtt.ssl.credentials.keystore.key_alias MQTT_SSL_KEY_ALIAS Optional alias of the private key; If not set, the platform will load the first private key from the keystore;
transport.mqtt.ssl.credentials.keystore.key_password MQTT_SSL_KEY_PASSWORD server_key_password Password used to access the key
transport.mqtt.ssl.skip_validity_check_for_client_cert MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT false Skip certificate validity check for client certificates.
transport.sessions.inactivity_timeout TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT 600000 Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device. The parameter value is in milliseconds. The last activity time of the device session is updated if the device sends any message, including keepalive messages If there is no activity, the session will be closed, and all subscriptions will be deleted. We recommend this parameter to be in sync with device inactivity timeout ("state.defaultInactivityTimeoutInSec" or DEFAULT_INACTIVITY_TIMEOUT) parameter which is responsible for detection of the device connectivity status in the core service of the platform. The value of the session inactivity timeout parameter should be greater or equal to the device inactivity timeout. Note that the session inactivity timeout is set in milliseconds while device inactivity timeout is in seconds.
transport.sessions.report_timeout TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT 3000 Interval of periodic check for expired sessions and report of the changes to session last activity time
transport.json.type_cast_enabled JSON_TYPE_CAST_ENABLED true Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
transport.json.max_string_value_length JSON_MAX_STRING_VALUE_LENGTH 0 Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
transport.log.enabled TB_TRANSPORT_LOG_ENABLED true Enable/Disable log of transport messages to telemetry. For example, logging of LwM2M registration update
transport.log.max_length TB_TRANSPORT_LOG_MAX_LENGTH 1024 Maximum length of the log message. The content will be truncated to the specified value if needed
transport.stats.enabled TB_TRANSPORT_STATS_ENABLED true Enable/Disable the collection of transport statistics
transport.stats.print-interval-ms TB_TRANSPORT_STATS_PRINT_INTERVAL_MS 60000 Interval of transport statistics logging
transport.client_side_rpc.timeout CLIENT_SIDE_RPC_TIMEOUT 60000 Processing timeout interval of the RPC command on the CLIENT SIDE. Time in milliseconds
transport.rate_limits.ip_limits_enabled TB_TRANSPORT_IP_RATE_LIMITS_ENABLED false Enable or disable generic rate limits. Device and Tenant specific rate limits are controlled in Tenant Profile.
transport.rate_limits.max_wrong_credentials_per_ip TB_TRANSPORT_MAX_WRONG_CREDENTIALS_PER_IP 10 Maximum number of connect attempts with invalid credentials
transport.rate_limits.ip_block_timeout TB_TRANSPORT_IP_BLOCK_TIMEOUT 60000 Timeout to expire block IP addresses

Queue configuration parameters

Parameter Environment Variable Default Value Description
queue.type TB_QUEUE_TYPE kafka kafka (Apache Kafka)
queue.prefix TB_QUEUE_PREFIX Global queue prefix. If specified, prefix is added before default topic name: 'prefix.default_topic_name'. Prefix is applied to all topics (and consumer groups for kafka).
queue.kafka.bootstrap.servers TB_KAFKA_SERVERS localhost:9092 Kafka Bootstrap Servers
queue.kafka.ssl.enabled TB_KAFKA_SSL_ENABLED false Enable/Disable SSL Kafka communication
queue.kafka.ssl.truststore.location TB_KAFKA_SSL_TRUSTSTORE_LOCATION The location of the trust store file
queue.kafka.ssl.truststore.password TB_KAFKA_SSL_TRUSTSTORE_PASSWORD The password of trust store file if specified
queue.kafka.ssl.keystore.location TB_KAFKA_SSL_KEYSTORE_LOCATION The location of the key store file. This is optional for the client and can be used for two-way authentication for the client
queue.kafka.ssl.keystore.password TB_KAFKA_SSL_KEYSTORE_PASSWORD The store password for the key store file. This is optional for the client and only needed if ‘ssl.keystore.location’ is configured. Key store password is not supported for PEM format
queue.kafka.ssl.key.password TB_KAFKA_SSL_KEY_PASSWORD The password of the private key in the key store file or the PEM key specified in ‘keystore.key’
queue.kafka.acks TB_KAFKA_ACKS all The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:0,1 and all
queue.kafka.retries TB_KAFKA_RETRIES 1 Number of retries. Resend any record whose send fails with a potentially transient error
queue.kafka.compression.type TB_KAFKA_COMPRESSION_TYPE none none or gzip
queue.kafka.batch.size TB_KAFKA_BATCH_SIZE 16384 Default batch size. This setting gives the upper bound of the batch size to be sent
queue.kafka.linger.ms TB_KAFKA_LINGER_MS 1 This variable creates a small amount of artificial delay—that is, rather than immediately sending out a record
queue.kafka.max.request.size TB_KAFKA_MAX_REQUEST_SIZE 1048576 The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests
queue.kafka.max.in.flight.requests.per.connection TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 5 The maximum number of unacknowledged requests the client will send on a single connection before blocking
queue.kafka.buffer.memory TB_BUFFER_MEMORY 33554432 The total bytes of memory the producer can use to buffer records waiting to be sent to the server
queue.kafka.replication_factor TB_QUEUE_KAFKA_REPLICATION_FACTOR 1 The multiple copies of data over the multiple brokers of Kafka
queue.kafka.max_poll_interval_ms TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS 300000 The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records
queue.kafka.max_poll_records TB_QUEUE_KAFKA_MAX_POLL_RECORDS 8192 The maximum number of records returned in a single call to poll()
queue.kafka.max_partition_fetch_bytes TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES 16777216 The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer
queue.kafka.fetch_max_bytes TB_QUEUE_KAFKA_FETCH_MAX_BYTES 134217728 The maximum amount of data the server will return. Records are fetched in batches by the consumer
queue.kafka.request.timeout.ms TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS 30000 (30 seconds)
queue.kafka.session.timeout.ms TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS 10000 (10 seconds)
queue.kafka.auto_offset_reset TB_QUEUE_KAFKA_AUTO_OFFSET_RESET earliest earliest, latest or none
queue.kafka.use_confluent_cloud TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD false Enable/Disable using of Confluent Cloud
queue.kafka.confluent.ssl.algorithm TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM https The endpoint identification algorithm used by clients to validate server hostname. The default value is https
queue.kafka.confluent.sasl.mechanism TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM PLAIN The mechanism used to authenticate Schema Registry requests. SASL/PLAIN should only be used with TLS/SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption
queue.kafka.confluent.sasl.config TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; Using JAAS Configuration for specifying multiple SASL mechanisms on a broker
queue.kafka.confluent.security.protocol TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL SASL_SSL Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
queue.kafka.consumer-properties-per-topic-inline TB_QUEUE_KAFKA_CONSUMER_PROPERTIES_PER_TOPIC_INLINE If you override any default Kafka topic name using environment variables, you must also specify the related consumer properties for the new topic in consumer-properties-per-topic-inline. Otherwise, the topic will not inherit its expected configuration (e.g., max.poll.records, timeouts, etc). Format: "topic1:key1=value1,key2=value2;topic2:key=value" Example: "tb_core_modified.notifications:max.poll.records=10;tb_edge_modified:max.poll.records=10,enable.auto.commit=true"
queue.kafka.other-inline TB_QUEUE_KAFKA_OTHER_PROPERTIES In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin
queue.kafka.topic-properties.rule-engine TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1 Kafka properties for Rule Engine
queue.kafka.topic-properties.core TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1 Kafka properties for Core topics
queue.kafka.topic-properties.transport-api TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:10;min.insync.replicas:1 Kafka properties for Transport Api topics
queue.kafka.topic-properties.notifications TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1 Kafka properties for Notifications topics
queue.kafka.topic-properties.housekeeper TB_QUEUE_KAFKA_HOUSEKEEPER_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:10;min.insync.replicas:1 Kafka properties for Housekeeper tasks topic
queue.kafka.topics_cache_ttl_ms TB_QUEUE_KAFKA_TOPICS_CACHE_TTL_MS 300000 Topics cache TTL in milliseconds. 5 minutes by default
queue.partitions.hash_function_name TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME murmur3_128 murmur3_32, murmur3_128 or sha256
queue.transport_api.requests_topic TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC tb_transport.api.requests Topic used to consume api requests from transport microservices
queue.transport_api.responses_topic TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC tb_transport.api.responses Topic used to produce api responses to transport microservices
queue.transport_api.max_pending_requests TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS 10000 Maximum pending api requests from transport microservices to be handled by server
queue.transport_api.max_requests_timeout TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT 10000 Maximum timeout in milliseconds to handle api request from transport microservice by server
queue.transport_api.max_callback_threads TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS 100 Amount of threads used to invoke callbacks
queue.transport_api.request_poll_interval TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS 25 Interval in milliseconds to poll api requests from transport microservices
queue.transport_api.response_poll_interval TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS 25 Interval in milliseconds to poll api response from transport microservices
queue.core.topic TB_QUEUE_CORE_TOPIC tb_core Default topic name
queue.core.notifications_topic TB_QUEUE_CORE_NOTIFICATIONS_TOPIC tb_core.notifications For high-priority notifications that require minimum latency and processing time
queue.core.poll-interval TB_QUEUE_CORE_POLL_INTERVAL_MS 25 Interval in milliseconds to poll messages by Core microservices
queue.core.partitions TB_QUEUE_CORE_PARTITIONS 10 Amount of partitions used by Core microservices
queue.core.pack-processing-timeout TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS 60000 Timeout for processing a message pack by Core microservices
queue.core.usage-stats-topic TB_QUEUE_US_TOPIC tb_usage_stats Default topic name
queue.core.stats.enabled TB_QUEUE_CORE_STATS_ENABLED false Enable/disable statistics for Core microservices
queue.core.stats.print-interval-ms TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS 10000 Statistics printing interval for Core microservices
queue.core.housekeeper.topic TB_HOUSEKEEPER_TOPIC tb_housekeeper Topic name for Housekeeper tasks
queue.js.request_topic REMOTE_JS_EVAL_REQUEST_TOPIC js_eval.requests JS Eval request topic
queue.js.response_topic_prefix REMOTE_JS_EVAL_RESPONSE_TOPIC js_eval.responses JS Eval responses topic prefix that is combined with node id
queue.js.max_pending_requests REMOTE_JS_MAX_PENDING_REQUESTS 10000 JS Eval max pending requests
queue.js.max_requests_timeout REMOTE_JS_MAX_REQUEST_TIMEOUT 10000 JS Eval max request timeout
queue.js.response_poll_interval REMOTE_JS_RESPONSE_POLL_INTERVAL_MS 25 JS response poll interval
queue.rule-engine.topic TB_QUEUE_RULE_ENGINE_TOPIC tb_rule_engine Deprecated. It will be removed in the nearest releases
queue.rule-engine.notifications_topic TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC tb_rule_engine.notifications For high-priority notifications that require minimum latency and processing time
queue.rule-engine.poll-interval TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS 25 Interval in milliseconds to poll messages by Rule Engine
queue.rule-engine.pack-processing-timeout TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS 60000 Timeout for processing a message pack of Rule Engine
queue.rule-engine.stats.enabled TB_QUEUE_RULE_ENGINE_STATS_ENABLED true Enable/disable statistics for Rule Engine
queue.rule-engine.stats.print-interval-ms TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS 60000 Statistics printing interval for Rule Engine
queue.transport.notifications_topic TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC tb_transport.notifications For high priority notifications that require minimum latency and processing time
queue.transport.poll_interval TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS 25 Interval in milliseconds to poll messages

Service common properties

Parameter Environment Variable Default Value Description
service.type TB_SERVICE_TYPE tb-transport service type
service.id TB_SERVICE_ID Unique id for this service (autogenerated if empty)

Usage statistics parameters

Parameter Environment Variable Default Value Description
usage.stats.report.enabled USAGE_STATS_REPORT_ENABLED true Enable/Disable the collection of statistics about API usage. Collected on a system and tenant level by default
usage.stats.report.enabled_per_customer USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED false Enable/Disable collection of statistics about API usage on a customer level
usage.stats.report.interval USAGE_STATS_REPORT_INTERVAL 60 Interval of reporting the statistics. By default, the summarized statistics are sent every 10 seconds
usage.stats.report.pack_size USAGE_STATS_REPORT_PACK_SIZE 1024 Amount of statistic messages in pack

Metrics parameters

Parameter Environment Variable Default Value Description
metrics.enabled METRICS_ENABLED false Enable/disable actuator metrics.

General management parameters

Parameter Environment Variable Default Value Description
management.endpoints.web.exposure.include METRICS_ENDPOINTS_EXPOSE info Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).

Notification system parameters

Parameter Environment Variable Default Value Description
notification_system.rules.deduplication_durations TB_NOTIFICATION_RULES_DEDUPLICATION_DURATIONS RATE_LIMITS:14400000; Semicolon-separated deduplication durations (in millis) for trigger types. Format: 'NotificationRuleTriggerType1:123;NotificationRuleTriggerType2:456'

文章来源:https://www.cnblogs.com/dddy/p/19879919
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云