diff --git a/google/cloud/pubsub/CMakeLists.txt b/google/cloud/pubsub/CMakeLists.txt index 1f5aa454d5f5f..b8937c3689323 100644 --- a/google/cloud/pubsub/CMakeLists.txt +++ b/google/cloud/pubsub/CMakeLists.txt @@ -53,6 +53,8 @@ add_library( internal/default_batch_sink.h internal/default_retry_policies.cc internal/default_retry_policies.h + internal/defaults.cc + internal/defaults.h internal/emulator_overrides.cc internal/emulator_overrides.h internal/flow_controlled_publisher_connection.cc @@ -101,6 +103,7 @@ add_library( internal/subscription_session.h message.cc message.h + options.h publisher.cc publisher.h publisher_connection.cc @@ -233,6 +236,7 @@ function (google_cloud_cpp_pubsub_client_define_tests) ack_handler_test.cc internal/batching_publisher_connection_test.cc internal/default_batch_sink_test.cc + internal/defaults_test.cc internal/emulator_overrides_test.cc internal/flow_controlled_publisher_connection_test.cc internal/ordering_key_publisher_connection_test.cc diff --git a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl index e1a844b3bff97..e7fcb1c744526 100644 --- a/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl +++ b/google/cloud/pubsub/google_cloud_cpp_pubsub.bzl @@ -26,6 +26,7 @@ google_cloud_cpp_pubsub_hdrs = [ "internal/create_channel.h", "internal/default_batch_sink.h", "internal/default_retry_policies.h", + "internal/defaults.h", "internal/emulator_overrides.h", "internal/flow_controlled_publisher_connection.h", "internal/ordering_key_publisher_connection.h", @@ -51,6 +52,7 @@ google_cloud_cpp_pubsub_hdrs = [ "internal/subscription_message_source.h", "internal/subscription_session.h", "message.h", + "options.h", "publisher.h", "publisher_connection.h", "publisher_options.h", @@ -82,6 +84,7 @@ google_cloud_cpp_pubsub_srcs = [ "internal/create_channel.cc", "internal/default_batch_sink.cc", "internal/default_retry_policies.cc", + "internal/defaults.cc", "internal/emulator_overrides.cc", "internal/flow_controlled_publisher_connection.cc", "internal/ordering_key_publisher_connection.cc", diff --git a/google/cloud/pubsub/internal/defaults.cc b/google/cloud/pubsub/internal/defaults.cc new file mode 100644 index 0000000000000..fdee88f557871 --- /dev/null +++ b/google/cloud/pubsub/internal/defaults.cc @@ -0,0 +1,58 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/pubsub/internal/defaults.h" +#include "google/cloud/pubsub/options.h" +#include "google/cloud/options.h" +#include +#include + +namespace google { +namespace cloud { +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { + +Options DefaultPublisherOptions(Options opts) { + if (!opts.has()) { + opts.set(std::chrono::milliseconds(10)); + } + if (!opts.has()) { + opts.set(100); + } + if (!opts.has()) { + opts.set(1024 * 1024L); + } + if (!opts.has()) { + opts.set( + (std::numeric_limits::max)()); + } + if (!opts.has()) { + opts.set( + (std::numeric_limits::max)()); + } + if (!opts.has()) { + opts.set(false); + } + if (!opts.has()) { + opts.set( + pubsub::FullPublisherAction::kBlocks); + } + + return opts; +} + +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/pubsub/internal/defaults.h b/google/cloud/pubsub/internal/defaults.h new file mode 100644 index 0000000000000..8317cc6dcec83 --- /dev/null +++ b/google/cloud/pubsub/internal/defaults.h @@ -0,0 +1,33 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_DEFAULTS_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_DEFAULTS_H + +#include "google/cloud/pubsub/version.h" +#include "google/cloud/options.h" + +namespace google { +namespace cloud { +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { + +Options DefaultPublisherOptions(Options opts); + +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_DEFAULTS_H diff --git a/google/cloud/pubsub/internal/defaults_test.cc b/google/cloud/pubsub/internal/defaults_test.cc new file mode 100644 index 0000000000000..b1aad6cedca7a --- /dev/null +++ b/google/cloud/pubsub/internal/defaults_test.cc @@ -0,0 +1,68 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/pubsub/internal/defaults.h" +#include "google/cloud/pubsub/options.h" +#include +#include + +namespace google { +namespace cloud { +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { +namespace { + +using ms = std::chrono::milliseconds; + +TEST(OptionsTest, PublisherDefaults) { + auto opts = DefaultPublisherOptions(Options{}); + EXPECT_EQ(ms(10), opts.get()); + EXPECT_EQ(100, opts.get()); + EXPECT_EQ(1024 * 1024L, opts.get()); + EXPECT_EQ((std::numeric_limits::max)(), + opts.get()); + EXPECT_EQ((std::numeric_limits::max)(), + opts.get()); + EXPECT_EQ(false, opts.get()); + EXPECT_EQ(pubsub::FullPublisherAction::kBlocks, + opts.get()); +} + +TEST(OptionsTest, UserSetPublisherOptions) { + auto opts = + DefaultPublisherOptions(Options{} + .set(ms(100)) + .set(1) + .set(2) + .set(3) + .set(4) + .set(true) + .set( + pubsub::FullPublisherAction::kIgnored)); + + EXPECT_EQ(ms(100), opts.get()); + EXPECT_EQ(1U, opts.get()); + EXPECT_EQ(2U, opts.get()); + EXPECT_EQ(3U, opts.get()); + EXPECT_EQ(4U, opts.get()); + EXPECT_EQ(true, opts.get()); + EXPECT_EQ(pubsub::FullPublisherAction::kIgnored, + opts.get()); +} + +} // namespace +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/pubsub/internal/flow_controlled_publisher_connection_test.cc b/google/cloud/pubsub/internal/flow_controlled_publisher_connection_test.cc index a4b458bb7a4d5..d0c7c244022da 100644 --- a/google/cloud/pubsub/internal/flow_controlled_publisher_connection_test.cc +++ b/google/cloud/pubsub/internal/flow_controlled_publisher_connection_test.cc @@ -165,7 +165,8 @@ std::shared_ptr TestFlowControl( return publish.PushBack("Publish()"); }); - auto under_test = FlowControlledPublisherConnection::Create(options, mock); + auto under_test = + FlowControlledPublisherConnection::Create(std::move(options), mock); auto publisher_task = [&](int iterations) { for (int i = 0; i != iterations; ++i) { diff --git a/google/cloud/pubsub/options.h b/google/cloud/pubsub/options.h new file mode 100644 index 0000000000000..18927810f34f6 --- /dev/null +++ b/google/cloud/pubsub/options.h @@ -0,0 +1,169 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_OPTIONS_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_OPTIONS_H + +/** + * @file + * + * This file defines options to be used with instances of + * `google::cloud::Options`. By convention options are named with an "Option" + * suffix. As the name would imply, all options are optional, and leaving them + * unset will result in a reasonable default being chosen. + * + * Not all options are meaningful to all functions that accept a + * `google::cloud::Options` instance. Each function that accepts a + * `google::cloud::Options` should document which options it expects. This is + * typically done by indicating lists of options using "OptionList" aliases. + * For example, a function may indicate that users may set any option in + * `PublisherOptionList`. + * + * @note Unrecognized options are allowed and will be ignored. To debug issues + * with options set `GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes` in the environment + * and unexpected options will be logged. + * + * @see `google::cloud::CommonOptionList` + * @see `google::cloud::GrpcOptionList` + */ + +#include "google/cloud/pubsub/version.h" +#include "google/cloud/options.h" +#include + +namespace google { +namespace cloud { +namespace pubsub { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { + +/** + * The maximum hold time for the messages. + * + * @note The implementation depends on the granularity of your OS timers. It is + * possible that messages are held for slightly longer times than the value + * set here. + * + * @note The first message in a batch starts the hold time counter. New + * messages do not extend the life of the batch. For example, if you have + * set the holding time to 10 milliseconds, start a batch with message 1, + * and publish a second message 5 milliseconds later, the second message + * will be flushed approximately 5 milliseconds after it is published. + */ +struct MaximumHoldTimeOption { + using Type = std::chrono::microseconds; +}; + +/** + * The maximum number of messages in a batch. + * + * @note Application developers should keep in mind that Cloud Pub/Sub + * sets [limits][pubsub-quota-link] on the size of a batch (1,000 messages) + * The library makes no attempt to validate the value provided in this + * option. + * + * [pubsub-quota-link]: https://cloud.google.com/pubsub/quotas#resource_limits + */ +struct MaximumBatchMessagesOption { + using Type = std::size_t; +}; + +/** + * The maximum size for the messages in a batch. + * + * @note Application developers should keep in mind that Cloud Pub/Sub + * sets [limits][pubsub-quota-link] on the size of a batch (10MB). The + * library makes no attempt to validate the value provided in this + * option. + * + * [pubsub-quota-link]: https://cloud.google.com/pubsub/quotas#resource_limits + */ +struct MaximumBatchBytesOption { + using Type = std::size_t; +}; + +/** + * The maximum number of pending messages. + * + * After a publisher flushes a batch of messages the batch is (obviously) not + * received immediately by the service. While the batch remains pending it + * potentially consumes memory resources in the client (and/or the service). + * + * Some applications may have constraints on the number of bytes and/or + * messages they can tolerate in this pending state, and may prefer to block + * or reject messages. + */ +struct MaximumPendingMessagesOption { + using Type = std::size_t; +}; + +/** + * The maximum size for pending messages. + * + * After a publisher flushes a batch of messages the batch is (obviously) not + * received immediately by the service. While the batch remains pending it + * potentially consumes memory resources in the client (and/or the service). + * + * Some applications may have constraints on the number of bytes and/or + * messages they can tolerate in this pending state, and may prefer to block + * or reject messages. + */ +struct MaximumPendingBytesOption { + using Type = std::size_t; +}; + +/** + * Publisher message ordering. + * + * To guarantee messages are received by the service in the same order that + * the application gives them to a publisher, the client library needs to wait + * until a batch of messages is successfully delivered before sending the next + * batch, otherwise batches may arrive out of order as there is no guarantee + * the same channel or network path is used for each batch. + * + * For applications that do not care about message ordering, this can limit + * the throughput. Therefore, the behavior is disabled by default. + * + * @see the documentation for the `Publisher` class for details. + */ +struct MessageOrderingOption { + using Type = bool; +}; + +/// Actions taken by a full publisher. +enum class FullPublisherAction { + /// Ignore full publishers, continue as usual. + kIgnored, + /// Configure the publisher to reject new messages when full. + kRejects, + /// Configure the publisher to block the caller when full. + kBlocks +}; +/// The action taken by a full publisher. +struct FullPublisherActionOption { + using Type = FullPublisherAction; +}; + +/// The list of options specific to publishers +using PublisherOptionList = + OptionList; + +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_OPTIONS_H diff --git a/google/cloud/pubsub/publisher.cc b/google/cloud/pubsub/publisher.cc index 84f2b80f664b9..122fe10101697 100644 --- a/google/cloud/pubsub/publisher.cc +++ b/google/cloud/pubsub/publisher.cc @@ -20,7 +20,7 @@ namespace pubsub { inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { Publisher::Publisher(std::shared_ptr connection, - PublisherOptions) + PublisherOptions const&) : connection_(std::move(connection)) {} } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS diff --git a/google/cloud/pubsub/publisher.h b/google/cloud/pubsub/publisher.h index 2b1616b26957b..e2781e41c2635 100644 --- a/google/cloud/pubsub/publisher.h +++ b/google/cloud/pubsub/publisher.h @@ -98,7 +98,7 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { class Publisher { public: explicit Publisher(std::shared_ptr connection, - PublisherOptions options = {}); + PublisherOptions const& options = {}); //@{ Publisher(Publisher const&) = default; diff --git a/google/cloud/pubsub/publisher_options.cc b/google/cloud/pubsub/publisher_options.cc index 1d18bb44728e7..69ad23bf556c0 100644 --- a/google/cloud/pubsub/publisher_options.cc +++ b/google/cloud/pubsub/publisher_options.cc @@ -13,15 +13,17 @@ // limitations under the License. #include "google/cloud/pubsub/publisher_options.h" +#include "google/cloud/pubsub/internal/defaults.h" namespace google { namespace cloud { namespace pubsub { inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { -std::chrono::milliseconds constexpr PublisherOptions::kDefaultMaximumHoldTime; -std::size_t constexpr PublisherOptions::kDefaultMaximumMessageCount; -std::size_t constexpr PublisherOptions::kDefaultMaximumMessageSize; +PublisherOptions::PublisherOptions(Options opts) { + internal::CheckExpectedOptions(opts, __func__); + opts_ = pubsub_internal::DefaultPublisherOptions(std::move(opts)); +} } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS } // namespace pubsub diff --git a/google/cloud/pubsub/publisher_options.h b/google/cloud/pubsub/publisher_options.h index 2ad285f50233d..8fe88657795a0 100644 --- a/google/cloud/pubsub/publisher_options.h +++ b/google/cloud/pubsub/publisher_options.h @@ -15,7 +15,9 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_OPTIONS_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_OPTIONS_H +#include "google/cloud/pubsub/options.h" #include "google/cloud/pubsub/version.h" +#include "google/cloud/options.h" #include #include #include @@ -23,6 +25,18 @@ namespace google { namespace cloud { +namespace pubsub { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { +class PublisherOptions; +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub + +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { +Options MakeOptions(pubsub::PublisherOptions); +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal + namespace pubsub { inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { @@ -46,7 +60,20 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { */ class PublisherOptions { public: - PublisherOptions() = default; + PublisherOptions() : PublisherOptions(Options{}) {} + + /** + * Initialize the publisher options. + * + * Expected options are any of the types in the `PublisherOptionList` + * + * @note Unrecognized options will be ignored. To debug issues with options + * set `GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes` in the environment and + * unexpected options will be logged. + * + * @param opts configuration options + */ + explicit PublisherOptions(Options opts); //@{ /** @@ -61,7 +88,7 @@ class PublisherOptions { * [quota limits]: https://cloud.google.com/pubsub/quotas#resource_limits */ std::chrono::microseconds maximum_hold_time() const { - return maximum_hold_time_; + return opts_.get(); } /** @@ -81,13 +108,13 @@ class PublisherOptions { template PublisherOptions& set_maximum_hold_time( std::chrono::duration v) { - maximum_hold_time_ = - std::chrono::duration_cast(v); + opts_.set( + std::chrono::duration_cast(v)); return *this; } std::size_t maximum_batch_message_count() const { - return maximum_batch_message_count_; + return opts_.get(); } /** @@ -101,11 +128,13 @@ class PublisherOptions { * [pubsub-quota-link]: https://cloud.google.com/pubsub/quotas#resource_limits */ PublisherOptions& set_maximum_batch_message_count(std::size_t v) { - maximum_batch_message_count_ = v; + opts_.set(v); return *this; } - std::size_t maximum_batch_bytes() const { return maximum_batch_bytes_; } + std::size_t maximum_batch_bytes() const { + return opts_.get(); + } /** * Set the maximum size for the messages in a batch. @@ -118,7 +147,7 @@ class PublisherOptions { * [pubsub-quota-link]: https://cloud.google.com/pubsub/quotas#resource_limits */ PublisherOptions& set_maximum_batch_bytes(std::size_t v) { - maximum_batch_bytes_ = v; + opts_.set(v); return *this; } //@} @@ -138,7 +167,7 @@ class PublisherOptions { */ /// Return `true` if message ordering is enabled. - bool message_ordering() const { return message_ordering_; } + bool message_ordering() const { return opts_.get(); } /** * Enable message ordering. @@ -146,7 +175,7 @@ class PublisherOptions { * @see the documentation for the `Publisher` class for details. */ PublisherOptions& enable_message_ordering() { - message_ordering_ = true; + opts_.set(true); return *this; } @@ -156,7 +185,7 @@ class PublisherOptions { * @see the documentation for the `Publisher` class for details. */ PublisherOptions& disable_message_ordering() { - message_ordering_ = false; + opts_.set(false); return *this; } //@} @@ -176,73 +205,73 @@ class PublisherOptions { /// Flow control based on pending bytes. PublisherOptions& set_maximum_pending_bytes(std::size_t v) { - maximum_pending_bytes_ = v; + opts_.set(v); return *this; } /// Flow control based on pending messages. PublisherOptions& set_maximum_pending_messages(std::size_t v) { - maximum_pending_messages_ = v; + opts_.set(v); return *this; } - std::size_t maximum_pending_bytes() const { return maximum_pending_bytes_; } + std::size_t maximum_pending_bytes() const { + return opts_.get(); + } std::size_t maximum_pending_messages() const { - return maximum_pending_messages_; + return opts_.get(); } /// The current action for a full publisher bool full_publisher_ignored() const { - return full_publisher_action_ == FullPublisherAction::kIgnored; + return opts_.get() == + FullPublisherAction::kIgnored; } bool full_publisher_rejects() const { - return full_publisher_action_ == FullPublisherAction::kRejects; + return opts_.get() == + FullPublisherAction::kRejects; } bool full_publisher_blocks() const { - return full_publisher_action_ == FullPublisherAction::kBlocks; + return opts_.get() == + FullPublisherAction::kBlocks; } /// Ignore full publishers, continue as usual PublisherOptions& set_full_publisher_ignored() { - full_publisher_action_ = FullPublisherAction::kIgnored; + opts_.set(FullPublisherAction::kIgnored); return *this; } /// Configure the publisher to reject new messages when full. PublisherOptions& set_full_publisher_rejects() { - full_publisher_action_ = FullPublisherAction::kRejects; + opts_.set(FullPublisherAction::kRejects); return *this; } /// Configure the publisher to block the caller when full. PublisherOptions& set_full_publisher_blocks() { - full_publisher_action_ = FullPublisherAction::kBlocks; + opts_.set(FullPublisherAction::kBlocks); return *this; } //@} private: - static auto constexpr kDefaultMaximumHoldTime = std::chrono::milliseconds(10); - static std::size_t constexpr kDefaultMaximumMessageCount = 100; - static std::size_t constexpr kDefaultMaximumMessageSize = 1024 * 1024L; - static std::size_t constexpr kDefaultMaximumPendingBytes = - (std::numeric_limits::max)(); - static std::size_t constexpr kDefaultMaximumPendingMessages = - (std::numeric_limits::max)(); - - enum class FullPublisherAction { kIgnored, kRejects, kBlocks }; - - std::chrono::microseconds maximum_hold_time_ = kDefaultMaximumHoldTime; - std::size_t maximum_batch_message_count_ = kDefaultMaximumMessageCount; - std::size_t maximum_batch_bytes_ = kDefaultMaximumMessageSize; - bool message_ordering_ = false; - std::size_t maximum_pending_bytes_ = kDefaultMaximumPendingBytes; - std::size_t maximum_pending_messages_ = kDefaultMaximumPendingMessages; - FullPublisherAction full_publisher_action_ = FullPublisherAction::kBlocks; + friend Options pubsub_internal::MakeOptions(PublisherOptions); + Options opts_; }; } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS } // namespace pubsub + +namespace pubsub_internal { +inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS { + +inline Options MakeOptions(pubsub::PublisherOptions o) { + return std::move(o.opts_); +} + +} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS +} // namespace pubsub_internal } // namespace cloud } // namespace google diff --git a/google/cloud/pubsub/publisher_options_test.cc b/google/cloud/pubsub/publisher_options_test.cc index 2bfb466ee3d81..e64d974bfe6e3 100644 --- a/google/cloud/pubsub/publisher_options_test.cc +++ b/google/cloud/pubsub/publisher_options_test.cc @@ -79,6 +79,61 @@ TEST(PublisherOptions, FullPublisherAction) { PublisherOptions{}.set_full_publisher_blocks().full_publisher_blocks()); } +TEST(PublisherOptions, OptionsConstructor) { + auto const b = PublisherOptions( + Options{} + .set(std::chrono::seconds(12)) + .set(10) + .set(123) + .set(4) + .set(444) + .set(true) + .set(FullPublisherAction::kRejects)); + + auto const expected = std::chrono::duration_cast( + std::chrono::seconds(12)); + EXPECT_EQ(expected, b.maximum_hold_time()); + EXPECT_EQ(10, b.maximum_batch_message_count()); + EXPECT_EQ(123, b.maximum_batch_bytes()); + EXPECT_EQ(4, b.maximum_pending_messages()); + EXPECT_EQ(444, b.maximum_pending_bytes()); + EXPECT_TRUE(b.message_ordering()); + EXPECT_TRUE(b.full_publisher_rejects()); +} + +TEST(PublisherOptions, MakeOptions) { + auto b = PublisherOptions{} + .set_maximum_hold_time(std::chrono::seconds(12)) + .set_maximum_batch_message_count(10) + .set_maximum_batch_bytes(123) + .set_maximum_pending_messages(4) + .set_maximum_pending_bytes(444) + .enable_message_ordering(); + + auto opts = pubsub_internal::MakeOptions(std::move(b)); + EXPECT_EQ(std::chrono::seconds(12), opts.get()); + EXPECT_EQ(10, opts.get()); + EXPECT_EQ(123, opts.get()); + EXPECT_EQ(4, opts.get()); + EXPECT_EQ(444, opts.get()); + EXPECT_EQ(true, opts.get()); + + auto ignored = PublisherOptions{}.set_full_publisher_ignored(); + opts = pubsub_internal::MakeOptions(std::move(ignored)); + EXPECT_EQ(FullPublisherAction::kIgnored, + opts.get()); + + auto rejects = PublisherOptions{}.set_full_publisher_rejects(); + opts = pubsub_internal::MakeOptions(std::move(rejects)); + EXPECT_EQ(FullPublisherAction::kRejects, + opts.get()); + + auto blocks = PublisherOptions{}.set_full_publisher_blocks(); + opts = pubsub_internal::MakeOptions(std::move(blocks)); + EXPECT_EQ(FullPublisherAction::kBlocks, + opts.get()); +} + } // namespace } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS } // namespace pubsub diff --git a/google/cloud/pubsub/pubsub_client_unit_tests.bzl b/google/cloud/pubsub/pubsub_client_unit_tests.bzl index 8fcdca1af3c39..7bbac7d439bc8 100644 --- a/google/cloud/pubsub/pubsub_client_unit_tests.bzl +++ b/google/cloud/pubsub/pubsub_client_unit_tests.bzl @@ -20,6 +20,7 @@ pubsub_client_unit_tests = [ "ack_handler_test.cc", "internal/batching_publisher_connection_test.cc", "internal/default_batch_sink_test.cc", + "internal/defaults_test.cc", "internal/emulator_overrides_test.cc", "internal/flow_controlled_publisher_connection_test.cc", "internal/ordering_key_publisher_connection_test.cc",