Skip to content

Commit 1fe7e97

Browse files
authored
Merge pull request #4 from openDAQ/publisherFb
New publisher FB with 4 schemes: - one MQTT message per a signal/one message per a sample/one topic per a signal/one TS for each sample; - one MQTT message per all signals/one message per a sample for each signal/one topic per all signals/one TS for each sample of all signals (shared TS); - one MQTT message per all signals/one message per a sample for several signals (from 1 to N)/one topic per all signals/one TS for each sample of each signal; - one MQTT message per a signal/one message per several samples/one topic per a signal/one TS for each sample (array of samples); - ref-dev-mqtt-fb-pub example application; - Tests;
2 parents 249eb50 + 569af53 commit 1fe7e97

35 files changed

Lines changed: 2962 additions & 51 deletions

README.md

Lines changed: 165 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,122 @@
22

33
## Description
44

5-
MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ).
6-
5+
MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The module is designed for software communication via the *MQTT 3.1.1* protocol using an external broker. It allows publishing and subscribing to openDAQ signal data over MQTT. The module consists of four key openDAQ components: the *MQTT device* and its nested function blocks — the *publisher* (**@publisherMqttFb**), the *raw subscriber* (**@rawMqttFb**), and the *JSON subscriber* (**@jsonMqttFb**).
6+
7+
### Functional
8+
- Connecting to an MQTT broker;
9+
- Publishing openDAQ signals as MQTT messages (*publisher FB*);
10+
- Subscribing to MQTT topics and converting incoming messages into openDAQ signals (*raw FB and JSON FB*);
11+
- Support for multiple message types and formats for both publishing and subscribing;
12+
- A set of examples and *gtests* for verifying functionality.
13+
14+
### Key components
15+
1) **MQTT device**:
16+
- **Where**: *mqtt_streaming_client_module/src/mqtt_streaming_device_impl.cpp, include/mqtt_streaming_client_module/...*
17+
- **Purpose**: Represents the MQTT broker as an openDAQ device - the connection point through which function blocks are created.
18+
- **Main properties:**
19+
- *MqttBrokerAddress* (string) - MQTT broker address. It can be an IP address or a hostname. By default, it is set to *"127.0.0.1"*.
20+
- *MqttBrokerPort* (integer) - Port number for the MQTT broker connection. By default, it is set to *1883*.
21+
- *MqttUsername* (string) — Username for MQTT broker authentication. By default, it is empty.
22+
- *MqttPassword* (string) — Password for MQTT broker authentication. By default, it is empty.
23+
- *ConnectTimeout* (integer) — Timeout in milliseconds for the initial connection to the MQTT broker. If the connection fails, an exception is thrown. By default, it is set to *3000 ms*.
24+
2) **Publisher MQTT Function Block (@publisherMqttFb)**:
25+
- **Where**: *include/mqtt_streaming_client_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
26+
- **Purpose**: Publishes openDAQ signal data to MQTT topics. There are **four** general data publishing schemes:
27+
1) One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*
28+
29+
2) One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples). Example: *{"AI0": [1.1, 2.2, 3.3], "timestamps": [1763716736100000, 1763716736200000, 1763716736300000]}*
30+
31+
3) One MQTT message for several signals (from 1 to N) / one message per sample for each signal / one topic for all signals / separate timestamps for each signal. Example: *[{"AI0": 1.1, "timestamp": 1763716736100000}, {"AI1": 2, "timestamp": 1763716736700000}]*
32+
33+
4) One MQTT message for all signals / one message per sample containing all signals / one topic for all signals / one shared timestamp for all signals. Example: *{"AI0": 1.1, "AI1": 2, "timestamp": 1763716736100000}*
34+
35+
The schemes are configured through combinations of properties.
36+
37+
- **Main properties**:
38+
- *TopicMode* (list) — Selects whether to publish all signals to separate MQTT topics (one per signal, *single-topic mode*) or to a single topic (*multiple-topic mode*), one for all signals. Choose *0* for *single-topic* mode and *1* for *multiple-topic* mode. By default, it is set to *single-topic* mode.
39+
- *MqttQoS* (integer) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
40+
- *SharedTimestamp* (bool) — Enables the use of a shared timestamp for all signals when publishing in *multiple-topic* mode. By default, it is set to *false*.
41+
- *GroupValues* (bool) — Enables the use of a sample pack for a signal when publishing in *single-topic* mode. By default, it is set to *false*.
42+
- *UseSignalNames* (bool) — Uses signal names as JSON field names instead of Global IDs. By default, it is set to *false*.
43+
- *GroupValuesPackSize* (integer) — Sets the size of the sample pack when publishing grouped values in *single-topic* mode. By default, it is set to *1*.
44+
- *ReaderPeriod* (integer) — Polling period in milliseconds, specifying how often the server collects and publishes the connected signals’ data to an MQTT broker. By default, it is set to *20 ms*.
45+
46+
To configure the publishing schemes, set the properties as follows:
47+
1) *TopicMode(0), SharedTimestamp(false), GroupValues(false)*;
48+
2) *TopicMode(0), SharedTimestamp(false), GroupValues(true), GroupValuesPackSize(<pack_size>)*;
49+
3) *TopicMode(1), SharedTimestamp(false), GroupValues(false)*;
50+
4) *TopicMode(1), SharedTimestamp(true), GroupValues(false)*;
51+
52+
53+
3) **Raw MQTT Function Block (@rawMqttFb)**:
54+
55+
- **Where**: *include/mqtt_streaming_client_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
56+
- **Purpose**: Subscribes to raw MQTT messages and converts them into openDAQ signals (binary data) without any parsing — suitable for binary/unstructured messages or simple numeric values.
57+
- **Main properties**:
58+
- *SignalList* (list of strings) — List of MQTT topics to subscribe to for receiving raw binary data.
59+
60+
4) **JSON MQTT Function Block (@jsonMqttFb)**:
61+
- **Where**: *include/mqtt_streaming_client_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
62+
- **Purpose**: Subscribes to MQTT topics, extracts values and timestamps from MQTT JSON messages, and converts them into openDAQ signal data samples.
63+
- **Main properties**:
64+
- *SignalList* (string) — **JSON configuration string** that defines the list of MQTT topics and the corresponding signals to subscribe to. A typical string structure:
65+
```json
66+
{
67+
"<topic>":[
68+
{
69+
"<signal_name>":{
70+
"Value":"<field_name_in_JSON_MQTT_message_for_extracting_sample_value>",
71+
"Timestamp":"<field_name_in_JSON_MQTT_message_for_extracting_sample_timestamp>",
72+
"Unit":[
73+
"<unit_symbol>",
74+
"<unit_name>",
75+
"<unit_quantity>"
76+
]
77+
}
78+
},
79+
{
80+
<another_signal>
81+
}
82+
]
83+
}
84+
```
85+
The *‘Timestamp’* and *‘Unit’* fields may be omitted. The fields inside *‘Unit’* may also be omitted. Example:
86+
```json
87+
{
88+
"/mirip/UNet3AC2/sensor/data":[
89+
{
90+
"temp":{
91+
"Value":"temp",
92+
"Timestamp":"ts",
93+
"Unit":[
94+
"°C"
95+
]
96+
}
97+
},
98+
{
99+
"humidity":{
100+
"Value":"humi",
101+
"Timestamp":"ts"
102+
}
103+
},
104+
{
105+
"tds":{
106+
"Value":"tds_value",
107+
"Unit":[
108+
"ppm", "parts per million", "Total dissolved solids"
109+
]
110+
}
111+
}
112+
]
113+
}
114+
```
115+
In this example, the *JSON MQTT Function Block* creates 3 signals, subscribes to the *"/mirip/UNet3AC2/sensor/data"* topic, and extracts 3 signal samples from each message (one sample per signal). The signals are named *“temp”*, *“humidity”*, and *“tds”*. The *“temp”* signal is created with a domain signal because the *“Timestamp”* field is present. Each domain-signal sample is extracted from the *“ts”* field of the JSON MQTT message. The value of the *“ts”* field (the timestamp field) may be in **ISO8601** format or **Unix epoch time** in seconds, milliseconds, or microseconds. The value of the *“temp”* signal sample is extracted from the *“temp”* field of the JSON message. The unit of the values is “°C”.
116+
Example of JSON MQTT message for this configuration:
117+
```json
118+
{"ts":"2025-10-08 20:35:43", "bdn":"SanbonFishTank3", "temp":27.20,"humi":72.40, "tds_value":275.22, "fan_status":"off", "auto_mode":"on", "fan_comp":"26.3", "humi_comp":"55"}
119+
```
120+
---
7121

8122
## Building MQTTStreamingModule
9123

@@ -15,7 +129,7 @@ For example, on **Ubuntu**:
15129

16130
```shell
17131
sudo apt-get update
18-
sudo apt-get install -y git build-essential lld cmake ninja-build mono-complete python3
132+
sudo apt-get install -y git build-essential openssh-client wget curl lld cmake ninja-build mono-complete python3 libssl-dev
19133
```
20134

21135
#### 2. Clone the openDAQ repository
@@ -47,18 +161,58 @@ cmake --build .
47161

48162
---
49163

50-
## Testing
164+
## Examples
165+
166+
There are 3 example C++ application:
167+
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT device* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
168+
```bash
169+
./custom-mqtt-sub --address broker.emqx.io examples/custom-mqtt-sub/public-example0.json
170+
```
171+
- **ref-dev-mqtt-raw-sub** - demonstrates how to work with the raw MQTT FB. The application creates an MQTT device and a raw MQTT FB to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The SignalList property of the raw MQTT FB is filled from the application arguments. Usage:
172+
```bash
173+
./ref-dev-mqtt-raw-sub --address broker.emqx.io /agvstate /mirip/UNet3AC2/sensor/data
174+
```
175+
- **ref-dev-mqtt-fb-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT device*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
176+
- 0 - One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample;
177+
- 1 - One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples);
178+
- 2 - One MQTT message for several signals (from 1 to N) / one message per sample for each signal / one topic for all signals / separate timestamps for each signal;
179+
- 3 - One MQTT message for all signals / one message per sample containing all signals / one topic for all signals / one shared timestamp for all signals.
180+
```bash
181+
./ref-dev-mqtt-fb-pub --address broker.emqx.io --mode 1
182+
```
183+
Published messages can be observed using third-party tools (see the **External MQTT tools** section).
184+
For all applications, by default, the IP address *127.0.0.1* is used for the broker connection. It can be set via the *--address* option, for example:
185+
```bash
186+
./<app_name> --address 192.168.0.100 <other_options> <args>
187+
```
188+
189+
They are located in the **examples/** directory.
190+
> ***Note:*** *Using the applications involves using a third-party broker. It must be started before example applications. See a **External MQTT tools** section for more details*
191+
192+
> ***Note:*** *The **ref-dev-mqtt-fb-pub** application depends on [**RefDeviceModule**](https://github.com/openDAQ/openDAQ/tree/main/examples/modules/ref_device_module).*
51193
52-
There are several example applications in the *"examples"* folder. These examples are based on OpenDAQ SDK and allow testing of *MQTTStreamingModule* client/server sides with each other and with third-party MQTT tools.
53194

54-
> ***Note:*** *Using the applications involves using a third-party broker. It must be started before example applications. See a **External MQTT tools** section for more details*
195+
## External MQTT tools
55196

56-
> ***Note:*** *The applications depend on **MQTTStreamingModule** and [**RefDeviceModule**](https://github.com/openDAQ/openDAQ/tree/main/examples/modules/ref_device_module).*
197+
It is suggested to use [***Eclipse Mosquitto***](https://github.com/eclipse-mosquitto/mosquitto) as a third-party MQTT tool set. It includes MQTT broker and MQTT publisher/subscriber clients.
198+
Utilities could be installed on **Ubuntu**:
57199

58-
#### ref-dev-mqtt-pub
200+
```shell
201+
sudo apt install mosquitto mosquitto-clients
202+
```
59203

60-
The *ref-dev-mqtt-pub* application is a console example which publishes *ref-device Signal* samples via the *MQTTStreamingModule* server.
204+
The MQTT broker will be run automatically after installing. For simple testing run a subscriber with the following options:
61205

62-
#### ref-dev-mqtt-sub
206+
```shell
207+
mosquitto_sub -h 127.0.0.1 -t "#" -v
208+
```
209+
The subscriber will wait for incoming data and then print it. Then run a publisher with the following options:
210+
```shell
211+
mosquitto_pub -h 127.0.0.1 -t "openDAQ/publisher" -m '{"Input0":2, "Input1":1.2, "Input3":3.3}'
212+
```
213+
This command publishes a message and exits. From the subscriber's side you can see:
63214

64-
The *ref-dev-mqtt-sub* application is a console example which subscribes to an available MQTT openDAQ device and prints signal samples.
215+
```shell
216+
mosquitto_sub -h 127.0.0.1 -t "openDAQ/publisher" -v
217+
openDAQ/publisher {"Input0":2, "Input1":1.2, "Input3":3.3}
218+
```

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
55
add_subdirectory(ref-dev-mqtt-sub)
66
add_subdirectory(ref-dev-mqtt-raw-sub)
77
add_subdirectory(custom-mqtt-sub)
8+
add_subdirectory(ref-dev-mqtt-fb-pub)
89
endif()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
cmake_minimum_required(VERSION 3.16)
2+
3+
set(EXAMPLE_PROJECT_NAME "ref-dev-mqtt-fb-pub")
4+
5+
project(${EXAMPLE_PROJECT_NAME} LANGUAGES CXX)
6+
7+
set(CMAKE_CXX_STANDARD 17)
8+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
9+
10+
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
11+
12+
add_subdirectory(src)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
cmake_minimum_required(VERSION 3.16)
2+
3+
add_compile_definitions(MODULE_PATH="${OPENDAQ_MODULES_DIR}")
4+
add_compile_definitions(APP_NAME="${EXAMPLE_PROJECT_NAME}")
5+
6+
add_executable(${EXAMPLE_PROJECT_NAME} ref-dev-mqtt-fb-pub.cpp)
7+
add_dependencies(${EXAMPLE_PROJECT_NAME} daq::ref_device_module)
8+
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
9+
target_include_directories(${EXAMPLE_PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../)
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#include "../../InputArgs.h"
2+
#include <opendaq/opendaq.h>
3+
4+
#include <iostream>
5+
6+
using namespace daq;
7+
8+
enum class Mode {
9+
SINGLE = 0,
10+
SINGLE_PACK,
11+
MULTI_SINGLE,
12+
MULTI_SHARED,
13+
_COUNT
14+
};
15+
16+
struct ConfigStruct {
17+
std::string brokerAddress;
18+
Mode mode;
19+
bool exit = true;
20+
int error = 0;
21+
};
22+
23+
ConfigStruct StartUp(int argc, char* argv[])
24+
{
25+
ConfigStruct config;
26+
InputArgs args;
27+
args.addArg("--help", "Show help message");
28+
args.addArg("--address", "MQTT broker address", true);
29+
args.addArg("--mode", "publisher FB mode", true);
30+
args.setUsageHelp(APP_NAME " [options]\n"
31+
"Available modes:\n"
32+
" 0 - Single\n"
33+
" 1 - Single with packing\n"
34+
" 2 - Multi Single\n"
35+
" 3 - Multi Shared");
36+
args.parse(argc, argv);
37+
38+
if (args.hasArg("--help") || args.hasUnknownArgs())
39+
{
40+
args.printHelp();
41+
config.error = 0;
42+
return config;
43+
}
44+
45+
config.brokerAddress = args.getArgValue("--address", "127.0.0.1");
46+
config.exit = false;
47+
int mode = std::stoi(args.getArgValue("--mode", "0"));
48+
if (mode < 0 || mode >= static_cast<int>(Mode::_COUNT))
49+
{
50+
std::cout << "Invalid mode value. Allowed values are from 0 to " << (static_cast<int>(Mode::_COUNT) - 1) << "." << std::endl;
51+
args.printHelp();
52+
config.error = -1;
53+
config.exit = true;
54+
return config;
55+
}
56+
config.mode = static_cast<Mode>(mode);
57+
return config;
58+
}
59+
60+
int main(int argc, char* argv[])
61+
{
62+
// Parse input arguments
63+
auto appConfig = StartUp(argc, argv);
64+
if (appConfig.exit)
65+
{
66+
return appConfig.error;
67+
}
68+
69+
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).setRootDevice("daqref://device0").build();
70+
auto refDevice = instance.getRootDevice();
71+
refDevice.setPropertyValue("NumberOfChannels", 4);
72+
refDevice.setPropertyValue("GlobalSampleRate", 100);
73+
74+
// Configure channels
75+
const auto channels = refDevice.getChannelsRecursive();
76+
channels[0].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::MULTI_SHARED);
77+
channels[0].setPropertyValue("SampleRate", 10);
78+
channels[0].setPropertyValue("Frequency", 1);
79+
channels[0].setPropertyValue("Waveform", 1);
80+
channels[1].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::MULTI_SHARED);
81+
channels[1].setPropertyValue("SampleRate", 20);
82+
channels[1].setPropertyValue("Frequency", 1);
83+
channels[1].setPropertyValue("Waveform", 3);
84+
channels[2].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::MULTI_SHARED);
85+
channels[2].setPropertyValue("SampleRate", 50);
86+
channels[2].setPropertyValue("Frequency", 4);
87+
channels[3].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::MULTI_SHARED);
88+
channels[3].setPropertyValue("SampleRate", 100);
89+
channels[3].setPropertyValue("Frequency", 20);
90+
91+
// Create and configure MQTT server
92+
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
93+
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
94+
const std::string fbName = "@publisherMqttFb";
95+
std::cout << "Try to add the " << fbName << std::endl;
96+
97+
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
98+
config.setPropertyValue("MqttQoS", 1);
99+
config.setPropertyValue("ReaderPeriod", 20);
100+
config.setPropertyValue("UseSignalNames", True);
101+
switch (appConfig.mode) {
102+
case Mode::SINGLE:
103+
config.setPropertyValue("SharedTimestamp", False);
104+
config.setPropertyValue("TopicMode", 0);
105+
config.setPropertyValue("GroupValues", False);
106+
break;
107+
case Mode::SINGLE_PACK:
108+
config.setPropertyValue("SharedTimestamp", False);
109+
config.setPropertyValue("TopicMode", 0);
110+
config.setPropertyValue("GroupValues", True);
111+
config.setPropertyValue("GroupValuesPackSize", 3);
112+
break;
113+
case Mode::MULTI_SINGLE:
114+
config.setPropertyValue("SharedTimestamp", False);
115+
config.setPropertyValue("TopicMode", 1);
116+
config.setPropertyValue("GroupValues", False);
117+
break;
118+
case Mode::MULTI_SHARED:
119+
config.setPropertyValue("SharedTimestamp", True);
120+
config.setPropertyValue("TopicMode", 1);
121+
config.setPropertyValue("GroupValues", False);
122+
break;
123+
default:
124+
break;
125+
}
126+
127+
128+
// Add the publisher function block to the broker device
129+
daq::FunctionBlockPtr fb = brokerDevice.addFunctionBlock(fbName, config);
130+
const auto signals = refDevice.getSignals(search::Recursive(search::Any()));
131+
for (const auto& s : signals)
132+
{
133+
if (s.getDomainSignal().assigned())
134+
{
135+
auto ports = fb.getInputPorts();
136+
ports[ports.getCount() - 1].connect(s);
137+
}
138+
}
139+
140+
auto status = fb.getStatusContainer().getStatus("ComponentStatus");
141+
const auto statusStr = status.getValue();
142+
if (statusStr != "Ok")
143+
return -1;
144+
std::cout << "Press \"enter\" to exit the application..." << std::endl;
145+
std::cin.get();
146+
return 0;
147+
}

0 commit comments

Comments
 (0)