1515
1616using namespace daq ;
1717using namespace daq ::modules::mqtt_streaming_module;
18+ using DDSM = mqtt::MqttDataWrapper::DomainSignalMode;
1819
1920namespace
2021{
@@ -372,19 +373,18 @@ class MqttJsonDecoderFbHelper : public DaqTestHelper
372373 subMqttFb = new MqttSubscriberFbImpl (NullContext (), nullptr , fbType, nullptr , config);
373374 }
374375
375- void CreateDecoderFB (std::string topic, std::string valueF, std::string tsF, std::string unitSymbol = " " )
376+ void CreateDecoderFB (std::string topic, std::string valueF, DDSM mode, std::string tsF, std::string unitSymbol = " " )
376377 {
377378 CreateJsonFb (topic);
378- AddDecoderFb (valueF, tsF, unitSymbol);
379+ AddDecoderFb (valueF, mode, tsF, unitSymbol);
379380 }
380381
381- daq::FunctionBlockPtr AddDecoderFb (std::string valueF, std::string tsF, std::string unitSymbol = " " )
382+ daq::FunctionBlockPtr AddDecoderFb (std::string valueF, DDSM mode, std::string tsF, std::string unitSymbol = " " )
382383 {
383384 daq::StringPtr typeId = daq::String (JSON_DECODER_FB_NAME);
384385 auto config = subMqttFb.getAvailableFunctionBlockTypes ().get (JSON_DECODER_FB_NAME).createDefaultConfig ();
385386
386- if (!tsF.empty ())
387- config.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, 1 );
387+ config.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(mode));
388388 config.setPropertyValue (PROPERTY_NAME_DEC_VALUE_NAME, valueF);
389389 config.setPropertyValue (PROPERTY_NAME_DEC_TS_NAME, tsF);
390390 config.setPropertyValue (PROPERTY_NAME_DEC_UNIT, unitSymbol);
@@ -426,6 +426,12 @@ class MqttJsonDecoderFbHelper : public DaqTestHelper
426426 return transferData<vT, tsT, vT>(data, jsonDataTemplate);
427427 }
428428
429+ template <typename vT, typename tsT>
430+ std::vector<std::pair<vT, tsT>> transferDataWithSystemTime (const std::vector<std::pair<vT, tsT>>& data, const std::string& jsonDataTemplate, std::vector<uint64_t >& timePoints)
431+ {
432+ return transferDataWithSystemTime<vT, tsT, std::pair<vT, tsT>>(data, jsonDataTemplate, timePoints);
433+ }
434+
429435private:
430436 template <typename vT, typename tsT, typename returnT> std::vector<returnT> transferData (const std::vector<std::pair<vT, tsT>>& data, const std::string& jsonDataTemplate)
431437 {
@@ -436,16 +442,46 @@ class MqttJsonDecoderFbHelper : public DaqTestHelper
436442 {
437443 tsF = extractFieldName (jsonDataTemplate, " <placeholder_ts>" );
438444 }
445+ DDSM mode = DDSM::None;
446+ if (!tsF.empty ())
447+ {
448+ mode = DDSM::ExtractFromMessage;
449+ }
450+ CreateDecoderFB (topic, valueF, mode, tsF);
451+
452+ auto signal = getSignals ()[0 ];
453+ auto reader = daq::PacketReader (signal);
454+
455+ auto msgs = replacePlaceholders (data, jsonDataTemplate);
456+ for (const auto & str : msgs)
457+ {
458+ onSignalsMessage ({topic, std::vector<uint8_t >(str.begin (), str.end ()), 1 , 0 });
459+ }
439460
440- CreateDecoderFB (topic, valueF, tsF);
461+ std::vector<returnT> dataToReceive = read<returnT>(reader, signal, 0 );
462+ return dataToReceive;
463+ }
464+
465+ template <typename vT, typename tsT, typename returnT> std::vector<returnT> transferDataWithSystemTime (const std::vector<std::pair<vT, tsT>>& data, const std::string& jsonDataTemplate, std::vector<uint64_t >& timePoints)
466+ {
467+ const auto topic = buildTopicName ();
468+ std::string valueF = extractFieldName (jsonDataTemplate, " <placeholder_value>" );
469+ CreateDecoderFB (topic, valueF, DDSM::ExternalTimestamp, " " );
441470
442471 auto signal = getSignals ()[0 ];
443472 auto reader = daq::PacketReader (signal);
444473
474+ auto getTime = []()
475+ {
476+ using namespace std ::chrono;
477+ return duration_cast<microseconds>(system_clock::now ().time_since_epoch ()).count ();
478+ };
445479 auto msgs = replacePlaceholders (data, jsonDataTemplate);
480+ timePoints.push_back (getTime ());
446481 for (const auto & str : msgs)
447482 {
448483 onSignalsMessage ({topic, std::vector<uint8_t >(str.begin (), str.end ()), 1 , 0 });
484+ timePoints.push_back (getTime ());
449485 }
450486
451487 std::vector<returnT> dataToReceive = read<returnT>(reader, signal, 0 );
@@ -525,14 +561,7 @@ class MqttJsonFbUnitPTest : public ::testing::TestWithParam<std::pair<std::strin
525561
526562TEST_F (MqttJsonDecoderFbTest, DefaultConfig)
527563{
528- StartUp ();
529- AddSubFb (buildTopicName ());
530- daq::DictPtr<daq::IString, daq::IFunctionBlockType> fbTypes;
531- daq::FunctionBlockTypePtr fbt;
532- daq::PropertyObjectPtr defaultConfig;
533- ASSERT_NO_THROW (fbTypes = subMqttFb.getAvailableFunctionBlockTypes ());
534- ASSERT_NO_THROW (fbt = fbTypes.get (JSON_DECODER_FB_NAME));
535- ASSERT_NO_THROW (defaultConfig = fbt.createDefaultConfig ());
564+ daq::PropertyObjectPtr defaultConfig = MqttJsonDecoderFbImpl::CreateType ().createDefaultConfig ();
536565
537566 ASSERT_TRUE (defaultConfig.assigned ());
538567
@@ -545,7 +574,7 @@ TEST_F(MqttJsonDecoderFbTest, DefaultConfig)
545574
546575 ASSERT_TRUE (defaultConfig.hasProperty (PROPERTY_NAME_DEC_TS_MODE));
547576 ASSERT_EQ (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_MODE).getValueType (), CoreType::ctInt);
548- EXPECT_EQ (defaultConfig.getPropertyValue (PROPERTY_NAME_DEC_TS_MODE).asPtr <IInteger>(), static_cast <int >(mqtt::MqttDataWrapper::DomainSignalMode ::None));
577+ EXPECT_EQ (defaultConfig.getPropertyValue (PROPERTY_NAME_DEC_TS_MODE).asPtr <IInteger>(), static_cast <int >(DDSM ::None));
549578 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_MODE).getVisible ());
550579
551580 ASSERT_TRUE (defaultConfig.hasProperty (PROPERTY_NAME_DEC_TS_NAME));
@@ -561,28 +590,26 @@ TEST_F(MqttJsonDecoderFbTest, DefaultConfig)
561590
562591TEST_F (MqttJsonDecoderFbTest, PropertyVisibility)
563592{
564- daq::DictPtr<daq::IString, daq::IFunctionBlockType> fbTypes;
565- daq::FunctionBlockTypePtr fbt = MqttJsonDecoderFbImpl::CreateType ();
566- daq::PropertyObjectPtr defaultConfig = fbt.createDefaultConfig ();
593+ daq::PropertyObjectPtr defaultConfig = MqttJsonDecoderFbImpl::CreateType ().createDefaultConfig ();
567594
568595 {
569- defaultConfig.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(mqtt::MqttDataWrapper::DomainSignalMode ::None));
596+ defaultConfig.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(DDSM ::None));
570597 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_VALUE_NAME).getVisible ());
571598 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_MODE).getVisible ());
572599 EXPECT_FALSE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_NAME).getVisible ());
573600 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_UNIT).getVisible ());
574601 }
575602
576603 {
577- defaultConfig.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(mqtt::MqttDataWrapper::DomainSignalMode ::ExtractFromMessage));
604+ defaultConfig.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(DDSM ::ExtractFromMessage));
578605 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_VALUE_NAME).getVisible ());
579606 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_MODE).getVisible ());
580607 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_NAME).getVisible ());
581608 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_UNIT).getVisible ());
582609 }
583610
584611 {
585- defaultConfig.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(mqtt::MqttDataWrapper::DomainSignalMode ::ExternalTimestamp));
612+ defaultConfig.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(DDSM ::ExternalTimestamp));
586613 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_VALUE_NAME).getVisible ());
587614 EXPECT_TRUE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_MODE).getVisible ());
588615 EXPECT_FALSE (defaultConfig.getProperty (PROPERTY_NAME_DEC_TS_NAME).getVisible ());
@@ -598,8 +625,10 @@ TEST_F(MqttJsonDecoderFbTest, Config)
598625 auto config = subMqttFb.getAvailableFunctionBlockTypes ().get (JSON_DECODER_FB_NAME).createDefaultConfig ();
599626
600627 config.setPropertyValue (PROPERTY_NAME_DEC_VALUE_NAME, " value" );
628+ config.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(DDSM::ExtractFromMessage));
601629 config.setPropertyValue (PROPERTY_NAME_DEC_TS_NAME, " timestamp" );
602630 config.setPropertyValue (PROPERTY_NAME_DEC_UNIT, " ppm" );
631+
603632 daq::FunctionBlockPtr fb;
604633 ASSERT_NO_THROW (fb = subMqttFb.addFunctionBlock (JSON_DECODER_FB_NAME, config));
605634 EXPECT_EQ (fb.getSignals ().getCount (), 1u );
@@ -638,21 +667,49 @@ TEST_F(MqttJsonDecoderFbTest, CreationWithPartialConfig)
638667 auto config = PropertyObject ();
639668 config.addProperty (StringProperty (PROPERTY_NAME_DEC_VALUE_NAME, String (" value" )));
640669 ASSERT_NO_THROW (fb = subMqttFb.addFunctionBlock (JSON_DECODER_FB_NAME, config));
670+
671+ EXPECT_EQ (fb.getSignals ().getCount (), 1u );
672+ EXPECT_EQ (fb.getSignals (search::Any ()).getCount (), 1u );
673+ EXPECT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
674+ Enumeration (" ComponentStatusType" , " Ok" , daqInstance.getContext ().getTypeManager ()));
675+ EXPECT_NE (fb.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Waiting for data" ), std::string::npos);
676+
677+ fb.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(DDSM::ExtractFromMessage));
678+ EXPECT_EQ (fb.getSignals ().getCount (), 1u );
679+ EXPECT_EQ (fb.getSignals (search::Any ()).getCount (), 2u );
680+ EXPECT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
681+ Enumeration (" ComponentStatusType" , " Error" , daqInstance.getContext ().getTypeManager ()));
682+ EXPECT_NE (fb.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Configuration is invalid" ), std::string::npos);
683+
684+ fb.setPropertyValue (PROPERTY_NAME_DEC_TS_NAME, String (" ts" ));
641685 EXPECT_EQ (fb.getSignals ().getCount (), 1u );
642- ASSERT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
686+ EXPECT_EQ (fb.getSignals (search::Any ()).getCount (), 2u );
687+ EXPECT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
643688 Enumeration (" ComponentStatusType" , " Ok" , daqInstance.getContext ().getTypeManager ()));
644689 EXPECT_NE (fb.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Waiting for data" ), std::string::npos);
690+
645691 subMqttFb.removeFunctionBlock (fb);
646692 }
647693 {
648694 daq::FunctionBlockPtr fb;
649- auto config = PropertyObject ();
650- config.addProperty (StringProperty (PROPERTY_NAME_DEC_TS_NAME, String (" ts" )));
651- ASSERT_NO_THROW (fb = subMqttFb.addFunctionBlock (JSON_DECODER_FB_NAME, config));
695+ ASSERT_NO_THROW (fb = subMqttFb.addFunctionBlock (JSON_DECODER_FB_NAME));
696+
697+ fb.setPropertyValue (PROPERTY_NAME_DEC_TS_MODE, static_cast <int >(DDSM::ExtractFromMessage));
698+ fb.setPropertyValue (PROPERTY_NAME_DEC_TS_NAME, String (" ts" ));
699+
652700 EXPECT_EQ (fb.getSignals ().getCount (), 1u );
653- ASSERT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
701+ EXPECT_EQ (fb.getSignals (search::Any ()).getCount (), 2u );
702+ EXPECT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
654703 Enumeration (" ComponentStatusType" , " Error" , daqInstance.getContext ().getTypeManager ()));
655704 EXPECT_NE (fb.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Configuration is invalid" ), std::string::npos);
705+
706+ fb.setPropertyValue (PROPERTY_NAME_DEC_VALUE_NAME, String (" value" ));
707+ EXPECT_EQ (fb.getSignals ().getCount (), 1u );
708+ EXPECT_EQ (fb.getSignals (search::Any ()).getCount (), 2u );
709+ EXPECT_EQ (fb.getStatusContainer ().getStatus (" ComponentStatus" ),
710+ Enumeration (" ComponentStatusType" , " Ok" , daqInstance.getContext ().getTypeManager ()));
711+ EXPECT_NE (fb.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Waiting for data" ), std::string::npos);
712+
656713 subMqttFb.removeFunctionBlock (fb);
657714 }
658715}
@@ -679,6 +736,27 @@ TEST_P(MqttJsonFbDoubleDataPTest, DataTransferOneSignalDoubleWithoutDomain)
679736 EXPECT_NE (decoderObj.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Parsing succeeded" ), std::string::npos);
680737}
681738
739+ TEST_P (MqttJsonFbDoubleDataPTest, DataTransferOneSignalDoubleWithSystemTime)
740+ {
741+ const auto dataToSend = GetParam ();
742+ std::vector<uint64_t > timePoints;
743+ auto dataToReceive = transferDataWithSystemTime (dataToSend, VALID_JSON_DATA_1, timePoints);
744+ ASSERT_EQ (dataToSend.size (), dataToReceive.size ());
745+ ASSERT_TRUE (compareData (dataToSend, dataToReceive, false ));
746+
747+ ASSERT_EQ (dataToSend.size () + 1 , timePoints.size ());
748+
749+ for (size_t i = 0 ; i < dataToReceive.size (); ++i)
750+ {
751+ EXPECT_GE (dataToReceive[i].second , timePoints[i]);
752+ EXPECT_LE (dataToReceive[i].second , timePoints[i + 1 ]);
753+ }
754+
755+ ASSERT_EQ (decoderObj.getStatusContainer ().getStatus (" ComponentStatus" ),
756+ Enumeration (" ComponentStatusType" , " Ok" , decoderObj.getContext ().getTypeManager ()));
757+ EXPECT_NE (decoderObj.getStatusContainer ().getStatusMessage (" ComponentStatus" ).toStdString ().find (" Parsing succeeded" ), std::string::npos);
758+ }
759+
682760INSTANTIATE_TEST_SUITE_P (DataTransferOneSignalDouble,
683761 MqttJsonFbDoubleDataPTest,
684762 ::testing::Values (DATA_DOUBLE_INT_0, DATA_DOUBLE_INT_1, DATA_DOUBLE_INT_2));
@@ -837,9 +915,9 @@ TEST_F(MqttJsonDecoderFbTest, DataTransferSeveralSignals)
837915 DaqInstanceInit ();
838916 auto clientFb0 = DaqAddClientMqttFb (" 127.0.0.1" , DEFAULT_PORT);
839917 auto jsonFb0 = AddSubFb (topic);
840- auto decoderFb0 = AddDecoderFb (valueF0, tsF);
841- auto decoderFb1 = AddDecoderFb (valueF1, tsF);
842- auto decoderFb2 = AddDecoderFb (valueF2, " " );
918+ auto decoderFb0 = AddDecoderFb (valueF0, DDSM::ExtractFromMessage, tsF);
919+ auto decoderFb1 = AddDecoderFb (valueF1, DDSM::ExtractFromMessage, tsF);
920+ auto decoderFb2 = AddDecoderFb (valueF2, DDSM::None, " " );
843921
844922 auto signalList = List<ISignal>();
845923 signalList.pushBack (decoderFb0.getSignals ()[0 ]);
@@ -894,7 +972,7 @@ TEST_F(MqttJsonDecoderFbTest, DataTransferMissingFieldOneSignal)
894972 const auto topic = buildTopicName ();
895973 std::string valueF = extractFieldName (VALID_JSON_DATA_1, " <placeholder_value>" );
896974 std::string tsF = " ts" ;
897- CreateDecoderFB (topic, valueF, tsF);
975+ CreateDecoderFB (topic, valueF, DDSM::ExtractFromMessage, tsF);
898976
899977 auto signal = getSignals ()[0 ];
900978 auto reader = daq::PacketReader (signal);
@@ -924,9 +1002,9 @@ TEST_F(MqttJsonDecoderFbTest, DataTransferMissingFieldSeveralSignals)
9241002 DaqInstanceInit ();
9251003 auto clientFb0 = DaqAddClientMqttFb (" 127.0.0.1" , DEFAULT_PORT);
9261004 auto jsonFb0 = AddSubFb (topic);
927- auto decoderFb0 = AddDecoderFb (valueF0, tsF);
928- auto decoderFb1 = AddDecoderFb (valueF1, tsF);
929- auto decoderFb2 = AddDecoderFb (valueF2, " " );
1005+ auto decoderFb0 = AddDecoderFb (valueF0, DDSM::ExtractFromMessage, tsF);
1006+ auto decoderFb1 = AddDecoderFb (valueF1, DDSM::ExtractFromMessage, tsF);
1007+ auto decoderFb2 = AddDecoderFb (valueF2, DDSM::None, " " );
9301008
9311009 auto signalList = List<ISignal>();
9321010 signalList.pushBack (decoderFb0.getSignals ()[0 ]);
@@ -981,7 +1059,7 @@ TEST_F(MqttJsonFbCommunicationTest, FullDataTransfer)
9811059 const std::string valueF = extractFieldName (msgTemplate, " <placeholder_value>" );
9821060 const std::string tsF = extractFieldName (msgTemplate, " <placeholder_ts>" );
9831061 AddSubFb (topic);
984- AddDecoderFb (valueF, tsF);
1062+ AddDecoderFb (valueF, DDSM::ExtractFromMessage, tsF);
9851063
9861064 const auto result = processTransfer (" 127.0.0.1" , DEFAULT_PORT, topic, DATA_DOUBLE_INT_0, decoderObj.getSignals ()[0 ]);
9871065
@@ -1005,11 +1083,11 @@ TEST_F(MqttJsonFbCommunicationTest, DISABLED_FullDataTransferFor2MqttFbs)
10051083 DaqInstanceInit ();
10061084 auto clientFb0 = DaqAddClientMqttFb (" 127.0.0.1" , 1883 );
10071085 auto jsonFb0 = AddSubFb (topic0);
1008- auto decoderFb0 = AddDecoderFb (valueF, tsF);
1086+ auto decoderFb0 = AddDecoderFb (valueF, DDSM::ExtractFromMessage, tsF);
10091087
10101088 auto clientFb1 = DaqAddClientMqttFb (" 127.0.0.1" , 1884 );
10111089 auto jsonFb1 = AddSubFb (topic1);
1012- auto decoderFb1 = AddDecoderFb (valueF, tsF);
1090+ auto decoderFb1 = AddDecoderFb (valueF, DDSM::ExtractFromMessage, tsF);
10131091
10141092 const auto result0 = processTransfer (" 127.0.0.1" , 1883 , topic0, DATA_DOUBLE_INT_0, decoderFb0.getSignals ()[0 ]);
10151093 const auto result1 = processTransfer (" 127.0.0.1" , 1884 , topic1, DATA_DOUBLE_INT_1, decoderFb1.getSignals ()[0 ]);
0 commit comments