Skip to content

Commit cbc7079

Browse files
authored
Merge pull request #8 from openDAQ/json-subscriber-reworking
Json subscriber reworking - Rework of JSON subscription flow: JSON receiver FB no longer parses all signals itself — it now creates nested JSON decoder function blocks (jsonDecoderMqttFb) that parse individual values/timestamps and produce signals. - New JSON decoder FB: added mqtt_json_decoder_fb_impl.{h,cpp} to extract a single value + optional timestamp from incoming JSON and emit data/domain samples. - JSON config handling changed: SignalList (big JSON string) → JsonConfig (string) and JsonConfigFile (path). JSON FB reads config (or file), extracts topic and per-signal descriptors and creates nested decoders. New "Topic" property. - Stronger topic validation and subscription QoS: added PROPERTY_NAME_SUB_QOS / DEFAULT_SUB_QOS; subscription/unsubscribe now use the configured QoS. - Status system refactor: introduced a generic StatusHelper to manage enumerated statuses (component / subscription / parsing / publishing / setting), replacing many manual EnumerationWithIntValue usages. Publisher FB improvements: cleaner status reporting (SignalStatus / PublishingStatus / SettingStatus), validation of properties (QoS, topic when sharedTs or multi), rename of publishing modes to descriptive enum names, and more robust publishing status tracking. - Raw FB changes: Topic property replaced SignalList. - MqttDataWrapper changes: JSON parsing & descriptor extraction reworked (now returns per-signal MqttMsgDescriptor with unit info), value/timestamp extraction refactored; API signatures changed. - Tests and examples: many unit tests added / updated (including new test for json decoder), new example JSON files for tests, updates to examples (custom-mqtt-sub, raw-mqtt-sub, ref-dev-mqtt-pub) to use the new FB/type/property names and behavior. - Renames & cleanups: several handler/source/header files renamed to clearer names (single_handler → atomic_signal_atomic_sample_handler, multisingle → atomic_signal_sample_arr_handler, multiple_shared → group_signal_shared_ts_handler, multiple → signal_arr_atomic_sample_handler), constants and FB type IDs removed leading '@' (e.g. @rootMqttFb → rootMqttFb), and general code cleanup (status helper, removal of old helper functions).
2 parents 8d6a0ee + ba87f4a commit cbc7079

47 files changed

Lines changed: 2722 additions & 1553 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22

33
## Description
44

5-
MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The module is designed for software communication via the *MQTT 3.1.1* protocol using an external broker. It allows publishing and subscribing to openDAQ signal data over MQTT. The module consists of four key openDAQ components: the *MQTT device* and its nested function blocks — the *publisher* (**@publisherMqttFb**), the *raw subscriber* (**@rawMqttFb**), and the *JSON subscriber* (**@jsonMqttFb**).
5+
MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The module is designed for software communication via the *MQTT 3.1.1* protocol using an external broker. It allows publishing and subscribing to openDAQ signal data over MQTT. The module consists of five key openDAQ components: the *MQTT root function block* (**rootMqttFb**) and its nested function blocks — the *publisher* (**publisherMqttFb**) with its nested block *JSON decoder* (**jsonDecoderMqttFb**) , the *raw subscriber* (**rawMqttFb**), and the *JSON subscriber* (**jsonMqttFb**).
66

77
### Functional
88
- Connecting to an MQTT broker;
99
- Publishing openDAQ signals as MQTT messages (*publisher FB*);
10-
- Subscribing to MQTT topics and converting incoming messages into openDAQ signals (*raw FB and JSON FB*);
10+
- Subscribing to MQTT topics and converting incoming messages into openDAQ signals (*raw FB and JSON FB + JSON decoder FB*);
1111
- Support for multiple message types and formats for both publishing and subscribing;
1212
- A set of examples and *gtests* for verifying functionality.
1313

1414
### Key components
15-
1) **MQTT root Function Block (@rootMqttFb)**:
15+
1) **MQTT root Function Block (rootMqttFb)**:
1616
- **Where**: *mqtt_streaming_module/src/mqtt_root_fb_impl.cpp, include/mqtt_streaming_module/...*
1717
- **Purpose**: Represents the MQTT broker as an openDAQ function block - the connection point through which function blocks are created.
1818
- **Main properties:**
@@ -21,7 +21,7 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
2121
- *MqttUsername* (string) — Username for MQTT broker authentication. By default, it is empty.
2222
- *MqttPassword* (string) — Password for MQTT broker authentication. By default, it is empty.
2323
- *ConnectTimeout* (integer) — Timeout in milliseconds for the initial connection to the MQTT broker. If the connection fails, an exception is thrown. By default, it is set to *3000 ms*.
24-
2) **Publisher MQTT Function Block (@publisherMqttFb)**:
24+
2) **Publisher MQTT Function Block (publisherMqttFb)**:
2525
- **Where**: *include/mqtt_streaming_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
2626
- **Purpose**: Publishes openDAQ signal data to MQTT topics. There are **four** general data publishing schemes:
2727
1) One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*
@@ -37,6 +37,7 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
3737
- **Main properties**:
3838
- *TopicMode* (list) — Selects whether to publish all signals to separate MQTT topics (one per signal, *single-topic mode*) or to a single topic (*multiple-topic mode*), one for all signals. Choose *0* for *single-topic* mode and *1* for *multiple-topic* mode. By default, it is set to *single-topic* mode.
3939
- *MqttQoS* (integer) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
40+
- *Topic* (string) — Topic name for publishing in multiple-topic mode. If left empty, the Publisher's *Global ID* is used as the topic name.
4041
- *SharedTimestamp* (bool) — Enables the use of a shared timestamp for all signals when publishing in *multiple-topic* mode. By default, it is set to *false*.
4142
- *GroupValues* (bool) — Enables the use of a sample pack for a signal when publishing in *single-topic* mode. By default, it is set to *false*.
4243
- *UseSignalNames* (bool) — Uses signal names as JSON field names instead of Global IDs. By default, it is set to *false*.
@@ -50,18 +51,22 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
5051
4) *TopicMode(1), SharedTimestamp(true), GroupValues(false)*;
5152

5253

53-
3) **Raw MQTT Function Block (@rawMqttFb)**:
54+
3) **Raw MQTT Function Block (rawMqttFb)**:
5455

5556
- **Where**: *include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
5657
- **Purpose**: Subscribes to raw MQTT messages and converts them into openDAQ signals (binary data) without any parsing — suitable for binary/unstructured messages or simple numeric values.
5758
- **Main properties**:
58-
- *SignalList* (list of strings) — List of MQTT topics to subscribe to for receiving raw binary data.
59+
- *Topic* (string) — MQTT topic to subscribe to for receiving raw binary data.
60+
- *MqttQoS* (integer) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
5961

60-
4) **JSON MQTT Function Block (@jsonMqttFb)**:
62+
4) **JSON MQTT Function Block (jsonMqttFb)**:
6163
- **Where**: *include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
62-
- **Purpose**: Subscribes to MQTT topics, extracts values and timestamps from MQTT JSON messages, and converts them into openDAQ signal data samples.
64+
- **Purpose**: Subscribes to MQTT topics, extracts values and timestamps from MQTT JSON messages via nested *JSON decoder MQTT Function Blocks*.
6365
- **Main properties**:
64-
- *SignalList* (string) — **JSON configuration string** that defines the list of MQTT topics and the corresponding signals to subscribe to. A typical string structure:
66+
- *Topic* (string) — MQTT topic to subscribe to for receiving JSON data.
67+
- *MqttQoS* (integer) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
68+
- *JsonConfigFile* (string) — path to file with **JSON configuration string**. See the *JsonConfig* property for more details. This property could be set only at creation. It is not visible.
69+
- *JsonConfig* (string) — **JSON configuration string** that defines the MQTT topic and the corresponding signals to subscribe to. This property could be set only at creation. It is not visible. A typical string structure:
6570
```json
6671
{
6772
"<topic>":[
@@ -71,8 +76,8 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
7176
"Timestamp":"<field_name_in_JSON_MQTT_message_for_extracting_sample_timestamp>",
7277
"Unit":[
7378
"<unit_symbol>",
74-
"<unit_name>",
75-
"<unit_quantity>"
79+
"<unit_name>", // is not used
80+
"<unit_quantity>" // is not used
7681
]
7782
}
7883
},
@@ -112,11 +117,21 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
112117
]
113118
}
114119
```
115-
In this example, the *JSON MQTT Function Block* creates 3 signals, subscribes to the *"/mirip/UNet3AC2/sensor/data"* topic, and extracts 3 signal samples from each message (one sample per signal). The signals are named *“temp”*, *“humidity”*, and *“tds”*. The *“temp”* signal is created with a domain signal because the *“Timestamp”* field is present. Each domain-signal sample is extracted from the *“ts”* field of the JSON MQTT message. The value of the *“ts”* field (the timestamp field) may be in **ISO8601** format or **Unix epoch time** in seconds, milliseconds, or microseconds. The value of the *“temp”* signal sample is extracted from the *“temp”* field of the JSON message. The unit of the values is “°C”.
120+
In this example, the *JSON MQTT Function Block* creates 3 nested *jsonDecoderMqttFb*, subscribes to the *"/mirip/UNet3AC2/sensor/data"* topic, and extracts 3 signal samples from each message (one sample per *jsonDecoderMqttFb*). The signals are named *“temp”*, *“humidity”*, and *“tds”*. The *“temp”* signal is created with a domain signal because the *“Timestamp”* field is present. Each domain-signal sample is extracted from the *“ts”* field of the JSON MQTT message. The value of the *“ts”* field (the timestamp field) may be in **ISO8601** format or **Unix epoch time** in seconds, milliseconds, or microseconds. The value of the *“temp”* signal sample is extracted from the *“temp”* field of the JSON message. The unit of the values is “°C”.
116121
Example of JSON MQTT message for this configuration:
117122
```json
118123
{"ts":"2025-10-08 20:35:43", "bdn":"SanbonFishTank3", "temp":27.20,"humi":72.40, "tds_value":275.22, "fan_status":"off", "auto_mode":"on", "fan_comp":"26.3", "humi_comp":"55"}
119124
```
125+
126+
5) **JSON decoder MQTT Function Block (jsonDecoderMqttFb)**:
127+
128+
- **Where**: *include/mqtt_streaming_module/mqtt_json_decoder_fb_impl.h, src/mqtt_json_decoder_fb_impl.cpp*
129+
- **Purpose**: To parse JSON string data to extract a value and a timestamp, and to send data and domain samples based on this data.
130+
- **Main properties**:
131+
- *ValueName* (string) — indicates which JSON field contains the sample value.
132+
- *TimestampName* (string) — indicates which JSON field contains the timestamp.
133+
- *Unit* (string) — describes the unit symbol of the decoded signal value.
134+
- *SignalName* (string) — specifies the name to assign to the signal created by a *jsonDecoderMqttFb*.
120135
---
121136

122137
## Building MQTTStreamingModule
@@ -164,13 +179,13 @@ cmake --build .
164179
## Examples
165180

166181
There are 3 example C++ application:
167-
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT root FB* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
182+
- **custom-mqtt-sub** - demonstrates how to work with the *JSON receiver MQTT FB* and *JSON decoder MQTT FB*. The application creates an *MQTT root FB* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *JsonConfigFile* property of the JSON MQTT FB is set to the value of path whose is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
168183
```bash
169184
./custom-mqtt-sub --address broker.emqx.io examples/custom-mqtt-sub/public-example0.json
170185
```
171-
- **raw-mqtt-sub** - demonstrates how to work with the *raw MQTT FB*. The application creates an *MQTT root FB* and a *raw MQTT FB* to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The *SignalList* property of the raw MQTT FB is filled from the application arguments. Usage:
186+
- **raw-mqtt-sub** - demonstrates how to work with the *raw MQTT FB*. The application creates an *MQTT root FB* and a *raw MQTT FB* to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The *Topic* property of the raw MQTT FB is filled from the application arguments. Usage:
172187
```bash
173-
./raw-mqtt-sub --address broker.emqx.io /agvstate /mirip/UNet3AC2/sensor/data
188+
./raw-mqtt-sub --address broker.emqx.io /mirip/UNet3AC2/sensor/data
174189
```
175190
- **ref-dev-mqtt-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT root FB*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
176191
- 0 - One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample;

examples/custom-mqtt-sub/pub-config.json

Lines changed: 0 additions & 54 deletions
This file was deleted.

examples/custom-mqtt-sub/src/custom-mqtt-sub.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ std::string to_string(uint64_t ts)
2222
system_clock::time_point tp = system_clock::time_point(microseconds(ts));
2323

2424
auto tt = system_clock::to_time_t(tp);
25-
std::tm tm = *std::localtime(&tt);
25+
std::tm tm = *std::gmtime(&tt);
2626

2727
auto us = duration_cast<milliseconds>(tp.time_since_epoch()) % 1000;
2828

@@ -123,29 +123,37 @@ int main(int argc, char* argv[])
123123

124124
// Create OpenDAQ instance and add MQTT broker FB
125125
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
126-
const std::string rootFbName = "@rootMqttFb";
126+
const std::string rootFbName = "RootMqttFb";
127127
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
128128
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
129129
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
130130
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
131131

132-
const std::string fbName = "@jsonMqttFb";
133-
std::cout << "Try to add the " << fbName << std::endl;
132+
const std::string jsonFbName = "JsonSubscriberMqttFb";
133+
std::cout << "Try to add the " << jsonFbName << std::endl;
134134

135-
// Read JSON function block configuration from file and fill out the function block config
136-
const std::string jsonConfig = readFileToString(appConfig.configFilePath);
137-
auto config = availableFbs.get(fbName).createDefaultConfig();
138-
config.setPropertyValue("SignalList", jsonConfig);
135+
auto config = availableFbs.get(jsonFbName).createDefaultConfig();
136+
config.setPropertyValue("JsonConfigFile", appConfig.configFilePath);
139137

140138
// Add the JSON function block to the broker FB
141-
daq::FunctionBlockPtr jsonFb = brokerFB.addFunctionBlock(fbName, config);
139+
daq::FunctionBlockPtr jsonFb = brokerFB.addFunctionBlock(jsonFbName, config);
142140

143141
// Create packet readers for all signals
144-
const auto signals = jsonFb.getSignals();
145-
std::map<std::string, PacketReaderPtr> packetReaders;
142+
auto signals = List<daq::ISignal>();
143+
const auto fbs = jsonFb.getFunctionBlocks();
144+
for (const auto& fb : fbs)
145+
{
146+
const auto sig = fb.getSignals();
147+
for (const auto& s : sig)
148+
{
149+
signals.pushBack(s);
150+
}
151+
}
152+
153+
std::vector<std::pair<std::string, PacketReaderPtr>> packetReaders;
146154
for (const auto& s : signals)
147155
{
148-
packetReaders.emplace(std::pair<std::string, PacketReaderPtr>(s.getName().toStdString(), daq::PacketReader(s)));
156+
packetReaders.push_back(std::pair<std::string, PacketReaderPtr>(s.getName().toStdString(), daq::PacketReader(s)));
149157
}
150158

151159
// Start a thread to read packets from the readers

examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ int main(int argc, char* argv[])
5555

5656
// Create OpenDAQ instance and add MQTT broker FB
5757
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
58-
const std::string rootFbName = "@rootMqttFb";
58+
const std::string rootFbName = "RootMqttFb";
5959
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
6060
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
6161
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
6262
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
6363

64-
const std::string fbName = "@rawMqttFb";
64+
const std::string fbName = "RawSubscriberMqttFb";
6565
std::cout << "Try to add the " << fbName << std::endl;
6666

6767
// Create RAW function block configuration

0 commit comments

Comments
 (0)