Skip to content

Commit 23b60d1

Browse files
authored
Fix incorrect last sequence id when sending messages in batch (#546)
1 parent 0c6a7c0 commit 23b60d1

5 files changed

Lines changed: 73 additions & 17 deletions

File tree

lib/Commands.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,9 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
902902
batchPayload.write(payload.data(), payload.readableBytes());
903903
}
904904

905-
return messages.back().impl_->metadata.sequence_id();
905+
// Use the first message's sequence_id so that ackReceived can compute
906+
// lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly.
907+
return messages.front().impl_->metadata.sequence_id();
906908
}
907909

908910
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,

lib/ProducerImpl.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -933,19 +933,24 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
933933
return false;
934934
}
935935

936-
uint64_t expectedSequenceId = op.sendArgs->sequenceId;
937-
if (sequenceId > expectedSequenceId) {
938-
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
939-
<< " expecting: " << expectedSequenceId << " queue size=" //
940-
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
936+
const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId;
937+
const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1;
938+
// Broker may ack with either the first or the last sequence id of the batch.
939+
if (sequenceId > expectedLastSequenceId) {
940+
LOG_WARN(getName() << "Got ack for msg " << sequenceId
941+
<< " expecting last: " << expectedLastSequenceId
942+
<< " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_);
941943
return false;
942-
} else if (sequenceId < expectedSequenceId) {
944+
}
945+
if (sequenceId < expectedFirstSequenceId) {
943946
// Ignoring the ack since it's referring to a message that has already timed out.
944-
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
945-
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
946-
<< " producer: " << producerId_);
947+
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " -- MessageId - " << messageId
948+
<< " first-seq: " << expectedFirstSequenceId << " producer: " << producerId_);
947949
return true;
948950
}
951+
// sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; accept as matching this op.
952+
const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId);
953+
lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : sequenceId;
949954

950955
// Message was persisted correctly
951956
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
@@ -960,7 +965,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
960965
}
961966

962967
releaseSemaphoreForSendOp(op);
963-
lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
964968

965969
std::unique_ptr<OpSendMsg> opSendMsg{pendingMessagesQueue_.front().release()};
966970
pendingMessagesQueue_.pop_front();

tests/KeyBasedBatchingTest.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,11 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
134134
sendAsync("B", "3");
135135
sendAsync("C", "4");
136136
sendAsync("A", "5");
137-
// sequence id: B < C < A, so there are 3 batches in order as following:
137+
// Batches are sent in ascending order of the first message's sequence id (BatchMessageKeyBasedContainer
138+
// sorts by sendArgs->sequenceId). Send order gives A=0, B=1, C=2 for first per key, so batches: A, B, C.
139+
// A: 0, 5
138140
// B: 1, 3
139141
// C: 2, 4
140-
// A: 0, 5
141142
latch.wait();
142143

143144
std::vector<std::string> receivedKeys;
@@ -149,8 +150,8 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
149150
receivedValues.emplace_back(msg.getDataAsString());
150151
}
151152

152-
decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"};
153-
decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"};
153+
decltype(receivedKeys) expectedKeys{"A", "A", "B", "B", "C", "C"};
154+
decltype(receivedValues) expectedValues{"0", "5", "1", "3", "2", "4"};
154155
EXPECT_EQ(receivedKeys, expectedKeys);
155156
EXPECT_EQ(receivedValues, expectedValues);
156157
}

tests/ProducerTest.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) {
441441
client.close();
442442
}
443443

444+
// Verifies that getLastSequenceId() is correct after sendAsync + flush when batching is enabled.
445+
// Previously the batch used the last message's sequence_id, causing lastSequenceIdPublished_ to be
446+
// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the first message's
447+
// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount - 1 is correct.
448+
TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) {
449+
Client client(serviceUrl);
450+
451+
const std::string topicName =
452+
"persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" + std::to_string(time(nullptr));
453+
454+
ProducerConfiguration producerConfiguration;
455+
producerConfiguration.setBatchingEnabled(true);
456+
producerConfiguration.setBatchingMaxMessages(10);
457+
producerConfiguration.setBatchingMaxPublishDelayMs(60000);
458+
459+
Producer producer;
460+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
461+
462+
// Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so getLastSequenceId() must be 2.
463+
for (int i = 0; i < 3; i++) {
464+
Message msg = MessageBuilder().setContent("content").build();
465+
producer.sendAsync(msg, nullptr);
466+
}
467+
ASSERT_EQ(ResultOk, producer.flush());
468+
ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last sequence id should be 2";
469+
470+
// Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so getLastSequenceId() must be 4.
471+
for (int i = 0; i < 2; i++) {
472+
Message msg = MessageBuilder().setContent("content").build();
473+
producer.sendAsync(msg, nullptr);
474+
}
475+
ASSERT_EQ(ResultOk, producer.flush());
476+
ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total, last sequence id should be 4";
477+
478+
producer.close();
479+
client.close();
480+
}
481+
444482
TEST_P(ProducerTest, testFlushBatch) {
445483
Client client(serviceUrl);
446484

tests/ReaderTest.cc

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <time.h>
2323

2424
#include <atomic>
25+
#include <chrono>
2526
#include <functional>
2627
#include <future>
2728
#include <set>
@@ -863,7 +864,13 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
863864
}
864865

865866
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
866-
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
867+
// After seek-to-end the broker may close the consumer and trigger reconnect; allow a short
868+
// delay for hasMessageAvailable to become false (avoids flakiness when reconnect completes).
869+
for (int i = 0; i < 50; i++) {
870+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
871+
if (!hasMessageAvailable) break;
872+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
873+
}
867874
ASSERT_FALSE(hasMessageAvailable);
868875

869876
producer.send(MessageBuilder().setContent("msg-2").build());
@@ -876,7 +883,11 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
876883

877884
// Test the 2nd seek
878885
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
879-
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
886+
for (int i = 0; i < 50; i++) {
887+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
888+
if (!hasMessageAvailable) break;
889+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
890+
}
880891
ASSERT_FALSE(hasMessageAvailable);
881892
}
882893

0 commit comments

Comments
 (0)