Skip to content

Commit 569af53

Browse files
committed
mqtt: README.md
1 parent 06f89c1 commit 569af53

7 files changed

Lines changed: 301 additions & 37 deletions

File tree

README.md

Lines changed: 165 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,122 @@
22

33
## Description
44

5-
MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ).
6-
5+
MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The module is designed for software communication via the *MQTT 3.1.1* protocol using an external broker. It allows publishing and subscribing to openDAQ signal data over MQTT. The module consists of four key openDAQ components: the *MQTT device* and its nested function blocks — the *publisher* (**@publisherMqttFb**), the *raw subscriber* (**@rawMqttFb**), and the *JSON subscriber* (**@jsonMqttFb**).
6+
7+
### Functional
8+
- Connecting to an MQTT broker;
9+
- Publishing openDAQ signals as MQTT messages (*publisher FB*);
10+
- Subscribing to MQTT topics and converting incoming messages into openDAQ signals (*raw FB and JSON FB*);
11+
- Support for multiple message types and formats for both publishing and subscribing;
12+
- A set of examples and *gtests* for verifying functionality.
13+
14+
### Key components
15+
1) **MQTT device**:
16+
- **Where**: *mqtt_streaming_client_module/src/mqtt_streaming_device_impl.cpp, include/mqtt_streaming_client_module/...*
17+
- **Purpose**: Represents the MQTT broker as an openDAQ device - the connection point through which function blocks are created.
18+
- **Main properties:**
19+
- *MqttBrokerAddress* (string) - MQTT broker address. It can be an IP address or a hostname. By default, it is set to *"127.0.0.1"*.
20+
- *MqttBrokerPort* (integer) - Port number for the MQTT broker connection. By default, it is set to *1883*.
21+
- *MqttUsername* (string) — Username for MQTT broker authentication. By default, it is empty.
22+
- *MqttPassword* (string) — Password for MQTT broker authentication. By default, it is empty.
23+
- *ConnectTimeout* (integer) — Timeout in milliseconds for the initial connection to the MQTT broker. If the connection fails, an exception is thrown. By default, it is set to *3000 ms*.
24+
2) **Publisher MQTT Function Block (@publisherMqttFb)**:
25+
- **Where**: *include/mqtt_streaming_client_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
26+
- **Purpose**: Publishes openDAQ signal data to MQTT topics. There are **four** general data publishing schemes:
27+
1) One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*
28+
29+
2) One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples). Example: *{"AI0": [1.1, 2.2, 3.3], "timestamps": [1763716736100000, 1763716736200000, 1763716736300000]}*
30+
31+
3) One MQTT message for several signals (from 1 to N) / one message per sample for each signal / one topic for all signals / separate timestamps for each signal. Example: *[{"AI0": 1.1, "timestamp": 1763716736100000}, {"AI1": 2, "timestamp": 1763716736700000}]*
32+
33+
4) One MQTT message for all signals / one message per sample containing all signals / one topic for all signals / one shared timestamp for all signals. Example: *{"AI0": 1.1, "AI1": 2, "timestamp": 1763716736100000}*
34+
35+
The schemes are configured through combinations of properties.
36+
37+
- **Main properties**:
38+
- *TopicMode* (list) — Selects whether to publish all signals to separate MQTT topics (one per signal, *single-topic mode*) or to a single topic (*multiple-topic mode*), one for all signals. Choose *0* for *single-topic* mode and *1* for *multiple-topic* mode. By default, it is set to *single-topic* mode.
39+
- *MqttQoS* (integer) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
40+
- *SharedTimestamp* (bool) — Enables the use of a shared timestamp for all signals when publishing in *multiple-topic* mode. By default, it is set to *false*.
41+
- *GroupValues* (bool) — Enables the use of a sample pack for a signal when publishing in *single-topic* mode. By default, it is set to *false*.
42+
- *UseSignalNames* (bool) — Uses signal names as JSON field names instead of Global IDs. By default, it is set to *false*.
43+
- *GroupValuesPackSize* (integer) — Sets the size of the sample pack when publishing grouped values in *single-topic* mode. By default, it is set to *1*.
44+
- *ReaderPeriod* (integer) — Polling period in milliseconds, specifying how often the server collects and publishes the connected signals’ data to an MQTT broker. By default, it is set to *20 ms*.
45+
46+
To configure the publishing schemes, set the properties as follows:
47+
1) *TopicMode(0), SharedTimestamp(false), GroupValues(false)*;
48+
2) *TopicMode(0), SharedTimestamp(false), GroupValues(true), GroupValuesPackSize(<pack_size>)*;
49+
3) *TopicMode(1), SharedTimestamp(false), GroupValues(false)*;
50+
4) *TopicMode(1), SharedTimestamp(true), GroupValues(false)*;
51+
52+
53+
3) **Raw MQTT Function Block (@rawMqttFb)**:
54+
55+
- **Where**: *include/mqtt_streaming_client_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
56+
- **Purpose**: Subscribes to raw MQTT messages and converts them into openDAQ signals (binary data) without any parsing — suitable for binary/unstructured messages or simple numeric values.
57+
- **Main properties**:
58+
- *SignalList* (list of strings) — List of MQTT topics to subscribe to for receiving raw binary data.
59+
60+
4) **JSON MQTT Function Block (@jsonMqttFb)**:
61+
- **Where**: *include/mqtt_streaming_client_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
62+
- **Purpose**: Subscribes to MQTT topics, extracts values and timestamps from MQTT JSON messages, and converts them into openDAQ signal data samples.
63+
- **Main properties**:
64+
- *SignalList* (string) — **JSON configuration string** that defines the list of MQTT topics and the corresponding signals to subscribe to. A typical string structure:
65+
```json
66+
{
67+
"<topic>":[
68+
{
69+
"<signal_name>":{
70+
"Value":"<field_name_in_JSON_MQTT_message_for_extracting_sample_value>",
71+
"Timestamp":"<field_name_in_JSON_MQTT_message_for_extracting_sample_timestamp>",
72+
"Unit":[
73+
"<unit_symbol>",
74+
"<unit_name>",
75+
"<unit_quantity>"
76+
]
77+
}
78+
},
79+
{
80+
<another_signal>
81+
}
82+
]
83+
}
84+
```
85+
The *‘Timestamp’* and *‘Unit’* fields may be omitted. The fields inside *‘Unit’* may also be omitted. Example:
86+
```json
87+
{
88+
"/mirip/UNet3AC2/sensor/data":[
89+
{
90+
"temp":{
91+
"Value":"temp",
92+
"Timestamp":"ts",
93+
"Unit":[
94+
"°C"
95+
]
96+
}
97+
},
98+
{
99+
"humidity":{
100+
"Value":"humi",
101+
"Timestamp":"ts"
102+
}
103+
},
104+
{
105+
"tds":{
106+
"Value":"tds_value",
107+
"Unit":[
108+
"ppm", "parts per million", "Total dissolved solids"
109+
]
110+
}
111+
}
112+
]
113+
}
114+
```
115+
In this example, the *JSON MQTT Function Block* creates 3 signals, subscribes to the *"/mirip/UNet3AC2/sensor/data"* topic, and extracts 3 signal samples from each message (one sample per signal). The signals are named *“temp”*, *“humidity”*, and *“tds”*. The *“temp”* signal is created with a domain signal because the *“Timestamp”* field is present. Each domain-signal sample is extracted from the *“ts”* field of the JSON MQTT message. The value of the *“ts”* field (the timestamp field) may be in **ISO8601** format or **Unix epoch time** in seconds, milliseconds, or microseconds. The value of the *“temp”* signal sample is extracted from the *“temp”* field of the JSON message. The unit of the values is “°C”.
116+
Example of JSON MQTT message for this configuration:
117+
```json
118+
{"ts":"2025-10-08 20:35:43", "bdn":"SanbonFishTank3", "temp":27.20,"humi":72.40, "tds_value":275.22, "fan_status":"off", "auto_mode":"on", "fan_comp":"26.3", "humi_comp":"55"}
119+
```
120+
---
7121

8122
## Building MQTTStreamingModule
9123

@@ -15,7 +129,7 @@ For example, on **Ubuntu**:
15129

16130
```shell
17131
sudo apt-get update
18-
sudo apt-get install -y git build-essential lld cmake ninja-build mono-complete python3
132+
sudo apt-get install -y git build-essential openssh-client wget curl lld cmake ninja-build mono-complete python3 libssl-dev
19133
```
20134

21135
#### 2. Clone the openDAQ repository
@@ -47,18 +161,58 @@ cmake --build .
47161

48162
---
49163

50-
## Testing
164+
## Examples
165+
166+
There are 3 example C++ application:
167+
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT device* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
168+
```bash
169+
./custom-mqtt-sub --address broker.emqx.io examples/custom-mqtt-sub/public-example0.json
170+
```
171+
- **ref-dev-mqtt-raw-sub** - demonstrates how to work with the raw MQTT FB. The application creates an MQTT device and a raw MQTT FB to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The SignalList property of the raw MQTT FB is filled from the application arguments. Usage:
172+
```bash
173+
./ref-dev-mqtt-raw-sub --address broker.emqx.io /agvstate /mirip/UNet3AC2/sensor/data
174+
```
175+
- **ref-dev-mqtt-fb-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT device*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
176+
- 0 - One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample;
177+
- 1 - One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples);
178+
- 2 - One MQTT message for several signals (from 1 to N) / one message per sample for each signal / one topic for all signals / separate timestamps for each signal;
179+
- 3 - One MQTT message for all signals / one message per sample containing all signals / one topic for all signals / one shared timestamp for all signals.
180+
```bash
181+
./ref-dev-mqtt-fb-pub --address broker.emqx.io --mode 1
182+
```
183+
Published messages can be observed using third-party tools (see the **External MQTT tools** section).
184+
For all applications, by default, the IP address *127.0.0.1* is used for the broker connection. It can be set via the *--address* option, for example:
185+
```bash
186+
./<app_name> --address 192.168.0.100 <other_options> <args>
187+
```
188+
189+
They are located in the **examples/** directory.
190+
> ***Note:*** *Using the applications involves using a third-party broker. It must be started before example applications. See a **External MQTT tools** section for more details*
191+
192+
> ***Note:*** *The **ref-dev-mqtt-fb-pub** application depends on [**RefDeviceModule**](https://github.com/openDAQ/openDAQ/tree/main/examples/modules/ref_device_module).*
51193
52-
There are several example applications in the *"examples"* folder. These examples are based on OpenDAQ SDK and allow testing of *MQTTStreamingModule* client/server sides with each other and with third-party MQTT tools.
53194

54-
> ***Note:*** *Using the applications involves using a third-party broker. It must be started before example applications. See a **External MQTT tools** section for more details*
195+
## External MQTT tools
55196

56-
> ***Note:*** *The applications depend on **MQTTStreamingModule** and [**RefDeviceModule**](https://github.com/openDAQ/openDAQ/tree/main/examples/modules/ref_device_module).*
197+
It is suggested to use [***Eclipse Mosquitto***](https://github.com/eclipse-mosquitto/mosquitto) as a third-party MQTT tool set. It includes MQTT broker and MQTT publisher/subscriber clients.
198+
Utilities could be installed on **Ubuntu**:
57199

58-
#### ref-dev-mqtt-pub
200+
```shell
201+
sudo apt install mosquitto mosquitto-clients
202+
```
59203

60-
The *ref-dev-mqtt-pub* application is a console example which publishes *ref-device Signal* samples via the *MQTTStreamingModule* server.
204+
The MQTT broker will be run automatically after installing. For simple testing run a subscriber with the following options:
61205

62-
#### ref-dev-mqtt-sub
206+
```shell
207+
mosquitto_sub -h 127.0.0.1 -t "#" -v
208+
```
209+
The subscriber will wait for incoming data and then print it. Then run a publisher with the following options:
210+
```shell
211+
mosquitto_pub -h 127.0.0.1 -t "openDAQ/publisher" -m '{"Input0":2, "Input1":1.2, "Input3":3.3}'
212+
```
213+
This command publishes a message and exits. From the subscriber's side you can see:
63214

64-
The *ref-dev-mqtt-sub* application is a console example which subscribes to an available MQTT openDAQ device and prints signal samples.
215+
```shell
216+
mosquitto_sub -h 127.0.0.1 -t "openDAQ/publisher" -v
217+
openDAQ/publisher {"Input0":2, "Input1":1.2, "Input3":3.3}
218+
```

mqtt_streaming_client_module/include/mqtt_streaming_client_module/constants.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ static constexpr uint16_t DEFAULT_PORT = 1883;
2020
static constexpr const char* DEFAULT_USERNAME = "";
2121
static constexpr const char* DEFAULT_PASSWORD = "";
2222
static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms
23-
static constexpr uint32_t DEFAULT_DISCOVERY_TIMEOUT = 3000; // ms
23+
static constexpr uint32_t DEFAULT_DISCOVERY_TIMEOUT = 1000; // ms
2424

2525
static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms
2626
static constexpr uint32_t DEFAULT_PUB_QOS = 1;

mqtt_streaming_client_module/src/mqtt_json_receiver_fb_impl.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@ MqttJsonReceiverFbImpl::~MqttJsonReceiverFbImpl()
3131
unsubscribeFromTopics();
3232
}
3333

34-
3534
FunctionBlockTypePtr MqttJsonReceiverFbImpl::CreateType()
3635
{
3736
auto defaultConfig = PropertyObject();
38-
defaultConfig.addProperty(StringProperty(PROPERTY_NAME_SIGNAL_LIST, String("")));
37+
auto builder =
38+
StringPropertyBuilder(PROPERTY_NAME_SIGNAL_LIST, String(""))
39+
.setDescription("JSON configuration string that defines the list of MQTT topics and corresponding signals to subscribe to.");
40+
defaultConfig.addProperty(builder.build());
3941

4042
const auto fbType = FunctionBlockType(JSON_FB_NAME,
4143
JSON_FB_NAME,
42-
"",
44+
"The JSON MQTT function block allows subscribing to MQTT topics, extracting values and "
45+
"timestamps from MQTT JSON messages, and converting them into openDAQ signal data samples.",
4346
defaultConfig);
4447
return fbType;
4548
}

0 commit comments

Comments
 (0)