From 4281cae0bc7c4961cf966d488bfc10de9ba2b855 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Fri, 5 Jun 2026 07:29:52 +0000 Subject: [PATCH 1/4] feat(storage): Add full object checksum validation when resuming async bidi uploads --- .../storage/internal/async/connection_impl.cc | 38 +++++-- .../storage/internal/async/connection_impl.h | 4 +- .../async/connection_impl_upload_hash_test.cc | 101 ++++++++++++++++++ 3 files changed, 136 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 8aff6b5fd4c9b..993bbf768a03e 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -137,7 +137,16 @@ StatusOr> MakeAppendableWriter( current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); - hash = CreateHashFunction(*current); + if (current->get() && + rpc->first_response.has_persisted_data_checksums() && + rpc->first_response.persisted_data_checksums().has_crc32c()) { + hash = std::make_shared< + ::google::cloud::storage::internal::Crc32cHashFunction>( + rpc->first_response.persisted_data_checksums().crc32c(), + persisted_size); + } else { + hash = CreateHashFunction(*current); + } auto checksums = rpc->first_response.has_persisted_data_checksums() ? absl::make_optional( rpc->first_response.persisted_data_checksums()) @@ -758,10 +767,22 @@ AsyncConnectionImpl::ResumeUnbufferedUploadImpl( // In most cases computing a hash for a resumed upload is not feasible. We // lack the data to initialize the hash functions. The one exception is when // the upload resumes from the beginning of the file. - auto hash_function = storage::internal::CreateNullHashFunction(); + std::shared_ptr hash_function = + storage::internal::CreateNullHashFunction(); if (response->persisted_size() == 0) { hash_function = CreateHashFunction(*current); + } else if (current->get() && + response->has_persisted_data_checksums() && + response->persisted_data_checksums().has_crc32c()) { + hash_function = std::make_shared< + ::google::cloud::storage::internal::Crc32cHashFunction>( + response->persisted_data_checksums().crc32c(), + response->persisted_size()); } + auto checksums = response->has_persisted_data_checksums() + ? absl::make_optional( + response->persisted_data_checksums()) + : absl::nullopt; auto configure = [current, upload_id = query.upload_id()](grpc::ClientContext& context) { internal::ConfigureContext(context, *current); @@ -773,7 +794,8 @@ AsyncConnectionImpl::ResumeUnbufferedUploadImpl( std::move(*query.mutable_common_object_request_params()); return UnbufferedUploadImpl(std::move(current), std::move(configure), std::move(proto), std::move(hash_function), - response->persisted_size()); + response->persisted_size(), + std::move(checksums)); } future>> @@ -782,7 +804,9 @@ AsyncConnectionImpl::UnbufferedUploadImpl( std::function configure_context, google::storage::v2::BidiWriteObjectRequest request, std::shared_ptr hash_function, - std::int64_t persisted_size) { + std::int64_t persisted_size, + absl::optional + persisted_data_checksums) { using StreamingRpc = AsyncWriterConnectionImpl::StreamingRpc; using StreamingRpcTimeout = google::cloud::internal::AsyncStreamingReadWriteRpcTimeout< @@ -819,14 +843,16 @@ AsyncConnectionImpl::UnbufferedUploadImpl( }; auto transform = [current, request = std::move(request), persisted_size, - hash = std::move(hash_function)](auto f) mutable + hash = std::move(hash_function), + checksums = std::move(persisted_data_checksums)]( + auto f) mutable -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); return std::unique_ptr( std::make_unique( current, std::move(request), *std::move(rpc), std::move(hash), - persisted_size, true)); + persisted_size, true, std::move(checksums))); }; auto retry = retry_policy(*current); diff --git a/google/cloud/storage/internal/async/connection_impl.h b/google/cloud/storage/internal/async/connection_impl.h index f52750caa427c..2cc9d5d3a583f 100644 --- a/google/cloud/storage/internal/async/connection_impl.h +++ b/google/cloud/storage/internal/async/connection_impl.h @@ -137,7 +137,9 @@ class AsyncConnectionImpl std::function configure_context, google::storage::v2::BidiWriteObjectRequest request, std::shared_ptr hash_function, - std::int64_t persisted_size); + std::int64_t persisted_size, + absl::optional + persisted_data_checksums = absl::nullopt); future>> AppendableObjectUploadImpl(AppendableUploadParams p); diff --git a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc index 9106e0fd3ba1b..404ace9028e29 100644 --- a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc @@ -776,6 +776,107 @@ TEST_P(AsyncConnectionImplUploadHashTest, ResumeBufferedWithPersistedData) { next.first.set_value(true); } +TEST_P(AsyncConnectionImplUploadHashTest, + ResumeUnbufferedWithPersistedDataChecksums) { + auto const& param = GetParam(); + + AsyncSequencer sequencer; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncQueryWriteStatus) + .WillOnce( + [&](auto&, auto, auto, + google::storage::v2::QueryWriteStatusRequest const& request) { + EXPECT_EQ(request.upload_id(), "resume-upload-id"); + return sequencer.PushBack("QueryWriteStatus(1)").then([](auto) { + auto response = google::storage::v2::QueryWriteStatusResponse{}; + response.set_persisted_size(256 * 1024); + response.mutable_persisted_data_checksums()->set_crc32c(0x12345678); + return make_status_or(response); + }); + }); + EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&]() { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + EXPECT_CALL(*stream, Write) + .WillOnce( + [&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_EQ(request.upload_id(), "resume-upload-id"); + EXPECT_TRUE(request.finish_write()); + if (param.options.get()) { + EXPECT_TRUE(request.object_checksums().has_crc32c()); + } + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write"); + }); + EXPECT_CALL(*stream, Read).WillOnce([&] { + return sequencer.PushBack("Read").then([](auto) { + auto response = google::storage::v2::BidiWriteObjectResponse{}; + response.mutable_resource()->set_bucket( + "projects/_/buckets/test-bucket"); + response.mutable_resource()->set_name("test-object"); + response.mutable_resource()->set_generation(123456); + return absl::make_optional(std::move(response)); + }); + }); + EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); + }); + return std::unique_ptr(std::move(stream)); + }); + + auto mock_cq = std::make_shared(); + EXPECT_CALL(*mock_cq, MakeRelativeTimer).Times(0); + + auto options = + DefaultOptionsAsync(param.options) + .set(1) + .set(std::chrono::seconds(0)); + + auto connection = MakeAsyncConnection(CompletionQueue(mock_cq), + std::move(mock), std::move(options)); + auto request = google::storage::v2::QueryWriteStatusRequest{}; + request.set_upload_id("resume-upload-id"); + auto pending = connection->ResumeUnbufferedUpload( + {std::move(request), connection->options()}); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "QueryWriteStatus(1)"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto writer = *std::move(r); + EXPECT_EQ(writer->UploadId(), "resume-upload-id"); + EXPECT_EQ(absl::get(writer->PersistedState()), 256 * 1024); + + auto w2 = writer->Finalize(storage::WritePayload(kQuickFox)); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read"); + next.first.set_value(true); + + auto response = w2.get(); + ASSERT_STATUS_OK(response); + EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket"); + EXPECT_EQ(response->name(), "test-object"); + EXPECT_EQ(response->generation(), 123456); + + writer.reset(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal From 9cf8dbf03c0d6ed9a8c5bef455deaa30d39318dd Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Fri, 5 Jun 2026 07:39:55 +0000 Subject: [PATCH 2/4] test(storage): Dynamically verify cumulative CRC in ResumeAppendableObjectUploadWithChecksum --- .../internal/async/connection_impl_appendable_upload_test.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index 49b6348f4be3c..16037e400b379 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -825,7 +825,10 @@ TEST_F(AsyncConnectionImplAppendableTest, EXPECT_TRUE(request.finish_write()); EXPECT_TRUE(wopt.is_last_message()); EXPECT_TRUE(request.has_object_checksums()); - EXPECT_EQ(request.object_checksums().crc32c(), 2901820631); + auto expected_crc = + google::cloud::storage_internal::ExtendCrc32c(kPersistedCrc, + "some data"); + EXPECT_EQ(request.object_checksums().crc32c(), expected_crc); return sequencer.PushBack("Write(Finalize)"); }); From 10e29d0679afb7c206fb84f6c3b27e46b6fbba84 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Fri, 5 Jun 2026 07:44:22 +0000 Subject: [PATCH 3/4] fix(storage): Address review comments for async bidi upload resume --- google/cloud/storage/internal/async/connection_impl.cc | 10 ++++++---- .../internal/async/connection_impl_upload_hash_test.cc | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 993bbf768a03e..f7eebc1aa01b2 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -137,15 +137,17 @@ StatusOr> MakeAppendableWriter( current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); - if (current->get() && - rpc->first_response.has_persisted_data_checksums() && - rpc->first_response.persisted_data_checksums().has_crc32c()) { + if (persisted_size == 0) { + hash = CreateHashFunction(*current); + } else if (current->get() && + rpc->first_response.has_persisted_data_checksums() && + rpc->first_response.persisted_data_checksums().has_crc32c()) { hash = std::make_shared< ::google::cloud::storage::internal::Crc32cHashFunction>( rpc->first_response.persisted_data_checksums().crc32c(), persisted_size); } else { - hash = CreateHashFunction(*current); + hash = storage::internal::CreateNullHashFunction(); } auto checksums = rpc->first_response.has_persisted_data_checksums() ? absl::make_optional( diff --git a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc index 404ace9028e29..1c960d174d6f6 100644 --- a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc @@ -807,6 +807,8 @@ TEST_P(AsyncConnectionImplUploadHashTest, EXPECT_TRUE(request.finish_write()); if (param.options.get()) { EXPECT_TRUE(request.object_checksums().has_crc32c()); + } else { + EXPECT_FALSE(request.object_checksums().has_crc32c()); } EXPECT_TRUE(wopt.is_last_message()); return sequencer.PushBack("Write"); From d2dec7f346b6824407584d5846b5067c57ad9f6f Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Fri, 5 Jun 2026 08:14:33 +0000 Subject: [PATCH 4/4] style(storage): Apply exact formatting requested by CI checkers --- .../storage/internal/async/connection_impl.cc | 15 +++++++-------- .../connection_impl_appendable_upload_test.cc | 5 ++--- .../async/connection_impl_upload_hash_test.cc | 3 ++- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index f7eebc1aa01b2..343784819da44 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -781,10 +781,10 @@ AsyncConnectionImpl::ResumeUnbufferedUploadImpl( response->persisted_data_checksums().crc32c(), response->persisted_size()); } - auto checksums = response->has_persisted_data_checksums() - ? absl::make_optional( - response->persisted_data_checksums()) - : absl::nullopt; + auto checksums = + response->has_persisted_data_checksums() + ? absl::make_optional(response->persisted_data_checksums()) + : absl::nullopt; auto configure = [current, upload_id = query.upload_id()](grpc::ClientContext& context) { internal::ConfigureContext(context, *current); @@ -796,8 +796,7 @@ AsyncConnectionImpl::ResumeUnbufferedUploadImpl( std::move(*query.mutable_common_object_request_params()); return UnbufferedUploadImpl(std::move(current), std::move(configure), std::move(proto), std::move(hash_function), - response->persisted_size(), - std::move(checksums)); + response->persisted_size(), std::move(checksums)); } future>> @@ -846,8 +845,8 @@ AsyncConnectionImpl::UnbufferedUploadImpl( auto transform = [current, request = std::move(request), persisted_size, hash = std::move(hash_function), - checksums = std::move(persisted_data_checksums)]( - auto f) mutable + checksums = + std::move(persisted_data_checksums)](auto f) mutable -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index 16037e400b379..7b30eb39c1805 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -825,9 +825,8 @@ TEST_F(AsyncConnectionImplAppendableTest, EXPECT_TRUE(request.finish_write()); EXPECT_TRUE(wopt.is_last_message()); EXPECT_TRUE(request.has_object_checksums()); - auto expected_crc = - google::cloud::storage_internal::ExtendCrc32c(kPersistedCrc, - "some data"); + auto expected_crc = google::cloud::storage_internal::ExtendCrc32c( + kPersistedCrc, "some data"); EXPECT_EQ(request.object_checksums().crc32c(), expected_crc); return sequencer.PushBack("Write(Finalize)"); }); diff --git a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc index 1c960d174d6f6..7e985f9599831 100644 --- a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc @@ -790,7 +790,8 @@ TEST_P(AsyncConnectionImplUploadHashTest, return sequencer.PushBack("QueryWriteStatus(1)").then([](auto) { auto response = google::storage::v2::QueryWriteStatusResponse{}; response.set_persisted_size(256 * 1024); - response.mutable_persisted_data_checksums()->set_crc32c(0x12345678); + response.mutable_persisted_data_checksums()->set_crc32c( + 0x12345678); return make_status_or(response); }); });