Skip to content

Commit 033c406

Browse files
committed
mqtt: disconnecting on device removing
1 parent 024af35 commit 033c406

4 files changed

Lines changed: 55 additions & 5 deletions

File tree

mqtt_streaming_client_module/include/mqtt_streaming_client_module/mqtt_streaming_device_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class MqttStreamingDeviceImpl : public Device
4747
DictPtr<IString, IFunctionBlockType> onGetAvailableFunctionBlockTypes() override;
4848
FunctionBlockPtr onAddFunctionBlock(const StringPtr& typeId, const PropertyObjectPtr& config) override;
4949

50-
void setupMqttSubscriber();
50+
void initMqttSubscriber();
5151
bool waitForConnection(const int timeoutMs);
5252
void receiveSignalTopics(const int timeoutMs);
5353
void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, mqtt::MqttMessage& msg);

mqtt_streaming_client_module/src/mqtt_streaming_device_impl.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE
1717

1818
std::atomic<int> MqttStreamingDeviceImpl::localIndex = 0;
1919

20+
constexpr int MQTT_CLIENT_SYNC_DISCONNECT_TOUT = 3000;
21+
2022
MqttStreamingDeviceImpl::MqttStreamingDeviceImpl(const ContextPtr& ctx, const ComponentPtr& parent, const PropertyObjectPtr& config)
2123
: Device(ctx, parent, getLocalId()),
2224
connectionStatus(Enumeration("ConnectionStatusType", "Connected", this->context.getTypeManager())),
@@ -37,7 +39,7 @@ MqttStreamingDeviceImpl::MqttStreamingDeviceImpl(const ContextPtr& ctx, const Co
3739

3840
initComponentStatus();
3941

40-
setupMqttSubscriber();
42+
initMqttSubscriber();
4143
if (!waitForConnection(initTimeout))
4244
{
4345
LOG_E("MQTT: could not connect to MQTT broker within {} ms", initTimeout);
@@ -50,6 +52,9 @@ MqttStreamingDeviceImpl::MqttStreamingDeviceImpl(const ContextPtr& ctx, const Co
5052

5153
void MqttStreamingDeviceImpl::removed()
5254
{
55+
bool disRes = subscriber->syncDisconnect(MQTT_CLIENT_SYNC_DISCONNECT_TOUT);
56+
if (!disRes)
57+
LOG_E("MQTT: disconnection was unsuccessful");
5358
Device::removed();
5459
}
5560

@@ -58,9 +63,8 @@ DeviceInfoPtr MqttStreamingDeviceImpl::onGetInfo()
5863
return DeviceInfo(connectionString, MQTT_DEVICE_NAME);
5964
}
6065

61-
void MqttStreamingDeviceImpl::setupMqttSubscriber()
66+
void MqttStreamingDeviceImpl::initMqttSubscriber()
6267
{
63-
subscriber->disconnect();
6468
subscriber->setServerURL(connectionSettings.mqttUrl);
6569
subscriber->setClientId(connectionSettings.clientId);
6670
subscriber->setUsernamePasswrod(connectionSettings.username, connectionSettings.password);

mqtt_streaming_protocol/include/MqttAsyncClient.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class MqttAsyncClient final {
4040

4141
bool connect();
4242
bool disconnect();
43+
bool syncDisconnect(int timeoutMs);
4344

4445
bool publish(const std::string &topic,
4546
void *data,
@@ -84,6 +85,7 @@ class MqttAsyncClient final {
8485
std::function<void()> onConnectedCb;
8586
std::function<void(int, bool)> onSentCb;
8687
std::function<void(bool)> onDisconnectCb;
88+
std::function<void(bool)> onInternalDisconnectCb;
8789
std::function<void(int)> onDeliveryCompletedCb;
8890
std::function<MsgArrivedCb_type> onMsgArrivedCmnCb;
8991
std::unordered_map<std::string, std::function<MsgArrivedCb_type>> onMsgArrivedCbs;

mqtt_streaming_protocol/src/MqttAsyncClient.cpp

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "MqttAsyncClient.h"
2+
#include <future>
23

34
namespace mqtt
45
{
@@ -90,8 +91,47 @@ bool MqttAsyncClient::connect()
9091

9192
bool MqttAsyncClient::disconnect()
9293
{
94+
if (client == nullptr)
95+
return true;
96+
9397
// It is only the result of the request to disconnect (queuing)
94-
return MQTTAsync_disconnect(client, &disconnOpts) == MQTTASYNC_SUCCESS;
98+
auto status = MQTTAsync_disconnect(client, &disconnOpts);
99+
bool result = (status == MQTTASYNC_SUCCESS || status == MQTTASYNC_DISCONNECTED);
100+
return result;
101+
}
102+
103+
bool MqttAsyncClient::syncDisconnect(int timeoutMs)
104+
{
105+
if (client == nullptr)
106+
return true;
107+
108+
bool result = disconnect();
109+
if (result)
110+
{
111+
std::atomic<bool> done{false};
112+
std::promise<bool> disconnectedPromise;
113+
auto disconnectedFuture = disconnectedPromise.get_future();
114+
{
115+
auto lock = getCbLock();
116+
onInternalDisconnectCb = [promise = &disconnectedPromise, &done](bool result) {
117+
bool expected = false;
118+
if (done.compare_exchange_strong(expected, true)) {
119+
promise->set_value(result);
120+
}
121+
};
122+
}
123+
auto status = disconnectedFuture.wait_for(std::chrono::milliseconds(timeoutMs));
124+
{
125+
auto lock = getCbLock();
126+
onInternalDisconnectCb = nullptr;
127+
}
128+
if (status == std::future_status::ready && disconnectedFuture.get() == true)
129+
{
130+
MQTTAsync_destroy(&client);
131+
return true;
132+
}
133+
}
134+
return false;
95135
}
96136

97137
MqttConnectionStatus MqttAsyncClient::isConnected() const
@@ -336,6 +376,8 @@ void MqttAsyncClient::onDisconnectSuccess(void* context, MQTTAsync_successData*
336376
{
337377
auto clienttInst = (MqttAsyncClient*)context;
338378
auto lock = clienttInst->getCbLock();
379+
if (clienttInst->onInternalDisconnectCb)
380+
clienttInst->onInternalDisconnectCb(true);
339381
if (clienttInst->onDisconnectCb)
340382
clienttInst->onDisconnectCb(true);
341383
}
@@ -348,6 +390,8 @@ void MqttAsyncClient::onDisconnectFailure(void* context, MQTTAsync_failureData*
348390
{
349391
auto clienttInst = (MqttAsyncClient*)context;
350392
auto lock = clienttInst->getCbLock();
393+
if (clienttInst->onInternalDisconnectCb)
394+
clienttInst->onInternalDisconnectCb(false);
351395
if (clienttInst->onDisconnectCb)
352396
clienttInst->onDisconnectCb(false);
353397
}

0 commit comments

Comments
 (0)