-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsignal_arr_atomic_sample_handler.cpp
More file actions
159 lines (147 loc) · 6.38 KB
/
signal_arr_atomic_sample_handler.cpp
File metadata and controls
159 lines (147 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include <mqtt_streaming_module/signal_arr_atomic_sample_handler.h>
#include <opendaq/custom_log.h>
#include <opendaq/event_packet_ids.h>
#include <opendaq/event_packet_params.h>
#include <opendaq/event_packet_ptr.h>
#include <opendaq/reader_factory.h>
#include <opendaq/reader_utils.h>
#include <opendaq/sample_type_traits.h>
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
SignalArrayAtomicSampleHandler::SignalArrayAtomicSampleHandler(WeakRefPtr<IFunctionBlock> parentFb, SignalValueJSONKey signalNamesMode, std::string topic)
: HandlerBase(parentFb, signalNamesMode),
topic(topic)
{
}
MqttData SignalArrayAtomicSampleHandler::processSignalContexts(std::vector<SignalContext>& signalContexts)
{
MqttData messages;
bool dataAvailable = true;
while (dataAvailable)
{
dataAvailable = false;
std::vector<std::string> array;
for (const auto& signalContext : signalContexts)
{
const auto conn = signalContext.inputPort.getConnection();
if (!conn.assigned())
continue;
PacketPtr packet = conn.dequeue();
if (packet.assigned())
{
dataAvailable = true;
if (packet.getType() == PacketType::Event)
{
auto eventPacket = packet.asPtr<IEventPacket>(true);
LOG_T("Processing {} event", eventPacket.getEventId());
}
else if (packet.getType() == PacketType::Data)
{
const auto signal = signalContext.inputPort.getSignal();
std::string valueFieldName = buildValueFieldName(signalNamesMode, signal);
array.emplace_back(toString(valueFieldName, packet));
}
}
}
if (array.empty())
continue;
std::string topic = buildTopicName();
std::string msg = messageFromArray(array);
messages.data.emplace_back(MqttDataSample{signalContexts[0].previewSignal, std::move(topic), std::move(msg)});
}
return messages;
}
ProcedureStatus SignalArrayAtomicSampleHandler::validateSignalContexts(const std::vector<SignalContext>& signalContexts) const
{
ProcedureStatus status{true, {}};
for (const auto& sigCtx : signalContexts)
{
auto signal = sigCtx.inputPort.getSignal();
if (!signal.assigned())
continue;
if (!signal.getDescriptor().assigned())
{
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
sigCtx.inputPort.getSignal().getGlobalId()));
}
if (auto demensions = signal.getDescriptor().getDimensions(); demensions.assigned() && demensions.getCount() > 0)
{
status.addError(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
sigCtx.inputPort.getSignal().getGlobalId()));
}
if (auto sampleType = signal.getDescriptor().getSampleType(); allowedSampleTypes.find(sampleType) == allowedSampleTypes.cend())
{
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
sigCtx.inputPort.getSignal().getGlobalId(),
convertSampleTypeToString(sampleType)));
}
if (auto dSignal = signal.getDomainSignal(); dSignal.assigned())
{
auto descriptor = dSignal.getDescriptor();
if (!descriptor.assigned())
{
status.addError(fmt::format("Connected signal \"{}\" has a domain signal without descriptor. This is not allowed.",
sigCtx.inputPort.getSignal().getGlobalId()));
}
else if (descriptor.getSampleType() != SampleType::UInt64 && descriptor.getSampleType() != SampleType::Int64)
{
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type for its domain signal. "
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
sigCtx.inputPort.getSignal().getGlobalId()));
}
else if (auto unit = descriptor.getUnit(); !unit.assigned() || unit.getSymbol() != "s")
{
status.addError(fmt::format("Connected signal \"{}\" has an incompatible unit for its domain signal. "
"Only 's' (seconds) is allowed.",
sigCtx.inputPort.getSignal().getGlobalId()));
}
}
}
status.merge(HandlerBase::validateSignalContexts(signalContexts));
return status;
}
ProcedureStatus SignalArrayAtomicSampleHandler::signalListChanged(std::vector<SignalContext>&)
{
return ProcedureStatus{true, {}};
}
ListPtr<IString> SignalArrayAtomicSampleHandler::getTopics(const std::vector<SignalContext>&)
{
auto res = List<IString>(String(buildTopicName()));
return res;
}
std::string SignalArrayAtomicSampleHandler::getSchema()
{
return fmt::format("[{{\"{}\" : <sample_value>, \"timestamp\": <timestamp_ns>}}, ..., {{\"{}\" : <sample_value>, \"timestamp\": <timestamp_ns>}}]", buildValueFieldNameForSchema(signalNamesMode, "_0"), buildValueFieldNameForSchema(signalNamesMode, "_N"));
}
std::string SignalArrayAtomicSampleHandler::toString(const std::string valueFieldName, daq::DataPacketPtr packet)
{
std::string result;
std::string data = HandlerBase::toString(packet, 0);
if (auto domainPacket = packet.getDomainPacket(); domainPacket.assigned())
{
uint64_t ts = convertToEpoch(domainPacket, 0);
result = fmt::format("{{\"{}\" : {}, \"timestamp\": {}}}", valueFieldName, data, ts);
}
else
{
result = fmt::format("{{\"{}\" : {}}}", valueFieldName, data);
}
return result;
}
std::string SignalArrayAtomicSampleHandler::buildTopicName()
{
return topic;
}
std::string SignalArrayAtomicSampleHandler::messageFromArray(const std::vector<std::string>& array)
{
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < array.size(); ++i)
{
if (i > 0)
oss << ", ";
oss << std::move(array[i]);
}
oss << "]";
return oss.str();
}
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE