Skip to content

Commit

Permalink
feat(pubsub): introduce PublisherOptionList (g::c::Options) (#7308)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc authored Sep 14, 2021
1 parent 1e0f138 commit 480cd16
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 44 deletions.
4 changes: 4 additions & 0 deletions google/cloud/pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ add_library(
internal/default_batch_sink.h
internal/default_retry_policies.cc
internal/default_retry_policies.h
internal/defaults.cc
internal/defaults.h
internal/emulator_overrides.cc
internal/emulator_overrides.h
internal/flow_controlled_publisher_connection.cc
Expand Down Expand Up @@ -101,6 +103,7 @@ add_library(
internal/subscription_session.h
message.cc
message.h
options.h
publisher.cc
publisher.h
publisher_connection.cc
Expand Down Expand Up @@ -233,6 +236,7 @@ function (google_cloud_cpp_pubsub_client_define_tests)
ack_handler_test.cc
internal/batching_publisher_connection_test.cc
internal/default_batch_sink_test.cc
internal/defaults_test.cc
internal/emulator_overrides_test.cc
internal/flow_controlled_publisher_connection_test.cc
internal/ordering_key_publisher_connection_test.cc
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/pubsub/google_cloud_cpp_pubsub.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ google_cloud_cpp_pubsub_hdrs = [
"internal/create_channel.h",
"internal/default_batch_sink.h",
"internal/default_retry_policies.h",
"internal/defaults.h",
"internal/emulator_overrides.h",
"internal/flow_controlled_publisher_connection.h",
"internal/ordering_key_publisher_connection.h",
Expand All @@ -51,6 +52,7 @@ google_cloud_cpp_pubsub_hdrs = [
"internal/subscription_message_source.h",
"internal/subscription_session.h",
"message.h",
"options.h",
"publisher.h",
"publisher_connection.h",
"publisher_options.h",
Expand Down Expand Up @@ -82,6 +84,7 @@ google_cloud_cpp_pubsub_srcs = [
"internal/create_channel.cc",
"internal/default_batch_sink.cc",
"internal/default_retry_policies.cc",
"internal/defaults.cc",
"internal/emulator_overrides.cc",
"internal/flow_controlled_publisher_connection.cc",
"internal/ordering_key_publisher_connection.cc",
Expand Down
58 changes: 58 additions & 0 deletions google/cloud/pubsub/internal/defaults.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/internal/defaults.h"
#include "google/cloud/pubsub/options.h"
#include "google/cloud/options.h"
#include <chrono>
#include <limits>

namespace google {
namespace cloud {
namespace pubsub_internal {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

Options DefaultPublisherOptions(Options opts) {
if (!opts.has<pubsub::MaximumHoldTimeOption>()) {
opts.set<pubsub::MaximumHoldTimeOption>(std::chrono::milliseconds(10));
}
if (!opts.has<pubsub::MaximumBatchMessagesOption>()) {
opts.set<pubsub::MaximumBatchMessagesOption>(100);
}
if (!opts.has<pubsub::MaximumBatchBytesOption>()) {
opts.set<pubsub::MaximumBatchBytesOption>(1024 * 1024L);
}
if (!opts.has<pubsub::MaximumPendingBytesOption>()) {
opts.set<pubsub::MaximumPendingBytesOption>(
(std::numeric_limits<std::size_t>::max)());
}
if (!opts.has<pubsub::MaximumPendingMessagesOption>()) {
opts.set<pubsub::MaximumPendingMessagesOption>(
(std::numeric_limits<std::size_t>::max)());
}
if (!opts.has<pubsub::MessageOrderingOption>()) {
opts.set<pubsub::MessageOrderingOption>(false);
}
if (!opts.has<pubsub::FullPublisherActionOption>()) {
opts.set<pubsub::FullPublisherActionOption>(
pubsub::FullPublisherAction::kBlocks);
}

return opts;
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
} // namespace google
33 changes: 33 additions & 0 deletions google/cloud/pubsub/internal/defaults.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_DEFAULTS_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_DEFAULTS_H

#include "google/cloud/pubsub/version.h"
#include "google/cloud/options.h"

namespace google {
namespace cloud {
namespace pubsub_internal {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

Options DefaultPublisherOptions(Options opts);

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_DEFAULTS_H
68 changes: 68 additions & 0 deletions google/cloud/pubsub/internal/defaults_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/internal/defaults.h"
#include "google/cloud/pubsub/options.h"
#include <gmock/gmock.h>
#include <chrono>

namespace google {
namespace cloud {
namespace pubsub_internal {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
namespace {

using ms = std::chrono::milliseconds;

TEST(OptionsTest, PublisherDefaults) {
auto opts = DefaultPublisherOptions(Options{});
EXPECT_EQ(ms(10), opts.get<pubsub::MaximumHoldTimeOption>());
EXPECT_EQ(100, opts.get<pubsub::MaximumBatchMessagesOption>());
EXPECT_EQ(1024 * 1024L, opts.get<pubsub::MaximumBatchBytesOption>());
EXPECT_EQ((std::numeric_limits<std::size_t>::max)(),
opts.get<pubsub::MaximumPendingBytesOption>());
EXPECT_EQ((std::numeric_limits<std::size_t>::max)(),
opts.get<pubsub::MaximumPendingMessagesOption>());
EXPECT_EQ(false, opts.get<pubsub::MessageOrderingOption>());
EXPECT_EQ(pubsub::FullPublisherAction::kBlocks,
opts.get<pubsub::FullPublisherActionOption>());
}

TEST(OptionsTest, UserSetPublisherOptions) {
auto opts =
DefaultPublisherOptions(Options{}
.set<pubsub::MaximumHoldTimeOption>(ms(100))
.set<pubsub::MaximumBatchMessagesOption>(1)
.set<pubsub::MaximumBatchBytesOption>(2)
.set<pubsub::MaximumPendingBytesOption>(3)
.set<pubsub::MaximumPendingMessagesOption>(4)
.set<pubsub::MessageOrderingOption>(true)
.set<pubsub::FullPublisherActionOption>(
pubsub::FullPublisherAction::kIgnored));

EXPECT_EQ(ms(100), opts.get<pubsub::MaximumHoldTimeOption>());
EXPECT_EQ(1U, opts.get<pubsub::MaximumBatchMessagesOption>());
EXPECT_EQ(2U, opts.get<pubsub::MaximumBatchBytesOption>());
EXPECT_EQ(3U, opts.get<pubsub::MaximumPendingBytesOption>());
EXPECT_EQ(4U, opts.get<pubsub::MaximumPendingMessagesOption>());
EXPECT_EQ(true, opts.get<pubsub::MessageOrderingOption>());
EXPECT_EQ(pubsub::FullPublisherAction::kIgnored,
opts.get<pubsub::FullPublisherActionOption>());
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
} // namespace google
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ std::shared_ptr<FlowControlledPublisherConnection> TestFlowControl(
return publish.PushBack("Publish()");
});

auto under_test = FlowControlledPublisherConnection::Create(options, mock);
auto under_test =
FlowControlledPublisherConnection::Create(std::move(options), mock);

auto publisher_task = [&](int iterations) {
for (int i = 0; i != iterations; ++i) {
Expand Down
169 changes: 169 additions & 0 deletions google/cloud/pubsub/options.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_OPTIONS_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_OPTIONS_H

/**
* @file
*
* This file defines options to be used with instances of
* `google::cloud::Options`. By convention options are named with an "Option"
* suffix. As the name would imply, all options are optional, and leaving them
* unset will result in a reasonable default being chosen.
*
* Not all options are meaningful to all functions that accept a
* `google::cloud::Options` instance. Each function that accepts a
* `google::cloud::Options` should document which options it expects. This is
* typically done by indicating lists of options using "OptionList" aliases.
* For example, a function may indicate that users may set any option in
* `PublisherOptionList`.
*
* @note Unrecognized options are allowed and will be ignored. To debug issues
* with options set `GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes` in the environment
* and unexpected options will be logged.
*
* @see `google::cloud::CommonOptionList`
* @see `google::cloud::GrpcOptionList`
*/

#include "google/cloud/pubsub/version.h"
#include "google/cloud/options.h"
#include <chrono>

namespace google {
namespace cloud {
namespace pubsub {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

/**
* The maximum hold time for the messages.
*
* @note The implementation depends on the granularity of your OS timers. It is
* possible that messages are held for slightly longer times than the value
* set here.
*
* @note The first message in a batch starts the hold time counter. New
* messages do not extend the life of the batch. For example, if you have
* set the holding time to 10 milliseconds, start a batch with message 1,
* and publish a second message 5 milliseconds later, the second message
* will be flushed approximately 5 milliseconds after it is published.
*/
struct MaximumHoldTimeOption {
using Type = std::chrono::microseconds;
};

/**
* The maximum number of messages in a batch.
*
* @note Application developers should keep in mind that Cloud Pub/Sub
* sets [limits][pubsub-quota-link] on the size of a batch (1,000 messages)
* The library makes no attempt to validate the value provided in this
* option.
*
* [pubsub-quota-link]: https://cloud.google.com/pubsub/quotas#resource_limits
*/
struct MaximumBatchMessagesOption {
using Type = std::size_t;
};

/**
* The maximum size for the messages in a batch.
*
* @note Application developers should keep in mind that Cloud Pub/Sub
* sets [limits][pubsub-quota-link] on the size of a batch (10MB). The
* library makes no attempt to validate the value provided in this
* option.
*
* [pubsub-quota-link]: https://cloud.google.com/pubsub/quotas#resource_limits
*/
struct MaximumBatchBytesOption {
using Type = std::size_t;
};

/**
* The maximum number of pending messages.
*
* After a publisher flushes a batch of messages the batch is (obviously) not
* received immediately by the service. While the batch remains pending it
* potentially consumes memory resources in the client (and/or the service).
*
* Some applications may have constraints on the number of bytes and/or
* messages they can tolerate in this pending state, and may prefer to block
* or reject messages.
*/
struct MaximumPendingMessagesOption {
using Type = std::size_t;
};

/**
* The maximum size for pending messages.
*
* After a publisher flushes a batch of messages the batch is (obviously) not
* received immediately by the service. While the batch remains pending it
* potentially consumes memory resources in the client (and/or the service).
*
* Some applications may have constraints on the number of bytes and/or
* messages they can tolerate in this pending state, and may prefer to block
* or reject messages.
*/
struct MaximumPendingBytesOption {
using Type = std::size_t;
};

/**
* Publisher message ordering.
*
* To guarantee messages are received by the service in the same order that
* the application gives them to a publisher, the client library needs to wait
* until a batch of messages is successfully delivered before sending the next
* batch, otherwise batches may arrive out of order as there is no guarantee
* the same channel or network path is used for each batch.
*
* For applications that do not care about message ordering, this can limit
* the throughput. Therefore, the behavior is disabled by default.
*
* @see the documentation for the `Publisher` class for details.
*/
struct MessageOrderingOption {
using Type = bool;
};

/// Actions taken by a full publisher.
enum class FullPublisherAction {
/// Ignore full publishers, continue as usual.
kIgnored,
/// Configure the publisher to reject new messages when full.
kRejects,
/// Configure the publisher to block the caller when full.
kBlocks
};
/// The action taken by a full publisher.
struct FullPublisherActionOption {
using Type = FullPublisherAction;
};

/// The list of options specific to publishers
using PublisherOptionList =
OptionList<MaximumHoldTimeOption, MaximumBatchMessagesOption,
MaximumBatchBytesOption, MaximumPendingMessagesOption,
MaximumPendingBytesOption, MessageOrderingOption,
FullPublisherActionOption>;

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_OPTIONS_H
2 changes: 1 addition & 1 deletion google/cloud/pubsub/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace pubsub {
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

Publisher::Publisher(std::shared_ptr<PublisherConnection> connection,
PublisherOptions)
PublisherOptions const&)
: connection_(std::move(connection)) {}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
Expand Down
Loading

0 comments on commit 480cd16

Please sign in to comment.