Skip to content

Commit 42a3798

Browse files
committed
mqtt: SignalStatus for publisher FB
1 parent dd9d234 commit 42a3798

4 files changed

Lines changed: 111 additions & 30 deletions

File tree

mqtt_streaming_module/include/mqtt_streaming_module/constants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,6 @@ static const char* MQTT_LOCAL_JSON_FB_ID_PREFIX = "jsonMqttFb";
4949

5050
static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType";
5151
static const char* MQTT_RAW_FB_SUB_STATUS_TYPE = "MqttSubscriptionStatusType";
52+
static const char* MQTT_PUB_FB_SIG_STATUS_TYPE = "MqttSignalStatusType";
5253

5354
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/include/mqtt_streaming_module/mqtt_publisher_fb_impl.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
2727
class MqttPublisherFbImpl final : public FunctionBlock
2828
{
2929
public:
30+
enum class SignalStatus : EnumType
31+
{
32+
NotConnected = 0,
33+
Invalid,
34+
Valid
35+
};
36+
3037
explicit MqttPublisherFbImpl(const ContextPtr& ctx,
3138
const ComponentPtr& parent,
3239
const FunctionBlockTypePtr& type,
@@ -40,7 +47,11 @@ class MqttPublisherFbImpl final : public FunctionBlock
4047
void onConnected(const InputPortPtr& port) override;
4148
void onDisconnected(const InputPortPtr& port) override;
4249

50+
static void addTypeToTypeManager(daq::TypeManagerPtr manager);
51+
4352
private:
53+
static const std::vector<std::pair<SignalStatus, std::string>> signalStatusMap;
54+
4455
static std::atomic<int> localIndex;
4556
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient;
4657
mqtt::MqttDataWrapper jsonDataWorker;
@@ -51,8 +62,11 @@ class MqttPublisherFbImpl final : public FunctionBlock
5162
std::atomic<bool> running;
5263
std::atomic<bool> hasError;
5364
std::unique_ptr<HandlerBase> handler;
65+
EnumerationPtr signalStatus;
5466

5567
static std::string getLocalId();
68+
void initSignalStatus();
69+
void setSignalStatus(const SignalStatus status, std::string message = "");
5670
void initProperties(const PropertyObjectPtr& config);
5771
void readProperties();
5872
void propertyChanged();

mqtt_streaming_module/src/mqtt_publisher_fb_impl.cpp

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
99

1010
std::atomic<int> MqttPublisherFbImpl::localIndex = 0;
1111

12+
const std::vector<std::pair<MqttPublisherFbImpl::SignalStatus, std::string>> MqttPublisherFbImpl::signalStatusMap =
13+
{{SignalStatus::NotConnected, "NotConnected"},
14+
{SignalStatus::Invalid, "Invalid"},
15+
{SignalStatus::Valid, "Valid"}};
16+
1217
MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx,
1318
const ComponentPtr& parent,
1419
const FunctionBlockTypePtr& type,
@@ -22,6 +27,7 @@ MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx,
2227
hasError(false)
2328
{
2429
initComponentStatus();
30+
initSignalStatus();
2531
if (config.assigned())
2632
initProperties(populateDefaultConfig(type.createDefaultConfig(), config));
2733
else
@@ -165,20 +171,31 @@ void MqttPublisherFbImpl::updateInputPorts()
165171

166172
void MqttPublisherFbImpl::validateInputPorts()
167173
{
168-
const auto status = handler->validateSignalContexts(signalContexts);
169-
hasError = !status.success;
170-
if (!status.success)
174+
if (signalContexts.size() == 1) // no one input port is connected
171175
{
172-
setComponentStatusWithMessage(ComponentStatus::Error, "Some connected signals were invalidated!");
173-
for (const auto& msg : status.messages)
174-
{
175-
LOG_E("{}", msg);
176-
}
176+
setSignalStatus(SignalStatus::NotConnected);
177177
}
178178
else
179179
{
180-
setComponentStatus(ComponentStatus::Ok);
181-
handler->signalListChanged(signalContexts);
180+
const auto status = handler->validateSignalContexts(signalContexts);
181+
hasError = !status.success;
182+
if (!status.success)
183+
{
184+
setComponentStatusWithMessage(ComponentStatus::Error, "Some connected signals were invalidated!");
185+
std::string allMessages;
186+
for (const auto& msg : status.messages)
187+
{
188+
LOG_E("{}", msg);
189+
allMessages += msg + "; ";
190+
}
191+
setSignalStatus(SignalStatus::Invalid, allMessages);
192+
}
193+
else
194+
{
195+
setComponentStatus(ComponentStatus::Ok);
196+
setSignalStatus(SignalStatus::Valid);
197+
handler->signalListChanged(signalContexts);
198+
}
182199
}
183200
}
184201

@@ -283,4 +300,36 @@ std::string MqttPublisherFbImpl::getLocalId()
283300
{
284301
return std::string(MQTT_LOCAL_PUB_FB_ID_PREFIX + std::to_string(localIndex++));
285302
}
303+
304+
void MqttPublisherFbImpl::initSignalStatus()
305+
{
306+
307+
addTypeToTypeManager(context.getTypeManager());
308+
309+
signalStatus = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE,
310+
static_cast<Int>(SignalStatus::NotConnected),
311+
this->context.getTypeManager());
312+
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).addStatus("SignalStatus",
313+
signalStatus);
314+
}
315+
316+
void MqttPublisherFbImpl::setSignalStatus(const SignalStatus status, std::string message)
317+
{
318+
signalStatus = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE, static_cast<Int>(status), this->context.getTypeManager());
319+
statusContainer.template asPtr<IComponentStatusContainerPrivate>(true).setStatusWithMessage("SignalStatus",
320+
signalStatus,
321+
message);
322+
}
323+
324+
void MqttPublisherFbImpl::addTypeToTypeManager(daq::TypeManagerPtr manager)
325+
{
326+
if (!manager.hasType(MQTT_PUB_FB_SIG_STATUS_TYPE))
327+
{
328+
auto list = List<IString>();
329+
for (const auto& [_, st] : signalStatusMap)
330+
list.pushBack(st);
331+
332+
manager.addType(EnumerationType(MQTT_PUB_FB_SIG_STATUS_TYPE, list));
333+
}
334+
}
286335
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -665,36 +665,53 @@ TEST_F(MqttPublisherFbTest, CreationWithPartialConfig)
665665
TEST_F(MqttPublisherFbTest, ConnectToPort)
666666
{
667667
StartUp();
668+
MqttPublisherFbImpl::addTypeToTypeManager(rootMqttFb.getContext().getTypeManager());
669+
const auto sigStValid = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE,
670+
static_cast<Int>(MqttPublisherFbImpl::SignalStatus::Valid),
671+
daqInstance.getContext().getTypeManager());
672+
const auto sigStInvalid = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE,
673+
static_cast<Int>(MqttPublisherFbImpl::SignalStatus::Invalid),
674+
daqInstance.getContext().getTypeManager());
675+
const auto sigStNotConnected = EnumerationWithIntValue(MQTT_PUB_FB_SIG_STATUS_TYPE,
676+
static_cast<Int>(MqttPublisherFbImpl::SignalStatus::NotConnected),
677+
daqInstance.getContext().getTypeManager());
678+
const auto comStOk = Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager());
679+
const auto comStError = Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager());
668680

669681
{
670682
daq::FunctionBlockPtr fb;
671683
ASSERT_NO_THROW(fb = rootMqttFb.addFunctionBlock(PUB_FB_NAME));
684+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStNotConnected);
672685
auto help = SignalHelper<double>();
673686

674687
ASSERT_EQ(fb.getInputPorts().getCount(), 1u);
675688
fb.getInputPorts()[0].connect(help.signal0);
676689
ASSERT_EQ(fb.getInputPorts().getCount(), 2u);
677-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
678-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
690+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
691+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
692+
679693
fb.getInputPorts()[1].connect(help.signal0);
680694
ASSERT_EQ(fb.getInputPorts().getCount(), 3u);
681-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
682-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
695+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
696+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
683697
// disconnection
684698
fb.getInputPorts()[1].disconnect();
685699
ASSERT_EQ(fb.getInputPorts().getCount(), 2u);
686-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
687-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
700+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
701+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
688702
// connection without a domain signal
689703
fb.getInputPorts()[1].connect(help.signalWithoutDomain);
690704
ASSERT_EQ(fb.getInputPorts().getCount(), 3u);
691-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
692-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
705+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
706+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
693707
// disconnection
694708
fb.getInputPorts()[1].disconnect();
695709
ASSERT_EQ(fb.getInputPorts().getCount(), 2u);
696-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
697-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
710+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
711+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
712+
fb.getInputPorts()[0].disconnect();
713+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
714+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStNotConnected);
698715
}
699716

700717
{
@@ -709,8 +726,8 @@ TEST_F(MqttPublisherFbTest, ConnectToPort)
709726

710727
fb.getInputPorts()[0].connect(signal0);
711728
fb.getInputPorts()[1].connect(signal1);
712-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
713-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
729+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
730+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
714731
}
715732

716733
{
@@ -725,11 +742,11 @@ TEST_F(MqttPublisherFbTest, ConnectToPort)
725742

726743
fb.getInputPorts()[0].connect(signal0);
727744
fb.getInputPorts()[1].connect(signal1);
728-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
729-
Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()));
745+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStError);
746+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStInvalid);
730747
fb.getInputPorts()[1].disconnect();
731-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
732-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
748+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
749+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
733750
}
734751

735752
{
@@ -744,11 +761,11 @@ TEST_F(MqttPublisherFbTest, ConnectToPort)
744761

745762
fb.getInputPorts()[0].connect(signal0);
746763
fb.getInputPorts()[1].connect(signal1);
747-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
748-
Enumeration("ComponentStatusType", "Error", daqInstance.getContext().getTypeManager()));
764+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStError);
765+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStInvalid);
749766
fb.getInputPorts()[1].disconnect();
750-
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
751-
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
767+
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"), comStOk);
768+
ASSERT_EQ(fb.getStatusContainer().getStatus("SignalStatus"), sigStValid);
752769
}
753770
}
754771

0 commit comments

Comments
 (0)