Skip to content

Commit 65cc0d1

Browse files
authored
Merge pull request #2 from openDAQ/client
Brief: MQTT openDAQ streaming client module; Example applications; Reworking MQTT client wrapper (PAHO wrapper); Improving MQTT openDAQ streaming server module; Tests for client and server MQTT modules; Tests for MQTT client wrapper; Description: Implementation of MQTT openDAQ streaming client module allows to receive and parse MQTT messages which adhere to a strict message structure. It allows discovering of MQTT openDAQ devices and communication with them. The example applications demonstrate a typical approach to working with openDAQ MQTT modules. MQTT client wrapper reworking changed the architecture of wrapper classes, removed unused functional and duplication of code. The previous approach with separate publisher and subscriber class was redesigned into single client class. New tests for client and server MQTT modules. New tests for MQTT client wrapper allow to test communication with a MQTT brocker and messaging between clients.
2 parents 900722c + dd0e070 commit 65cc0d1

55 files changed

Lines changed: 3107 additions & 1438 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CMakeLists.txt

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# CMakeList.txt : CMake project for OpenDAQHistorianFB, include source and define
1+
# CMakeList.txt : CMake project for MQTTStreamingModule, include source and define
22
# project specific logic here.
33
#
44
set(CMAKE_POLICY_VERSION_MINIMUM 3.5)
@@ -32,13 +32,21 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
3232

3333
option(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS "Enable building example applications" OFF)
3434

35+
if ((CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX) AND NOT MSVC)
36+
if (NOT WIN32)
37+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
38+
endif()
39+
endif()
40+
3541
include(CommonUtils)
3642
setup_repo(${REPO_OPTION_PREFIX})
3743

3844
if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
3945
set(DAQMODULES_REF_DEVICE_MODULE ON CACHE BOOL "" FORCE)
4046
endif()
4147

48+
option(OPENDAQ_MQTT_ENABLE_TESTS "Enable module testing" OFF)
49+
option(OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS "Enable example applications building" OFF)
4250

4351
find_package(OpenSSL REQUIRED)
4452
if (OPENSSL_FOUND)
@@ -50,30 +58,9 @@ endif()
5058
add_subdirectory(external)
5159
add_subdirectory(mqtt_streaming_protocol)
5260
add_subdirectory(mqtt_streaming_server_module)
61+
add_subdirectory(mqtt_streaming_client_module)
5362

5463
if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
5564
message(STATUS "Example applications have been enabled")
5665
add_subdirectory(examples)
5766
endif()
58-
59-
60-
61-
# Set CPack variables
62-
set(CPACK_COMPONENTS_ALL RUNTIME)
63-
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
64-
set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
65-
set(CPACK_PACKAGE_VERSION ${PROJECT_VERSION})
66-
set(CPACK_OUTPUT_FILE_PREFIX "${CMAKE_BINARY_DIR}/package")
67-
68-
# Set the CPack generator based on the platform
69-
if (WIN32)
70-
set(CPACK_GENERATOR "ZIP")
71-
elseif (UNIX AND NOT APPLE)
72-
cmake_host_system_information(RESULT DISTRO_ID QUERY DISTRIB_ID)
73-
cmake_host_system_information(RESULT DISTRO_VERSION_ID QUERY DISTRIB_VERSION_ID)
74-
set(CPACK_SYSTEM_NAME "${DISTRO_ID}${DISTRO_VERSION_ID}")
75-
set(CPACK_GENERATOR "TGZ")
76-
endif()
77-
78-
# Include CPack for packaging
79-
include(CPack)

CMakePresets.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@
9595
"name": "full",
9696
"hidden": true,
9797
"cacheVariables": {
98-
"OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS": "true"
98+
"OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS": "true",
99+
"OPENDAQ_MQTT_ENABLE_TESTS": "true",
100+
"OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS": "true"
99101
}
100102
},
101103
{

README.md

Lines changed: 4 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -49,102 +49,16 @@ cmake --build .
4949

5050
## Testing
5151

52-
There are several example applications in the *"examples"* folder. These examples are based on OpenDAQ SDK and allow testing of *SimpleMQTTModule* functional blocks with each other and with third-party MQTT tools.
52+
There are several example applications in the *"examples"* folder. These examples are based on OpenDAQ SDK and allow testing of *MQTTStreamingModule* client/server sides with each other and with third-party MQTT tools.
5353

5454
> ***Note:*** *Using the applications involves using a third-party broker. It must be started before example applications. See a **External MQTT tools** section for more details*
5555
5656
> ***Note:*** *The applications depend on **MQTTStreamingModule** and [**RefDeviceModule**](https://github.com/openDAQ/openDAQ/tree/main/examples/modules/ref_device_module).*
5757
5858
#### ref-dev-mqtt-pub
5959

60-
The *ref-dev-mqtt-pub* application is a console example which publishes *ref-device Signal* samples via the *MQTTStreamingModule* server. By default it uses the following *MQTTStreamingModule* server settings:
61-
```
62-
StreamingDataPollingPeriod: 20
63-
MaxPacketReadCount: 1000
64-
BrokerAddress: "127.0.0.1"
65-
MqttUsername: ""
66-
MqttPassword: ""
67-
```
68-
69-
### External MQTT tools
70-
71-
It is suggested to use [***Eclipse Mosquitto***](https://github.com/eclipse-mosquitto/mosquitto) as a third-party MQTT tool set. It includes MQTT broker and MQTT publisher/subscriber clients.
72-
Utilities could be installed on **Ubuntu**:
73-
74-
```shell
75-
sudo apt install mosquitto mosquitto-clients
76-
```
60+
The *ref-dev-mqtt-pub* application is a console example which publishes *ref-device Signal* samples via the *MQTTStreamingModule* server.
7761

78-
The MQTT broker will be run automatically after installing. For simple testing run a subscriber with the following options:
62+
#### ref-dev-mqtt-sub
7963

80-
```shell
81-
mosquitto_sub -h 127.0.0.1 -t "openDAQ/#" -v
82-
```
83-
The subscriber will wait for incoming data and then print it. Then run a publisher with the following options:
84-
```shell
85-
mosquitto_pub -h 127.0.0.1 -t "openDAQ/publisher" -m '{"Input0":2, "Input1":1.2, "Input3":3.3}'
86-
```
87-
This command publishes a message and exits. From the subscriber's side you can see:
88-
89-
```shell
90-
user@machine:$ mosquitto_sub -h 127.0.0.1 -t "openDAQ/publisher" -v
91-
openDAQ/publisher {"Input0":2, "Input1":1.2, "Input3":3.3}
92-
```
93-
Now, you can test examples with 3rd-party tools. For example, run *ref-dev-mqtt-pub* and *mosquitto_sub* in different terminals with proper settings.
94-
```shell
95-
user@machine:$ ./ref-dev-mqtt-pub
96-
[tid: 29784][2025-09-15 11:17:02.663] [ModuleManager] [info] Loaded module [v3.31.0 ReferenceDeviceModule] from "libref_device_module-64-3-debug.module.so".
97-
[tid: 29784][2025-09-15 11:17:02.664] [ModuleManager] [info] DEV [daqref] Reference device: "Reference device"
98-
[tid: 29784][2025-09-15 11:17:02.668] [ModuleManager] [info] Loaded module [v3.4.0 OpenDAQMqttStreamingServerModule] from "libmqtt_stream_srv_module-64-3-debug.module.so".
99-
[tid: 29784][2025-09-15 11:17:02.670] [ModuleManager] [info] SRV [OpenDAQMQTT] openDAQ MQTT Streaming server: "Streams data over MQTT"
100-
[tid: 29784][2025-09-15 11:17:03.196] [ReferenceDevice] [info] Properties: NumberOfChannels 2
101-
[tid: 29784][2025-09-15 11:17:03.201] [/RefDev1/IO/AI/RefCh0] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
102-
[tid: 29784][2025-09-15 11:17:03.201] [/RefDev1/IO/AI/RefCh0] [info] Properties: SampleRate 1000, ClientSideScaling false
103-
[tid: 29784][2025-09-15 11:17:03.207] [/RefDev1/IO/AI/RefCh1] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
104-
[tid: 29784][2025-09-15 11:17:03.207] [/RefDev1/IO/AI/RefCh1] [info] Properties: SampleRate 1000, ClientSideScaling false
105-
[tid: 29784][2025-09-15 11:17:03.209] [ReferenceDevice] [info] Properties: AcquisitionLoopTime 20
106-
[tid: 29784][2025-09-15 11:17:03.216] [Instance] [info] Root device set to daqref://device1
107-
[tid: 29784][2025-09-15 11:17:03.231] [ReferenceDevice] [info] Properties: NumberOfChannels 2
108-
[tid: 29784][2025-09-15 11:17:03.235] [/RefDev1/Dev/RefDev0/IO/AI/RefCh0] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
109-
[tid: 29784][2025-09-15 11:17:03.235] [/RefDev1/Dev/RefDev0/IO/AI/RefCh0] [info] Properties: SampleRate 1000, ClientSideScaling false
110-
[tid: 29784][2025-09-15 11:17:03.240] [/RefDev1/Dev/RefDev0/IO/AI/RefCh1] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
111-
[tid: 29784][2025-09-15 11:17:03.240] [/RefDev1/Dev/RefDev0/IO/AI/RefCh1] [info] Properties: SampleRate 1000, ClientSideScaling false
112-
[tid: 29784][2025-09-15 11:17:03.242] [ReferenceDevice] [info] Properties: AcquisitionLoopTime 20
113-
[tid: 29784][2025-09-15 11:17:03.253] [/RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
114-
[tid: 29784][2025-09-15 11:17:03.253] [/RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel] [info] Properties: SampleRate 1000, ClientSideScaling false
115-
[tid: 29784][2025-09-15 11:17:03.265] [OpenDAQMQTT] [info] MQTT: Trying to connect to MQTT broker (127.0.0.1)
116-
[tid: 29784][2025-09-15 11:17:03.267] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/IO/AI/RefCh0/Sig/AI0;
117-
[tid: 29784][2025-09-15 11:17:03.268] [OpenDAQMQTT] [info] Signal /RefDev1/IO/AI/RefCh0/Sig/AI0Time doesn't has domain signal assigned, skipping
118-
[tid: 29784][2025-09-15 11:17:03.268] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/IO/AI/RefCh1/Sig/AI1;
119-
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Signal /RefDev1/IO/AI/RefCh1/Sig/AI1Time doesn't has domain signal assigned, skipping
120-
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Signal /RefDev1/Sig/Time doesn't has domain signal assigned, skipping
121-
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/Dev/RefDev0/IO/AI/RefCh0/Sig/AI0;
122-
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/IO/AI/RefCh0/Sig/AI0Time doesn't has domain signal assigned, skipping
123-
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/Dev/RefDev0/IO/AI/RefCh1/Sig/AI1;
124-
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/IO/AI/RefCh1/Sig/AI1Time doesn't has domain signal assigned, skipping
125-
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel/Sig/AI2;
126-
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel/Sig/AI2Time doesn't has domain signal assigned, skipping
127-
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/Sig/Time doesn't has domain signal assigned, skipping
128-
[tid: 29812][2025-09-15 11:17:03.271] [OpenDAQMQTT] [info] Streaming-to-device read thread started
129-
[tid: 29784][2025-09-15 11:17:03.271] [OpenDAQMQTT] [info] Added Component: /RefDev1/Srv/OpenDAQMQTT;
130-
Press "enter" to exit the application...
131-
[tid: 29811][2025-09-15 11:17:03.276] [OpenDAQMQTT] [info] MQTT: Connection established
132-
133-
```
134-
In this case you can see messages on the *mosquitto_sub* side:
135-
136-
```shell
137-
user@machine:$ mosquitto_sub -h 127.0.0.1 -t "openDAQ/#" -v
138-
openDAQ/RefDev1/$signals ["openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0","openDAQ/RefDev1/IO/AI/RefCh1/Sig/AI1","openDAQ/RefDev1/Dev/RefDev0/IO/AI/RefCh0/Sig/AI0","openDAQ/RefDev1/Dev/RefDev0/IO/AI/RefCh1/Sig/AI1","openDAQ/RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel/Sig/AI2"]
139-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":1.243449435824274,"timestamp":1757928009227270}
140-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":0.9369065729286229,"timestamp":1757928009228270}
141-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":0.6266661678215204,"timestamp":1757928009229270}
142-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":0.3139525976465657,"timestamp":1757928009230270}
143-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-1.6081226496766366e-15,"timestamp":1757928009231270}
144-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-0.31395259764656677,"timestamp":1757928009232270}
145-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-0.6266661678215214,"timestamp":1757928009233270}
146-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-0.9369065729286239,"timestamp":1757928009234270}
147-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-1.2434494358242752,"timestamp":1757928009235270}
148-
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-1.5450849718747386,"timestamp":1757928009236270}
149-
<...>
150-
```
64+
The *ref-dev-mqtt-sub* application is a console example which subscribes to an available MQTT openDAQ device and prints signal samples.

examples/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
cmake_minimum_required(VERSION 3.16)
22

3-
add_subdirectory(ref-dev-mqtt-pub)
3+
if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
4+
add_subdirectory(ref-dev-mqtt-pub)
5+
add_subdirectory(ref-dev-mqtt-sub)
6+
endif()

examples/InputArgs.h

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#pragma once
2+
#include <string>
3+
#include <vector>
4+
#include <unordered_map>
5+
#include <algorithm>
6+
#include <iostream>
7+
8+
class InputArgs
9+
{
10+
public:
11+
void addArg(const std::string& name, const std::string& description, bool hasValue = false)
12+
{
13+
argDescriptions[name] = {description, hasValue};
14+
}
15+
16+
void parse(int argc, char* argv[])
17+
{
18+
parsedArgs.clear();
19+
argValues.clear();
20+
positionalArgs.clear();
21+
22+
for (int i = 1; i < argc; ++i)
23+
{
24+
std::string arg = argv[i];
25+
if (arg.rfind("--", 0) == 0)
26+
{
27+
auto eqPos = arg.find('=');
28+
if (eqPos != std::string::npos)
29+
{
30+
std::string key = arg.substr(0, eqPos);
31+
std::string value = arg.substr(eqPos + 1);
32+
argValues[key] = value;
33+
parsedArgs.push_back(key);
34+
}
35+
else if (i + 1 < argc && argDescriptions[arg].hasValue)
36+
{
37+
argValues[arg] = argv[i + 1];
38+
parsedArgs.push_back(arg);
39+
++i;
40+
}
41+
else
42+
{
43+
parsedArgs.push_back(arg);
44+
}
45+
}
46+
else
47+
{
48+
positionalArgs.push_back(arg);
49+
}
50+
}
51+
}
52+
53+
bool hasArg(const std::string& name) const
54+
{
55+
return std::find(parsedArgs.begin(), parsedArgs.end(), name) != parsedArgs.end();
56+
}
57+
58+
std::string getArgValue(const std::string& name, const std::string& defaultValue = "") const
59+
{
60+
auto it = argValues.find(name);
61+
if (it != argValues.end())
62+
return it->second;
63+
return defaultValue;
64+
}
65+
66+
const std::vector<std::string>& getPositionalArgs() const
67+
{
68+
return positionalArgs;
69+
}
70+
71+
bool hasUnknownArgs() const
72+
{
73+
for (const auto& arg : parsedArgs)
74+
{
75+
if (argDescriptions.find(arg) == argDescriptions.end())
76+
return true;
77+
}
78+
return false;
79+
}
80+
81+
void printHelp() const
82+
{
83+
std::cout << "Available arguments:" << std::endl;
84+
for (const auto& [name, descStruct] : argDescriptions)
85+
{
86+
std::cout << " " << name;
87+
if (descStruct.hasValue)
88+
std::cout << " <value>";
89+
std::cout << " : " << descStruct.description << std::endl;
90+
}
91+
}
92+
93+
private:
94+
struct ArgDesc
95+
{
96+
std::string description;
97+
bool hasValue;
98+
};
99+
std::unordered_map<std::string, ArgDesc> argDescriptions;
100+
std::vector<std::string> parsedArgs;
101+
std::unordered_map<std::string, std::string> argValues;
102+
std::vector<std::string> positionalArgs;
103+
};

examples/ref-dev-mqtt-pub/src/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ add_compile_definitions(MODULE_PATH="${OPENDAQ_MODULES_DIR}")
44

55
add_executable(${EXAMPLE_PROJECT_NAME} ref-dev-mqtt-pub.cpp)
66
add_dependencies(${EXAMPLE_PROJECT_NAME} daq::ref_device_module)
7-
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
7+
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
8+
target_include_directories(${EXAMPLE_PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../)

0 commit comments

Comments
 (0)