diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc b/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc index 733092b0b8c96..2b61697068675 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc @@ -17,12 +17,38 @@ #include "google/cloud/pubsub/internal/extend_leases_with_retry.h" #include "google/cloud/internal/async_retry_loop.h" #include "google/cloud/log.h" +#include #include namespace google { namespace cloud { namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { +// NOLINTNEXTLINE(misc-no-recursion) +future> WaitAll(std::vector> v) { + if (v.empty()) return make_ready_future(std::vector{}); + auto back = std::move(v.back()); + v.pop_back(); + return WaitAll(std::move(v)).then([b = std::move(back)](auto f) mutable { + return b.then([list = f.get()](auto g) mutable { + list.push_back(g.get()); + return list; + }); + }); +} + +future Reduce(std::vector> v) { + return WaitAll(std::move(v)).then([](auto f) { + auto ready = f.get(); + for (auto& s : ready) { + if (!s.ok()) return std::move(s); + } + return Status{}; + }); +} + +} // namespace StreamingSubscriptionBatchSource::StreamingSubscriptionBatchSource( CompletionQueue cq, @@ -119,8 +145,21 @@ future StreamingSubscriptionBatchSource::BulkNack( request.set_subscription(subscription_full_name_); for (auto& a : ack_ids) *request.add_ack_ids() = std::move(a); request.set_ack_deadline_seconds(0); - return stub_->AsyncModifyAckDeadline( - cq_, absl::make_unique(), request); + + auto requests = + SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage); + if (requests.size() == 1) { + return stub_->AsyncModifyAckDeadline( + cq_, absl::make_unique(), requests.front()); + } + + std::vector> pending(requests.size()); + std::transform(requests.begin(), requests.end(), pending.begin(), + [this](auto const& request) { + return stub_->AsyncModifyAckDeadline( + cq_, absl::make_unique(), request); + }); + return Reduce(std::move(pending)); } void StreamingSubscriptionBatchSource::ExtendLeases( @@ -133,14 +172,17 @@ void StreamingSubscriptionBatchSource::ExtendLeases( request.add_ack_ids(std::move(a)); } std::unique_lock lk(mu_); + auto split = SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage); if (exactly_once_delivery_enabled_.value_or(false)) { lk.unlock(); - (void)ExtendLeasesWithRetry(stub_, cq_, std::move(request)); + for (auto& r : split) (void)ExtendLeasesWithRetry(stub_, cq_, std::move(r)); return; } lk.unlock(); - (void)stub_->AsyncModifyAckDeadline( - cq_, absl::make_unique(), request); + for (auto& r : split) { + (void)stub_->AsyncModifyAckDeadline( + cq_, absl::make_unique(), r); + } } void StreamingSubscriptionBatchSource::StartStream( @@ -370,7 +412,7 @@ void StreamingSubscriptionBatchSource::ShutdownStream( lk.unlock(); auto weak = WeakFromThis(); // There are no pending reads or writes, and something (probable a read or - // write error) recommends we shutdown the stream + // write error) recommends we shut down the stream stream->Finish().then([weak, stream](future f) { if (auto self = weak.lock()) self->OnFinish(f.get()); }); @@ -454,6 +496,30 @@ void StreamingSubscriptionBatchSource::ChangeState( stream_state_ = s; } +std::vector +SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request, + int max_ack_ids) { + // We expect this to be the common case. + if (request.ack_ids_size() <= max_ack_ids) return {std::move(request)}; + + std::vector result; + auto& source = *request.mutable_ack_ids(); + while (request.ack_ids_size() > max_ack_ids) { + google::pubsub::v1::ModifyAckDeadlineRequest r; + r.set_subscription(request.subscription()); + r.set_ack_deadline_seconds(request.ack_deadline_seconds()); + + auto begin = source.begin(); + auto end = std::next(source.begin(), max_ack_ids); + r.mutable_ack_ids()->Reserve(max_ack_ids); + for (auto i = begin; i != end; ++i) r.add_ack_ids(std::move(*i)); + source.erase(begin, end); + result.push_back(std::move(r)); + } + if (!request.ack_ids().empty()) result.push_back(std::move(request)); + return result; +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal } // namespace cloud diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source.h b/google/cloud/pubsub/internal/streaming_subscription_batch_source.h index c571167b4b557..491653c973eba 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source.h +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source.h @@ -68,6 +68,16 @@ class StreamingSubscriptionBatchSource kFinishing, }; + // The maximum size for `ModifyAckDeadlineRequest` is 512 KB: + // https://cloud.google.com/pubsub/quotas#resource_limits + // Typical ack ids are less than 200 bytes. This value is safe, but there is + // no need to over optimize it: + // - Google does not charge for these messages + // - The value is reached rarely + // - The CPU costs saved between 2,048 ids per message vs. the theoretical + // maximum are minimal + static int constexpr kMaxAckIdsPerMessage = 2048; + private: // C++17 adds weak_from_this(), we cannot use the same name as (1) some // versions of the standard library include `weak_from_this()` even with @@ -136,6 +146,11 @@ class StreamingSubscriptionBatchSource std::ostream& operator<<(std::ostream& os, StreamingSubscriptionBatchSource::StreamState s); +/// Split @p request such that each request has at most @p max_ack_ids. +std::vector +SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request, + int max_ack_ids); + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal } // namespace cloud diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc b/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc index 1c9122eec8501..bff92c9496da8 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc @@ -17,9 +17,11 @@ #include "google/cloud/pubsub/subscription.h" #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" #include "google/cloud/pubsub/testing/test_retry_policies.h" +#include "google/cloud/credentials.h" #include "google/cloud/internal/background_threads_impl.h" #include "google/cloud/log.h" #include "google/cloud/testing_util/async_sequencer.h" +#include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/mock_completion_queue_impl.h" #include "google/cloud/testing_util/status_matchers.h" #include @@ -36,6 +38,7 @@ using ::google::cloud::internal::AutomaticallyCreatedBackgroundThreads; using ::google::cloud::internal::RunAsyncBase; using ::google::cloud::testing_util::AsyncSequencer; using ::google::cloud::testing_util::IsOk; +using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::MockCompletionQueueImpl; using ::google::cloud::testing_util::StatusIs; using ::testing::_; @@ -44,6 +47,7 @@ using ::testing::AtLeast; using ::testing::AtMost; using ::testing::ByMove; using ::testing::ElementsAre; +using ::testing::ElementsAreArray; using ::testing::HasSubstr; using ::testing::Property; using ::testing::Return; @@ -115,6 +119,7 @@ std::shared_ptr MakeTestBatchSource( auto subscription = pubsub::Subscription("test-project", "test-subscription"); auto opts = DefaultSubscriberOptions(pubsub_testing::MakeTestOptions( Options{} + .set(MakeInsecureCredentials()) .set(100) .set(100 * 1024 * 1024L) .set(std::chrono::seconds(300)))); @@ -1120,6 +1125,208 @@ TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesWithRetry) { EXPECT_THAT(done.get(), IsOk()); } +TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadlineSmall) { + auto constexpr kMaxIds = 3; + + std::vector bulk_nacks{"fake-001", "fake-002", "fake-003"}; + ModifyRequest request; + request.set_subscription( + "projects/test-project/subscriptions/test-subscription"); + request.set_ack_deadline_seconds(12345); + for (auto id : bulk_nacks) request.add_ack_ids(std::move(id)); + + auto const actual = SplitModifyAckDeadline(request, kMaxIds); + EXPECT_THAT(actual, ElementsAre(IsProtoEqual(request))); +} + +TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadline) { + auto constexpr kMaxIds = 3; + + std::vector bulk_nacks{ + "fake-001", "fake-002", "fake-003", "fake-004", + "fake-005", "fake-006", "fake-007", + }; + ModifyRequest request; + request.set_subscription( + "projects/test-project/subscriptions/test-subscription"); + request.set_ack_deadline_seconds(12345); + for (auto id : bulk_nacks) request.add_ack_ids(std::move(id)); + + std::vector expected(3); + for (auto& e : expected) { + e.set_subscription(request.subscription()); + e.set_ack_deadline_seconds(request.ack_deadline_seconds()); + } + expected[0].add_ack_ids("fake-001"); + expected[0].add_ack_ids("fake-002"); + expected[0].add_ack_ids("fake-003"); + + expected[1].add_ack_ids("fake-004"); + expected[1].add_ack_ids("fake-005"); + expected[1].add_ack_ids("fake-006"); + + expected[2].add_ack_ids("fake-007"); + + auto const actual = SplitModifyAckDeadline(std::move(request), kMaxIds); + EXPECT_THAT(actual, + ElementsAre(IsProtoEqual(expected[0]), IsProtoEqual(expected[1]), + IsProtoEqual(expected[2]))); +} + +std::unique_ptr MakeUnusedStream( + bool enable_exactly_once) { + auto start_response = []() { return make_ready_future(true); }; + auto write_response = [](google::pubsub::v1::StreamingPullRequest const&, + grpc::WriteOptions const&) { + return make_ready_future(true); + }; + auto read_response = [enable_exactly_once]() { + using Response = ::google::pubsub::v1::StreamingPullResponse; + Response response; + if (enable_exactly_once) { + response.mutable_subscription_properties() + ->set_exactly_once_delivery_enabled(true); + } + return make_ready_future(absl::make_optional(std::move(response))); + }; + auto finish_response = []() { return make_ready_future(Status{}); }; + + auto stream = absl::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce(start_response); + EXPECT_CALL(*stream, Write).WillRepeatedly(write_response); + EXPECT_CALL(*stream, Read).WillRepeatedly(read_response); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); + EXPECT_CALL(*stream, Finish).Times(AtMost(1)).WillRepeatedly(finish_response); + return stream; +} + +TEST(StreamingSubscriptionBatchSourceTest, BulkNackMultipleRequests) { + auto constexpr kMaxIds = + StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage; + + std::vector> groups; + auto make_ids = [](std::string const& prefix, int count) { + std::vector ids(count); + std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable { + return prefix + std::to_string(++count); + }); + return ids; + }; + groups.push_back(make_ids("group-1-", kMaxIds)); + groups.push_back(make_ids("group-2-", kMaxIds)); + groups.push_back(make_ids("group-3-", 2)); + + auto make_on_modify = [](std::vector e) { + return [expected_ids = std::move(e)](auto, auto, auto const& request) { + EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids)); + return make_ready_future(Status{}); + }; + }; + + AutomaticallyCreatedBackgroundThreads background; + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, AsyncStreamingPull) + .WillOnce([&](google::cloud::CompletionQueue&, + std::unique_ptr, + google::pubsub::v1::StreamingPullRequest const&) { + return MakeUnusedStream(false); + }); + + EXPECT_CALL( + *mock, + AsyncModifyAckDeadline( + _, _, + Property(&ModifyRequest::subscription, + "projects/test-project/subscriptions/test-subscription"))) + .WillOnce(make_on_modify(groups[0])) + .WillOnce(make_on_modify(groups[1])) + .WillOnce(make_on_modify(groups[2])); + + auto shutdown = std::make_shared(); + auto uut = MakeTestBatchSource(background.cq(), shutdown, mock); + + auto done = shutdown->Start({}); + uut->Start([](StatusOr const&) {}); + + std::vector nacks; + for (auto& ids : groups) { + nacks.insert(nacks.end(), ids.begin(), ids.end()); + } + + uut->BulkNack(nacks); + + shutdown->MarkAsShutdown("test", {}); +} + +void CheckExtendLeasesMultipleRequests(bool enable_exactly_once) { + auto constexpr kMaxIds = + StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage; + + std::vector> groups; + auto make_ids = [](std::string const& prefix, int count) { + std::vector ids(count); + std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable { + return prefix + std::to_string(++count); + }); + return ids; + }; + groups.push_back(make_ids("group-1-", kMaxIds)); + groups.push_back(make_ids("group-2-", kMaxIds)); + groups.push_back(make_ids("group-3-", 2)); + + auto make_on_modify = [](std::vector e) { + return [expected_ids = std::move(e)](auto, auto, auto const& request) { + EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids)); + return make_ready_future(Status{}); + }; + }; + + AutomaticallyCreatedBackgroundThreads background; + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, AsyncStreamingPull) + .WillOnce([&](google::cloud::CompletionQueue&, + std::unique_ptr, + google::pubsub::v1::StreamingPullRequest const&) { + return MakeUnusedStream(enable_exactly_once); + }); + + EXPECT_CALL( + *mock, + AsyncModifyAckDeadline( + _, _, + Property(&ModifyRequest::subscription, + "projects/test-project/subscriptions/test-subscription"))) + .WillOnce(make_on_modify(groups[0])) + .WillOnce(make_on_modify(groups[1])) + .WillOnce(make_on_modify(groups[2])); + + auto shutdown = std::make_shared(); + auto uut = MakeTestBatchSource(background.cq(), shutdown, mock); + + auto done = shutdown->Start({}); + uut->Start([](StatusOr const&) {}); + + std::vector acks; + for (auto& ids : groups) { + acks.insert(acks.end(), ids.begin(), ids.end()); + } + + uut->ExtendLeases(acks, std::chrono::seconds(60)); + + shutdown->MarkAsShutdown("test", {}); +} + +TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesMultipleRequests) { + CheckExtendLeasesMultipleRequests(false); +} + +TEST(StreamingSubscriptionBatchSourceTest, + ExtendLeasesMultipleRequestsWithExactlyOnce) { + CheckExtendLeasesMultipleRequests(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal