Skip to content

Commit 249eb50

Browse files
authored
Merge pull request #3 from openDAQ/newApproach
New approach for MQTT module. Brief: -New MQTT functional block - @rawMqttFb; -New MQTT functional block - @jsonMqttFb; -New "SignalList" property approach; -New "signals" topic approach; -New example applications: custom-mqtt-sub, ref-dev-mqtt-raw-sub; -New device property - DiscoveryTimeout; -Using of MQTT brocker port; -Unsubscribing when a FB is removed or deleted; -Supporting for different value types: int64_t, double, string; -New logs; -Improved stability; -Tests;
2 parents 65cc0d1 + 0d94e9e commit 249eb50

50 files changed

Lines changed: 4084 additions & 763 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CMakeLists.txt

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
set(CMAKE_POLICY_VERSION_MINIMUM 3.5)
55
cmake_minimum_required(VERSION 3.25)
66

7+
set(SDK_TARGET_NAMESPACE daq)
78
set(REPO_NAME mqtt_module)
89
set(REPO_OPTION_PREFIX MQTT_MODULE)
910

@@ -30,6 +31,7 @@ set(CMAKE_MESSAGE_CONTEXT_SHOW ON CACHE BOOL "Show CMake message context")
3031

3132
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
3233

34+
option(OPENDAQ_MQTT_MODULE_ENABLE_SSL "Enable building with openSSL" OFF)
3335
option(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS "Enable building example applications" OFF)
3436

3537
if ((CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX) AND NOT MSVC)
@@ -42,17 +44,20 @@ include(CommonUtils)
4244
setup_repo(${REPO_OPTION_PREFIX})
4345

4446
if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
45-
set(DAQMODULES_REF_DEVICE_MODULE ON CACHE BOOL "" FORCE)
47+
set(DAQMODULES_REF_DEVICE_MODULE ON CACHE BOOL "" FORCE)
4648
endif()
4749

4850
option(OPENDAQ_MQTT_ENABLE_TESTS "Enable module testing" OFF)
4951
option(OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS "Enable example applications building" OFF)
5052

51-
find_package(OpenSSL REQUIRED)
52-
if (OPENSSL_FOUND)
53-
message(STATUS "Found OpenSSL ${OPENSSL_VERSION}")
54-
else()
55-
message(STATUS "OpenSSL Not Found")
53+
if(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
54+
find_package(OpenSSL REQUIRED)
55+
if (OPENSSL_FOUND)
56+
message(STATUS "Found OpenSSL ${OPENSSL_VERSION}")
57+
else()
58+
message(FATAL_ERROR "OpenSSL Not Found")
59+
endif()
60+
add_compile_definitions(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
5661
endif()
5762

5863
add_subdirectory(external)

examples/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@ cmake_minimum_required(VERSION 3.16)
33
if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
44
add_subdirectory(ref-dev-mqtt-pub)
55
add_subdirectory(ref-dev-mqtt-sub)
6+
add_subdirectory(ref-dev-mqtt-raw-sub)
7+
add_subdirectory(custom-mqtt-sub)
68
endif()

examples/InputArgs.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,15 @@ class InputArgs
7878
return false;
7979
}
8080

81+
void setUsageHelp(const std::string& str)
82+
{
83+
usageString = str;
84+
}
85+
8186
void printHelp() const
8287
{
88+
if (!usageString.empty())
89+
std::cout << "Usage: " << usageString << std::endl;
8390
std::cout << "Available arguments:" << std::endl;
8491
for (const auto& [name, descStruct] : argDescriptions)
8592
{
@@ -100,4 +107,5 @@ class InputArgs
100107
std::vector<std::string> parsedArgs;
101108
std::unordered_map<std::string, std::string> argValues;
102109
std::vector<std::string> positionalArgs;
103-
};
110+
std::string usageString;
111+
};
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
cmake_minimum_required(VERSION 3.16)
2+
3+
set(EXAMPLE_PROJECT_NAME "custom-mqtt-sub")
4+
5+
project(${EXAMPLE_PROJECT_NAME} LANGUAGES CXX)
6+
7+
set(CMAKE_CXX_STANDARD 17)
8+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
9+
10+
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
11+
12+
add_subdirectory(src)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{
2+
"openDAQ/RefDev0/IO/AI/RefCh0/Sig/AI0": [
3+
{
4+
"AI0": {
5+
"Value": "value",
6+
"Timestamp": "timestamp",
7+
"Unit": [
8+
"V",
9+
"volts",
10+
"voltage"
11+
]
12+
}
13+
}
14+
],
15+
"openDAQ/RefDev0/IO/AI/RefCh1/Sig/AI1": [
16+
{
17+
"AI1": {
18+
"Value": "value",
19+
"Timestamp": "timestamp",
20+
"Unit": [
21+
"V",
22+
"volts",
23+
"voltage"
24+
]
25+
}
26+
}
27+
],
28+
"openDAQ/RefDev0/IO/AI/RefCh2/Sig/AI2": [
29+
{
30+
"AI2": {
31+
"Value": "value",
32+
"Timestamp": "timestamp",
33+
"Unit": [
34+
"V",
35+
"volts",
36+
"voltage"
37+
]
38+
}
39+
}
40+
],
41+
"openDAQ/RefDev0/IO/AI/RefCh3/Sig/AI3": [
42+
{
43+
"AI3": {
44+
"Value": "value",
45+
"Timestamp": "timestamp",
46+
"Unit": [
47+
"V",
48+
"volts",
49+
"voltage"
50+
]
51+
}
52+
}
53+
]
54+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
cmake_minimum_required(VERSION 3.16)
2+
3+
add_compile_definitions(MODULE_PATH="${OPENDAQ_MODULES_DIR}")
4+
add_compile_definitions(APP_NAME="${EXAMPLE_PROJECT_NAME}")
5+
6+
add_executable(${EXAMPLE_PROJECT_NAME} custom-mqtt-sub.cpp)
7+
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
8+
target_include_directories(${EXAMPLE_PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../)
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
#include "../../InputArgs.h"
2+
#include <iomanip>
3+
#include <opendaq/opendaq.h>
4+
5+
#include <fstream>
6+
#include <iostream>
7+
#include <sstream>
8+
9+
using namespace daq;
10+
11+
struct ConfigStruct {
12+
std::string brokerAddress;
13+
std::string configFilePath;
14+
bool exit = true;
15+
int error = 0;
16+
};
17+
18+
std::string to_string(uint64_t ts)
19+
{
20+
using namespace std::chrono;
21+
22+
system_clock::time_point tp = system_clock::time_point(microseconds(ts));
23+
24+
auto tt = system_clock::to_time_t(tp);
25+
std::tm tm = *std::localtime(&tt);
26+
27+
auto us = duration_cast<milliseconds>(tp.time_since_epoch()) % 1000;
28+
29+
std::ostringstream oss;
30+
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S") << '.' << std::setfill('0') << std::setw(3) << us.count();
31+
return oss.str();
32+
}
33+
34+
std::string to_string(daq::DataPacketPtr packet)
35+
{
36+
std::string result;
37+
std::string data;
38+
switch (packet.getDataDescriptor().getSampleType())
39+
{
40+
case SampleType::Float64:
41+
data = std::to_string(*(static_cast<double*>(packet.getData())));
42+
break;
43+
case SampleType::UInt64:
44+
data = std::to_string(*(static_cast<uint64_t*>(packet.getData())));
45+
break;
46+
case SampleType::Int64:
47+
data = std::to_string(*(static_cast<int64_t*>(packet.getData())));
48+
break;
49+
case SampleType::Binary:
50+
data = '\"' + std::string(static_cast<char*>(packet.getData()), packet.getDataSize()) + '\"';
51+
break;
52+
default:
53+
break;
54+
}
55+
std::string unitStr;
56+
if (auto unit = packet.getDataDescriptor().getUnit(); unit.assigned())
57+
{
58+
if (auto s = unit.getSymbol(); s.assigned())
59+
unitStr = " " + s.toStdString();
60+
}
61+
62+
result = fmt::format("SampleType : {}; Data: {}{};", convertSampleTypeToString(packet.getDataDescriptor().getSampleType()), data, unitStr);
63+
if (auto domainPacket = packet.getDomainPacket(); domainPacket.assigned())
64+
{
65+
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
66+
result += fmt::format(" Time : {};", to_string(ts));
67+
}
68+
return result;
69+
}
70+
71+
std::string readFileToString(const std::string& filePath)
72+
{
73+
std::ifstream file(filePath);
74+
if (!file)
75+
throw std::runtime_error("Failed to open file: " + filePath);
76+
77+
std::ostringstream buffer;
78+
buffer << file.rdbuf(); // Read the entire file buffer
79+
return buffer.str();
80+
}
81+
82+
ConfigStruct StartUp(int argc, char* argv[])
83+
{
84+
ConfigStruct config;
85+
InputArgs args;
86+
args.addArg("--help", "Show help message");
87+
args.addArg("--address", "MQTT broker address", true);
88+
args.setUsageHelp(APP_NAME " [options] <config file>");
89+
args.parse(argc, argv);
90+
91+
if (args.hasArg("--help") || args.hasUnknownArgs())
92+
{
93+
args.printHelp();
94+
config.error = 0;
95+
return config;
96+
}
97+
98+
config.brokerAddress = args.getArgValue("--address", "127.0.0.1");
99+
auto configFilePath = args.getPositionalArgs();
100+
if (configFilePath.size() != 1)
101+
{
102+
std::cout << "Configuration file path is required." << std::endl;
103+
config.error = -1;
104+
return config;
105+
}
106+
if (configFilePath.size() > 1)
107+
{
108+
std::cout << "Only one configuration file path is allowed. The first one will be used - " << configFilePath[0] << std::endl;
109+
}
110+
config.configFilePath = std::move(configFilePath[0]);
111+
config.exit = false;
112+
return config;
113+
}
114+
115+
int main(int argc, char* argv[])
116+
{
117+
// Parse input arguments
118+
auto appConfig = StartUp(argc, argv);
119+
if (appConfig.exit)
120+
{
121+
return appConfig.error;
122+
}
123+
124+
// Create OpenDAQ instance and add MQTT broker device
125+
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
126+
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
127+
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
128+
129+
const std::string fbName = "@jsonMqttFb";
130+
std::cout << "Try to add the " << fbName << std::endl;
131+
132+
// Read JSON function block configuration from file and fill out the function block config
133+
const std::string jsonConfig = readFileToString(appConfig.configFilePath);
134+
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
135+
config.setPropertyValue("SignalList", jsonConfig);
136+
137+
// Add the JSON function block to the broker device
138+
daq::FunctionBlockPtr jsonFb = brokerDevice.addFunctionBlock(fbName, config);
139+
140+
// Create packet readers for all signals
141+
const auto signals = jsonFb.getSignals();
142+
std::map<std::string, PacketReaderPtr> packetReaders;
143+
for (const auto& s : signals)
144+
{
145+
packetReaders.emplace(std::pair<std::string, PacketReaderPtr>(s.getName().toStdString(), daq::PacketReader(s)));
146+
}
147+
148+
// Start a thread to read packets from the readers
149+
std::atomic<bool> running = true;
150+
std::thread readerThread(
151+
[&packetReaders, &running]()
152+
{
153+
while (running)
154+
{
155+
for (const auto& [signal, reader] : packetReaders)
156+
{
157+
while (!reader.getEmpty() && running)
158+
{
159+
auto packet = reader.read();
160+
if (packet.getType() == PacketType::Event)
161+
{
162+
std::cout << "Event packet is skipped!" << std::endl;
163+
}
164+
else if (packet.getType() == PacketType::Data)
165+
{
166+
const auto dataPacket = packet.asPtr<IDataPacket>();
167+
std::cout << signal << " - " << to_string(dataPacket) << std::endl;
168+
}
169+
}
170+
}
171+
std::this_thread::sleep_for(std::chrono::milliseconds(20));
172+
}
173+
});
174+
175+
std::cout << "Press \"enter\" to exit the application..." << std::endl;
176+
std::cin.get();
177+
178+
running = false;
179+
readerThread.join();
180+
std::cout << "Reader thread finished. Exiting.\n";
181+
182+
return 0;
183+
}

0 commit comments

Comments
 (0)