Skip to content

Commit 0b1ae5e

Browse files
committed
mqtt: publishingStatus for publiher FB
1 parent 42a3798 commit 0b1ae5e

13 files changed

Lines changed: 167 additions & 46 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ if(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
6262
endif()
6363

6464
add_subdirectory(external)
65+
add_subdirectory(helper_utils)
6566
add_subdirectory(mqtt_streaming_protocol)
6667
add_subdirectory(mqtt_streaming_module)
6768

helper_utils/CMakeLists.txt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
cmake_minimum_required(VERSION 3.10)
2+
3+
set(HELPER_LIB mqtt_streaming_helper)
4+
5+
set(HELPER_PRJ_VERSION "1.0.0")
6+
set(HELPER_PRJ_NAME "HelperUtils")
7+
8+
project(${HELPER_PRJ_NAME} VERSION ${HELPER_PRJ_VERSION} LANGUAGES C CXX)
9+
10+
11+
12+
add_library(${HELPER_LIB} INTERFACE)
13+
14+
target_include_directories(${HELPER_LIB} INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include
15+
)
16+
17+
target_sources(${HELPER_LIB} INTERFACE
18+
${CMAKE_CURRENT_SOURCE_DIR}/include/mqtt_streaming_helper/timer.h
19+
)

mqtt_streaming_protocol/tests/Timer.h renamed to helper_utils/include/mqtt_streaming_helper/timer.h

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,49 @@
22

33
#include <chrono>
44

5+
namespace helper::utils
6+
{
57
class Timer
68
{
79
public:
8-
Timer(int ms)
10+
Timer(size_t ms, bool start = true)
11+
: period(ms),
12+
firstStart(true)
913
{
10-
start = std::chrono::steady_clock::now();
11-
timeout = std::chrono::milliseconds(ms);
14+
if (start)
15+
restart();
1216
}
17+
1318
std::chrono::milliseconds remain() const
1419
{
1520
auto now = std::chrono::steady_clock::now();
1621
const auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
1722
std::chrono::milliseconds newTout = (elapsed_ms >= timeout) ? std::chrono::milliseconds(0) : timeout - elapsed_ms;
1823
return newTout;
1924
}
25+
2026
bool expired()
2127
{
22-
return remain() == std::chrono::milliseconds(0);
28+
return (firstStart) ? true : (remain() == std::chrono::milliseconds(0));
2329
}
30+
2431
explicit operator std::chrono::milliseconds() const noexcept
2532
{
2633
return remain();
2734
}
2835

36+
void restart()
37+
{
38+
firstStart = false;
39+
start = std::chrono::steady_clock::now();
40+
timeout = std::chrono::milliseconds(period);
41+
}
42+
2943
protected:
3044
std::chrono::steady_clock::time_point start;
3145
std::chrono::milliseconds timeout;
46+
std::chrono::milliseconds period;
47+
bool firstStart;
3248
};
49+
50+
} // namespace helper::utils

mqtt_streaming_module/include/mqtt_streaming_module/constants.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,9 @@ static const char* MQTT_LOCAL_JSON_FB_ID_PREFIX = "jsonMqttFb";
5050
static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType";
5151
static const char* MQTT_RAW_FB_SUB_STATUS_TYPE = "MqttSubscriptionStatusType";
5252
static const char* MQTT_PUB_FB_SIG_STATUS_TYPE = "MqttSignalStatusType";
53+
static const char* MQTT_PUB_FB_PUB_STATUS_TYPE = "MqttPublishingStatusType";
54+
55+
static const char* MQTT_PUB_FB_SIG_STATUS_NAME = "SignalStatus";
56+
static const char* MQTT_PUB_FB_PUB_STATUS_NAME = "PublishingStatus";
5357

5458
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "MqttAsyncClient.h"
1919
#include "MqttDataWrapper.h"
2020
#include "mqtt_streaming_module/handler_base.h"
21+
#include "mqtt_streaming_helper/timer.h"
2122
#include <mqtt_streaming_module/common.h>
2223
#include <mqtt_streaming_module/types.h>
2324
#include <opendaq/function_block_impl.h>
@@ -34,6 +35,12 @@ class MqttPublisherFbImpl final : public FunctionBlock
3435
Valid
3536
};
3637

38+
enum class PublishingStatus : EnumType
39+
{
40+
Ok = 0,
41+
SampleSkipped
42+
};
43+
3744
explicit MqttPublisherFbImpl(const ContextPtr& ctx,
3845
const ComponentPtr& parent,
3946
const FunctionBlockTypePtr& type,
@@ -47,10 +54,11 @@ class MqttPublisherFbImpl final : public FunctionBlock
4754
void onConnected(const InputPortPtr& port) override;
4855
void onDisconnected(const InputPortPtr& port) override;
4956

50-
static void addTypeToTypeManager(daq::TypeManagerPtr manager);
57+
static void addTypesToTypeManager(daq::TypeManagerPtr manager);
5158

5259
private:
5360
static const std::vector<std::pair<SignalStatus, std::string>> signalStatusMap;
61+
static const std::vector<std::pair<PublishingStatus, std::string>> publishingStatusMap;
5462

5563
static std::atomic<int> localIndex;
5664
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient;
@@ -63,10 +71,16 @@ class MqttPublisherFbImpl final : public FunctionBlock
6371
std::atomic<bool> hasError;
6472
std::unique_ptr<HandlerBase> handler;
6573
EnumerationPtr signalStatus;
74+
EnumerationPtr publishingStatus;
75+
uint64_t skippedMsgCnt;
76+
uint64_t publishedMsgCnt;
77+
std::string lastSkippedReason;
78+
helper::utils::Timer publishingStatusTimer;
6679

6780
static std::string getLocalId();
68-
void initSignalStatus();
69-
void setSignalStatus(const SignalStatus status, std::string message = "");
81+
void setSignalStatus(const SignalStatus status, std::string message = "", bool init = false);
82+
void setPublishingStatus(const PublishingStatus status, std::string message = "", bool init = false);
83+
void updatePublishingStatus();
7084
void initProperties(const PropertyObjectPtr& config);
7185
void readProperties();
7286
void propertyChanged();

mqtt_streaming_module/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ endif()
8686

8787
target_link_libraries(${LIB_NAME} PUBLIC daq::opendaq
8888
mqtt_streaming_protocol
89+
mqtt_streaming_helper
8990
PRIVATE $<BUILD_INTERFACE:Boost::algorithm>
9091
)
9192

mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ const std::vector<std::pair<MqttPublisherFbImpl::SignalStatus, std::string>> Mqt
1414
{SignalStatus::Invalid, "Invalid"},
1515
{SignalStatus::Valid, "Valid"}};
1616

17+
const std::vector<std::pair<MqttPublisherFbImpl::PublishingStatus, std::string>> MqttPublisherFbImpl::publishingStatusMap =
18+
{{PublishingStatus::Ok, "Ok"},
19+
{PublishingStatus::SampleSkipped, "SampleSkipped"}};
20+
1721
MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx,
1822
const ComponentPtr& parent,
1923
const FunctionBlockTypePtr& type,
@@ -24,10 +28,14 @@ MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx,
2428
jsonDataWorker(loggerComponent),
2529
inputPortCount(0),
2630
running(true),
27-
hasError(false)
31+
hasError(false),
32+
skippedMsgCnt(0),
33+
publishingStatusTimer(helper::utils::Timer(1000, false))
2834
{
2935
initComponentStatus();
30-
initSignalStatus();
36+
addTypesToTypeManager(context.getTypeManager());
37+
setSignalStatus(SignalStatus::NotConnected, "", true);
38+
setPublishingStatus(PublishingStatus::Ok, "", true);
3139
if (config.assigned())
3240
initProperties(populateDefaultConfig(type.createDefaultConfig(), config));
3341
else
@@ -171,8 +179,12 @@ void MqttPublisherFbImpl::updateInputPorts()
171179

172180
void MqttPublisherFbImpl::validateInputPorts()
173181
{
182+
skippedMsgCnt = 0;
183+
publishedMsgCnt = 0;
184+
setPublishingStatus(PublishingStatus::Ok);
174185
if (signalContexts.size() == 1) // no one input port is connected
175186
{
187+
setComponentStatus(ComponentStatus::Ok);
176188
setSignalStatus(SignalStatus::NotConnected);
177189
}
178190
else
@@ -280,6 +292,7 @@ void MqttPublisherFbImpl::readerLoop()
280292
msgs = handler->processSignalContexts(signalContexts);
281293
}
282294
sendMessages(msgs);
295+
updatePublishingStatus();
283296
std::this_thread::sleep_for(std::chrono::milliseconds(config.periodMs));
284297
}
285298
}
@@ -291,7 +304,13 @@ void MqttPublisherFbImpl::sendMessages(const MqttData& data)
291304
auto status = mqttClient->publish(topic, (void*)msg.c_str(), msg.length(), config.qos);
292305
if (!status.success)
293306
{
294-
LOG_W("Failed to publish data to {}; reason - {}", topic, status.msg);
307+
++skippedMsgCnt;
308+
lastSkippedReason = std::move(status.msg);
309+
LOG_W("Failed to publish data to {}; reason - {}", topic, lastSkippedReason);
310+
}
311+
else
312+
{
313+
++publishedMsgCnt;
295314
}
296315
}
297316
}
@@ -301,27 +320,55 @@ std::string MqttPublisherFbImpl::getLocalId()
301320
return std::string(MQTT_LOCAL_PUB_FB_ID_PREFIX + std::to_string(localIndex++));
302321
}
303322

304-
void MqttPublisherFbImpl::initSignalStatus()
323+
void MqttPublisherFbImpl::setSignalStatus(const SignalStatus status, std::string message, bool init)
305324
{
325+
signalStatus = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, static_cast<Int>(status), context.getTypeManager());
326+
if (init)
327+
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).addStatusWithMessage(MQTT_PUB_FB_SIG_STATUS_NAME,
328+
signalStatus,
329+
message);
330+
else
331+
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).setStatusWithMessage(MQTT_PUB_FB_SIG_STATUS_NAME,
332+
signalStatus,
333+
message);
334+
}
306335

307-
addTypeToTypeManager(context.getTypeManager());
308-
309-
signalStatus = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE,
310-
static_cast<Int>(SignalStatus::NotConnected),
311-
this->context.getTypeManager());
312-
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).addStatus("SignalStatus",
313-
signalStatus);
336+
void MqttPublisherFbImpl::updatePublishingStatus()
337+
{
338+
if (publishingStatusTimer.expired())
339+
{
340+
publishingStatusTimer.restart();
341+
if (skippedMsgCnt != 0)
342+
{
343+
if (statusContainer.getStatus("ComponentStatus") == ComponentStatus::Ok)
344+
setComponentStatusWithMessage(ComponentStatus::Warning, "Some messages were not published!");
345+
setPublishingStatus(PublishingStatus::SampleSkipped,
346+
fmt::format("Published: {}; Skipped: {}; last reason - {}",
347+
publishedMsgCnt,
348+
skippedMsgCnt,
349+
lastSkippedReason));
350+
}
351+
else
352+
{
353+
setPublishingStatus(PublishingStatus::Ok, fmt::format("Published: {};", publishedMsgCnt));
354+
}
355+
}
314356
}
315357

316-
void MqttPublisherFbImpl::setSignalStatus(const SignalStatus status, std::string message)
358+
void MqttPublisherFbImpl::setPublishingStatus(const PublishingStatus status, std::string message, bool init)
317359
{
318-
signalStatus = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, static_cast<Int>(status), this->context.getTypeManager());
319-
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).setStatusWithMessage("SignalStatus",
320-
signalStatus,
321-
message);
360+
publishingStatus = EnumerationWithIntValue(MQTT_PUB_FB_PUB_STATUS_TYPE, static_cast<Int>(status), context.getTypeManager());
361+
if (init)
362+
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).addStatusWithMessage(MQTT_PUB_FB_PUB_STATUS_NAME,
363+
publishingStatus,
364+
message);
365+
else
366+
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).setStatusWithMessage(MQTT_PUB_FB_PUB_STATUS_NAME,
367+
publishingStatus,
368+
message);
322369
}
323370

324-
void MqttPublisherFbImpl::addTypeToTypeManager(daq::TypeManagerPtr manager)
371+
void MqttPublisherFbImpl::addTypesToTypeManager(daq::TypeManagerPtr manager)
325372
{
326373
if (!manager.hasType(MQTT_PUB_FB_SIG_STATUS_TYPE))
327374
{
@@ -331,5 +378,14 @@ void MqttPublisherFbImpl::addTypeToTypeManager(daq::TypeManagerPtr manager)
331378

332379
manager.addType(EnumerationType(MQTT_PUB_FB_SIG_STATUS_TYPE, list));
333380
}
381+
382+
if (!manager.hasType(MQTT_PUB_FB_PUB_STATUS_TYPE))
383+
{
384+
auto list = List<IString>();
385+
for (const auto& [_, st] : publishingStatusMap)
386+
list.pushBack(st);
387+
388+
manager.addType(EnumerationType(MQTT_PUB_FB_PUB_STATUS_TYPE, list));
389+
}
334390
}
335391
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ target_link_libraries(${TEST_APP} PRIVATE daq::test_utils
1818
${MODULE_NAME}
1919
mqtt_stream_module
2020
mqtt_streaming_protocol_test_helper
21+
mqtt_streaming_helper
2122
)
2223

2324
add_test(NAME ${TEST_APP}

mqtt_streaming_module/tests/test_mqtt_json_fb.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#include "MqttAsyncClientWrapper.h"
2-
#include "Timer.h"
2+
#include "mqtt_streaming_helper/timer.h"
33
#include "mqtt_streaming_module/mqtt_json_receiver_fb_impl.h"
44
#include "test_daq_test_helper.h"
55
#include "test_data.h"
@@ -236,7 +236,7 @@ class MqttJsonFbHelper
236236
{
237237
std::vector<T> result;
238238

239-
auto timer = Timer(timeoutMs);
239+
auto timer = helper::utils::Timer(timeoutMs);
240240
while (!reader.getEmpty() || !timer.expired())
241241
{
242242
if (reader.getEmpty())

0 commit comments

Comments
 (0)