Skip to content

Commit 45af7fa

Browse files
committed
mqtt: test fixing
1 parent 810dc2a commit 45af7fa

4 files changed

Lines changed: 60 additions & 12 deletions

File tree

mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -324,19 +324,28 @@ class MqttPublisherFbHelper : public DaqTestHelper
324324
expectedMsgsForMultimsg(const std::string& signalName0, const std::string& signalName1, std::vector<std::pair<T, uint64_t>> data)
325325
{
326326
std::vector<std::string> msgs;
327-
const std::string PUBLISHER_SINGLE_MSG = "[{<placeholder_signal0> : <placeholder_value0>, \"timestamp\": <placeholder_ts0>}, "
328-
"{<placeholder_signal1> : <placeholder_value1>, \"timestamp\": <placeholder_ts1>}]";
329-
327+
// const std::string PUBLISHER_SINGLE_MSG = "[{<placeholder_signal0> : <placeholder_value0>, \"timestamp\": <placeholder_ts0>}, "
328+
// "{<placeholder_signal1> : <placeholder_value1>, \"timestamp\": <placeholder_ts1>}]";
329+
const std::string PUBLISHER_SINGLE_MSG_FIRST = "{<placeholder_signal0> : <placeholder_value0>, \"timestamp\": <placeholder_ts0>}";
330+
const std::string PUBLISHER_SINGLE_MSG_SECOND = "{<placeholder_signal1> : <placeholder_value1>, \"timestamp\": <placeholder_ts1>}";
331+
332+
std::string tmpStr;
333+
tmpStr = replacePlaceholder(tmpStr, "<placeholder_first>", PUBLISHER_SINGLE_MSG_FIRST);
334+
tmpStr = replacePlaceholder(tmpStr, "<placeholder_second>", PUBLISHER_SINGLE_MSG_SECOND);
335+
const std::string PUBLISHER_SINGLE_MSG = std::move(tmpStr);
330336
std::vector<std::string> messages;
331337
{
332338
for (const auto& [value, ts] : data)
333339
{
334-
auto msg = PUBLISHER_SINGLE_MSG;
340+
auto msg = PUBLISHER_SINGLE_MSG_FIRST;
335341
msg = replacePlaceholder(msg, "<placeholder_signal0>", signalName0);
336-
msg = replacePlaceholder(msg, "<placeholder_signal1>", signalName1);
337342
msg = replacePlaceholder(msg, "<placeholder_value0>", value);
338-
msg = replacePlaceholder(msg, "<placeholder_value1>", value);
339343
msg = replacePlaceholder(msg, "<placeholder_ts0>", ts * 1000);
344+
messages.push_back(std::move(msg));
345+
346+
msg = PUBLISHER_SINGLE_MSG_SECOND;
347+
msg = replacePlaceholder(msg, "<placeholder_signal1>", signalName1);
348+
msg = replacePlaceholder(msg, "<placeholder_value1>", value);
340349
msg = replacePlaceholder(msg, "<placeholder_ts1>", ts * 1000);
341350
messages.push_back(std::move(msg));
342351
}
@@ -405,12 +414,15 @@ class MqttPublisherFbHelper : public DaqTestHelper
405414
bool transfer(const std::string& topic,
406415
const std::vector<std::string>& messages,
407416
SignalHelper<T>& helper,
408-
const std::vector<std::pair<T, uint64_t>>& data)
417+
const std::vector<std::pair<T, uint64_t>>& data, bool isMultimessage = false)
409418
{
410419
std::promise<bool> receivedPromise;
411420
auto receivedFuture = receivedPromise.get_future();
412421
std::atomic<bool> done{false};
413-
subscriber->expectMsgs(topic, messages, receivedPromise, done);
422+
if (isMultimessage)
423+
subscriber->expectMultiMsgs(topic, messages, receivedPromise, done);
424+
else
425+
subscriber->expectMsgs(topic, messages, receivedPromise, done);
414426

415427
helper::utils::Timer receiveTimer(5000);
416428
bool ok = subscriber->subscribe(topic, 2);
@@ -962,7 +974,7 @@ TEST_P(MqttPublisherFbPTest, TransferMultimessage)
962974
const auto data = help.generateTestData(sampleCnt);
963975
const std::vector<std::string> messages =
964976
expectedMsgsForMultimsg(help.signal0.getGlobalId().toStdString(), help.signal1.getGlobalId().toStdString(), data);
965-
auto ok = transfer(topic, messages, help, data);
977+
auto ok = transfer(topic, messages, help, data, true);
966978
ASSERT_TRUE(ok);
967979
ASSERT_EQ(fb.getStatusContainer().getStatus("ComponentStatus"),
968980
Enumeration("ComponentStatusType", "Ok", daqInstance.getContext().getTypeManager()));

mqtt_streaming_module/tests/test_mqtt_raw_fb.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,8 @@ TEST_F(MqttRawFbTest, CheckRawFbFullDataTransfer)
289289
mqtt::MqttMessage msg = {topic, data, 1, 0};
290290
ASSERT_TRUE(publisher.publishMsg(msg));
291291
}
292-
293-
while (!reader.getEmpty())
292+
helper::utils::Timer tmr(3000, true);
293+
while ((!reader.getEmpty() || !tmr.expired()) && dataToReceive.size() != dataToSend.size())
294294
{
295295
auto packet = reader.read();
296296
if (const auto eventPacket = packet.asPtrOrNull<IEventPacket>(); eventPacket.assigned())

mqtt_streaming_protocol/tests/MqttAsyncClientWrapper.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ void MqttAsyncClientWrapper::expectMsgs(const std::string& topic,
143143
std::atomic<bool>& done)
144144
{
145145
instance->setMessageArrivedCb(topic,
146-
[topic, &done, &msgs, &promise, i = 0](const mqtt::MqttAsyncClient& subscriber,
146+
[topic, &done, &msgs, &promise, i = size_t(0)](const mqtt::MqttAsyncClient& subscriber,
147147
mqtt::MqttMessage& receivedMsg) mutable
148148
{
149149
const auto receivedStr = receivedMsg.toString();
@@ -156,3 +156,38 @@ void MqttAsyncClientWrapper::expectMsgs(const std::string& topic,
156156
promise.set_value(true);
157157
});
158158
}
159+
160+
void MqttAsyncClientWrapper::expectMultiMsgs(const std::string& topic,
161+
const std::vector<std::string>& msgs,
162+
std::promise<bool>& promise,
163+
std::atomic<bool>& done)
164+
{
165+
instance->setMessageArrivedCb(topic,
166+
[topic, &done, localMsgs = msgs, &promise](const mqtt::MqttAsyncClient& subscriber,
167+
mqtt::MqttMessage& receivedMsg) mutable
168+
{
169+
const auto receivedStr = receivedMsg.toString();
170+
std::cout << "{topic | msg}: " << receivedMsg.getTopic() << " | " << receivedStr << std::endl;
171+
if (receivedMsg.getTopic() != topic || localMsgs.empty())
172+
return;
173+
if ((std::string("[") + localMsgs.at(0) + "]") == receivedStr)
174+
{
175+
localMsgs.erase(localMsgs.begin());
176+
}
177+
else if (localMsgs.size() >= 2 && (std::string("[") + localMsgs.at(1) + "]") == receivedStr)
178+
{
179+
localMsgs.erase(localMsgs.begin() + 1);
180+
}
181+
else if(localMsgs.size() >= 2 && (std::string("[") + localMsgs.at(0) + ", " + localMsgs.at(1) + "]") == receivedStr)
182+
{
183+
localMsgs.erase(localMsgs.begin(), localMsgs.begin() + 2);
184+
}
185+
else if(localMsgs.size() >= 2 && (std::string("[") + localMsgs.at(1) + ", " + localMsgs.at(0) + "]") == receivedStr)
186+
{
187+
localMsgs.erase(localMsgs.begin(), localMsgs.begin() + 2);
188+
}
189+
bool expected = false;
190+
if (localMsgs.empty() && done.compare_exchange_strong(expected, true))
191+
promise.set_value(true);
192+
});
193+
}

mqtt_streaming_protocol/tests/MqttAsyncClientWrapper.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class MqttAsyncClientWrapper
1717
bool publishMsg(const mqtt::MqttMessage& msg);
1818
bool subscribe(const std::string& topic, int qos);
1919
void expectMsgs(const std::string& topic, const std::vector<std::string>& msgs, std::promise<bool>& promise, std::atomic<bool>& done);
20+
void expectMultiMsgs(const std::string& topic, const std::vector<std::string>& msgs, std::promise<bool>& promise, std::atomic<bool>& done);
2021

2122
std::unique_ptr<mqtt::MqttAsyncClient> instance;
2223
std::promise<bool> connectedPromise;

0 commit comments

Comments
 (0)