Skip to content

Commit e240597

Browse files
committed
mqtt: refactoring
1 parent 1a4e0d6 commit e240597

6 files changed

Lines changed: 13 additions & 13 deletions

File tree

mqtt_streaming_client_module/src/mqtt_json_receiver_fb_impl.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ MqttJsonReceiverFbImpl::~MqttJsonReceiverFbImpl()
3333

3434
void MqttJsonReceiverFbImpl::readProperties()
3535
{
36-
auto lock = std::lock_guard<std::mutex>(sync);
36+
auto lock = std::scoped_lock<std::mutex>(sync);
3737
subscribedSignals.clear();
3838
signalIdList.clear();
3939
bool isPresent = false;
@@ -66,7 +66,7 @@ void MqttJsonReceiverFbImpl::readProperties()
6666

6767
void MqttJsonReceiverFbImpl::createDataPacket(const std::string& topic, const std::string& json)
6868
{
69-
auto lock = std::lock_guard<std::mutex>(sync);
69+
auto lock = std::scoped_lock<std::mutex>(sync);
7070
jsonDataWorker.createAndSendDataPacket(topic, json);
7171
}
7272

@@ -79,7 +79,7 @@ void MqttJsonReceiverFbImpl::processMessage(const mqtt::MqttMessage& msg)
7979

8080
void MqttJsonReceiverFbImpl::createSignals()
8181
{
82-
auto lock = std::lock_guard<std::mutex>(sync);
82+
auto lock = std::scoped_lock<std::mutex>(sync);
8383
if (!subscribedSignals.empty())
8484
LOG_I("Creating signals...");
8585

@@ -131,7 +131,7 @@ void MqttJsonReceiverFbImpl::createSignals()
131131

132132
std::vector<std::string> MqttJsonReceiverFbImpl::getSubscribedTopics() const
133133
{
134-
auto lock = std::lock_guard<std::mutex>(sync);
134+
auto lock = std::scoped_lock<std::mutex>(sync);
135135
std::set<std::string> topicsSet;
136136
for (const auto& [signalId, _] : subscribedSignals)
137137
{
@@ -142,7 +142,7 @@ std::vector<std::string> MqttJsonReceiverFbImpl::getSubscribedTopics() const
142142

143143
void MqttJsonReceiverFbImpl::clearSubscribedTopics()
144144
{
145-
auto lock = std::lock_guard<std::mutex>(sync);
145+
auto lock = std::scoped_lock<std::mutex>(sync);
146146
subscribedSignals.clear();
147147
signalIdList.clear();
148148
}

mqtt_streaming_client_module/src/mqtt_raw_receiver_fb_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ MqttRawReceiverFbImpl::~MqttRawReceiverFbImpl()
3333

3434
void MqttRawReceiverFbImpl::readProperties()
3535
{
36-
auto lock = std::lock_guard<std::mutex>(sync);
36+
auto lock = std::scoped_lock<std::mutex>(sync);
3737
topicsForSubscribing.clear();
3838
bool isPresent = false;
3939
if (objPtr.hasProperty(PROPERTY_NAME_SIGNAL_LIST))
@@ -69,7 +69,7 @@ void MqttRawReceiverFbImpl::processMessage(const mqtt::MqttMessage& msg)
6969
{
7070
std::string topic(msg.getTopic());
7171

72-
auto lock = std::lock_guard<std::mutex>(sync);
72+
auto lock = std::scoped_lock<std::mutex>(sync);
7373
auto signalIter = outputSignals.find(topic);
7474
if (signalIter == outputSignals.end())
7575
{
@@ -84,7 +84,7 @@ void MqttRawReceiverFbImpl::processMessage(const mqtt::MqttMessage& msg)
8484

8585
void MqttRawReceiverFbImpl::createSignals()
8686
{
87-
auto lock = std::lock_guard<std::mutex>(sync);
87+
auto lock = std::scoped_lock<std::mutex>(sync);
8888
if (!topicsForSubscribing.empty())
8989
LOG_I("Creating signals...");
9090
for (const auto& topic : topicsForSubscribing)

mqtt_streaming_client_module/src/mqtt_streaming_device_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ FunctionBlockPtr MqttStreamingDeviceImpl::onAddFunctionBlock(const StringPtr& ty
223223
}
224224
else
225225
{
226-
setComponentStatusWithMessage(ComponentStatus::Error, "Function block type is not available: " + typeId.toStdString());
226+
DAQ_THROW_EXCEPTION(NotFoundException, "Function block type is not available: " + typeId.toStdString());
227227
}
228228
}
229229
return nestedFunctionBlock;

mqtt_streaming_client_module/tests/test_daq_test_helper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class DaqTestHelper
77
{
88
public:
99
daq::InstancePtr daqInstance;
10-
daq::GenericDevicePtr<daq::IDevice> device;
10+
daq::DevicePtr device;
1111

1212
void StartUp(std::string connectionStr = "daq.mqtt://127.0.0.1", int discoveryTimeoutMs = 0)
1313
{

mqtt_streaming_protocol/include/MqttAsyncClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class MqttAsyncClient final {
116116
std::function<MsgArrivedCb_type> onMsgArrivedCmnCb;
117117
std::unordered_map<std::string, std::function<MsgArrivedCb_type>> onMsgArrivedCbs;
118118

119-
std::lock_guard<std::recursive_mutex> getCbLock();
119+
std::scoped_lock<std::recursive_mutex> getCbLock();
120120

121121
static void onDeliveryCompleted(void *context, MQTTAsync_token token);
122122
static void onConnected(void *context, char *cause);

mqtt_streaming_protocol/src/MqttAsyncClient.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ MqttConnectionStatus MqttAsyncClient::isConnected() const
147147
return MQTTAsync_isConnected(client) ? MqttConnectionStatus::connected : MqttConnectionStatus::not_connected;
148148
}
149149

150-
std::lock_guard<std::recursive_mutex> MqttAsyncClient::getCbLock()
150+
std::scoped_lock<std::recursive_mutex> MqttAsyncClient::getCbLock()
151151
{
152-
return std::lock_guard<decltype(cbMtx)>(cbMtx);
152+
return std::scoped_lock<decltype(cbMtx)>(cbMtx);
153153
}
154154

155155
void MqttAsyncClient::setUsernamePasswrod(std::string username, std::string password)

0 commit comments

Comments
 (0)