Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

com.google.cloud.pubsub.v1.Subscriber how to await stop signal? #2485

Closed
peter-gerhard opened this issue Oct 5, 2017 · 2 comments · Fixed by #2498
Closed

com.google.cloud.pubsub.v1.Subscriber how to await stop signal? #2485

peter-gerhard opened this issue Oct 5, 2017 · 2 comments · Fixed by #2498
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release.

Comments

@peter-gerhard
Copy link

peter-gerhard commented Oct 5, 2017

Hi,
when im calling stopAsync on a subscriber i get the following exception:

com.google.common.util.concurrent.AbstractFuture executeListener
SEVERE: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$4@22553be0 with executor java.util.concurrent.Executors$DelegatedScheduledExecutorService@6e1d503c
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@2c92d002 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@31c73eaa[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 18]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:811)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:675)
	at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:53)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:141)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:385)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:422)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:61)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:504)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:425)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:536)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

the doc of the startAsync suggests to wait for a stop signal:

  /**
   * Initiates service startup and returns immediately.
   *
   * <p>Example of receiving a specific number of messages.
   *
   * <pre>{@code
   * Subscriber subscriber = Subscriber.defaultBuilder(subscription, receiver).build();
   * subscriber.addListener(new Subscriber.Listener() {
   *   public void failed(Subscriber.State from, Throwable failure) {
   *     // Handle error.
   *   }
   * }, executor);
   * subscriber.startAsync();
   *
   * // Wait for a stop signal.
   * done.get();
   * subscriber.stopAsync().awaitTerminated();
   * }</pre>
   */

But i don't know what is meant by done.get();

version 0.25.0-beta

@pongad
Copy link
Contributor

pongad commented Oct 6, 2017

@peter-gerhard You're not doing anything wrong. The done here is a dummy Future that should return whenever you want to shut down the subscriber. For example, if you're writing a server, you might schedule for the future to complete when you send it a shutdown signal etc. The stopAsync method can be called whenever.

The exception isn't actually thrown from stopAsync but just logged right?
Subscriber uses the executor for "house-keeping" tasks like reestablishing lost connections, etc. When we shutdown the executor, some of these tasks are still pending. It shouldn't do anything bad, since the Subscriber is about to go away anyway. I'll track this bug at P2 since the error messages still look confusing.

@pongad pongad self-assigned this Oct 6, 2017
@pongad pongad added api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Oct 6, 2017
@peter-gerhard
Copy link
Author

Hello @pongad, thx for the reply.

The exception isn't actually thrown from stopAsync but just logged right?

Exactly.

When we shutdown the executor, some of these tasks are still pending. It shouldn't do anything bad, since the Subscriber is about to go away anyway.

Thats what it looked like to me. It doesn't do anything bad but the output mixed with my test logs and it was confusing since the error suggests a high severity.

Thx for taking care.

pongad added a commit that referenced this issue Oct 10, 2017
In the doc, we just wait for done.get() to return before closing the
Subscriber.
This commit makes clear that done.get() in the doc is an example
condition.

Updates #2485.
pongad added a commit that referenced this issue Oct 11, 2017
We register a callback to reconnect connection when old connection closes.
When we shut down the subscriber, we close all connections and
begin to shutdown executor.
There is a race: if the callback is called after executor closes,
an exception occurs and we print a scary stack trace.
It doesn't do anything bad; the subscriber is going to go away anyway,
but the stack trace is still confusing.

This commit avoids registering new jobs on executors.
When a connection closes, the callback to determine whether we should
reconnect is called in the RPC thread.

If the connection closes due to some error, the callback should quickly
determine whether we should reconnection. If so, we register the actual
reconnection job on a separate thread. This does not block RPC thread,
and everyone should be happy.

If the connection closes because we're shutting down,
the callback running on RPC thread should determine that we should NOT
reconnect, not register a reconnection job, and we shouldn't see
a stack trace.

Fixes #2485.
schmidt-sebastian pushed a commit to FirebasePrivate/google-cloud-java that referenced this issue Nov 9, 2017
In the doc, we just wait for done.get() to return before closing the
Subscriber.
This commit makes clear that done.get() in the doc is an example
condition.

Updates googleapis#2485.
schmidt-sebastian pushed a commit to FirebasePrivate/google-cloud-java that referenced this issue Nov 9, 2017
We register a callback to reconnect connection when old connection closes.
When we shut down the subscriber, we close all connections and
begin to shutdown executor.
There is a race: if the callback is called after executor closes,
an exception occurs and we print a scary stack trace.
It doesn't do anything bad; the subscriber is going to go away anyway,
but the stack trace is still confusing.

This commit avoids registering new jobs on executors.
When a connection closes, the callback to determine whether we should
reconnect is called in the RPC thread.

If the connection closes due to some error, the callback should quickly
determine whether we should reconnection. If so, we register the actual
reconnection job on a separate thread. This does not block RPC thread,
and everyone should be happy.

If the connection closes because we're shutting down,
the callback running on RPC thread should determine that we should NOT
reconnect, not register a reconnection job, and we shouldn't see
a stack trace.

Fixes googleapis#2485.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants