diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 8aff6b5fd4c9b..343784819da44 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -137,7 +137,18 @@ StatusOr> MakeAppendableWriter( current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); - hash = CreateHashFunction(*current); + if (persisted_size == 0) { + hash = CreateHashFunction(*current); + } else if (current->get() && + rpc->first_response.has_persisted_data_checksums() && + rpc->first_response.persisted_data_checksums().has_crc32c()) { + hash = std::make_shared< + ::google::cloud::storage::internal::Crc32cHashFunction>( + rpc->first_response.persisted_data_checksums().crc32c(), + persisted_size); + } else { + hash = storage::internal::CreateNullHashFunction(); + } auto checksums = rpc->first_response.has_persisted_data_checksums() ? absl::make_optional( rpc->first_response.persisted_data_checksums()) @@ -758,10 +769,22 @@ AsyncConnectionImpl::ResumeUnbufferedUploadImpl( // In most cases computing a hash for a resumed upload is not feasible. We // lack the data to initialize the hash functions. The one exception is when // the upload resumes from the beginning of the file. - auto hash_function = storage::internal::CreateNullHashFunction(); + std::shared_ptr hash_function = + storage::internal::CreateNullHashFunction(); if (response->persisted_size() == 0) { hash_function = CreateHashFunction(*current); + } else if (current->get() && + response->has_persisted_data_checksums() && + response->persisted_data_checksums().has_crc32c()) { + hash_function = std::make_shared< + ::google::cloud::storage::internal::Crc32cHashFunction>( + response->persisted_data_checksums().crc32c(), + response->persisted_size()); } + auto checksums = + response->has_persisted_data_checksums() + ? absl::make_optional(response->persisted_data_checksums()) + : absl::nullopt; auto configure = [current, upload_id = query.upload_id()](grpc::ClientContext& context) { internal::ConfigureContext(context, *current); @@ -773,7 +796,7 @@ AsyncConnectionImpl::ResumeUnbufferedUploadImpl( std::move(*query.mutable_common_object_request_params()); return UnbufferedUploadImpl(std::move(current), std::move(configure), std::move(proto), std::move(hash_function), - response->persisted_size()); + response->persisted_size(), std::move(checksums)); } future>> @@ -782,7 +805,9 @@ AsyncConnectionImpl::UnbufferedUploadImpl( std::function configure_context, google::storage::v2::BidiWriteObjectRequest request, std::shared_ptr hash_function, - std::int64_t persisted_size) { + std::int64_t persisted_size, + absl::optional + persisted_data_checksums) { using StreamingRpc = AsyncWriterConnectionImpl::StreamingRpc; using StreamingRpcTimeout = google::cloud::internal::AsyncStreamingReadWriteRpcTimeout< @@ -819,14 +844,16 @@ AsyncConnectionImpl::UnbufferedUploadImpl( }; auto transform = [current, request = std::move(request), persisted_size, - hash = std::move(hash_function)](auto f) mutable + hash = std::move(hash_function), + checksums = + std::move(persisted_data_checksums)](auto f) mutable -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); return std::unique_ptr( std::make_unique( current, std::move(request), *std::move(rpc), std::move(hash), - persisted_size, true)); + persisted_size, true, std::move(checksums))); }; auto retry = retry_policy(*current); diff --git a/google/cloud/storage/internal/async/connection_impl.h b/google/cloud/storage/internal/async/connection_impl.h index f52750caa427c..2cc9d5d3a583f 100644 --- a/google/cloud/storage/internal/async/connection_impl.h +++ b/google/cloud/storage/internal/async/connection_impl.h @@ -137,7 +137,9 @@ class AsyncConnectionImpl std::function configure_context, google::storage::v2::BidiWriteObjectRequest request, std::shared_ptr hash_function, - std::int64_t persisted_size); + std::int64_t persisted_size, + absl::optional + persisted_data_checksums = absl::nullopt); future>> AppendableObjectUploadImpl(AppendableUploadParams p); diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index 49b6348f4be3c..7b30eb39c1805 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -825,7 +825,9 @@ TEST_F(AsyncConnectionImplAppendableTest, EXPECT_TRUE(request.finish_write()); EXPECT_TRUE(wopt.is_last_message()); EXPECT_TRUE(request.has_object_checksums()); - EXPECT_EQ(request.object_checksums().crc32c(), 2901820631); + auto expected_crc = google::cloud::storage_internal::ExtendCrc32c( + kPersistedCrc, "some data"); + EXPECT_EQ(request.object_checksums().crc32c(), expected_crc); return sequencer.PushBack("Write(Finalize)"); }); diff --git a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc index 9106e0fd3ba1b..7e985f9599831 100644 --- a/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc @@ -776,6 +776,110 @@ TEST_P(AsyncConnectionImplUploadHashTest, ResumeBufferedWithPersistedData) { next.first.set_value(true); } +TEST_P(AsyncConnectionImplUploadHashTest, + ResumeUnbufferedWithPersistedDataChecksums) { + auto const& param = GetParam(); + + AsyncSequencer sequencer; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncQueryWriteStatus) + .WillOnce( + [&](auto&, auto, auto, + google::storage::v2::QueryWriteStatusRequest const& request) { + EXPECT_EQ(request.upload_id(), "resume-upload-id"); + return sequencer.PushBack("QueryWriteStatus(1)").then([](auto) { + auto response = google::storage::v2::QueryWriteStatusResponse{}; + response.set_persisted_size(256 * 1024); + response.mutable_persisted_data_checksums()->set_crc32c( + 0x12345678); + return make_status_or(response); + }); + }); + EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&]() { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + EXPECT_CALL(*stream, Write) + .WillOnce( + [&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_EQ(request.upload_id(), "resume-upload-id"); + EXPECT_TRUE(request.finish_write()); + if (param.options.get()) { + EXPECT_TRUE(request.object_checksums().has_crc32c()); + } else { + EXPECT_FALSE(request.object_checksums().has_crc32c()); + } + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write"); + }); + EXPECT_CALL(*stream, Read).WillOnce([&] { + return sequencer.PushBack("Read").then([](auto) { + auto response = google::storage::v2::BidiWriteObjectResponse{}; + response.mutable_resource()->set_bucket( + "projects/_/buckets/test-bucket"); + response.mutable_resource()->set_name("test-object"); + response.mutable_resource()->set_generation(123456); + return absl::make_optional(std::move(response)); + }); + }); + EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); + }); + return std::unique_ptr(std::move(stream)); + }); + + auto mock_cq = std::make_shared(); + EXPECT_CALL(*mock_cq, MakeRelativeTimer).Times(0); + + auto options = + DefaultOptionsAsync(param.options) + .set(1) + .set(std::chrono::seconds(0)); + + auto connection = MakeAsyncConnection(CompletionQueue(mock_cq), + std::move(mock), std::move(options)); + auto request = google::storage::v2::QueryWriteStatusRequest{}; + request.set_upload_id("resume-upload-id"); + auto pending = connection->ResumeUnbufferedUpload( + {std::move(request), connection->options()}); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "QueryWriteStatus(1)"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto writer = *std::move(r); + EXPECT_EQ(writer->UploadId(), "resume-upload-id"); + EXPECT_EQ(absl::get(writer->PersistedState()), 256 * 1024); + + auto w2 = writer->Finalize(storage::WritePayload(kQuickFox)); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read"); + next.first.set_value(true); + + auto response = w2.get(); + ASSERT_STATUS_OK(response); + EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket"); + EXPECT_EQ(response->name(), "test-object"); + EXPECT_EQ(response->generation(), 123456); + + writer.reset(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal