Skip to content

Commit 06f89c1

Browse files
committed
mqtt: formatting only
1 parent 51013eb commit 06f89c1

10 files changed

Lines changed: 77 additions & 55 deletions

File tree

mqtt_streaming_client_module/include/mqtt_streaming_client_module/handler_base.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class HandlerBase
3030
virtual MqttData processSignalContexts(std::vector<SignalContext>& signalContexts) = 0;
3131
virtual ProcedureStatus validateSignalContexts(const std::vector<SignalContext>& signalContexts) const = 0;
3232
virtual ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) = 0;
33+
3334
protected:
3435
static std::pair<uint64_t, uint64_t> calculateRatio(const DataDescriptorPtr descriptor)
3536
{
@@ -58,7 +59,7 @@ class HandlerBase
5859
ts = *(static_cast<uint64_t*>(domainPacket.getData()));
5960
else if (domainPacket.getDataDescriptor().getSampleType() == SampleType::Int64)
6061
ts = *(static_cast<int64_t*>(domainPacket.getData()));
61-
ts = ts * num / den; // us
62+
ts = ts * num / den; // us
6263
return ts;
6364
}
6465

mqtt_streaming_client_module/include/mqtt_streaming_client_module/handler_factory.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
#pragma once
1818

1919
#include "mqtt_streaming_client_module/types.h"
20-
#include <mqtt_streaming_client_module/single_handler.h>
21-
#include <mqtt_streaming_client_module/multisingle_handler.h>
2220
#include <mqtt_streaming_client_module/multiple_handler.h>
2321
#include <mqtt_streaming_client_module/multiple_shared_handler.h>
22+
#include <mqtt_streaming_client_module/multisingle_handler.h>
23+
#include <mqtt_streaming_client_module/single_handler.h>
2424

2525
BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE
2626

mqtt_streaming_client_module/include/mqtt_streaming_client_module/mqtt_publisher_fb_impl.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ class MqttPublisherFbImpl final : public FunctionBlock
2828
{
2929
public:
3030
explicit MqttPublisherFbImpl(const ContextPtr& ctx,
31-
const ComponentPtr& parent,
32-
const FunctionBlockTypePtr& type,
33-
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient,
34-
const PropertyObjectPtr& config = nullptr);
31+
const ComponentPtr& parent,
32+
const FunctionBlockTypePtr& type,
33+
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient,
34+
const PropertyObjectPtr& config = nullptr);
3535
~MqttPublisherFbImpl();
3636

3737
static FunctionBlockTypePtr CreateType();
@@ -57,12 +57,11 @@ class MqttPublisherFbImpl final : public FunctionBlock
5757
void readProperties();
5858
void updateInputPorts();
5959
void validateInputPorts();
60-
template<typename retT, typename intfT>
60+
template <typename retT, typename intfT>
6161
retT readProperty(const std::string& propertyName, const retT defaultValue);
6262
void runReaderThread();
6363
void readerLoop();
6464
void sendMessages(const MqttData& data);
65-
//void removed() override;
6665
};
6766

6867
END_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE

mqtt_streaming_client_module/include/mqtt_streaming_client_module/multiple_handler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class MultipleHandler : public HandlerBase
2929
MqttData processSignalContexts(std::vector<SignalContext>& signalContexts) override;
3030
ProcedureStatus validateSignalContexts(const std::vector<SignalContext>& signalContexts) const override;
3131
ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) override;
32+
3233
protected:
3334
bool useSignalNames;
3435
const std::string topic;

mqtt_streaming_client_module/include/mqtt_streaming_client_module/multiple_shared_handler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@ class MultipleSharedHandler : public HandlerBase
3838
MqttData processSignalContexts(std::vector<SignalContext>& signalContexts) override;
3939
ProcedureStatus validateSignalContexts(const std::vector<SignalContext>& signalContexts) const override;
4040
ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) override;
41+
4142
protected:
4243
bool useSignalNames;
4344
const size_t buffersSize;
4445
const std::string topic;
4546
std::vector<void*> dataBuffers;
4647
daq::MultiReaderPtr reader;
4748

48-
template<typename T>
49+
template <typename T>
4950
std::string toString(const std::string& valueFieldName, void* data, SizeT offset);
5051
std::string toString(const SampleType sampleType, const std::string& valueFieldName, void* data, SizeT offset);
5152
std::string tsToString(TimestampTickStruct tsStruct, SizeT offset);

mqtt_streaming_client_module/include/mqtt_streaming_client_module/single_handler.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ class SingleHandler : public HandlerBase
2727

2828
MqttData processSignalContexts(std::vector<SignalContext>& signalContexts) override;
2929
ProcedureStatus validateSignalContexts(const std::vector<SignalContext>& signalContexts) const override;
30-
ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) override {return ProcedureStatus{true, {}};};
30+
ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) override
31+
{
32+
return ProcedureStatus{true, {}};
33+
};
34+
3135
protected:
3236
bool useSignalNames;
3337

3438
virtual MqttData processSignalContext(SignalContext& signalContext);
35-
void processSignalDescriptorChanged(SignalContext& signalCtx, const DataDescriptorPtr& valueSigDesc, const DataDescriptorPtr& domainSigDesc);
39+
void
40+
processSignalDescriptorChanged(SignalContext& signalCtx, const DataDescriptorPtr& valueSigDesc, const DataDescriptorPtr& domainSigDesc);
3641
MqttDataSample processDataPacket(SignalContext& signalContext, const DataPacketPtr& dataPacket);
3742
std::string toString(const std::string valueFieldName, daq::DataPacketPtr packet);
3843
std::string buildTopicName(const SignalContext& signalContext);

mqtt_streaming_client_module/src/mqtt_publisher_fb_impl.cpp

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_CLIENT_MODULE
1010
std::atomic<int> MqttPublisherFbImpl::localIndex = 0;
1111

1212
MqttPublisherFbImpl::MqttPublisherFbImpl(const ContextPtr& ctx,
13-
const ComponentPtr& parent,
14-
const FunctionBlockTypePtr& type,
15-
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient,
16-
const PropertyObjectPtr& config)
13+
const ComponentPtr& parent,
14+
const FunctionBlockTypePtr& type,
15+
std::shared_ptr<mqtt::MqttAsyncClient> mqttClient,
16+
const PropertyObjectPtr& config)
1717
: FunctionBlock(type, ctx, parent, getLocalId()),
1818
mqttClient(mqttClient),
1919
jsonDataWorker(loggerComponent),
@@ -53,10 +53,7 @@ FunctionBlockTypePtr MqttPublisherFbImpl::CreateType()
5353
.build());
5454
defaultConfig.addProperty(IntProperty(PROPERTY_NAME_PUB_QOS, DEFAULT_PUB_QOS));
5555
defaultConfig.addProperty(IntProperty(PROPERTY_NAME_PUB_READ_PERIOD, DEFAULT_PUB_READ_PERIOD));
56-
const auto fbType = FunctionBlockType(PUB_FB_NAME,
57-
PUB_FB_NAME,
58-
"",
59-
defaultConfig);
56+
const auto fbType = FunctionBlockType(PUB_FB_NAME, PUB_FB_NAME, "", defaultConfig);
6057
return fbType;
6158
}
6259

@@ -98,7 +95,7 @@ void MqttPublisherFbImpl::updateInputPorts()
9895

9996
const auto inputPort = createAndAddInputPort(fmt::format("Input{}", inputPortCount++), PacketReadyNotification::SameThread);
10097

101-
signalContexts.emplace_back(SignalContext{ 0, inputPort });
98+
signalContexts.emplace_back(SignalContext{0, inputPort});
10299
for (size_t i = 0; i < signalContexts.size(); i++)
103100
signalContexts[i].index = i;
104101
}
@@ -159,7 +156,7 @@ void MqttPublisherFbImpl::readProperties()
159156
config.periodMs = DEFAULT_PUB_READ_PERIOD;
160157
}
161158

162-
template<typename retT, typename intfT>
159+
template <typename retT, typename intfT>
163160
retT MqttPublisherFbImpl::readProperty(const std::string& propertyName, const retT defaultValue)
164161
{
165162
retT returnValue{};
@@ -183,7 +180,7 @@ void MqttPublisherFbImpl::runReaderThread()
183180

184181
void MqttPublisherFbImpl::readerLoop()
185182
{
186-
while(running)
183+
while (running)
187184
{
188185
MqttData msgs;
189186
if (hasError == false)

mqtt_streaming_client_module/src/multiple_handler.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,38 +79,38 @@ ProcedureStatus MultipleHandler::validateSignalContexts(const std::vector<Signal
7979
if (!signal.getDescriptor().assigned())
8080
{
8181
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
82-
sigCtx.inputPort.getSignal().getGlobalId()));
82+
sigCtx.inputPort.getSignal().getGlobalId()));
8383
}
8484
if (auto demensions = signal.getDescriptor().getDimensions(); demensions.assigned() && demensions.getCount() > 0)
8585
{
8686
status.addError(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
87-
sigCtx.inputPort.getSignal().getGlobalId()));
87+
sigCtx.inputPort.getSignal().getGlobalId()));
8888
}
8989
if (auto sampleType = signal.getDescriptor().getSampleType(); allowedSampleTypes.find(sampleType) == allowedSampleTypes.cend())
9090
{
9191
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
92-
sigCtx.inputPort.getSignal().getGlobalId(),
93-
convertSampleTypeToString(sampleType)));
92+
sigCtx.inputPort.getSignal().getGlobalId(),
93+
convertSampleTypeToString(sampleType)));
9494
}
9595
if (auto dSignal = signal.getDomainSignal(); dSignal.assigned())
9696
{
9797
auto descriptor = dSignal.getDescriptor();
9898
if (!descriptor.assigned())
9999
{
100100
status.addError(fmt::format("Connected signal \"{}\" has a domain signal without descriptor. This is not allowed.",
101-
sigCtx.inputPort.getSignal().getGlobalId()));
101+
sigCtx.inputPort.getSignal().getGlobalId()));
102102
}
103103
else if (descriptor.getSampleType() != SampleType::UInt64 && descriptor.getSampleType() != SampleType::Int64)
104104
{
105105
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type for its domain signal. "
106-
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
107-
sigCtx.inputPort.getSignal().getGlobalId()));
106+
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
107+
sigCtx.inputPort.getSignal().getGlobalId()));
108108
}
109109
else if (auto unit = descriptor.getUnit(); !unit.assigned() || unit.getSymbol() != "s")
110110
{
111111
status.addError(fmt::format("Connected signal \"{}\" has an incompatible unit for its domain signal. "
112-
"Only 's' (seconds) is allowed.",
113-
sigCtx.inputPort.getSignal().getGlobalId()));
112+
"Only 's' (seconds) is allowed.",
113+
sigCtx.inputPort.getSignal().getGlobalId()));
114114
}
115115
}
116116
}

mqtt_streaming_client_module/src/multiple_shared_handler.cpp

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -174,25 +174,45 @@ std::string MultipleSharedHandler::toString(const SampleType sampleType, const s
174174
switch (sampleType)
175175
{
176176
case SampleType::Float64:
177-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Float64>::Type*>(data) + offset)));
177+
return fmt::format("\"{}\" : {}",
178+
valueFieldName,
179+
std::to_string(*(static_cast<SampleTypeToType<SampleType::Float64>::Type*>(data) + offset)));
178180
case SampleType::Float32:
179-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Float32>::Type*>(data) + offset)));
181+
return fmt::format("\"{}\" : {}",
182+
valueFieldName,
183+
std::to_string(*(static_cast<SampleTypeToType<SampleType::Float32>::Type*>(data) + offset)));
180184
case SampleType::UInt64:
181-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt64>::Type*>(data) + offset)));
185+
return fmt::format("\"{}\" : {}",
186+
valueFieldName,
187+
std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt64>::Type*>(data) + offset)));
182188
case SampleType::Int64:
183-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int64>::Type*>(data) + offset)));
189+
return fmt::format("\"{}\" : {}",
190+
valueFieldName,
191+
std::to_string(*(static_cast<SampleTypeToType<SampleType::Int64>::Type*>(data) + offset)));
184192
case SampleType::UInt32:
185-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt32>::Type*>(data) + offset)));
193+
return fmt::format("\"{}\" : {}",
194+
valueFieldName,
195+
std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt32>::Type*>(data) + offset)));
186196
case SampleType::Int32:
187-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int32>::Type*>(data) + offset)));
197+
return fmt::format("\"{}\" : {}",
198+
valueFieldName,
199+
std::to_string(*(static_cast<SampleTypeToType<SampleType::Int32>::Type*>(data) + offset)));
188200
case SampleType::UInt16:
189-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt16>::Type*>(data) + offset)));
201+
return fmt::format("\"{}\" : {}",
202+
valueFieldName,
203+
std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt16>::Type*>(data) + offset)));
190204
case SampleType::Int16:
191-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int16>::Type*>(data) + offset)));
205+
return fmt::format("\"{}\" : {}",
206+
valueFieldName,
207+
std::to_string(*(static_cast<SampleTypeToType<SampleType::Int16>::Type*>(data) + offset)));
192208
case SampleType::UInt8:
193-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt8>::Type*>(data) + offset)));
209+
return fmt::format("\"{}\" : {}",
210+
valueFieldName,
211+
std::to_string(*(static_cast<SampleTypeToType<SampleType::UInt8>::Type*>(data) + offset)));
194212
case SampleType::Int8:
195-
return fmt::format("\"{}\" : {}", valueFieldName, std::to_string(*(static_cast<SampleTypeToType<SampleType::Int8>::Type*>(data) + offset)));
213+
return fmt::format("\"{}\" : {}",
214+
valueFieldName,
215+
std::to_string(*(static_cast<SampleTypeToType<SampleType::Int8>::Type*>(data) + offset)));
196216
default:
197217
break;
198218
}

mqtt_streaming_client_module/src/single_handler.cpp

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@ SingleHandler::SingleHandler(bool useSignalNames)
1616
MqttData SingleHandler::processSignalContexts(std::vector<SignalContext>& signalContexts)
1717
{
1818
MqttData messages;
19-
for (auto& sigCtx: signalContexts)
19+
for (auto& sigCtx : signalContexts)
2020
{
2121
auto msgs = processSignalContext(sigCtx);
2222
messages.reserve(messages.size() + msgs.size());
23-
messages.insert(messages.end(),
24-
std::make_move_iterator(msgs.begin()),
25-
std::make_move_iterator(msgs.end()));
23+
messages.insert(messages.end(), std::make_move_iterator(msgs.begin()), std::make_move_iterator(msgs.end()));
2624
}
2725
return messages;
2826
}
@@ -48,38 +46,38 @@ ProcedureStatus SingleHandler::validateSignalContexts(const std::vector<SignalCo
4846
if (!signal.getDescriptor().assigned())
4947
{
5048
status.addError(fmt::format("Connected signal \"{}\" doesn't contain a descroptor. This is not allowed.",
51-
sigCtx.inputPort.getSignal().getGlobalId()));
49+
sigCtx.inputPort.getSignal().getGlobalId()));
5250
}
5351
if (auto demensions = signal.getDescriptor().getDimensions(); demensions.assigned() && demensions.getCount() > 0)
5452
{
5553
status.addError(fmt::format("Connected signal \"{}\" has more then 1 demention. This is not allowed.",
56-
sigCtx.inputPort.getSignal().getGlobalId()));
54+
sigCtx.inputPort.getSignal().getGlobalId()));
5755
}
5856
if (auto sampleType = signal.getDescriptor().getSampleType(); allowedSampleTypes.find(sampleType) == allowedSampleTypes.cend())
5957
{
6058
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type ({}).",
61-
sigCtx.inputPort.getSignal().getGlobalId(),
62-
convertSampleTypeToString(sampleType)));
59+
sigCtx.inputPort.getSignal().getGlobalId(),
60+
convertSampleTypeToString(sampleType)));
6361
}
6462
if (auto dSignal = signal.getDomainSignal(); dSignal.assigned())
6563
{
6664
auto descriptor = dSignal.getDescriptor();
6765
if (!descriptor.assigned())
6866
{
6967
status.addError(fmt::format("Connected signal \"{}\" has a domain signal without descriptor. This is not allowed.",
70-
sigCtx.inputPort.getSignal().getGlobalId()));
68+
sigCtx.inputPort.getSignal().getGlobalId()));
7169
}
7270
else if (descriptor.getSampleType() != SampleType::UInt64 && descriptor.getSampleType() != SampleType::Int64)
7371
{
7472
status.addError(fmt::format("Connected signal \"{}\" has an incompatible sample type for its domain signal. "
75-
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
76-
sigCtx.inputPort.getSignal().getGlobalId()));
73+
"Only SampleType::UInt64 and SampleType::Int64 are allowed.",
74+
sigCtx.inputPort.getSignal().getGlobalId()));
7775
}
7876
else if (auto unit = descriptor.getUnit(); !unit.assigned() || unit.getSymbol() != "s")
7977
{
8078
status.addError(fmt::format("Connected signal \"{}\" has an incompatible unit for its domain signal. "
81-
"Only 's' (seconds) is allowed.",
82-
sigCtx.inputPort.getSignal().getGlobalId()));
79+
"Only 's' (seconds) is allowed.",
80+
sigCtx.inputPort.getSignal().getGlobalId()));
8381
}
8482
}
8583
}

0 commit comments

Comments
 (0)