Skip to content

update mqtt #792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 50 additions & 35 deletions src/UserGuide/Master/Table/API/Programming-MQTT.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@

## 1. Overview

[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).

IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.

<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
![](/img/mqtt-table-en-1.png)


## 2. Configuration

## 2. Built-in MQTT Service
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
and then write the data into storage immediately.
The MQTT topic corresponds to IoTDB timeseries.The first segment of the MQTT topic (split by `/`) is used as the database name.The table name is derived from the `<measurement>` in the line protocol.
The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the implementation of `PayloadFormatter` for table is `LinePayloadFormatter`.
The following is the line protocol syntax of MQTT message payload and an example:
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.

| **Property** | **Description** | **Default** |
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- |
| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE |
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** |
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |

## 3. Write Protocol

* Line Protocol Syntax

Expand All @@ -49,23 +56,7 @@ The following is the line protocol syntax of MQTT message payload and an example
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
```



## 3. MQTT Configurations

By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.

Configurations are as follows:

| **Property** | **Description** | **Default** |
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |-------------|
| `enable_mqtt_service` | Enable/ disable the MQTT service. | false |
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** |
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |

![](/img/mqtt-table-en-2.png)

## 4. Naming Conventions

Expand Down Expand Up @@ -102,24 +93,47 @@ The table name is derived from the `<measurement>` in the line protocol.
## 5. Coding Examples
The following is an example which a mqtt client send messages to IoTDB server.

```java
```java
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");

BlockingConnection connection = mqtt.blockingConnection();
String DATABASE = "myMqttTest";
connection.connect();

for (int i = 0; i < 10; i++) {
String payload = String.format("test%d,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1", random.nextDouble());

connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
String payload =
"test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2";
connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

//batch write example
payload =
"test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n "
+ "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

//batch write example
payload =
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
+ "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

connection.disconnect();
```


```

## 6. Customize your MQTT Message Format

Expand Down Expand Up @@ -166,10 +180,10 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter {
for (int i = 0; i < 3; i++) {
long ts = i;
TableMessage message = new TableMessage();

// Parsing Database Name
message.setDatabase("db" + i);

//Parsing Table Names
message.setTable("t" + i);

Expand Down Expand Up @@ -200,7 +214,7 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter {
message.setFields(fields);
message.setDataTypes(dataTypes);
message.setValues(values);

//// Parsing timestamp
message.setTimestamp(ts);
ret.add(message);
Expand Down Expand Up @@ -232,6 +246,7 @@ Then, in your server:
More: the message format can be anything you want. For example, if it is a binary format,
just use `payload.forEachByte()` or `payload.array` to get bytes content.


## 7. Caution

To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client.
Expand Down
Loading