Skip to content

Commit ccde7dc

Browse files
committed
domain value duplication error; tests
1 parent 7e3039b commit ccde7dc

6 files changed

Lines changed: 142 additions & 7 deletions

File tree

modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_json_decoder_fb_impl.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class MqttJsonDecoderFbImpl final : public FunctionBlock
3939
const PropertyObjectPtr& config = nullptr);
4040

4141
DAQ_MQTT_STREAM_MODULE_API static FunctionBlockTypePtr CreateType();
42-
void processMessage(const std::string& json, const uint64_t externalTs);
42+
DAQ_MQTT_STREAM_MODULE_API void processMessage(const std::string& json, const uint64_t externalTs);
4343
protected:
4444

4545
struct FbConfig {
@@ -58,7 +58,9 @@ class MqttJsonDecoderFbImpl final : public FunctionBlock
5858
std::atomic<bool> configValid;
5959
std::string configMsg;
6060
std::atomic<bool> parsingSucceeded;
61+
std::atomic<bool> externalTsDuplicate;
6162
std::string parsingMsg;
63+
uint64_t lastExternalTs;
6264

6365
FbConfig config;
6466

@@ -73,6 +75,7 @@ class MqttJsonDecoderFbImpl final : public FunctionBlock
7375
void propertyChanged();
7476

7577
void updateStatuses();
78+
void checkExternalTs(const uint64_t externalTs);
7679
};
7780

7881
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class MqttSubscriberFbImpl final : public FunctionBlock
7777
void reconfigureSignal();
7878
void clearSubscribedTopic();
7979

80-
DataPacketPtr createDomainDataPacket(const uint64_t epochTime);
80+
DAQ_MQTT_STREAM_MODULE_API DataPacketPtr createDomainDataPacket(const uint64_t epochTime);
8181

8282
void processMessage(const mqtt::MqttMessage& msg);
8383
void initProperties(const PropertyObjectPtr& config);

modules/mqtt_streaming_module/src/mqtt_json_decoder_fb_impl.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ MqttJsonDecoderFbImpl::MqttJsonDecoderFbImpl(const ContextPtr& ctx,
1313
const FunctionBlockTypePtr& type,
1414
const PropertyObjectPtr& config)
1515
: FunctionBlock(type, ctx, parent, generateLocalId()),
16-
jsonDataWorker()
16+
jsonDataWorker(),
17+
lastExternalTs(0)
1718
{
1819
initComponentStatus();
1920
if (config.assigned())
@@ -147,6 +148,7 @@ void MqttJsonDecoderFbImpl::readProperties()
147148
jsonDataWorker.setTimestampFieldName(config.tsFieldName);
148149
jsonDataWorker.setDomainSignalMode(config.tsMode);
149150
waitingData = configValid.load();
151+
externalTsDuplicate = false;
150152
updateStatuses();
151153
}
152154

@@ -168,13 +170,29 @@ void MqttJsonDecoderFbImpl::updateStatuses()
168170
{
169171
setComponentStatusWithMessage(ComponentStatus::Ok, "Waiting for data");
170172
}
171-
else if (parsingSucceeded)
173+
else if (parsingSucceeded == false)
172174
{
173-
setComponentStatusWithMessage(ComponentStatus::Ok, "Parsing succeeded");
175+
setComponentStatusWithMessage(ComponentStatus::Error, "Parsing failed: " + parsingMsg);
176+
}
177+
else if (externalTsDuplicate)
178+
{
179+
setComponentStatusWithMessage(ComponentStatus::Warning,
180+
"Domain signal value for one of the received messages is the same as previous. "
181+
"Data may be lost!");
174182
}
175183
else
176184
{
177-
setComponentStatusWithMessage(ComponentStatus::Error, "Parsing failed: " + parsingMsg);
185+
setComponentStatusWithMessage(ComponentStatus::Ok, "Parsing succeeded");
186+
}
187+
}
188+
189+
void MqttJsonDecoderFbImpl::checkExternalTs(const uint64_t externalTs)
190+
{
191+
if (config.tsMode == mqtt::MqttDataWrapper::DomainSignalMode::ExternalTimestamp)
192+
{
193+
if (externalTs == lastExternalTs)
194+
externalTsDuplicate = true;
195+
lastExternalTs = externalTs;
178196
}
179197
}
180198

@@ -184,6 +202,7 @@ void MqttJsonDecoderFbImpl::processMessage(const std::string& json, const uint64
184202
{
185203
auto lock = this->getRecursiveConfigLock();
186204
waitingData = false;
205+
checkExternalTs(externalTs);
187206
auto status = jsonDataWorker.createAndSendDataPacket(json, externalTs);
188207
parsingSucceeded = status.success;
189208
if (status.success)

modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,11 @@ DataPacketPtr MqttSubscriberFbImpl::createDomainDataPacket(const uint64_t epochT
431431
if (lastTsValue == epochTime)
432432
{
433433
if (statusContainer.getStatus("ComponentStatus") != ComponentStatus::Error)
434-
setComponentStatusWithMessage(ComponentStatus::Warning, "Domain signal value is the same as previous. Data may be lost");
434+
{
435+
setComponentStatusWithMessage(ComponentStatus::Warning,
436+
"Domain signal value for one of the received messages is the same as previous. "
437+
"Data may be lost!");
438+
}
435439
}
436440
lastTsValue = epochTime;
437441
}

modules/mqtt_streaming_module/tests/test_mqtt_json_decoder_fb.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,3 +1127,54 @@ TEST_F(MqttJsonDecoderFbTest, RemovingNestedFunctionBlock)
11271127
ASSERT_EQ(subMqttFb.getStatusContainer().getStatus("ComponentStatus"),
11281128
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));
11291129
}
1130+
1131+
TEST_F(MqttJsonDecoderFbTest, PacketWithTheSameTS)
1132+
{
1133+
using namespace std::chrono;
1134+
const std::string warnMsg("Domain signal value for one of the received messages is the same as previous.");
1135+
StartUp();
1136+
const std::string topic = buildTopicName();
1137+
const auto msgTemplate = VALID_JSON_DATA_1;
1138+
const std::string valueF = extractFieldName(msgTemplate, "<placeholder_value>");
1139+
AddSubFb(topic);
1140+
AddDecoderFb(valueF, DDSM::ExternalTimestamp, "");
1141+
1142+
auto fb = reinterpret_cast<MqttJsonDecoderFbImpl*>(*decoderObj);
1143+
auto buildMsg = [](int64_t value)
1144+
{
1145+
return replacePlaceholder(VALID_JSON_DATA_1, "<placeholder_value>", value);
1146+
};
1147+
auto getTime = []() { return duration_cast<microseconds>(system_clock::now().time_since_epoch()).count(); };
1148+
auto getComponentStatus = [&]() { return decoderObj.getStatusContainer().getStatus("ComponentStatus"); };
1149+
auto getStatusMsg = [&]() { return decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString(); };
1150+
1151+
const auto warning = Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager());
1152+
const auto ok = Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager());
1153+
1154+
const auto ts = getTime();
1155+
1156+
fb->processMessage(buildMsg(1), ts);
1157+
1158+
ASSERT_EQ(getComponentStatus(), ok);
1159+
1160+
fb->processMessage(buildMsg(2), ts);
1161+
EXPECT_EQ(getComponentStatus(), warning);
1162+
EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos);
1163+
1164+
fb->processMessage(buildMsg(3), getTime());
1165+
EXPECT_EQ(getComponentStatus(), warning);
1166+
EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos);
1167+
1168+
// reconfiguring should reset warning
1169+
decoderObj.setPropertyValue(PROPERTY_NAME_DEC_UNIT, "ppm");
1170+
EXPECT_EQ(getComponentStatus(), ok);
1171+
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
1172+
1173+
fb->processMessage(buildMsg(3), getTime());
1174+
EXPECT_EQ(getComponentStatus(), ok);
1175+
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
1176+
1177+
fb->processMessage(buildMsg(3), getTime());
1178+
EXPECT_EQ(getComponentStatus(), ok);
1179+
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
1180+
}

modules/mqtt_streaming_module/tests/test_mqtt_subscriber_fb.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ class MqttSubscriberFbHelper
6161
mqtt::MqttAsyncClient unused;
6262
obj->onSignalsMessage(unused, msg);
6363
}
64+
65+
DataPacketPtr createDomainDataPacket(daq::FunctionBlockPtr subFb, const uint64_t epochTime)
66+
{
67+
auto fb = reinterpret_cast<MqttSubscriberFbImpl*>(*subFb);
68+
return fb->createDomainDataPacket(epochTime);
69+
}
6470
};
6571

6672
class MqttSubscriberFbTest : public testing::Test, public DaqTestHelper, public MqttSubscriberFbHelper
@@ -658,3 +664,55 @@ TEST_F(MqttSubscriberFbTest, CheckRawFbFullDataTransferWithReconfiguring)
658664
ASSERT_EQ(dataToSend[1], dataToReceive[0]);
659665
}
660666

667+
TEST_F(MqttSubscriberFbTest, DomainDataPacketWithTheSameTS)
668+
{
669+
using namespace std::chrono;
670+
const std::string warnMsg("Domain signal value for one of the received messages is the same as previous.");
671+
const auto topic = buildTopicName();
672+
673+
StartUp();
674+
675+
auto config = clientMqttFb.getAvailableFunctionBlockTypes().get(SUB_FB_NAME).createDefaultConfig();
676+
config.setPropertyValue(PROPERTY_NAME_SUB_TOPIC, topic);
677+
config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL, True);
678+
config.setPropertyValue(PROPERTY_NAME_SUB_PREVIEW_SIGNAL_TS_MODE, static_cast<int>(SDSM::SystemTime));
679+
auto fb = clientMqttFb.addFunctionBlock(SUB_FB_NAME, config);
680+
681+
auto getTime = []() { return duration_cast<microseconds>(system_clock::now().time_since_epoch()).count(); };
682+
auto getComponentStatus = [&]() { return fb.getStatusContainer().getStatus("ComponentStatus"); };
683+
auto getStatusMsg = [&]() { return fb.getStatusContainer().getStatusMessage("ComponentStatus").toStdString(); };
684+
685+
const auto warning = Enumeration("ComponentStatusType", "Warning", daqInstance.getContext().getTypeManager());
686+
const auto ok = Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager());
687+
688+
const auto ts = getTime();
689+
auto packet = createDomainDataPacket(fb, ts);
690+
691+
ASSERT_EQ(getComponentStatus(), ok);
692+
693+
packet = createDomainDataPacket(fb, ts);
694+
695+
EXPECT_EQ(getComponentStatus(), warning);
696+
EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos);
697+
698+
packet = createDomainDataPacket(fb, getTime());
699+
700+
EXPECT_EQ(getComponentStatus(), warning);
701+
EXPECT_NE(getStatusMsg().find(warnMsg), std::string::npos);
702+
703+
// reconfiguring should reset warning
704+
fb.setPropertyValue(PROPERTY_NAME_SUB_QOS, 2);
705+
706+
EXPECT_EQ(getComponentStatus(), ok);
707+
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
708+
709+
packet = createDomainDataPacket(fb, getTime());
710+
711+
EXPECT_EQ(getComponentStatus(), ok);
712+
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
713+
714+
packet = createDomainDataPacket(fb, getTime());
715+
716+
EXPECT_EQ(getComponentStatus(), ok);
717+
EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos);
718+
}

0 commit comments

Comments
 (0)