From bc07a352e1976d29ebe083c15cce6f227b4e0e50 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 12 Mar 2018 11:45:38 +1100 Subject: [PATCH] pubsub: make Publisher/Subscriber accept plain strings (#3018) --- .../com/google/cloud/pubsub/v1/Publisher.java | 47 ++++++++++++++----- .../google/cloud/pubsub/v1/Subscriber.java | 33 ++++++++----- .../cloud/pubsub/v1/PublisherImplTest.java | 23 +++++---- 3 files changed, 67 insertions(+), 36 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 4db17723937e..e09cacbb40cc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -40,19 +40,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.TopicNames; import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; -import org.threeten.bp.Duration; - -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -68,6 +66,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * A Cloud Pub/Sub publisher, that is @@ -90,8 +90,7 @@ public class Publisher { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); - private final ProjectTopicName topicName; - private final String cachedTopicNameString; + private final String topicName; private final BatchingSettings batchingSettings; private final RetrySettings retrySettings; @@ -124,7 +123,6 @@ public static long getApiMaxRequestBytes() { private Publisher(Builder builder) throws IOException { topicName = builder.topicName; - cachedTopicNameString = topicName.toString(); this.batchingSettings = builder.batchingSettings; this.retrySettings = builder.retrySettings; @@ -167,7 +165,12 @@ private Publisher(Builder builder) throws IOException { } /** Topic which the publisher publishes to. */ - public ProjectTopicName getTopicName() { + public TopicName getTopicName() { + return TopicNames.parse(topicName); + } + + /** Topic which the publisher publishes to. */ + public String getTopicNameString() { return topicName; } @@ -312,7 +315,7 @@ private void publishAllOutstanding() { private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); - publishRequest.setTopic(cachedTopicNameString); + publishRequest.setTopic(topicName); for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } @@ -497,6 +500,7 @@ interface LongRandom { * Constructs a new {@link Builder} using the given topic. * *

Example of creating a {@code Publisher}. + * *

{@code
    * String projectName = "my_project";
    * String topicName = "my_topic";
@@ -509,9 +513,28 @@ interface LongRandom {
    *   publisher.shutdown();
    * }
    * }
+ */ + public static Builder newBuilder(TopicName topicName) { + return newBuilder(topicName.toString()); + } + + /** + * Constructs a new {@link Builder} using the given topic. + * + *

Example of creating a {@code Publisher}. * + *

{@code
+   * String topic = "projects/my_project/topics/my_topic";
+   * Publisher publisher = Publisher.newBuilder(topic).build();
+   * try {
+   *   // ...
+   * } finally {
+   *   // When finished with the publisher, make sure to shutdown to free up resources.
+   *   publisher.shutdown();
+   * }
+   * }
*/ - public static Builder newBuilder(ProjectTopicName topicName) { + public static Builder newBuilder(String topicName) { return new Builder(topicName); } @@ -556,7 +579,7 @@ public long nextLong(long least, long bound) { .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors()) .build(); - ProjectTopicName topicName; + String topicName; // Batching options BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; @@ -574,7 +597,7 @@ public long nextLong(long least, long bound) { CredentialsProvider credentialsProvider = TopicAdminSettings.defaultCredentialsProviderBuilder().build(); - private Builder(ProjectTopicName topic) { + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index bd1d87c910c2..610885273058 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -107,8 +107,7 @@ public class Subscriber extends AbstractApiService { private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); - private final ProjectSubscriptionName subscriptionName; - private final String cachedSubscriptionNameString; + private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; @@ -135,7 +134,6 @@ private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; - cachedSubscriptionNameString = subscriptionName.toString(); Preconditions.checkArgument( builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive"); @@ -204,19 +202,32 @@ public void close() throws IOException { /** * Constructs a new {@link Builder}. * - *

Once {@link Builder#build} is called a gRPC stub will be created for use of the {@link - * Subscriber}. - * * @param subscription Cloud Pub/Sub subscription to bind the subscriber to * @param receiver an implementation of {@link MessageReceiver} used to process the received * messages */ public static Builder newBuilder(ProjectSubscriptionName subscription, MessageReceiver receiver) { + return newBuilder(subscription.toString(), receiver); + } + + /** + * Constructs a new {@link Builder}. + * + * @param subscription Cloud Pub/Sub subscription to bind the subscriber to + * @param receiver an implementation of {@link MessageReceiver} used to process the received + * messages + */ + public static Builder newBuilder(String subscription, MessageReceiver receiver) { return new Builder(subscription, receiver); } /** Subscription which the subscriber is subscribed to. */ public ProjectSubscriptionName getSubscriptionName() { + return ProjectSubscriptionName.parse(subscriptionName); + } + + /** Subscription which the subscriber is subscribed to. */ + public String getSubscriptionNameString() { return subscriptionName; } @@ -343,9 +354,7 @@ private void startPollingConnections() throws IOException { } Subscription subscriptionInfo = getSubStub.getSubscription( - GetSubscriptionRequest.newBuilder() - .setSubscription(cachedSubscriptionNameString) - .build()); + GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build()); for (Channel channel : channels) { SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel); @@ -401,7 +410,7 @@ private void startStreamingConnections() throws IOException { } streamingSubscriberConnections.add( new StreamingSubscriberConnection( - cachedSubscriptionNameString, + subscriptionName, receiver, ackExpirationPadding, maxAckExtensionPeriod, @@ -491,7 +500,7 @@ public static final class Builder { * Runtime.getRuntime().availableProcessors()) .build(); - ProjectSubscriptionName subscriptionName; + String subscriptionName; MessageReceiver receiver; Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; @@ -519,7 +528,7 @@ public static final class Builder { boolean useStreaming = true; int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE; - Builder(ProjectSubscriptionName subscriptionName, MessageReceiver receiver) { + Builder(String subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; this.receiver = receiver; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 1495c5f32071..79e0015275f7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,6 +16,12 @@ package com.google.cloud.pubsub.v1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.ExecutorProvider; @@ -35,6 +41,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,17 +52,6 @@ import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - @RunWith(JUnit4.class) public class PublisherImplTest { @@ -472,7 +471,7 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC, builder.topicName); + assertEquals(TEST_TOPIC.toString(), builder.topicName); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); assertEquals( Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,