Skip to content

Commit

Permalink
docs(samples): Optimistic subscribe sample (#2063)
Browse files Browse the repository at this point in the history
* docs(samples): Add code sample for optimistic subscribe

* docs(samples): Fix formatting on test

* docs(samples): Use an error listener instead of catching an exception for the OptimisticSubscribeExample

* test: Add exception handler to OptimisticSubscribeExample

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
michaelpri10 and gcf-owl-bot[bot] authored Jul 1, 2024
1 parent 5456b52 commit 53a4844
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 3 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-pubsub'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-pubsub:1.130.1'
implementation 'com.google.cloud:google-cloud-pubsub:1.131.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.130.1"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.131.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -275,6 +275,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m
| List Subscriptions In Project Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSubscriptionsInProjectExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSubscriptionsInProjectExample.java) |
| List Subscriptions In Topic Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListSubscriptionsInTopicExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListSubscriptionsInTopicExample.java) |
| List Topics Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/ListTopicsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/ListTopicsExample.java) |
| Optimistic Subscribe Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java) |
| Publish Avro Records Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/PublishAvroRecordsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/PublishAvroRecordsExample.java) |
| Publish Protobuf Messages Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/PublishProtobufMessagesExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/PublishProtobufMessagesExample.java) |
| Publish With Batch Settings Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/PublishWithBatchSettingsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/PublishWithBatchSettingsExample.java) |
Expand Down Expand Up @@ -411,7 +412,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsub/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-pubsub.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.130.1
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-pubsub/1.131.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
103 changes: 103 additions & 0 deletions samples/snippets/src/main/java/pubsub/OptimisticSubscribeExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_optimistic_subscribe]

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
String topicId = "your-topic-id";

optimisticSubscribeExample(projectId, subscriptionId, topicId);
}

public static void optimisticSubscribeExample(
String projectId, String subscriptionId, String topicId) throws IOException {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);

// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};

Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

// Listen for resource NOT_FOUND errors and rebuild the subscriber and restart subscribing
// when the current subscriber encounters these errors.
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
System.out.println(failure.getStackTrace());
if (failure instanceof NotFoundException) {
try (SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a pull subscription with default acknowledgement deadline of 10 seconds.
// The client library will automatically extend acknowledgement deadlines.
Subscription subscription =
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
System.out.println("Created pull subscription: " + subscription.getName());
optimisticSubscribeExample(projectId, subscriptionId, topicId);
} catch (IOException err) {
System.out.println("Failed to create pull subscription: " + err.getMessage());
}
}
}
},
MoreExecutors.directExecutor());

subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (IllegalStateException e) {
// Prevent an exception from being thrown if it is the expected NotFoundException
if (!(subscriber.failureCause() instanceof NotFoundException)) {
throw e;
}
} catch (TimeoutException e) {
subscriber.stopAsync();
}
}
}

// [END pubsub_optimistic_subscribe]
19 changes: 19 additions & 0 deletions samples/snippets/src/test/java/pubsub/SubscriberIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ public class SubscriberIT {
private static final String subscriptionId = "subscriber-test-subscription-" + _suffix;
// For a subscription with exactly once delivery enabled.
private static final String subscriptionEodId = "subscriber-test-subscription-eod" + _suffix;
private static final String subscriptionOptimisticId =
"subscriber-test-subscription-optimistic" + _suffix;
private static final TopicName topicName = TopicName.of(projectId, topicId);
private static final TopicName topicNameEod = TopicName.of(projectId, topicIdEod);
private static final ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
private static final ProjectSubscriptionName subscriptionEodName =
ProjectSubscriptionName.of(projectId, subscriptionEodId);
private static final ProjectSubscriptionName subscriptionOptimisticName =
ProjectSubscriptionName.of(projectId, subscriptionOptimisticId);

private static void requireEnvVar(String varName) {
assertNotNull(
Expand Down Expand Up @@ -163,6 +167,11 @@ public void tearDown() throws Exception {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
subscriptionAdminClient.deleteSubscription(subscriptionName.toString());
subscriptionAdminClient.deleteSubscription(subscriptionEodName.toString());
try {
subscriptionAdminClient.deleteSubscription(subscriptionOptimisticName.toString());
} catch (Exception e) {
// Ignore exception.
}
}

try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
Expand Down Expand Up @@ -240,4 +249,14 @@ public void testSubscriberExactlyOnceDelivery() throws Exception {
assertThat(bout.toString()).contains("Message successfully acked: " + messageId);
}
}

@Test
public void testOptimisticSubscriber() throws Exception {
bout.reset();
OptimisticSubscribeExample.optimisticSubscribeExample(
projectId, subscriptionOptimisticId, topicId);
assertThat(
bout.toString()
.contains("Created pull subscription: " + subscriptionOptimisticName.toString()));
}
}

0 comments on commit 53a4844

Please sign in to comment.