Skip to content

Commit 422f559

Browse files
committed
mqtt: new handling for timestamps in the publisher FB
1 parent dd52566 commit 422f559

6 files changed

Lines changed: 102 additions & 13 deletions

File tree

mqtt_streaming_client_module/include/mqtt_streaming_client_module/handler_base.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,27 @@ class HandlerBase
2929
virtual MqttData processSignalContexts(std::vector<SignalContext>& signalContexts) = 0;
3030
virtual ProcedureStatus validateSignalContexts(const std::vector<SignalContext>& signalContexts) const = 0;
3131
virtual ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) = 0;
32+
protected:
33+
static std::pair<uint64_t, uint64_t> calculateRatio(const DataDescriptorPtr descriptor)
34+
{
35+
const auto tickResolution = descriptor.getTickResolution().simplify();
36+
const uint64_t packetDelta = descriptor.getRule().getParameters().get("delta");
37+
uint64_t num = tickResolution.getNumerator() * packetDelta;
38+
uint64_t den = tickResolution.getDenominator();
39+
const uint64_t g = std::gcd(num, den);
40+
num /= g;
41+
den /= g;
42+
return std::pair<uint64_t, uint64_t>{num, den};
43+
}
44+
45+
static uint64_t convertToEpoch(const DataPacketPtr domainPacket)
46+
{
47+
const auto [ratioNum, ratioDen] = calculateRatio(domainPacket.getDataDescriptor());
48+
constexpr uint64_t US_IN_S = 1'000'000; // amount microseconds in a second
49+
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
50+
ts = ts * ratioNum * US_IN_S / ratioDen; // us
51+
return ts;
52+
}
3253
};
3354

3455
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE

mqtt_streaming_client_module/include/mqtt_streaming_client_module/multiple_shared_handler.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@
2121

2222
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE
2323

24+
struct TimestampTickStruct
25+
{
26+
uint64_t firstTick;
27+
uint64_t delta;
28+
uint64_t ratioNum;
29+
uint64_t ratioDen;
30+
uint64_t multiplier;
31+
};
32+
2433
class MultipleSharedHandler : public HandlerBase
2534
{
2635
public:
@@ -33,18 +42,18 @@ class MultipleSharedHandler : public HandlerBase
3342
bool useSignalNames;
3443
const size_t buffersSize;
3544
const std::string topic;
36-
std::vector<void*> domainBuffers;
3745
std::vector<void*> dataBuffers;
3846
daq::MultiReaderPtr reader;
3947

4048
template<typename T>
4149
std::string toString(const std::string& valueFieldName, void* data, SizeT offset);
4250
std::string toString(const SampleType sampleType, const std::string& valueFieldName, void* data, SizeT offset);
43-
std::string tsToString(void* data, SizeT offset);
51+
std::string tsToString(TimestampTickStruct tsStruct, SizeT offset);
4452
std::string buildTopicName();
4553
void createReader(const std::vector<SignalContext>& signalContexts);
4654
void allocateBuffers(const std::vector<SignalContext>& signalContexts);
4755
static std::string messageFromFields(const std::vector<std::string>& fields);
56+
static TimestampTickStruct domainToTs(const MultiReaderStatusPtr status);
4857
};
4958

5059
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE

mqtt_streaming_client_module/src/multiple_handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ std::string MultipleHandler::toString(const std::string valueFieldName, daq::Dat
110110
std::string data = toString(packet);
111111
if (auto domainPacket = packet.getDomainPacket(); domainPacket.assigned())
112112
{
113-
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
113+
uint64_t ts = convertToEpoch(domainPacket);
114114
result = fmt::format("{{\"{}\" : {}, \"timestamp\": {}}}", valueFieldName, data, ts);
115115
}
116116
else

mqtt_streaming_client_module/src/multiple_shared_handler.cpp

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <opendaq/reader_factory.h>
77
#include <opendaq/reader_utils.h>
88
#include <opendaq/sample_type_traits.h>
9+
#include <optional>
910
#include <set>
1011

1112
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE
@@ -24,9 +25,10 @@ MqttData MultipleSharedHandler::processSignalContexts(std::vector<SignalContext>
2425
return messages;
2526
auto dataAvailable = reader.getAvailableCount();
2627
auto count = std::min(SizeT{buffersSize}, dataAvailable);
27-
auto status = reader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);
28+
auto status = reader.read(dataBuffers.data(), &count);
2829
if (status.getReadStatus() == ReadStatus::Ok && count > 0)
2930
{
31+
const auto tsStruct = domainToTs(status);
3032
for (SizeT sampleCnt = 0; sampleCnt < count; ++sampleCnt)
3133
{
3234
std::vector<std::string> fields;
@@ -36,7 +38,8 @@ MqttData MultipleSharedHandler::processSignalContexts(std::vector<SignalContext>
3638
std::string valueFieldName = (useSignalNames ? signal.getName() : signal.getGlobalId()).toStdString();
3739
fields.emplace_back(toString(signal.getDescriptor().getSampleType(), valueFieldName, dataBuffers[signalCnt], sampleCnt));
3840
}
39-
fields.emplace_back(tsToString(domainBuffers[0], sampleCnt));
41+
42+
fields.emplace_back(tsToString(tsStruct, sampleCnt));
4043
std::string topic = buildTopicName();
4144
std::string msg = messageFromFields(fields);
4245
messages.emplace_back(std::move(topic), std::move(msg));
@@ -73,6 +76,19 @@ ProcedureStatus MultipleSharedHandler::validateSignalContexts(const std::vector<
7376
sigCtx.inputPort.getSignal().getGlobalId()));
7477
status.success = false;
7578
}
79+
if (!signal.getDomainSignal().getDescriptor().assigned())
80+
{
81+
status.messages.emplace_back(
82+
fmt::format("Connected signal \"{}\" doesn't contain a descroptor for a domain signal. This is not allowed.",
83+
sigCtx.inputPort.getSignal().getGlobalId()));
84+
status.success = false;
85+
}
86+
if (auto domainDataRule = signal.getDomainSignal().getDescriptor().getRule(); domainDataRule.getType() != DataRuleType::Linear)
87+
{
88+
status.messages.emplace_back(fmt::format("Connected signal \"{}\" has an incompatible data rule for its domain signal.",
89+
sigCtx.inputPort.getSignal().getGlobalId()));
90+
status.success = false;
91+
}
7692
if (!signal.getDescriptor().assigned())
7793
{
7894
status.messages.emplace_back(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
@@ -93,9 +109,52 @@ ProcedureStatus MultipleSharedHandler::validateSignalContexts(const std::vector<
93109
status.success = false;
94110
}
95111
}
112+
113+
std::optional<std::pair<uint64_t, uint64_t>> ratio0;
114+
bool error = false;
115+
for (size_t i = 0; i < signalContexts.size() && !error; ++i)
116+
{
117+
auto signal = signalContexts[i].inputPort.getSignal();
118+
if (!signal.assigned())
119+
continue;
120+
121+
if (!ratio0.has_value())
122+
{
123+
ratio0 = calculateRatio(signal.getDomainSignal().getDescriptor());
124+
}
125+
else
126+
{
127+
auto ratio = calculateRatio(signal.getDomainSignal().getDescriptor());
128+
error = (ratio != ratio0);
129+
}
130+
}
131+
132+
if (error)
133+
{
134+
status.messages.emplace_back(fmt::format("Connected signals have incompatible sample rates. This is not allowed."));
135+
status.success = false;
136+
}
96137
return status;
97138
}
98139

140+
TimestampTickStruct MultipleSharedHandler::domainToTs(const MultiReaderStatusPtr status)
141+
{
142+
TimestampTickStruct res;
143+
const auto descriptor =
144+
status.getMainDescriptor().getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR).asPtr<IDataDescriptor>();
145+
const auto [ratioNum, ratioDen] = calculateRatio(descriptor);
146+
const uint64_t offset = status.getOffset().getValue<uint64_t>(0);
147+
const uint64_t start = descriptor.getRule().getParameters().get("start").getValue<uint64_t>(0);
148+
const uint64_t refOffset = descriptor.getReferenceDomainInfo().getReferenceDomainOffset().getValue<uint64_t>(0);
149+
150+
res.firstTick = offset + start + refOffset;
151+
res.ratioNum = ratioNum;
152+
res.ratioDen = ratioDen;
153+
res.multiplier = 1'000'000; // amount of us in a second
154+
res.delta = descriptor.getRule().getParameters().get("delta").getValue<uint64_t>(0);
155+
return res;
156+
}
157+
99158
ProcedureStatus MultipleSharedHandler::signalListChanged(std::vector<SignalContext>& signalContexts)
100159
{
101160
ProcedureStatus status{true, {}};
@@ -125,9 +184,12 @@ std::string MultipleSharedHandler::toString(const std::string& valueFieldName, v
125184
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<T*>(data) + offset)));
126185
}
127186

128-
std::string MultipleSharedHandler::tsToString(void* data, SizeT offset)
187+
std::string MultipleSharedHandler::tsToString(TimestampTickStruct tsStruct, SizeT offset)
129188
{
130-
return fmt::format("\"timestamp\" : {}", std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt64>::Type*>(data) + offset)));
189+
// const uint64_t epochTime = (firstTick + delta * offset) * ratioNum * US_IN_S / ratioDen; // us
190+
return fmt::format("\"timestamp\" : {}",
191+
std::to_string(((tsStruct.firstTick + tsStruct.delta * offset) * tsStruct.ratioNum * tsStruct.multiplier) /
192+
tsStruct.ratioDen));
131193
}
132194

133195
std::string MultipleSharedHandler::buildTopicName()
@@ -159,19 +221,16 @@ void MultipleSharedHandler::allocateBuffers(const std::vector<SignalContext>& si
159221
{
160222
// Allocate buffers for each signal
161223
auto signalsCount = signalContexts.size() - 1;
162-
for (size_t i = 0; i < domainBuffers.size(); ++i)
224+
for (size_t i = 0; i < dataBuffers.size(); ++i)
163225
{
164-
std::free(domainBuffers[i]);
165226
std::free(dataBuffers[i]);
166227
}
167228

168-
domainBuffers = std::vector<void*>(signalsCount, nullptr);
169229
dataBuffers = std::vector<void*>(signalsCount, nullptr);
170230

171231
for (size_t i = 0; i < signalsCount; ++i)
172232
{
173233
dataBuffers[i] = std::malloc(buffersSize * getSampleSize(signalContexts[i].inputPort.getSignal().getDescriptor().getSampleType()));
174-
domainBuffers[i] = std::malloc(buffersSize * getSampleSize(SampleType::UInt64));
175234
}
176235
}
177236

mqtt_streaming_client_module/src/multisingle_handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ std::string MultisingleHandler::toString(const std::string valueFieldName, const
6868

6969
if (auto domainPacket = dataPackets[i].getDomainPacket(); domainPacket.assigned())
7070
{
71-
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
71+
uint64_t ts = convertToEpoch(domainPacket);
7272
tsOss << std::to_string(ts);
7373
}
7474
else

mqtt_streaming_client_module/src/single_handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ std::string SingleHandler::toString(const std::string valueFieldName, daq::DataP
142142
std::string data = toString(packet);
143143
if (auto domainPacket = packet.getDomainPacket(); domainPacket.assigned())
144144
{
145-
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
145+
uint64_t ts = convertToEpoch(domainPacket);
146146
result = fmt::format("{{\"{}\" : {}, \"timestamp\": {}}}", valueFieldName, data, ts);
147147
}
148148
else

0 commit comments

Comments
 (0)