Skip to content

Commit e966d29

Browse files
committed
mqtt: QoS property for subscribers
1 parent e54456e commit e966d29

7 files changed

Lines changed: 70 additions & 12 deletions

File tree

mqtt_streaming_module/include/mqtt_streaming_module/constants.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms
1616

1717
static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms
1818
static constexpr uint32_t DEFAULT_PUB_QOS = 1;
19+
static constexpr uint32_t DEFAULT_SUB_QOS = 1;
1920
static constexpr uint32_t DEFAULT_PUB_PACK_SIZE = 1;
2021

2122
static constexpr const char* DEFAULT_SIGNAL_NAME = "mqttValueSignal";
@@ -42,6 +43,7 @@ static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES = "GroupValues";
4243
static constexpr const char* PROPERTY_NAME_PUB_USE_SIGNAL_NAMES = "UseSignalNames";
4344
static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES_PACK_SIZE = "GroupValuesPackSize";
4445
static constexpr const char* PROPERTY_NAME_PUB_QOS = "MqttQoS";
46+
static constexpr const char* PROPERTY_NAME_SUB_QOS = "MqttQoS";
4547
static constexpr const char* PROPERTY_NAME_PUB_READ_PERIOD = "ReaderPeriod";
4648

4749
static constexpr const char* RAW_FB_NAME = "rawMqttFb";

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_base_fb.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <opendaq/function_block_impl.h>
2020

2121
#include "MqttAsyncClient.h"
22+
#include "mqtt_streaming_module/constants.h"
2223
#include "mqtt_streaming_module/status_helper.h"
2324

2425
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
@@ -64,6 +65,7 @@ class MqttBaseFb : public FunctionBlock
6465

6566
std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
6667
StatusHelper<SubscriptionStatus> subscriptionStatus;
68+
int qos = DEFAULT_SUB_QOS;
6769

6870
virtual void createSignals() = 0;
6971
virtual void processMessage(const mqtt::MqttMessage& msg) = 0;

mqtt_streaming_module/src/mqtt_base_fb.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ MqttBaseFb::CmdResult MqttBaseFb::subscribeToTopic()
7474
{
7575
LOG_I("Trying to subscribe to the topic : {}", topic);
7676
subscriber->setMessageArrivedCb(topic, lambda);
77-
if (auto subRes = subscriber->subscribe(topic, 1); subRes.success == false)
77+
if (auto subRes = subscriber->subscribe(topic, qos); subRes.success == false)
7878
{
7979
LOG_W("Failed to subscribe to the topic: \"{}\"; reason: {}", topic, subRes.msg);
8080
setComponentStatusWithMessage(ComponentStatus::Warning, "Some topics failed to subscribe!");

mqtt_streaming_module/src/mqtt_json_receiver_fb_impl.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,18 @@ FunctionBlockTypePtr MqttJsonReceiverFbImpl::CreateType()
8686
StringPropertyBuilder(PROPERTY_NAME_TOPIC, String("")).setDescription("An MQTT topic to subscribe to for receiving JSON data.");
8787
defaultConfig.addProperty(builder.build());
8888
}
89+
{
90+
auto builder =
91+
IntPropertyBuilder(PROPERTY_NAME_SUB_QOS, DEFAULT_SUB_QOS)
92+
.setMinValue(0)
93+
.setMaxValue(2)
94+
.setSuggestedValues(List<IInteger>(0, 1, 2))
95+
.setDescription(
96+
fmt::format("MQTT Quality of Service level for subscribing. It can be 0 (at most once), 1 (at least once), or 2 "
97+
"(exactly once). By default it is set to {}.",
98+
DEFAULT_SUB_QOS));
99+
defaultConfig.addProperty(builder.build());
100+
}
89101
{
90102
auto builder = StringPropertyBuilder(PROPERTY_NAME_JSON_CONFIG, String(""))
91103
.setDescription(
@@ -147,6 +159,15 @@ void MqttJsonReceiverFbImpl::readProperties()
147159
setTopic(topicStr.toStdString());
148160
}
149161
}
162+
if (objPtr.hasProperty(PROPERTY_NAME_SUB_QOS))
163+
{
164+
auto qosProp = objPtr.getPropertyValue(PROPERTY_NAME_SUB_QOS).asPtrOrNull<IInteger>();
165+
if (qosProp.assigned())
166+
{
167+
const auto qos = qosProp.getValue(DEFAULT_SUB_QOS);
168+
this->qos = (qos < 0 || qos > 2) ? DEFAULT_SUB_QOS : qos;
169+
}
170+
}
150171
if (!isPresent)
151172
{
152173
LOG_W("\'{}\' property is missing!", PROPERTY_NAME_TOPIC);

mqtt_streaming_module/src/mqtt_raw_receiver_fb_impl.cpp

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,29 @@ MqttRawReceiverFbImpl::~MqttRawReceiverFbImpl()
3535
FunctionBlockTypePtr MqttRawReceiverFbImpl::CreateType()
3636
{
3737
auto defaultConfig = PropertyObject();
38-
auto builder = StringPropertyBuilder(PROPERTY_NAME_TOPIC, "")
39-
.setDescription("An MQTT topic to subscribe to for receiving raw binary data.");
40-
defaultConfig.addProperty(builder.build());
41-
const auto fbType = FunctionBlockType(RAW_FB_NAME,
42-
RAW_FB_NAME,
43-
"The raw MQTT function block allows subscribing to an MQTT topic and converting MQTT payloads into "
44-
"openDAQ signal binary data samples.",
45-
defaultConfig);
38+
{
39+
auto builder =
40+
IntPropertyBuilder(PROPERTY_NAME_SUB_QOS, DEFAULT_SUB_QOS)
41+
.setMinValue(0)
42+
.setMaxValue(2)
43+
.setSuggestedValues(List<IInteger>(0, 1, 2))
44+
.setDescription(
45+
fmt::format("MQTT Quality of Service level for subscribing. It can be 0 (at most once), 1 (at least once), or 2 "
46+
"(exactly once). By default it is set to {}.",
47+
DEFAULT_SUB_QOS));
48+
defaultConfig.addProperty(builder.build());
49+
}
50+
{
51+
auto builder =
52+
StringPropertyBuilder(PROPERTY_NAME_TOPIC, "").setDescription("An MQTT topic to subscribe to for receiving raw binary data.");
53+
defaultConfig.addProperty(builder.build());
54+
}
55+
const auto fbType =
56+
FunctionBlockType(RAW_FB_NAME,
57+
RAW_FB_NAME,
58+
"The raw MQTT function block allows subscribing to an MQTT topic and converting MQTT payloads into "
59+
"openDAQ signal binary data samples.",
60+
defaultConfig);
4661
return fbType;
4762
}
4863

@@ -77,6 +92,17 @@ void MqttRawReceiverFbImpl::readProperties()
7792
}
7893
}
7994
}
95+
96+
if (objPtr.hasProperty(PROPERTY_NAME_SUB_QOS))
97+
{
98+
auto qosProp = objPtr.getPropertyValue(PROPERTY_NAME_SUB_QOS).asPtrOrNull<IInteger>();
99+
if (qosProp.assigned())
100+
{
101+
const auto qos = qosProp.getValue(DEFAULT_SUB_QOS);
102+
this->qos = (qos < 0 || qos > 2) ? DEFAULT_SUB_QOS : qos;
103+
}
104+
}
105+
80106
if (!isPresent)
81107
{
82108
LOG_W("\'{}\' property is missing!", PROPERTY_NAME_TOPIC);

mqtt_streaming_module/tests/test_mqtt_json_fb.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ TEST_F(MqttJsonFbTest, DefaultConfig)
7171

7272
ASSERT_TRUE(defaultConfig.assigned());
7373

74-
ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 3u);
74+
ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 4u);
7575

7676
ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_JSON_CONFIG));
7777
ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_JSON_CONFIG).getValueType(), CoreType::ctString);
@@ -84,6 +84,10 @@ TEST_F(MqttJsonFbTest, DefaultConfig)
8484
ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_TOPIC));
8585
ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_TOPIC).getValueType(), CoreType::ctString);
8686
ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_TOPIC).asPtr<IString>().getLength(), 0u);
87+
88+
ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_SUB_QOS));
89+
ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_SUB_QOS).getValueType(), CoreType::ctInt);
90+
ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_SUB_QOS).asPtr<IInteger>().getValue(DEFAULT_SUB_QOS), DEFAULT_SUB_QOS);
8791
}
8892

8993
TEST_F(MqttJsonFbTest, Config)

mqtt_streaming_module/tests/test_mqtt_raw_fb.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,15 @@ TEST_F(MqttRawFbTest, DefaultRawFbConfig)
5858

5959
ASSERT_TRUE(defaultConfig.assigned());
6060

61-
ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 1u);
61+
ASSERT_EQ(defaultConfig.getAllProperties().getCount(), 2u);
6262

6363
ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_TOPIC));
64-
6564
ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_TOPIC).getValueType(), CoreType::ctString);
6665
ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_TOPIC).asPtr<IString>().getLength(), 0u);
66+
67+
ASSERT_TRUE(defaultConfig.hasProperty(PROPERTY_NAME_SUB_QOS));
68+
ASSERT_EQ(defaultConfig.getProperty(PROPERTY_NAME_SUB_QOS).getValueType(), CoreType::ctInt);
69+
ASSERT_EQ(defaultConfig.getPropertyValue(PROPERTY_NAME_SUB_QOS).asPtr<IInteger>().getValue(DEFAULT_SUB_QOS), DEFAULT_SUB_QOS);
6770
}
6871

6972
TEST_F(MqttRawFbTest, Creation)

0 commit comments

Comments
 (0)