Skip to content

Commit 428ab4b

Browse files
author
Hemant Kumar
committed
Merge pull request #126 from code-mancers/implement-checks-around-message-counter
Implement checks around message counter
2 parents 6e3b1f3 + 56a77ca commit 428ab4b

14 files changed

Lines changed: 38 additions & 184 deletions

rbkit-lib/model/objectaggregator.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ RBKit::ObjectAggregator::ObjectAggregator()
1212

1313
void RBKit::ObjectAggregator::objCreated(RBKit::ObjectDetailPtr object)
1414
{
15+
if (idToName.end() == idToName.find(object->objectId)) {
16+
++typeToCount[object->className];
17+
++totalObjects;
18+
}
19+
1520
idToName[object->objectId] = object->className;
16-
++typeToCount[object->className];
17-
++totalObjects;
1821
}
1922

2023
void RBKit::ObjectAggregator::objDeleted(quint64 key)

rbkit-lib/model/objectdetail.cpp

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,32 +53,3 @@ QString RBKit::ObjectDetail::getFileLine()
5353
else
5454
return QString("%0:%1").arg(fileName).arg(lineNumber);
5555
}
56-
57-
58-
// ============================== static helper methods ==============================
59-
60-
RBKit::ObjectDetailPtr RBKit::payloadToObject(const QVariantMap& map)
61-
{
62-
auto objectId = map["object_id"].toULongLong();
63-
auto className = map["class_name"].toString();
64-
65-
RBKit::ObjectDetailPtr object(new RBKit::ObjectDetail(className, objectId));
66-
object->fileName = map["file"].toString();
67-
object->lineNumber = map["line"].toInt();
68-
object->addReferences(map["references"].toList());
69-
object->size = map["size"].toInt();
70-
71-
return object;
72-
}
73-
74-
QList<RBKit::ObjectDetailPtr> RBKit::payloadToObjects(const QVariantList& list)
75-
{
76-
QList<RBKit::ObjectDetailPtr> objects;
77-
78-
for (auto& entry : list) {
79-
auto object = RBKit::payloadToObject(entry.toMap());
80-
objects.append(object);
81-
}
82-
83-
return objects;
84-
}

rbkit-lib/model/objectdetail.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ namespace RBKit
4242

4343
// typedef for the pointer.
4444
typedef QSharedPointer<ObjectDetail> ObjectDetailPtr;
45-
46-
QList<ObjectDetailPtr> payloadToObjects(const QVariantList& list);
47-
ObjectDetailPtr payloadToObject(const QVariantMap& map);
4845
}
4946

5047
Q_DECLARE_METATYPE(RBKit::ObjectDetail)

rbkit-lib/rbeventparser.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ RBKit::EventParser::parseEvents(const msgpack::object& objarray) const
7676
RBKit::EventParser::EventParser(const QByteArray& message)
7777
: rawMessage(message)
7878
{
79-
if (1*1024*1024 < message.size()) {
79+
if (5*1024*1024 < message.size()) {
8080
qDebug() << "probably got objdump" << QTime::currentTime();
8181
}
8282

@@ -94,8 +94,10 @@ RBKit::EventDataBase* RBKit::EventParser::parseEvent() const
9494
auto timestamp = map[RBKit::EfTimestamp].as<double>();
9595
auto ts = QDateTime::fromMSecsSinceEpoch(timestamp);
9696

97+
auto counter = map[RBKit::EfMessageCounter].as<unsigned long long>();
98+
9799
auto events = parseEvents(map[RBKit::EfPayload]);
98-
return new RBKit::EvtCollection(ts, eventType, events);
100+
return new RBKit::EvtCollection(ts, eventType, events, counter);
99101
}
100102

101103

rbkit-lib/rbevents.cpp

Lines changed: 3 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1,147 +1,5 @@
1-
#include <msgpack.hpp>
21
#include "subscriber.h"
32
#include "rbevents.h"
4-
#include "mpparser.h"
5-
6-
7-
static QVariantList parseMsgpackObjectArray(const msgpack::object_array&);
8-
static QVariantMap parseMsgpackObjectMap(const msgpack::object_map&);
9-
static QList<RBKit::EventPtr> parseEventCollection(const QVariantList&);
10-
11-
12-
static QVariant parseMsgpackObject(const msgpack::object& obj)
13-
{
14-
switch (obj.type) {
15-
case msgpack::type::ARRAY :
16-
return QVariant(parseMsgpackObjectArray(obj.via.array));
17-
case msgpack::type::MAP :
18-
return QVariant(parseMsgpackObjectMap(obj.via.map));
19-
20-
case msgpack::type::RAW :
21-
return QVariant(RBKit::StringUtil::rawToQString(obj));
22-
case msgpack::type::DOUBLE :
23-
return QVariant(obj.via.dec);
24-
case msgpack::type::POSITIVE_INTEGER :
25-
return QVariant((unsigned long long int)(obj.via.u64));
26-
case msgpack::type::NIL :
27-
return QVariant("");
28-
29-
default:
30-
qDebug() << "throwing error while parsing event" << obj.type;
31-
throw "unknown object type";
32-
}
33-
}
34-
35-
// NOTE: This can be improved with the version that hemant is writing for GCStats.
36-
static QVariantMap parseMsgpackObjectMap(const msgpack::object_map& obj)
37-
{
38-
QVariantMap map;
39-
40-
msgpack::object_kv* list = obj.ptr;
41-
for (uint32_t iter = 0; iter != obj.size; ++iter) {
42-
msgpack::object key = list->key;
43-
msgpack::object val = list->val;
44-
45-
// qDebug() << key.type << val.type;
46-
47-
QString keyStr = RBKit::StringUtil::rawToQString(key);
48-
map[keyStr] = parseMsgpackObject(val);
49-
50-
++list;
51-
}
52-
53-
return map;
54-
}
55-
56-
RBKit::EventDataBase* RBKit::makeEventFromQVariantMap(const QVariantMap &map) {
57-
QDateTime timestamp = QDateTime::fromMSecsSinceEpoch(map["timestamp"].toULongLong());
58-
auto eventType = static_cast<RBKit::EventType>( map["event_type"].toInt() );
59-
60-
RBKit::EventDataBase* event(nullptr);
61-
switch (eventType) {
62-
case RBKit::EtObjCreated:
63-
{
64-
auto object = RBKit::payloadToObject(map["payload"].toMap());
65-
event = new RBKit::EvtNewObject(timestamp, eventType, object);
66-
}
67-
break;
68-
69-
case RBKit::EtObjDestroyed:
70-
// event = new RBKit::EvtDelObject(timestamp, eventType, map["payload"].toMap());
71-
break;
72-
73-
case RBKit::EtGcStats:
74-
event = new RBKit::EvtGcStats(timestamp, eventType, map["payload"].toMap());
75-
break;
76-
77-
case RBKit::EtGcStart:
78-
event = new RBKit::EvtGcStart(timestamp, eventType);
79-
break;
80-
81-
case RBKit::EtGcStartM:
82-
event = new RBKit::EvtGcStartM(timestamp, eventType);
83-
break;
84-
85-
case RBKit::EtGcEndS:
86-
event = new RBKit::EvtGcStop(timestamp, eventType);
87-
break;
88-
89-
case RBKit::EtObjectSpaceDump:
90-
{
91-
auto objects = RBKit::payloadToObjects(map["payload"].toList());
92-
event = new RBKit::EvtObjectDump(timestamp, eventType, objects);
93-
}
94-
break;
95-
96-
case RBKit::EtEventCollection:
97-
{
98-
auto events = parseEventCollection(map["payload"].toList());
99-
event = new RBKit::EvtCollection(timestamp, eventType, events);
100-
}
101-
break;
102-
103-
default:
104-
qDebug() << "Unable to parse event of type: " << eventType;
105-
}
106-
107-
return event;
108-
}
109-
110-
111-
static QVariantList parseMsgpackObjectArray(const msgpack::object_array& array)
112-
{
113-
QVariantList objList;
114-
115-
for (uint32_t iter = 0; iter != array.size; ++iter) {
116-
objList.append(parseMsgpackObject(array.ptr[iter]));
117-
}
118-
119-
return objList;
120-
}
121-
122-
123-
RBKit::EventDataBase* RBKit::parseEvent(const QByteArray& message)
124-
{
125-
msgpack::unpacked unpackedMessage;
126-
msgpack::unpack(&unpackedMessage, message.data(), message.size());
127-
128-
msgpack::object_map obj = unpackedMessage.get().via.map;
129-
130-
QVariantMap map = parseMsgpackObjectMap(obj);
131-
return makeEventFromQVariantMap(map);
132-
}
133-
134-
static QList<RBKit::EventPtr> parseEventCollection(const QVariantList& list)
135-
{
136-
QList<RBKit::EventPtr> events;
137-
138-
for (auto& eventMap : list) {
139-
auto event = RBKit::makeEventFromQVariantMap(eventMap.toMap());
140-
events.append(RBKit::EventPtr(event));
141-
}
142-
143-
return events;
144-
}
1453

1464

1475
// ============================== different events ==============================
@@ -231,9 +89,11 @@ void RBKit::EvtObjectDump::process(Subscriber& processor) const
23189
}
23290

23391
RBKit::EvtCollection::EvtCollection(QDateTime ts, RBKit::EventType eventType,
234-
QList<RBKit::EventPtr> _events)
92+
QList<RBKit::EventPtr> _events,
93+
quint64 _counter)
23594
: EventDataBase(ts, eventType)
23695
, events(_events)
96+
, messageCounter(_counter)
23797
{}
23898

23999
void RBKit::EvtCollection::process(Subscriber& processor) const

rbkit-lib/rbevents.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,13 @@ namespace RBKit
103103
class EvtCollection : public EventDataBase
104104
{
105105
public:
106-
EvtCollection(QDateTime ts, EventType eventType, QList<RBKit::EventPtr>);
106+
EvtCollection(QDateTime ts, EventType eventType, QList<RBKit::EventPtr>,
107+
quint64 counter);
107108
void process(Subscriber& process) const;
108109

109110
QList<RBKit::EventPtr> events;
111+
quint64 messageCounter;
110112
};
111-
112-
EventDataBase* parseEvent(const QByteArray& rawMessage);
113-
EventDataBase* makeEventFromQVariantMap(const QVariantMap& map);
114113
}
115114

116115

rbkit-lib/subscriber.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,17 @@ void Subscriber::setContext(nzmqt::ZMQContext *value)
4040
{
4141
context = value;
4242
}
43+
44+
4345
Subscriber::Subscriber(RBKit::JsBridge* bridge)
44-
:jsBridge(bridge), connectionEstablished(false)
46+
: jsBridge(bridge)
47+
, connectionEstablished(false)
48+
, messageCounter(0)
4549
{
4650
qDebug() << "** Thread is is : " << QThread::currentThreadId();
4751
}
4852

53+
4954
void Subscriber::triggerGc() {
5055
RBKit::CmdTriggerGC triggerGC_Command;
5156
qDebug() << "Triggering GC";
@@ -209,6 +214,8 @@ void Subscriber::processEvent(const RBKit::EvtObjectDump& dump)
209214

210215
void Subscriber::processEvent(const RBKit::EvtCollection& evtCollection)
211216
{
217+
checkForMissingMessages(evtCollection.messageCounter);
218+
212219
for (auto& event : evtCollection.events) {
213220
event->process(*this);
214221
}
@@ -241,3 +248,13 @@ void Subscriber::onTimerExpiry()
241248
QVariantMap map = hashToQVarMap(objectStore->liveStats());
242249
jsBridge->sendMapToJs(eventName, QDateTime(), map);
243250
}
251+
252+
253+
void Subscriber::checkForMissingMessages(const quint64 counter)
254+
{
255+
if (counter != ++messageCounter) {
256+
qDebug() << "missed message pack event messages from"
257+
<< messageCounter << "to" << counter - 1;
258+
messageCounter = counter;
259+
}
260+
}

rbkit-lib/subscriber.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class Subscriber : public QObject
4242
nzmqt::ZMQContext *context;
4343
bool connectionEstablished;
4444

45+
quint64 messageCounter;
46+
4547
public:
4648
explicit Subscriber(RBKit::JsBridge* jsBridge);
4749
~Subscriber();
@@ -76,6 +78,9 @@ public slots:
7678
void triggerGc();
7779
void takeSnapshot();
7880
void startSubscriber();
81+
82+
private:
83+
void checkForMissingMessages(const quint64 counter);
7984
};
8085

8186
#endif // SUBSCRIBER_H

tests/msgpack/gc_start

2 Bytes
Binary file not shown.

tests/msgpack/gcstats

2 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)