Skip to content

Commit 8bdaf85

Browse files
committed
mqtt: timestamps handling in publisher FB
1 parent 422f559 commit 8bdaf85

9 files changed

Lines changed: 180 additions & 110 deletions

File tree

mqtt_streaming_client_module/include/mqtt_streaming_client_module/handler_base.h

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
#pragma once
1818

1919
#include "mqtt_streaming_client_module/common.h"
20-
#include <vector>
2120
#include <mqtt_streaming_client_module/types.h>
21+
#include <opendaq/sample_type_traits.h>
22+
#include <vector>
2223

2324
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE
2425

@@ -44,12 +45,69 @@ class HandlerBase
4445

4546
static uint64_t convertToEpoch(const DataPacketPtr domainPacket)
4647
{
47-
const auto [ratioNum, ratioDen] = calculateRatio(domainPacket.getDataDescriptor());
4848
constexpr uint64_t US_IN_S = 1'000'000; // amount microseconds in a second
49-
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
50-
ts = ts * ratioNum * US_IN_S / ratioDen; // us
49+
const auto tickResolution = domainPacket.getDataDescriptor().getTickResolution().simplify();
50+
uint64_t num = tickResolution.getNumerator() * US_IN_S;
51+
uint64_t den = tickResolution.getDenominator();
52+
const uint64_t g = std::gcd(num, den);
53+
num /= g;
54+
den /= g;
55+
56+
uint64_t ts = 0;
57+
if (domainPacket.getDataDescriptor().getSampleType() == SampleType::UInt64)
58+
ts = *(static_cast<uint64_t*>(domainPacket.getData()));
59+
else if (domainPacket.getDataDescriptor().getSampleType() == SampleType::Int64)
60+
ts = *(static_cast<int64_t*>(domainPacket.getData()));
61+
ts = ts * num / den; // us
5162
return ts;
5263
}
64+
65+
static std::string toString(const DataPacketPtr& dataPacket)
66+
{
67+
auto sampleType = dataPacket.getDataDescriptor().getSampleType();
68+
std::string data("unsupported");
69+
70+
switch (sampleType)
71+
{
72+
case SampleType::Float64:
73+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::Float64>::Type*>(dataPacket.getData())));
74+
break;
75+
case SampleType::Float32:
76+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::Float32>::Type*>(dataPacket.getData())));
77+
break;
78+
case SampleType::UInt64:
79+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt64>::Type*>(dataPacket.getData())));
80+
break;
81+
case SampleType::Int64:
82+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::Int64>::Type*>(dataPacket.getData())));
83+
break;
84+
case SampleType::UInt32:
85+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt32>::Type*>(dataPacket.getData())));
86+
break;
87+
case SampleType::Int32:
88+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::Int32>::Type*>(dataPacket.getData())));
89+
break;
90+
case SampleType::UInt16:
91+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt16>::Type*>(dataPacket.getData())));
92+
break;
93+
case SampleType::Int16:
94+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::Int16>::Type*>(dataPacket.getData())));
95+
break;
96+
case SampleType::UInt8:
97+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt8>::Type*>(dataPacket.getData())));
98+
break;
99+
case SampleType::Int8:
100+
data = std::to_string(*(static_cast<SampleTypeToType<SampleType::Int8>::Type*>(dataPacket.getData())));
101+
break;
102+
case SampleType::String:
103+
case SampleType::Binary:
104+
data = '\"' + std::string(static_cast<char*>(dataPacket.getData()), dataPacket.getDataSize()) + '\"';
105+
break;
106+
default:
107+
break;
108+
}
109+
return data;
110+
}
53111
};
54112

55113
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE

mqtt_streaming_client_module/include/mqtt_streaming_client_module/multiple_handler.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class MultipleHandler : public HandlerBase
3434
const std::string topic;
3535

3636
std::string toString(const std::string valueFieldName, daq::DataPacketPtr packet);
37-
std::string toString(const DataPacketPtr& dataPackets);
3837
std::string buildTopicName();
3938
static std::string messageFromArray(const std::vector<std::string>& array);
4039
};

mqtt_streaming_client_module/include/mqtt_streaming_client_module/single_handler.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class SingleHandler : public HandlerBase
3434
virtual MqttData processSignalContext(SignalContext& signalContext);
3535
void processSignalDescriptorChanged(SignalContext& signalCtx, const DataDescriptorPtr& valueSigDesc, const DataDescriptorPtr& domainSigDesc);
3636
MqttDataSample processDataPacket(SignalContext& signalContext, const DataPacketPtr& dataPacket);
37-
std::string toString(const DataPacketPtr& dataPackets);
3837
std::string toString(const std::string valueFieldName, daq::DataPacketPtr packet);
3938
std::string buildTopicName(const SignalContext& signalContext);
4039
};

mqtt_streaming_client_module/include/mqtt_streaming_client_module/types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ struct ProcedureStatus
3838
{
3939
bool success;
4040
std::vector<std::string> messages;
41+
42+
void addError(const std::string& msg)
43+
{
44+
success = false;
45+
messages.push_back(msg);
46+
}
4147
};
4248

4349
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE

mqtt_streaming_client_module/src/multiple_handler.cpp

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -78,22 +78,40 @@ ProcedureStatus MultipleHandler::validateSignalContexts(const std::vector<Signal
7878
continue;
7979
if (!signal.getDescriptor().assigned())
8080
{
81-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
81+
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
8282
sigCtx.inputPort.getSignal().getGlobalId()));
83-
status.success = false;
8483
}
8584
if (auto demensions = signal.getDescriptor().getDimensions(); demensions.assigned() && demensions.getCount() > 0)
8685
{
87-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
86+
status.addError(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
8887
sigCtx.inputPort.getSignal().getGlobalId()));
89-
status.success = false;
9088
}
9189
if (auto sampleType = signal.getDescriptor().getSampleType(); allowedSampleTypes.find(sampleType) == allowedSampleTypes.cend())
9290
{
93-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
91+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
9492
sigCtx.inputPort.getSignal().getGlobalId(),
9593
convertSampleTypeToString(sampleType)));
96-
status.success = false;
94+
}
95+
if (auto dSignal = signal.getDomainSignal(); dSignal.assigned())
96+
{
97+
auto descriptor = dSignal.getDescriptor();
98+
if (!descriptor.assigned())
99+
{
100+
status.addError(fmt::format("Connected signal \"{}\" has a domain signal without descriptor. This is not allowed.",
101+
sigCtx.inputPort.getSignal().getGlobalId()));
102+
}
103+
else if (descriptor.getSampleType() != SampleType::UInt64 && descriptor.getSampleType() != SampleType::Int64)
104+
{
105+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type for its domain signal. "
106+
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
107+
sigCtx.inputPort.getSignal().getGlobalId()));
108+
}
109+
else if (auto unit = descriptor.getUnit(); !unit.assigned() || unit.getSymbol() != "s")
110+
{
111+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible unit for its domain signal. "
112+
"Only 's' (seconds) is allowed.",
113+
sigCtx.inputPort.getSignal().getGlobalId()));
114+
}
97115
}
98116
}
99117
return status;
@@ -107,7 +125,7 @@ ProcedureStatus MultipleHandler::signalListChanged(std::vector<SignalContext>& s
107125
std::string MultipleHandler::toString(const std::string valueFieldName, daq::DataPacketPtr packet)
108126
{
109127
std::string result;
110-
std::string data = toString(packet);
128+
std::string data = HandlerBase::toString(packet);
111129
if (auto domainPacket = packet.getDomainPacket(); domainPacket.assigned())
112130
{
113131
uint64_t ts = convertToEpoch(domainPacket);
@@ -121,32 +139,6 @@ std::string MultipleHandler::toString(const std::string valueFieldName, daq::Dat
121139
return result;
122140
}
123141

124-
std::string MultipleHandler::toString(const DataPacketPtr& dataPacket)
125-
{
126-
auto sampleType = dataPacket.getDataDescriptor().getSampleType();
127-
std::string data;
128-
129-
switch (sampleType)
130-
{
131-
case SampleType::Float64:
132-
data = std::to_string(*(static_cast<double*>(dataPacket.getData())));
133-
break;
134-
case SampleType::UInt64:
135-
data = std::to_string(*(static_cast<uint64_t*>(dataPacket.getData())));
136-
break;
137-
case SampleType::Int64:
138-
data = std::to_string(*(static_cast<int64_t*>(dataPacket.getData())));
139-
break;
140-
case SampleType::Binary:
141-
data = '\"' + std::string(static_cast<char*>(dataPacket.getData()), dataPacket.getDataSize()) + '\"';
142-
break;
143-
default:
144-
break;
145-
}
146-
147-
return data;
148-
}
149-
150142
std::string MultipleHandler::buildTopicName()
151143
{
152144
return topic;

mqtt_streaming_client_module/src/multiple_shared_handler.cpp

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ ProcedureStatus MultipleSharedHandler::validateSignalContexts(const std::vector<
5454

5555
static const std::set<SampleType> allowedSampleTypes{SampleType::Float64,
5656
SampleType::Float32,
57-
SampleType::Float32,
58-
SampleType::Float64,
5957
SampleType::UInt8,
6058
SampleType::Int8,
6159
SampleType::UInt16,
@@ -70,43 +68,54 @@ ProcedureStatus MultipleSharedHandler::validateSignalContexts(const std::vector<
7068
auto signal = sigCtx.inputPort.getSignal();
7169
if (!signal.assigned())
7270
continue;
73-
if (!signal.getDomainSignal().assigned())
71+
const auto dSignal = signal.getDomainSignal();
72+
if (!dSignal.assigned())
7473
{
75-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" doesn't contain a domain signal. This is not allowed.",
76-
sigCtx.inputPort.getSignal().getGlobalId()));
77-
status.success = false;
74+
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a domain signal. This is not allowed.",
75+
sigCtx.inputPort.getSignal().getGlobalId()));
7876
}
79-
if (!signal.getDomainSignal().getDescriptor().assigned())
77+
else if (const auto dSignalDesc = dSignal.getDescriptor(); !dSignalDesc.assigned())
8078
{
81-
status.messages.emplace_back(
82-
fmt::format("Connected signal \"{}\" doesn't contain a descroptor for a domain signal. This is not allowed.",
83-
sigCtx.inputPort.getSignal().getGlobalId()));
84-
status.success = false;
79+
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a descroptor for a domain signal. This is not allowed.",
80+
sigCtx.inputPort.getSignal().getGlobalId()));
8581
}
86-
if (auto domainDataRule = signal.getDomainSignal().getDescriptor().getRule(); domainDataRule.getType() != DataRuleType::Linear)
82+
else
8783
{
88-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" has an incompatible data rule for its domain signal.",
89-
sigCtx.inputPort.getSignal().getGlobalId()));
90-
status.success = false;
84+
if (auto domainDataRule = signal.getDomainSignal().getDescriptor().getRule(); domainDataRule.getType() != DataRuleType::Linear)
85+
{
86+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible data rule for its domain signal.",
87+
sigCtx.inputPort.getSignal().getGlobalId()));
88+
}
89+
if (signal.getDomainSignal().getDescriptor().getSampleType() != SampleType::UInt64 &&
90+
signal.getDomainSignal().getDescriptor().getSampleType() != SampleType::Int64)
91+
{
92+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type for its domain signal. "
93+
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
94+
sigCtx.inputPort.getSignal().getGlobalId()));
95+
}
96+
if (auto unit = signal.getDomainSignal().getDescriptor().getUnit(); !unit.assigned() || unit.getSymbol() != "s")
97+
{
98+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible unit for its domain signal. "
99+
"Only 's' (seconds) is allowed.",
100+
sigCtx.inputPort.getSignal().getGlobalId()));
101+
}
91102
}
103+
92104
if (!signal.getDescriptor().assigned())
93105
{
94-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
95-
sigCtx.inputPort.getSignal().getGlobalId()));
96-
status.success = false;
106+
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
107+
sigCtx.inputPort.getSignal().getGlobalId()));
97108
}
98109
if (auto demensions = signal.getDescriptor().getDimensions(); demensions.assigned() && demensions.getCount() > 0)
99110
{
100-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
101-
sigCtx.inputPort.getSignal().getGlobalId()));
102-
status.success = false;
111+
status.addError(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
112+
sigCtx.inputPort.getSignal().getGlobalId()));
103113
}
104114
if (auto sampleType = signal.getDescriptor().getSampleType(); allowedSampleTypes.find(sampleType) == allowedSampleTypes.cend())
105115
{
106-
status.messages.emplace_back(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
107-
sigCtx.inputPort.getSignal().getGlobalId(),
108-
convertSampleTypeToString(sampleType)));
109-
status.success = false;
116+
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
117+
sigCtx.inputPort.getSignal().getGlobalId(),
118+
convertSampleTypeToString(sampleType)));
110119
}
111120
}
112121

@@ -131,8 +140,7 @@ ProcedureStatus MultipleSharedHandler::validateSignalContexts(const std::vector<
131140

132141
if (error)
133142
{
134-
status.messages.emplace_back(fmt::format("Connected signals have incompatible sample rates. This is not allowed."));
135-
status.success = false;
143+
status.addError(fmt::format("Connected signals have incompatible sample rates. This is not allowed."));
136144
}
137145
return status;
138146
}
@@ -142,14 +150,13 @@ TimestampTickStruct MultipleSharedHandler::domainToTs(const MultiReaderStatusPtr
142150
TimestampTickStruct res;
143151
const auto descriptor =
144152
status.getMainDescriptor().getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR).asPtr<IDataDescriptor>();
145-
const auto [ratioNum, ratioDen] = calculateRatio(descriptor);
146153
const uint64_t offset = status.getOffset().getValue<uint64_t>(0);
147154
const uint64_t start = descriptor.getRule().getParameters().get("start").getValue<uint64_t>(0);
148155
const uint64_t refOffset = descriptor.getReferenceDomainInfo().getReferenceDomainOffset().getValue<uint64_t>(0);
149156

150157
res.firstTick = offset + start + refOffset;
151-
res.ratioNum = ratioNum;
152-
res.ratioDen = ratioDen;
158+
res.ratioNum = descriptor.getTickResolution().simplify().getNumerator();
159+
res.ratioDen = descriptor.getTickResolution().getDenominator();
153160
res.multiplier = 1'000'000; // amount of us in a second
154161
res.delta = descriptor.getRule().getParameters().get("delta").getValue<uint64_t>(0);
155162
return res;
@@ -168,14 +175,28 @@ std::string MultipleSharedHandler::toString(const SampleType sampleType, const s
168175
{
169176
case SampleType::Float64:
170177
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Float64>::Type*>(data) + offset)));
178+
case SampleType::Float32:
179+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Float32>::Type*>(data) + offset)));
171180
case SampleType::UInt64:
172181
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt64>::Type*>(data) + offset)));
173182
case SampleType::Int64:
174183
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int64>::Type*>(data) + offset)));
184+
case SampleType::UInt32:
185+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt32>::Type*>(data) + offset)));
186+
case SampleType::Int32:
187+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int32>::Type*>(data) + offset)));
188+
case SampleType::UInt16:
189+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt16>::Type*>(data) + offset)));
190+
case SampleType::Int16:
191+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int16>::Type*>(data) + offset)));
192+
case SampleType::UInt8:
193+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt8>::Type*>(data) + offset)));
194+
case SampleType::Int8:
195+
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int8>::Type*>(data) + offset)));
175196
default:
176197
break;
177198
}
178-
return "";
199+
return "unsupported";
179200
}
180201

181202
template <typename T>
@@ -187,6 +208,9 @@ std::string MultipleSharedHandler::toString(const std::string& valueFieldName, v
187208
std::string MultipleSharedHandler::tsToString(TimestampTickStruct tsStruct, SizeT offset)
188209
{
189210
// const uint64_t epochTime = (firstTick + delta * offset) * ratioNum * US_IN_S / ratioDen; // us
211+
const uint64_t g = std::gcd(tsStruct.multiplier, tsStruct.ratioDen);
212+
tsStruct.multiplier /= g;
213+
tsStruct.ratioDen /= g;
190214
return fmt::format("\"timestamp\" : {}",
191215
std::to_string(((tsStruct.firstTick + tsStruct.delta * offset) * tsStruct.ratioNum * tsStruct.multiplier) /
192216
tsStruct.ratioDen));

mqtt_streaming_client_module/src/multisingle_handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ std::string MultisingleHandler::toString(const std::string valueFieldName, const
6464
tsOss << ", ";
6565
}
6666

67-
dataOss << SingleHandler::toString(dataPackets[i]);
67+
dataOss << HandlerBase::toString(dataPackets[i]);
6868

6969
if (auto domainPacket = dataPackets[i].getDomainPacket(); domainPacket.assigned())
7070
{

0 commit comments

Comments
 (0)