Skip to content

Commit

Permalink
pubsub: make Publisher/Subscriber accept plain strings (#3018)
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad authored Mar 12, 2018
1 parent 848245e commit bc07a35
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,17 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import org.threeten.bp.Duration;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -68,6 +66,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
Expand All @@ -90,8 +90,7 @@
public class Publisher {
private static final Logger logger = Logger.getLogger(Publisher.class.getName());

private final ProjectTopicName topicName;
private final String cachedTopicNameString;
private final String topicName;

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
Expand Down Expand Up @@ -124,7 +123,6 @@ public static long getApiMaxRequestBytes() {

private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
cachedTopicNameString = topicName.toString();

this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
Expand Down Expand Up @@ -167,7 +165,12 @@ private Publisher(Builder builder) throws IOException {
}

/** Topic which the publisher publishes to. */
public ProjectTopicName getTopicName() {
public TopicName getTopicName() {
return TopicNames.parse(topicName);
}

/** Topic which the publisher publishes to. */
public String getTopicNameString() {
return topicName;
}

Expand Down Expand Up @@ -312,7 +315,7 @@ private void publishAllOutstanding() {

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
publishRequest.setTopic(cachedTopicNameString);
publishRequest.setTopic(topicName);
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
publishRequest.addMessages(outstandingPublish.message);
}
Expand Down Expand Up @@ -497,6 +500,7 @@ interface LongRandom {
* Constructs a new {@link Builder} using the given topic.
*
* <p>Example of creating a {@code Publisher}.
*
* <pre>{@code
* String projectName = "my_project";
* String topicName = "my_topic";
Expand All @@ -509,9 +513,28 @@ interface LongRandom {
* publisher.shutdown();
* }
* }</pre>
*/
public static Builder newBuilder(TopicName topicName) {
return newBuilder(topicName.toString());
}

/**
* Constructs a new {@link Builder} using the given topic.
*
* <p>Example of creating a {@code Publisher}.
*
* <pre>{@code
* String topic = "projects/my_project/topics/my_topic";
* Publisher publisher = Publisher.newBuilder(topic).build();
* try {
* // ...
* } finally {
* // When finished with the publisher, make sure to shutdown to free up resources.
* publisher.shutdown();
* }
* }</pre>
*/
public static Builder newBuilder(ProjectTopicName topicName) {
public static Builder newBuilder(String topicName) {
return new Builder(topicName);
}

Expand Down Expand Up @@ -556,7 +579,7 @@ public long nextLong(long least, long bound) {
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();

ProjectTopicName topicName;
String topicName;

// Batching options
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
Expand All @@ -574,7 +597,7 @@ public long nextLong(long least, long bound) {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();

private Builder(ProjectTopicName topic) {
private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ public class Subscriber extends AbstractApiService {

private static final Logger logger = Logger.getLogger(Subscriber.class.getName());

private final ProjectSubscriptionName subscriptionName;
private final String cachedSubscriptionNameString;
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
Expand All @@ -135,7 +134,6 @@ private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
subscriptionName = builder.subscriptionName;
cachedSubscriptionNameString = subscriptionName.toString();

Preconditions.checkArgument(
builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive");
Expand Down Expand Up @@ -204,19 +202,32 @@ public void close() throws IOException {
/**
* Constructs a new {@link Builder}.
*
* <p>Once {@link Builder#build} is called a gRPC stub will be created for use of the {@link
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(ProjectSubscriptionName subscription, MessageReceiver receiver) {
return newBuilder(subscription.toString(), receiver);
}

/**
* Constructs a new {@link Builder}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(String subscription, MessageReceiver receiver) {
return new Builder(subscription, receiver);
}

/** Subscription which the subscriber is subscribed to. */
public ProjectSubscriptionName getSubscriptionName() {
return ProjectSubscriptionName.parse(subscriptionName);
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscriptionNameString() {
return subscriptionName;
}

Expand Down Expand Up @@ -343,9 +354,7 @@ private void startPollingConnections() throws IOException {
}
Subscription subscriptionInfo =
getSubStub.getSubscription(
GetSubscriptionRequest.newBuilder()
.setSubscription(cachedSubscriptionNameString)
.build());
GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build());

for (Channel channel : channels) {
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel);
Expand Down Expand Up @@ -401,7 +410,7 @@ private void startStreamingConnections() throws IOException {
}
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
cachedSubscriptionNameString,
subscriptionName,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
Expand Down Expand Up @@ -491,7 +500,7 @@ public static final class Builder {
* Runtime.getRuntime().availableProcessors())
.build();

ProjectSubscriptionName subscriptionName;
String subscriptionName;
MessageReceiver receiver;

Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
Expand Down Expand Up @@ -519,7 +528,7 @@ public static final class Builder {
boolean useStreaming = true;
int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE;

Builder(ProjectSubscriptionName subscriptionName, MessageReceiver receiver) {
Builder(String subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
this.receiver = receiver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package com.google.cloud.pubsub.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -35,24 +41,17 @@
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(JUnit4.class)
public class PublisherImplTest {

Expand Down Expand Up @@ -472,7 +471,7 @@ public void testPublisherGetters() throws Exception {
@Test
public void testBuilderParametersAndDefaults() {
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC, builder.topicName);
assertEquals(TEST_TOPIC.toString(), builder.topicName);
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
assertEquals(
Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,
Expand Down

0 comments on commit bc07a35

Please sign in to comment.