Skip to content

Commit 86915c6

Browse files
authored
Merge pull request #6 from openDAQ/TBBAS-2923-2924
- Reworking the MQTT daq::Device into a MQTT daq::FunctionBlock; - Implemented ConnectionStatus for MQTT root FB ("Connected", "Reconnecting", "Disconnected");
2 parents 76a3213 + 728357c commit 86915c6

25 files changed

Lines changed: 646 additions & 867 deletions

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ cmake_minimum_required(VERSION 3.25)
77
set(SDK_TARGET_NAMESPACE daq)
88
set(REPO_NAME mqtt_module)
99
set(REPO_OPTION_PREFIX MQTT_MODULE)
10+
set(CMAKE_FOLDER "${CMAKE_FOLDER}/${REPO_NAME}")
1011

1112
project (${REPO_NAME} CXX)
1213

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,17 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
1212
- A set of examples and *gtests* for verifying functionality.
1313

1414
### Key components
15-
1) **MQTT device**:
16-
- **Where**: *mqtt_streaming_client_module/src/mqtt_streaming_device_impl.cpp, include/mqtt_streaming_client_module/...*
17-
- **Purpose**: Represents the MQTT broker as an openDAQ device - the connection point through which function blocks are created.
15+
1) **MQTT root Function Block (@rootMqttFb)**:
16+
- **Where**: *mqtt_streaming_module/src/mqtt_root_fb_impl.cpp, include/mqtt_streaming_module/...*
17+
- **Purpose**: Represents the MQTT broker as an openDAQ function block - the connection point through which function blocks are created.
1818
- **Main properties:**
1919
- *MqttBrokerAddress* (string) - MQTT broker address. It can be an IP address or a hostname. By default, it is set to *"127.0.0.1"*.
2020
- *MqttBrokerPort* (integer) - Port number for the MQTT broker connection. By default, it is set to *1883*.
2121
- *MqttUsername* (string) — Username for MQTT broker authentication. By default, it is empty.
2222
- *MqttPassword* (string) — Password for MQTT broker authentication. By default, it is empty.
2323
- *ConnectTimeout* (integer) — Timeout in milliseconds for the initial connection to the MQTT broker. If the connection fails, an exception is thrown. By default, it is set to *3000 ms*.
2424
2) **Publisher MQTT Function Block (@publisherMqttFb)**:
25-
- **Where**: *include/mqtt_streaming_client_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
25+
- **Where**: *include/mqtt_streaming_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
2626
- **Purpose**: Publishes openDAQ signal data to MQTT topics. There are **four** general data publishing schemes:
2727
1) One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*
2828

@@ -52,13 +52,13 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
5252

5353
3) **Raw MQTT Function Block (@rawMqttFb)**:
5454

55-
- **Where**: *include/mqtt_streaming_client_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
55+
- **Where**: *include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
5656
- **Purpose**: Subscribes to raw MQTT messages and converts them into openDAQ signals (binary data) without any parsing — suitable for binary/unstructured messages or simple numeric values.
5757
- **Main properties**:
5858
- *SignalList* (list of strings) — List of MQTT topics to subscribe to for receiving raw binary data.
5959

6060
4) **JSON MQTT Function Block (@jsonMqttFb)**:
61-
- **Where**: *include/mqtt_streaming_client_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
61+
- **Where**: *include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
6262
- **Purpose**: Subscribes to MQTT topics, extracts values and timestamps from MQTT JSON messages, and converts them into openDAQ signal data samples.
6363
- **Main properties**:
6464
- *SignalList* (string) — **JSON configuration string** that defines the list of MQTT topics and the corresponding signals to subscribe to. A typical string structure:
@@ -164,15 +164,15 @@ cmake --build .
164164
## Examples
165165

166166
There are 3 example C++ application:
167-
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT device* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
167+
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT root FB* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
168168
```bash
169169
./custom-mqtt-sub --address broker.emqx.io examples/custom-mqtt-sub/public-example0.json
170170
```
171-
- **raw-mqtt-sub** - demonstrates how to work with the raw MQTT FB. The application creates an MQTT device and a raw MQTT FB to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The SignalList property of the raw MQTT FB is filled from the application arguments. Usage:
171+
- **raw-mqtt-sub** - demonstrates how to work with the *raw MQTT FB*. The application creates an *MQTT root FB* and a *raw MQTT FB* to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The *SignalList* property of the raw MQTT FB is filled from the application arguments. Usage:
172172
```bash
173173
./raw-mqtt-sub --address broker.emqx.io /agvstate /mirip/UNet3AC2/sensor/data
174174
```
175-
- **ref-dev-mqtt-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT device*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
175+
- **ref-dev-mqtt-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT root FB*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
176176
- 0 - One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample;
177177
- 1 - One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples);
178178
- 2 - One MQTT message for several signals (from 1 to N) / one message per sample for each signal / one topic for all signals / separate timestamps for each signal;

examples/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
cmake_minimum_required(VERSION 3.16)
22

3+
set(CMAKE_FOLDER "${CMAKE_FOLDER}/examples")
4+
35
if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
46
add_subdirectory(raw-mqtt-sub)
57
add_subdirectory(custom-mqtt-sub)

examples/custom-mqtt-sub/src/custom-mqtt-sub.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,24 @@ int main(int argc, char* argv[])
121121
return appConfig.error;
122122
}
123123

124-
// Create OpenDAQ instance and add MQTT broker device
124+
// Create OpenDAQ instance and add MQTT broker FB
125125
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
126-
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
127-
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
126+
const std::string rootFbName = "@rootMqttFb";
127+
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
128+
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
129+
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
130+
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
128131

129132
const std::string fbName = "@jsonMqttFb";
130133
std::cout << "Try to add the " << fbName << std::endl;
131134

132135
// Read JSON function block configuration from file and fill out the function block config
133136
const std::string jsonConfig = readFileToString(appConfig.configFilePath);
134-
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
137+
auto config = availableFbs.get(fbName).createDefaultConfig();
135138
config.setPropertyValue("SignalList", jsonConfig);
136139

137-
// Add the JSON function block to the broker device
138-
daq::FunctionBlockPtr jsonFb = brokerDevice.addFunctionBlock(fbName, config);
140+
// Add the JSON function block to the broker FB
141+
daq::FunctionBlockPtr jsonFb = brokerFB.addFunctionBlock(fbName, config);
139142

140143
// Create packet readers for all signals
141144
const auto signals = jsonFb.getSignals();

examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,28 @@ int main(int argc, char* argv[])
5353
return appConfig.error;
5454
}
5555

56-
// Create OpenDAQ instance and add MQTT broker device
56+
// Create OpenDAQ instance and add MQTT broker FB
5757
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
58-
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
59-
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
58+
const std::string rootFbName = "@rootMqttFb";
59+
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
60+
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
61+
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
62+
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
6063

6164
const std::string fbName = "@rawMqttFb";
6265
std::cout << "Try to add the " << fbName << std::endl;
6366

6467
// Create RAW function block configuration
65-
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
68+
auto config = availableFbs.get(fbName).createDefaultConfig();
6669
auto topicList = List<IString>();
6770
for (auto& topic : appConfig.topics)
6871
{
6972
addToList(topicList, std::move(topic));
7073
}
7174
config.setPropertyValue("SignalList", topicList);
7275

73-
// Add the RAW function block to the broker device
74-
daq::FunctionBlockPtr rawFb = brokerDevice.addFunctionBlock(fbName, config);
76+
// Add the RAW function block to the broker FB
77+
daq::FunctionBlockPtr rawFb = brokerFB.addFunctionBlock(fbName, config);
7578

7679
// Create packet readers for all signals
7780
const auto signals = rawFb.getSignals();

examples/ref-dev-mqtt-pub/src/ref-dev-mqtt-pub.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,15 @@ int main(int argc, char* argv[])
8989
channels[3].setPropertyValue("Frequency", 20);
9090

9191
// Create and configure MQTT server
92-
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
93-
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
92+
const std::string rootFbName = "@rootMqttFb";
93+
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
94+
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
95+
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
96+
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
9497
const std::string fbName = "@publisherMqttFb";
9598
std::cout << "Try to add the " << fbName << std::endl;
9699

97-
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
100+
auto config = availableFbs.get(fbName).createDefaultConfig();
98101
config.setPropertyValue("MqttQoS", 1);
99102
config.setPropertyValue("ReaderPeriod", 20);
100103
config.setPropertyValue("UseSignalNames", True);
@@ -126,7 +129,7 @@ int main(int argc, char* argv[])
126129

127130

128131
// Add the publisher function block to the broker device
129-
daq::FunctionBlockPtr fb = brokerDevice.addFunctionBlock(fbName, config);
132+
daq::FunctionBlockPtr fb = brokerFB.addFunctionBlock(fbName, config);
130133
const auto signals = refDevice.getSignals(search::Recursive(search::Any()));
131134
for (const auto& s : signals)
132135
{

external/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
set(CMAKE_FOLDER external)
1+
set(CMAKE_FOLDER "${CMAKE_FOLDER}/external")
22
list(APPEND CMAKE_MESSAGE_CONTEXT external)
33

44
if (${CMAKE_SOURCE_DIR} STREQUAL ${CMAKE_BINARY_DIR})

external/mqtt/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ set(PAHO_BUILD_SHARED OFF CACHE BOOL "Build dynamic paho library" FORCE)
2424
set(CMAKE_POSITION_INDEPENDENT_CODE ${PAHO_BUILD_STATIC} CACHE BOOL "" FORCE)
2525
set(PAHO_ENABLE_TESTING ON CACHE BOOL "" FORCE)
2626

27-
# Now add the project
27+
set(CMAKE_FOLDER "${CMAKE_FOLDER}/mqtt")
28+
2829
add_subdirectory(${paho_mqtt_c_SOURCE_DIR} ${paho_mqtt_c_BINARY_DIR})
2930

3031
# Apply WIN32_LEAN_AND_MEAN to the Paho MQTT C library as well

mqtt_streaming_module/include/mqtt_streaming_module/constants.h

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,15 @@
44

55
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
66

7-
static const char* DaqMqttDeviceTypeId = "OpenDAQMQTTStreaming";
8-
static const char* DaqMqttProtocolId = "OpenDAQMQTTStreaming";
9-
static const char* DaqMqttDevicePrefix = "daq.mqtt";
10-
static const char* MqttScheme = "mqtt";
11-
12-
static const char* MODULE_NAME = "OpenDAQMQTTClientModule";
13-
static const char* MODULE_ID = "OpenDAQMQTTClientModule";
14-
static const char* SHORT_MODULE_NAME = "MQTTClient";
15-
static const char* PROTOCOL_NAME = "OpenDAQMQTT";
16-
static const char* CONNECTION_TYPE = "TCP/IP";
7+
static const char* MODULE_NAME = "OpenDaqMqttModule";
8+
static const char* MODULE_ID = "OpenDaqMqttModule";
9+
static const char* SHORT_MODULE_NAME = "MqttModule";
1710

1811
static constexpr const char* DEFAULT_BROKER_ADDRESS = "127.0.0.1";
1912
static constexpr uint16_t DEFAULT_PORT = 1883;
2013
static constexpr const char* DEFAULT_USERNAME = "";
2114
static constexpr const char* DEFAULT_PASSWORD = "";
2215
static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms
23-
static constexpr uint32_t DEFAULT_DISCOVERY_TIMEOUT = 1000; // ms
2416

2517
static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms
2618
static constexpr uint32_t DEFAULT_PUB_QOS = 1;
@@ -31,7 +23,6 @@ static constexpr const char* PROPERTY_NAME_MQTT_BROKER_PORT = "MqttBrokerPort";
3123
static constexpr const char* PROPERTY_NAME_MQTT_USERNAME = "MqttUsername";
3224
static constexpr const char* PROPERTY_NAME_MQTT_PASSWORD = "MqttPassword";
3325
static constexpr const char* PROPERTY_NAME_CONNECT_TIMEOUT = "ConnectTimeout";
34-
static constexpr const char* PROPERTY_NAME_DISCOVERY_TIMEOUT = "DiscoveryTimeout";
3526
static constexpr const char* PROPERTY_NAME_SIGNAL_LIST = "SignalList";
3627

3728
static constexpr const char* PROPERTY_NAME_PUB_TOPIC_MODE = "TopicMode";
@@ -42,16 +33,15 @@ static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES_PACK_SIZE = "GroupVa
4233
static constexpr const char* PROPERTY_NAME_PUB_QOS = "MqttQoS";
4334
static constexpr const char* PROPERTY_NAME_PUB_READ_PERIOD = "ReaderPeriod";
4435

45-
46-
4736
static constexpr const char* RAW_FB_NAME = "@rawMqttFb";
4837
static constexpr const char* JSON_FB_NAME = "@jsonMqttFb";
4938
static constexpr const char* PUB_FB_NAME = "@publisherMqttFb";
39+
static constexpr const char* ROOT_FB_NAME = "@rootMqttFb";
5040

51-
static const char* TOPIC_ALL_SIGNALS = "openDAQ/+/$signals";
41+
static const char* MQTT_LOCAL_ROOT_FB_ID_PREFIX = "rootMqttFb";
42+
static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb";
5243

53-
static const char* MQTT_LOCAL_DEVICE_ID_PREFIX = "MqttDevice";
54-
static const char* MQTT_DEVICE_NAME = "MqttStreamingClientPseudoDevice";
5544

56-
static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb";
45+
static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType";
46+
5747
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_streaming_device_impl.h renamed to mqtt_streaming_module/include/mqtt_streaming_module/mqtt_root_fb_impl.h

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,49 +19,54 @@
1919
#include "MqttSettings.h"
2020
#include <future>
2121
#include <mqtt_streaming_module/common.h>
22-
#include <opendaq/device_impl.h>
22+
#include <opendaq/function_block_impl.h>
2323
#include <opendaq/streaming_ptr.h>
2424
#include "MqttDataWrapper.h"
2525

2626

2727
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
2828

29-
class MqttStreamingDeviceImpl : public Device
29+
class MqttRootFbImpl : public FunctionBlock
3030
{
31+
enum class ConnectionStatus : EnumType
32+
{
33+
Connected = 0,
34+
Reconnecting,
35+
Disconnected
36+
};
37+
3138
public:
32-
explicit MqttStreamingDeviceImpl(const ContextPtr& ctx,
39+
explicit MqttRootFbImpl(const ContextPtr& ctx,
3340
const ComponentPtr& parent,
3441
const PropertyObjectPtr& config);
3542

43+
static FunctionBlockTypePtr CreateType();
44+
3645
protected:
3746
static std::atomic<int> localIndex;
3847
static std::string getLocalId();
48+
static std::vector<std::pair<MqttRootFbImpl::ConnectionStatus, std::string>> connectionStatusMap;
3949

4050
void removed() override;
41-
DeviceInfoPtr onGetInfo() override;
4251

43-
bool allowAddFunctionBlocksFromModules() override
44-
{
45-
return true;
46-
};
4752
DictPtr<IString, IFunctionBlockType> onGetAvailableFunctionBlockTypes() override;
4853
FunctionBlockPtr onAddFunctionBlock(const StringPtr& typeId, const PropertyObjectPtr& config) override;
4954

5055
void initBaseFunctionalBlocks();
5156
void initMqttSubscriber();
52-
void buildFunctionBlockTypes();
57+
void initConnectionStatus();
58+
void initProperties(const PropertyObjectPtr& config);
59+
void readProperties();
5360
bool waitForConnection(const int timeoutMs);
54-
void receiveSignalTopics(const int timeoutMs);
55-
void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, mqtt::MqttMessage& msg);
61+
void setConnectionStatus(const ConnectionStatus status, std::string message = "");
5662

57-
DictObjectPtr<IDict, IString, IFunctionBlockType> fbTypes;
5863
DictObjectPtr<IDict, IString, IFunctionBlockType> baseFbTypes;
5964

60-
StringPtr connectionString;
6165
EnumerationPtr connectionStatus;
6266

6367
std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
6468
Mqtt::Utils::Settings::MqttConnectionSettings connectionSettings;
69+
int connectTimeout;
6570

6671
std::promise<bool> connectedPromise;
6772
std::future<bool> connectedFuture;

0 commit comments

Comments
 (0)