Skip to content

Commit 100e65c

Browse files
committed
std::async for SignalHelper::send in tests
1 parent bf41a55 commit 100e65c

3 files changed

Lines changed: 41 additions & 33 deletions

File tree

modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_client_fb_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ class MqttClientFbImpl : public FunctionBlock
5959
std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
6060
Mqtt::Utils::Settings::MqttConnectionSettings connectionSettings;
6161

62-
std::promise<bool> connectedPromise;
6362
std::future<bool> connectedFuture;
6463
std::atomic<bool> connectedDone{false};
6564
std::unordered_map<std::string, std::string> deviceMap; // device name -> signal list JSON

modules/mqtt_streaming_module/src/mqtt_client_fb_impl.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,17 @@ void MqttClientFbImpl::initMqttSubscriber()
7575
subscriber->setUsernamePasswrod(connectionSettings.username, connectionSettings.password);
7676

7777
connectedDone = false;
78-
connectedPromise = std::promise<bool>();
79-
connectedFuture = connectedPromise.get_future();
78+
auto connectedPromise = std::make_shared<std::promise<bool>>();
79+
connectedFuture = connectedPromise->get_future();
8080

8181
subscriber->setConnectedCb(
82-
[this]
82+
[this, connectedPromise]
8383
{
8484
bool expected = false;
8585
if (connectedDone.compare_exchange_strong(expected, true))
8686
{
8787
connectionStatus.setStatus("Connected");
88-
connectedPromise.set_value(true);
88+
connectedPromise->set_value(true);
8989
std::scoped_lock lock(componentStatusSync);
9090
setComponentStatus(ComponentStatus::Ok);
9191
}

modules/mqtt_streaming_module/tests/test_mqtt_publisher_fb.cpp

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -73,33 +73,39 @@ class SignalHelper
7373
return data;
7474
}
7575

76-
void send(const std::vector<std::pair<T, uint64_t>>& data, size_t packSize = 3) const
76+
auto send(const std::vector<std::pair<T, uint64_t>>& data, uint32_t delay = 0, size_t packSize = 3) const
7777
{
78-
if (packSize == 0)
79-
packSize = 1;
80-
auto sendPacket = [this](SignalConfigPtr signal, const std::vector<T>& data, DataPacketPtr domainPacket)
81-
{
82-
auto dataPacket = DataPacketWithDomain(domainPacket, signal0.getDescriptor(), data.size());
83-
copyData(dataPacket, data);
84-
signal.sendPacket(dataPacket);
85-
};
86-
for (size_t i = 0; i < data.size(); i += packSize)
87-
{
88-
std::vector<T> dataPack;
89-
std::vector<uint64_t> tsPack;
90-
for (size_t j = 0; j < packSize && (j + i) < data.size(); j++)
91-
{
92-
dataPack.push_back(data[j + i].first);
93-
tsPack.push_back(data[j + i].second);
94-
}
95-
96-
auto domainPacket = DataPacket(signal0.getDomainSignal().getDescriptor(), tsPack.size(), i);
97-
memcpy(domainPacket.getData(), tsPack.data(), sizeof(uint64_t) * tsPack.size());
98-
SignalConfigPtr dSignal = signal0.getDomainSignal();
99-
dSignal.sendPacket(domainPacket);
100-
sendPacket(signal0, dataPack, domainPacket);
101-
sendPacket(signal1, dataPack, domainPacket);
102-
}
78+
return std::async(std::launch::async,
79+
[this, &data, packSize, delay]() mutable
80+
{
81+
if (packSize == 0)
82+
packSize = 1;
83+
auto sendPacket = [this](SignalConfigPtr signal, const std::vector<T>& data, DataPacketPtr domainPacket)
84+
{
85+
auto dataPacket = DataPacketWithDomain(domainPacket, signal0.getDescriptor(), data.size());
86+
copyData(dataPacket, data);
87+
signal.sendPacket(std::move(dataPacket));
88+
};
89+
for (size_t i = 0; i < data.size(); i += packSize)
90+
{
91+
if (i != 0 && delay != 0)
92+
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
93+
std::vector<T> dataPack;
94+
std::vector<uint64_t> tsPack;
95+
for (size_t j = 0; j < packSize && (j + i) < data.size(); j++)
96+
{
97+
dataPack.push_back(data[j + i].first);
98+
tsPack.push_back(data[j + i].second);
99+
}
100+
101+
auto domainPacket = DataPacket(signal0.getDomainSignal().getDescriptor(), tsPack.size(), i);
102+
memcpy(domainPacket.getData(), tsPack.data(), sizeof(uint64_t) * tsPack.size());
103+
SignalConfigPtr dSignal = signal0.getDomainSignal();
104+
dSignal.sendPacket(domainPacket);
105+
sendPacket(signal0, dataPack, domainPacket);
106+
sendPacket(signal1, dataPack, domainPacket);
107+
}
108+
});
103109
}
104110

105111
protected:
@@ -493,6 +499,8 @@ class MqttPublisherFbHelper : public DaqTestHelper
493499
SignalHelper<T>& helper,
494500
const std::vector<std::pair<T, uint64_t>>& data, bool isMultimessage = false)
495501
{
502+
constexpr uint32_t DELAY_BETWEEN_PACKS= 20;
503+
constexpr uint32_t BASE_TIMEOUT = 5000;
496504
std::promise<bool> receivedPromise;
497505
auto receivedFuture = receivedPromise.get_future();
498506
std::atomic<bool> done{false};
@@ -501,12 +509,13 @@ class MqttPublisherFbHelper : public DaqTestHelper
501509
else
502510
subscriber->expectMsgs(topic, messages, receivedPromise, done);
503511

504-
helper::utils::Timer receiveTimer(5000);
512+
helper::utils::Timer receiveTimer(BASE_TIMEOUT + data.size() * DELAY_BETWEEN_PACKS);
505513
bool ok = subscriber->subscribe(topic, 2);
506514
if (!ok)
507515
return false;
508-
helper.send(data);
516+
auto future = helper.send(data, DELAY_BETWEEN_PACKS);
509517
auto status = receivedFuture.wait_for(receiveTimer.remain());
518+
future.wait();
510519
subscriber = nullptr;
511520
ok = (status == std::future_status::ready) && receivedFuture.get();
512521
return ok;

0 commit comments

Comments
 (0)