Skip to content

Commit 8d6a0ee

Browse files
authored
Merge pull request #7 from openDAQ/publisher-improving
MQTT raw and publisher FB improving Brief: -Raw FB: one FB - one topic; "Topic" property; -Raw FB: SubscriptionStatus; -Raw FB: dynamic application of properties; -Raw FB, JSON FB: localId to create multiple objects; -Publisher FB: "Topic" property for Multi mode; -Publisher FB: PublishingStatus; -Publisher FB: SignalStatus; -Publisher FB: dynamic application of properties; -Tests;
2 parents 86915c6 + d5fa5d9 commit 8d6a0ee

28 files changed

Lines changed: 833 additions & 345 deletions

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ if(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
6262
endif()
6363

6464
add_subdirectory(external)
65+
add_subdirectory(helper_utils)
6566
add_subdirectory(mqtt_streaming_protocol)
6667
add_subdirectory(mqtt_streaming_module)
6768

examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ using namespace daq;
99

1010
struct ConfigStruct {
1111
std::string brokerAddress;
12-
std::vector<std::string> topics;
12+
std::string topic;
1313
bool exit = true;
1414
int error = 0;
1515
};
@@ -20,7 +20,7 @@ ConfigStruct StartUp(int argc, char* argv[])
2020
InputArgs args;
2121
args.addArg("--help", "Show help message");
2222
args.addArg("--address", "MQTT broker address", true);
23-
args.setUsageHelp(APP_NAME " [options] <topic1> <topic2> ... <topicN>");
23+
args.setUsageHelp(APP_NAME " [options] <topic>");
2424
args.parse(argc, argv);
2525

2626
if (args.hasArg("--help") || args.hasUnknownArgs())
@@ -31,14 +31,14 @@ ConfigStruct StartUp(int argc, char* argv[])
3131
}
3232

3333
config.brokerAddress = args.getArgValue("--address", "127.0.0.1");
34-
config.topics = args.getPositionalArgs();
35-
36-
if (config.topics.empty())
34+
const auto positionalArgs = args.getPositionalArgs();
35+
if (positionalArgs.empty())
3736
{
38-
std::cout << "MQTT topics are required." << std::endl;
37+
std::cout << "An MQTT topic is required." << std::endl;
3938
config.error = -1;
4039
return config;
4140
}
41+
config.topic = args.getPositionalArgs()[0];;
4242

4343
config.exit = false;
4444
return config;
@@ -66,46 +66,34 @@ int main(int argc, char* argv[])
6666

6767
// Create RAW function block configuration
6868
auto config = availableFbs.get(fbName).createDefaultConfig();
69-
auto topicList = List<IString>();
70-
for (auto& topic : appConfig.topics)
71-
{
72-
addToList(topicList, std::move(topic));
73-
}
74-
config.setPropertyValue("SignalList", topicList);
69+
config.setPropertyValue("Topic", appConfig.topic);
7570

7671
// Add the RAW function block to the broker FB
7772
daq::FunctionBlockPtr rawFb = brokerFB.addFunctionBlock(fbName, config);
7873

79-
// Create packet readers for all signals
80-
const auto signals = rawFb.getSignals();
81-
std::map<std::string, PacketReaderPtr> readers;
82-
for (const auto& s : signals)
83-
{
84-
readers.emplace(std::pair<std::string, PacketReaderPtr>(s.getName().toStdString(), daq::PacketReader(s)));
85-
}
74+
// Create packet readers for a signal
75+
const auto signal = rawFb.getSignals()[0];
76+
PacketReaderPtr reader = daq::PacketReader(signal);
8677

87-
// Start a thread to read packets from the readers
78+
// Start a thread to read packets from the reader
8879
std::atomic<bool> running = true;
8980
std::thread readerThread(
90-
[&readers, &running]()
81+
[&reader, &signal, &running]()
9182
{
9283
while (running)
9384
{
94-
for (const auto& [signalName, reader] : readers)
85+
while (!reader.getEmpty() && running)
9586
{
96-
while (!reader.getEmpty() && running)
87+
auto packet = reader.read();
88+
if (packet.getType() == PacketType::Event)
89+
{
90+
continue;
91+
}
92+
else if (packet.getType() == PacketType::Data)
9793
{
98-
auto packet = reader.read();
99-
if (packet.getType() == PacketType::Event)
100-
{
101-
continue;
102-
}
103-
else if (packet.getType() == PacketType::Data)
104-
{
105-
const auto dataPacket = packet.asPtr<IDataPacket>();
106-
std::string dataStr(static_cast<char*>(dataPacket.getData()), dataPacket.getDataSize());
107-
std::cout << signalName << " - " << dataStr << std::endl;
108-
}
94+
const auto dataPacket = packet.asPtr<IDataPacket>();
95+
std::string dataStr(static_cast<char*>(dataPacket.getData()), dataPacket.getDataSize());
96+
std::cout << signal.getName() << " - " << dataStr << std::endl;
10997
}
11098
}
11199
std::this_thread::sleep_for(std::chrono::milliseconds(20));

helper_utils/CMakeLists.txt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
cmake_minimum_required(VERSION 3.10)
2+
3+
set(HELPER_LIB mqtt_streaming_helper)
4+
5+
set(HELPER_PRJ_VERSION "1.0.0")
6+
set(HELPER_PRJ_NAME "HelperUtils")
7+
8+
project(${HELPER_PRJ_NAME} VERSION ${HELPER_PRJ_VERSION} LANGUAGES C CXX)
9+
10+
11+
12+
add_library(${HELPER_LIB} INTERFACE)
13+
14+
target_include_directories(${HELPER_LIB} INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include
15+
)
16+
17+
target_sources(${HELPER_LIB} INTERFACE
18+
${CMAKE_CURRENT_SOURCE_DIR}/include/mqtt_streaming_helper/timer.h
19+
)

mqtt_streaming_protocol/tests/Timer.h renamed to helper_utils/include/mqtt_streaming_helper/timer.h

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,49 @@
22

33
#include <chrono>
44

5+
namespace helper::utils
6+
{
57
class Timer
68
{
79
public:
8-
Timer(int ms)
10+
Timer(size_t ms, bool start = true)
11+
: period(ms),
12+
firstStart(true)
913
{
10-
start = std::chrono::steady_clock::now();
11-
timeout = std::chrono::milliseconds(ms);
14+
if (start)
15+
restart();
1216
}
17+
1318
std::chrono::milliseconds remain() const
1419
{
1520
auto now = std::chrono::steady_clock::now();
1621
const auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
1722
std::chrono::milliseconds newTout = (elapsed_ms >= timeout) ? std::chrono::milliseconds(0) : timeout - elapsed_ms;
1823
return newTout;
1924
}
25+
2026
bool expired()
2127
{
22-
return remain() == std::chrono::milliseconds(0);
28+
return (firstStart) ? true : (remain() == std::chrono::milliseconds(0));
2329
}
30+
2431
explicit operator std::chrono::milliseconds() const noexcept
2532
{
2633
return remain();
2734
}
2835

36+
void restart()
37+
{
38+
firstStart = false;
39+
start = std::chrono::steady_clock::now();
40+
timeout = std::chrono::milliseconds(period);
41+
}
42+
2943
protected:
3044
std::chrono::steady_clock::time_point start;
3145
std::chrono::milliseconds timeout;
46+
std::chrono::milliseconds period;
47+
bool firstStart;
3248
};
49+
50+
} // namespace helper::utils

mqtt_streaming_module/include/mqtt_streaming_module/constants.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@ static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms
1717
static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms
1818
static constexpr uint32_t DEFAULT_PUB_QOS = 1;
1919
static constexpr uint32_t DEFAULT_PUB_PACK_SIZE = 1;
20+
static constexpr const char* DEFAULT_PUB_TOPIC_NAME = "";
2021

2122
static constexpr const char* PROPERTY_NAME_MQTT_BROKER_ADDRESS = "MqttBrokerAddress";
2223
static constexpr const char* PROPERTY_NAME_MQTT_BROKER_PORT = "MqttBrokerPort";
2324
static constexpr const char* PROPERTY_NAME_MQTT_USERNAME = "MqttUsername";
2425
static constexpr const char* PROPERTY_NAME_MQTT_PASSWORD = "MqttPassword";
2526
static constexpr const char* PROPERTY_NAME_CONNECT_TIMEOUT = "ConnectTimeout";
2627
static constexpr const char* PROPERTY_NAME_SIGNAL_LIST = "SignalList";
28+
static constexpr const char* PROPERTY_NAME_TOPIC = "Topic";
2729

2830
static constexpr const char* PROPERTY_NAME_PUB_TOPIC_MODE = "TopicMode";
31+
static constexpr const char* PROPERTY_NAME_PUB_TOPIC_NAME = "Topic";
2932
static constexpr const char* PROPERTY_NAME_PUB_SHARED_TS = "SharedTimestamp";
3033
static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES = "GroupValues";
3134
static constexpr const char* PROPERTY_NAME_PUB_USE_SIGNAL_NAMES = "UseSignalNames";
@@ -40,8 +43,16 @@ static constexpr const char* ROOT_FB_NAME = "@rootMqttFb";
4043

4144
static const char* MQTT_LOCAL_ROOT_FB_ID_PREFIX = "rootMqttFb";
4245
static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb";
46+
static const char* MQTT_LOCAL_RAW_FB_ID_PREFIX = "rawMqttFb";
47+
static const char* MQTT_LOCAL_JSON_FB_ID_PREFIX = "jsonMqttFb";
4348

4449

4550
static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType";
51+
static const char* MQTT_RAW_FB_SUB_STATUS_TYPE = "MqttSubscriptionStatusType";
52+
static const char* MQTT_PUB_FB_SIG_STATUS_TYPE = "MqttSignalStatusType";
53+
static const char* MQTT_PUB_FB_PUB_STATUS_TYPE = "MqttPublishingStatusType";
54+
55+
static const char* MQTT_PUB_FB_SIG_STATUS_NAME = "SignalStatus";
56+
static const char* MQTT_PUB_FB_PUB_STATUS_NAME = "PublishingStatus";
4657

4758
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/include/mqtt_streaming_module/handler_factory.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class HandlerFactory
3131
{
3232
if (config.sharedTs)
3333
{
34-
return std::make_unique<MultipleSharedHandler>(config.useSignalNames, publisherFbGlobalId);
34+
return std::make_unique<MultipleSharedHandler>(config.useSignalNames,
35+
config.topicName.empty() ? publisherFbGlobalId : config.topicName);
3536
}
3637
else if (config.topicMode == TopicMode::Single)
3738
{
@@ -42,7 +43,8 @@ class HandlerFactory
4243
}
4344
else if (config.topicMode == TopicMode::Multi)
4445
{
45-
return std::make_unique<MultipleHandler>(config.useSignalNames, publisherFbGlobalId);
46+
return std::make_unique<MultipleHandler>(config.useSignalNames,
47+
config.topicName.empty() ? publisherFbGlobalId : config.topicName);
4648
}
4749

4850
return std::make_unique<SingleHandler>(config.useSignalNames);

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,29 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
2525
class MqttBaseFb : public FunctionBlock
2626
{
2727
public:
28+
enum class SubscriptionStatus : EnumType
29+
{
30+
InvalidTopicName = 0,
31+
SubscribingError,
32+
WaitingForData,
33+
HasData
34+
};
35+
36+
struct CmdResult
37+
{
38+
bool success = false;
39+
std::string msg;
40+
int token = 0;
41+
42+
CmdResult(bool success = false, const std::string& msg = "", int token = 0)
43+
: success(success),
44+
msg(msg),
45+
token(token)
46+
{
47+
}
48+
};
49+
50+
2851
explicit MqttBaseFb(const ContextPtr& ctx,
2952
const ComponentPtr& parent,
3053
const FunctionBlockTypePtr& type,
@@ -34,7 +57,10 @@ class MqttBaseFb : public FunctionBlock
3457
~MqttBaseFb() = default;
3558

3659
protected:
60+
static std::vector<std::pair<SubscriptionStatus, std::string>> subscriptionStatusMap;
61+
3762
std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
63+
EnumerationPtr subscriptionStatus;
3864

3965
virtual void createSignals() = 0;
4066
virtual void processMessage(const mqtt::MqttMessage& msg) = 0;
@@ -44,12 +70,17 @@ class MqttBaseFb : public FunctionBlock
4470

4571
void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg);
4672

47-
virtual std::vector<std::string> getSubscribedTopics() const = 0;
48-
virtual void clearSubscribedTopics() = 0;
49-
virtual void subscribeToTopics();
50-
virtual void unsubscribeFromTopics();
73+
virtual std::string getSubscribedTopic() const = 0;
74+
virtual void clearSubscribedTopic() = 0;
75+
virtual CmdResult subscribeToTopic();
76+
virtual CmdResult unsubscribeFromTopic();
77+
78+
virtual void propertyChanged() = 0;
5179

5280
void removed() override;
81+
82+
void initSubscriptionStatus();
83+
void setSubscriptionStatus(const SubscriptionStatus status, std::string message = "");
5384
};
5485

5586
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ class MqttJsonReceiverFbImpl final : public MqttBaseFb
3131
explicit MqttJsonReceiverFbImpl(const ContextPtr& ctx,
3232
const ComponentPtr& parent,
3333
const FunctionBlockTypePtr& type,
34-
const StringPtr& localId,
3534
std::shared_ptr<mqtt::MqttAsyncClient> subscriber,
3635
const PropertyObjectPtr& config = nullptr);
3736
~MqttJsonReceiverFbImpl() override;
@@ -42,14 +41,19 @@ class MqttJsonReceiverFbImpl final : public MqttBaseFb
4241
mutable std::mutex sync;
4342
mqtt::MqttDataWrapper jsonDataWorker;
4443
std::unordered_map<mqtt::SignalId, SignalConfigPtr> outputSignals;
45-
std::vector<mqtt::SignalId> signalIdList;
46-
std::unordered_map<mqtt::SignalId, DataDescriptorPtr> subscribedSignals;
44+
std::vector<std::string> signalNameList;
45+
std::unordered_map<std::string, DataDescriptorPtr> subscribedSignals;
46+
std::string topicForSubscribing;
47+
static std::atomic<int> localIndex;
48+
49+
static std::string getLocalId();
4750

4851
void createSignals() override;
49-
void clearSubscribedTopics() override;
50-
std::vector<std::string> getSubscribedTopics() const override;
52+
void clearSubscribedTopic() override;
53+
std::string getSubscribedTopic() const override;
5154
void processMessage(const mqtt::MqttMessage& msg) override;
5255
void readProperties() override;
56+
void propertyChanged() override;
5357

5458
void createDataPacket(const std::string& topic, const std::string& json);
5559
};

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "MqttAsyncClient.h"
1919
#include "MqttDataWrapper.h"
2020
#include "mqtt_streaming_module/handler_base.h"
21+
#include "mqtt_streaming_helper/timer.h"
2122
#include <mqtt_streaming_module/common.h>
2223
#include <mqtt_streaming_module/types.h>
2324
#include <opendaq/function_block_impl.h>
@@ -27,6 +28,19 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
2728
class MqttPublisherFbImpl final : public FunctionBlock
2829
{
2930
public:
31+
enum class SignalStatus : EnumType
32+
{
33+
NotConnected = 0,
34+
Invalid,
35+
Valid
36+
};
37+
38+
enum class PublishingStatus : EnumType
39+
{
40+
Ok = 0,
41+
SampleSkipped
42+
};
43+
3044
explicit MqttPublisherFbImpl(const ContextPtr& ctx,
3145
const ComponentPtr& parent,
3246
const FunctionBlockTypePtr& type,
@@ -40,7 +54,12 @@ class MqttPublisherFbImpl final : public FunctionBlock
4054
void onConnected(const InputPortPtr& port) override;
4155
void onDisconnected(const InputPortPtr& port) override;
4256

57+
static void addTypesToTypeManager(daq::TypeManagerPtr manager);
58+
4359
private:
60+
static const std::vector<std::pair<SignalStatus, std::string>> signalStatusMap;
61+
static const std::vector<std::pair<PublishingStatus, std::string>> publishingStatusMap;
62+
4463
static std::atomic<int> localIndex;
4564
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient;
4665
mqtt::MqttDataWrapper jsonDataWorker;
@@ -51,10 +70,20 @@ class MqttPublisherFbImpl final : public FunctionBlock
5170
std::atomic<bool> running;
5271
std::atomic<bool> hasError;
5372
std::unique_ptr<HandlerBase> handler;
73+
EnumerationPtr signalStatus;
74+
EnumerationPtr publishingStatus;
75+
uint64_t skippedMsgCnt;
76+
uint64_t publishedMsgCnt;
77+
std::string lastSkippedReason;
78+
helper::utils::Timer publishingStatusTimer;
5479

5580
static std::string getLocalId();
81+
void setSignalStatus(const SignalStatus status, std::string message = "", bool init = false);
82+
void setPublishingStatus(const PublishingStatus status, std::string message = "", bool init = false);
83+
void updatePublishingStatus();
5684
void initProperties(const PropertyObjectPtr& config);
5785
void readProperties();
86+
void propertyChanged();
5887
void updateInputPorts();
5988
void validateInputPorts();
6089
template <typename retT, typename intfT>

0 commit comments

Comments
 (0)