Skip to content

Commit 81a03d6

Browse files
committed
refactoring
1 parent 5aa0b46 commit 81a03d6

13 files changed

Lines changed: 736 additions & 682 deletions

File tree

modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,6 @@ class MqttSubscriberFbImpl final : public FunctionBlock
3030
friend class MqttJsonDecoderFbHelper;
3131

3232
public:
33-
struct CmdResult
34-
{
35-
bool success = false;
36-
std::string msg;
37-
int token = 0;
38-
39-
CmdResult(bool success = false, const std::string& msg = "", int token = 0)
40-
: success(success),
41-
msg(msg),
42-
token(token)
43-
{
44-
}
45-
};
46-
4733
enum class DomainSignalMode : EnumType
4834
{
4935
None = 0,
@@ -67,7 +53,6 @@ class MqttSubscriberFbImpl final : public FunctionBlock
6753

6854
std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
6955
int qos = DEFAULT_SUB_QOS;
70-
mqtt::MqttDataWrapper jsonDataWorker;
7156
std::string topicForSubscribing;
7257
DictObjectPtr<IDict, IString, IFunctionBlockType> nestedFbTypes;
7358
std::vector<FunctionBlockPtr> nestedFunctionBlocks;
@@ -103,8 +88,8 @@ class MqttSubscriberFbImpl final : public FunctionBlock
10388
void propertyChanged();
10489

10590
bool setTopic(std::string topic);
106-
CmdResult subscribeToTopic();
107-
CmdResult unsubscribeFromTopic();
91+
mqtt::CmdResult subscribeToTopic();
92+
mqtt::CmdResult unsubscribeFromTopic();
10893

10994
void removed() override;
11095
DictPtr<IString, IFunctionBlockType> onGetAvailableFunctionBlockTypes() override;

modules/mqtt_streaming_module/src/mqtt_json_decoder_fb_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ MqttJsonDecoderFbImpl::MqttJsonDecoderFbImpl(const ContextPtr& ctx,
1313
const FunctionBlockTypePtr& type,
1414
const PropertyObjectPtr& config)
1515
: FunctionBlock(type, ctx, parent, generateLocalId()),
16-
jsonDataWorker(loggerComponent)
16+
jsonDataWorker()
1717
{
1818
initComponentStatus();
1919
if (config.assigned())

modules/mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <opendaq/binary_data_packet_factory.h>
77
#include <opendaq/event_packet_params.h>
88
#include <mqtt_streaming_module/property_helper.h>
9+
#include "JsonConfigWrapper.h"
910

1011
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
1112

@@ -18,7 +19,7 @@ MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx,
1819
const PropertyObjectPtr& config)
1920
: FunctionBlock(type, ctx, parent, generateLocalId()),
2021
mqttClient(mqttClient),
21-
jsonDataWorker(loggerComponent),
22+
jsonDataWorker(),
2223
running(true),
2324
hasSignalError(false),
2425
signalDescriptorChanged(false),
@@ -512,7 +513,7 @@ void MqttPublisherFbImpl::readProperties()
512513

513514
if (config.mode == PublisherMode::Json && config.topicMode == TopicMode::Single)
514515
{
515-
auto result = mqtt::MqttDataWrapper::validateTopic(config.topicName, loggerComponent);
516+
auto result = mqtt::JsonConfigWrapper::validateTopic(config.topicName, loggerComponent);
516517
hasSettingError = !result.success;
517518
settingErrors.push_back(std::move(result.msg));
518519
}

modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <mqtt_streaming_module/mqtt_subscriber_fb_impl.h>
88
#include <opendaq/binary_data_packet_factory.h>
99
#include <sstream>
10+
#include "JsonConfigWrapper.h"
1011

1112
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
1213

@@ -21,7 +22,6 @@ MqttSubscriberFbImpl::MqttSubscriberFbImpl(const ContextPtr& ctx,
2122
const PropertyObjectPtr& config)
2223
: FunctionBlock(type, ctx, parent, generateLocalId()),
2324
subscriber(subscriber),
24-
jsonDataWorker(loggerComponent),
2525
topicForSubscribing(""),
2626
nestedFbTypes(nullptr),
2727
enablePreview(false),
@@ -259,11 +259,11 @@ std::pair<bool, std::string> MqttSubscriberFbImpl::readFileToString(const std::s
259259

260260
void MqttSubscriberFbImpl::setJsonConfig(const std::string config)
261261
{
262-
jsonDataWorker.setConfig(config);
263-
auto result = jsonDataWorker.isJsonValid();
262+
mqtt::JsonConfigWrapper jsonConfigWrapper(config);
263+
auto result = jsonConfigWrapper.isJsonValid();
264264
if (result.success)
265265
{
266-
auto topic = jsonDataWorker.extractTopic();
266+
auto topic = jsonConfigWrapper.extractTopic();
267267
result.success = setTopic(topic);
268268
if (result.success)
269269
{
@@ -273,7 +273,7 @@ void MqttSubscriberFbImpl::setJsonConfig(const std::string config)
273273
objPtr.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, String(topic));
274274
event.unmute();
275275
}
276-
if (const auto signalDscs = jsonDataWorker.extractDescription(); !signalDscs.empty())
276+
if (const auto signalDscs = jsonConfigWrapper.extractDescription(); !signalDscs.empty())
277277
{
278278
auto fbConfig = MqttJsonDecoderFbImpl::CreateType().createDefaultConfig();
279279
for (const auto& [signalName, descriptor] : signalDscs)
@@ -316,7 +316,7 @@ void MqttSubscriberFbImpl::propertyChanged()
316316

317317
bool MqttSubscriberFbImpl::setTopic(std::string topic)
318318
{
319-
const auto validationStatus = mqtt::MqttDataWrapper::validateTopic(topic, loggerComponent);
319+
const auto validationStatus = mqtt::JsonConfigWrapper::validateTopic(topic, loggerComponent);
320320
if (validationStatus.success)
321321
{
322322
LOG_I("An MQTT topic: {}", topic);
@@ -539,9 +539,9 @@ void MqttSubscriberFbImpl::clearSubscribedTopic()
539539
topicForSubscribing.clear();
540540
}
541541

542-
MqttSubscriberFbImpl::CmdResult MqttSubscriberFbImpl::subscribeToTopic()
542+
mqtt::CmdResult MqttSubscriberFbImpl::subscribeToTopic()
543543
{
544-
CmdResult result{false};
544+
mqtt::CmdResult result{false};
545545
if (subscriber)
546546
{
547547
auto lambda = [this](const mqtt::MqttAsyncClient& client, mqtt::MqttMessage& msg) { this->onSignalsMessage(client, msg); };
@@ -563,7 +563,7 @@ MqttSubscriberFbImpl::CmdResult MqttSubscriberFbImpl::subscribeToTopic()
563563
LOG_D("Trying to subscribe to the topic: {}", topic);
564564
setComponentStatusWithMessage(ComponentStatus::Ok, "Waiting for data for the topic: " + topicForSubscribing);
565565
waitingForData = true;
566-
result = {true, "", result.token};
566+
result = {true, ""};
567567
}
568568
}
569569
else
@@ -581,16 +581,16 @@ MqttSubscriberFbImpl::CmdResult MqttSubscriberFbImpl::subscribeToTopic()
581581
return result;
582582
}
583583

584-
MqttSubscriberFbImpl::CmdResult MqttSubscriberFbImpl::unsubscribeFromTopic()
584+
mqtt::CmdResult MqttSubscriberFbImpl::unsubscribeFromTopic()
585585
{
586-
CmdResult result{true};
586+
mqtt::CmdResult result{true};
587587
if (subscriber)
588588
{
589589
const auto topic = getSubscribedTopic();
590590
if (!topic.empty())
591591
{
592592
subscriber->setMessageArrivedCb(topic, nullptr);
593-
mqtt::CmdResult unsubRes = subscriber->unsubscribe(topic);
593+
mqtt::MqttAsyncClient::CmdResultWithToken unsubRes = subscriber->unsubscribe(topic);
594594
if (unsubRes.success)
595595
unsubRes = subscriber->waitForCompletion(unsubRes.token, MQTT_FB_UNSUBSCRIBE_TOUT);
596596

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <rapidjson/document.h>
4+
#include <opendaq/logger_component_ptr.h>
5+
#include <coreobjects/unit_ptr.h>
6+
#include <coretypes/string_ptr.h>
7+
#include "common.h"
8+
9+
namespace mqtt
10+
{
11+
12+
class JsonConfigWrapper final
13+
{
14+
public:
15+
JsonConfigWrapper(const std::string& config);
16+
17+
static CmdResult validateTopic(const daq::StringPtr topic, const daq::LoggerComponentPtr loggerComponent = nullptr);
18+
19+
std::vector<std::pair<std::string, MqttMsgDescriptor>> extractDescription();
20+
std::string extractTopic();
21+
CmdResult isJsonValid();
22+
23+
private:
24+
rapidjson::Document doc;
25+
std::string config;
26+
27+
daq::UnitPtr extractSignalUnit(const rapidjson::Value& signalObj);
28+
std::string extractValueFieldName(const rapidjson::Value& signalObj);
29+
std::string extractTimestampFieldName(const rapidjson::Value& signalObj);
30+
std::string extractFieldName(const rapidjson::Value& signalObj, const std::string& field);
31+
32+
};
33+
} // namespace mqtt

shared/mqtt_streaming_protocol/include/MqttAsyncClient.h

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,44 @@ enum class MqttConnectionStatus {
1616
pending,
1717
};
1818

19-
struct CmdResult
20-
{
21-
bool success = false;
22-
std::string msg;
23-
int token = 0;
24-
25-
CmdResult()
26-
: success(false),
27-
msg(""),
28-
token(0)
29-
{
30-
}
31-
CmdResult(bool success)
32-
: success(success),
33-
msg(""),
34-
token(0)
35-
{
36-
}
37-
38-
CmdResult(bool success, const std::string& msg)
39-
: success(success),
40-
msg(msg),
41-
token(0)
42-
{
43-
}
44-
45-
CmdResult(bool success, const std::string& msg, int token)
46-
: success(success),
47-
msg(msg),
48-
token(token)
49-
{
50-
}
51-
};
52-
5319
class MqttAsyncClient final {
5420
public:
5521
using MsgArrivedCb_type = void(const MqttAsyncClient &, mqtt::MqttMessage &msg);
5622

23+
struct CmdResultWithToken
24+
{
25+
bool success = false;
26+
std::string msg;
27+
int token = 0;
28+
29+
CmdResultWithToken()
30+
: success(false),
31+
msg(""),
32+
token(0)
33+
{
34+
}
35+
CmdResultWithToken(bool success)
36+
: success(success),
37+
msg(""),
38+
token(0)
39+
{
40+
}
41+
42+
CmdResultWithToken(bool success, const std::string& msg)
43+
: success(success),
44+
msg(msg),
45+
token(0)
46+
{
47+
}
48+
49+
CmdResultWithToken(bool success, const std::string& msg, int token)
50+
: success(success),
51+
msg(msg),
52+
token(token)
53+
{
54+
}
55+
};
56+
5757
MqttAsyncClient();
5858
MqttAsyncClient(std::string serverUrl, std::string clientId, bool cleanSession);
5959
MqttAsyncClient(const MqttAsyncClient &) = delete;
@@ -62,20 +62,20 @@ class MqttAsyncClient final {
6262
MqttAsyncClient &operator=(MqttAsyncClient &&) = delete;
6363
~MqttAsyncClient();
6464

65-
CmdResult connect();
66-
CmdResult disconnect();
65+
CmdResultWithToken connect();
66+
CmdResultWithToken disconnect();
6767
bool syncDisconnect(int timeoutMs);
6868

69-
CmdResult publish(const std::string &topic,
69+
CmdResultWithToken publish(const std::string &topic,
7070
void *data,
7171
size_t dataLen,
7272
int qos = 1,
7373
bool retained = false);
7474

75-
CmdResult subscribe(std::string topic, int qos);
76-
CmdResult unsubscribe(std::string topic);
77-
CmdResult unsubscribe(const std::vector<std::string>& topics);
78-
CmdResult waitForCompletion(int token, unsigned long toutMs);
75+
CmdResultWithToken subscribe(std::string topic, int qos);
76+
CmdResultWithToken unsubscribe(std::string topic);
77+
CmdResultWithToken unsubscribe(const std::vector<std::string>& topics);
78+
CmdResultWithToken waitForCompletion(int token, unsigned long toutMs);
7979

8080
void setConnectedCb(std::function<void()> cb);
8181
void setMessageArrivedCb(std::string topic, std::function<MsgArrivedCb_type> cb);

0 commit comments

Comments
 (0)