Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub async subscriber listener logging error on stop #2588

Closed
rcoy-v opened this issue Nov 7, 2017 · 7 comments
Closed

pubsub async subscriber listener logging error on stop #2588

rcoy-v opened this issue Nov 7, 2017 · 7 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@rcoy-v
Copy link

rcoy-v commented Nov 7, 2017

I have the same situation as described in #2485. Using version 0.26.0.

A listener for failed state is logging an error message when stopping an async subscriber:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@779f4acb rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7fb36e19[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 41]

Although this error is described as harmless in the referenced issue, it pollutes our logs during shutdown and deployments. The only seam I can find is detect if the from state is STOPPING, and ignore any failure. But I feel like this would potentially miss real errors if they were to occur during shutdown.

Are there any suggested ways to better handle this?

@anthmgoogle anthmgoogle added api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Nov 8, 2017
@pongad
Copy link
Contributor

pongad commented Nov 9, 2017

@rcoy-v Would you be able to share a little more of the stack trace with us? It would be really helpful to see what's trying to execute.

@rcoy-v
Copy link
Author

rcoy-v commented Nov 9, 2017

Stack trace:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7d9f7028 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@35e8a713[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 32]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.google.common.util.concurrent.ListenerCallQueue.execute(ListenerCallQueue.java:87)
	at com.google.common.util.concurrent.AbstractService.executeListeners(AbstractService.java:479)
	at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:256)
	at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:130)
	at com.google.cloud.pubsub.v1.Subscriber.stopConnections(Subscriber.java:471)
	at com.google.cloud.pubsub.v1.Subscriber.stopAllStreamingConnections(Subscriber.java:447)
	at com.google.cloud.pubsub.v1.Subscriber.doStop(Subscriber.java:293)
	at com.google.api.core.AbstractApiService$InnerService.doStop(AbstractApiService.java:154)
	at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:242)
	at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:130)
	at com.myapp.Application.shutdown(Application.kt:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:369)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeDestroyMethods(InitDestroyAnnotationBeanPostProcessor.java:327)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeDestruction(InitDestroyAnnotationBeanPostProcessor.java:155)
	at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:240)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:576)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:552)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:953)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:521)
	at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.destroySingletons(FactoryBeanRegistrySupport.java:227)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:960)
	at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1035)
	at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1011)
	at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:933)

The stack trace is not coming from the subscriber, but a listener implementing failed.

We are running this with Spring Boot. The parts related to Google pubsub are still pretty basic, following close to the async example here.

Testing this further, the error is logged if the app is stopped from SIGINT/SIGTERM. If I programmatically stop the subscriber after a sleep of 5 seconds, no error is logged. To be clear, subscriber.stopAsync is getting called in both situations. I am still investigating if this is an issue with our Spring boot app not properly handling shutdown, or the Google client library.

@neuromantik33
Copy link

I'm having the same problem with the latest version of pubsub 0.30.0-beta.. Here is my complete stacktract (don't mind the clojure code..)

I can reproduce at any time by running a kill -2 <pid> on the process (which is what happens for gracefully shutting down docker containers).

If you need any more info please let me know.

java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be TERMINATED, but the service has FAILED
	at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328)
	at com.google.common.util.concurrent.AbstractService.awaitTerminated(AbstractService.java:293)
	at com.google.api.core.AbstractApiService.awaitTerminated(AbstractApiService.java:104)
	at com.oscaro.clj_gcloud.sub$shutdown_BANG_.invokeStatic(sub.clj:127)
	at com.oscaro.clj_gcloud.sub$shutdown_BANG_.invoke(sub.clj:123)
	at punchcard.components.pubsub$eval1042$fn__1044$fn__1046.invoke(pubsub.clj:77)
	at punchcard.components.pubsub$eval1042$fn__1044.invoke(pubsub.clj:76)
	at clojure.lang.MultiFn.invoke(MultiFn.java:233)
	at integrant.core$try_run_action.invokeStatic(core.cljc:176)
	at integrant.core$try_run_action.invoke(core.cljc:174)
	at integrant.core$run_loop.invokeStatic(core.cljc:184)
	at integrant.core$run_loop.invoke(core.cljc:180)
	at integrant.core$reverse_run_BANG_.invokeStatic(core.cljc:201)
	at integrant.core$reverse_run_BANG_.invoke(core.cljc:195)
	at integrant.core$halt_BANG_.invokeStatic(core.cljc:325)
	at integrant.core$halt_BANG_.invoke(core.cljc:319)
	at integrant.core$halt_BANG_.invokeStatic(core.cljc:322)
	at integrant.core$halt_BANG_.invoke(core.cljc:319)
	at punchcard.core$shutdown.invokeStatic(core.clj:33)
	at punchcard.core$shutdown.invoke(core.clj:30)
	at punchcard.core$_main$fn__28729.invoke(core.clj:56)
	at punchcard.core.proxy$java.lang.Thread$ff19274a.run(Unknown Source)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@78d98c1e rejected from java.util.concurrent.ScheduledThreadPoolExecutor@4d743a6b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.google.common.util.concurrent.ListenerCallQueue.execute(ListenerCallQueue.java:87)
	at com.google.common.util.concurrent.AbstractService.executeListeners(AbstractService.java:479)
	at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:256)
	at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:129)
	at com.google.cloud.pubsub.v1.Subscriber.stopConnections(Subscriber.java:506)
	at com.google.cloud.pubsub.v1.Subscriber.stopAllStreamingConnections(Subscriber.java:482)
	at com.google.cloud.pubsub.v1.Subscriber.doStop(Subscriber.java:319)
	at com.google.api.core.AbstractApiService$InnerService.doStop(AbstractApiService.java:153)
	at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:242)
	at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:129)
	... 19 common frames omitted

@pongad
Copy link
Contributor

pongad commented Nov 20, 2017

@neuromantik33 This is rather surprising. In Subscriber, we shut down all connections before shutting down the executor so all callbacks should have been scheduled before the executor shuts down. (You have to option to provide your own executor, in this case Subscriber doesn't shut it down.)

It is possible that you're providing an executor and shutting it down before calling stopAsync?

@neuromantik33
Copy link

@pongad As far as I know I'm not providing any executor save the one required by the com.google.api.core.ApiService$Listener which is actually not a real one, and definetly not shutting it down beforehand. Here is a snippet of my Clojure code, which I think illustrates somewhat my Subscriber factory. I've also included my shutdown function which is invoked in a shutdown hook of my application. If you want for me to reproduce a Java based project which reproduces this bug I'll try to find the time but I'm not doing anything really exotic that isn't in the docs. I've also been using the pubsub emulator which exibits the same behaviour (hence the setChannelProvider).

(def ^:private direct-executor (MoreExecutors/directExecutor))
(defn- create-subscriber!
  "Returns a new subscriber using the default configuration."
  ^Subscriber
  [{:keys [project-id credentials channel-provider]} sub receiver]
  (let [sub-name (make-sub-name project-id sub)
        builder  (cond-> (Subscriber/defaultBuilder sub-name receiver)
                         credentials (.setCredentialsProvider (c/fixed-credentials credentials))
                         channel-provider (.setChannelProvider channel-provider))]
    (log/info "Creating subscriber:" (->clj sub-name))
    (doto (.build builder)
      (.addListener (proxy [ApiService$Listener] []
                      (failed [from ex]
                        (log/error ex "An unexpected error occurred")))
                    direct-executor))))

(defn shutdown!
  "Shuts down the subscriber and waits for the latter to reach the terminated state."
  [^Subscriber subscriber]
  (log/info "Shutting down:" (->clj subscriber))
  (-> subscriber .stopAsync .awaitTerminated))

@pongad
Copy link
Contributor

pongad commented Nov 22, 2017

I found the problem. I'll put the diagnosis in the PR description.

pongad added a commit to googleapis/gax-java that referenced this issue Dec 7, 2017
Fixes googleapis/google-cloud-java#2588 .

Background
==========

We previously decided that executors created from
InstantiatingExecutorProvider should not block JVM from exiting
if user forgets to call shutdown().

To implement this, we used Guava's
MoreExecutors.getExitingScheduledExecutorService.
It works by
1. making all executor threads daemon; the JVM exits after the last
non-daemon thread exits, so these threads don't block termination
2. adding a shutdown hook; when shutting down, we create one
*non-daemon* thread. This thread shuts down the executor, preventing
more jobs from being added, then wait for the maximum of 2 minutes
for existing jobs to complete. In either case, the
thread simply exits. Since (hopefully) there are no non-daemon threads
left, the JVM exits due to (1).

Problem
=======

Frameworks, like Spring, use shutdown hooks to gracefully exit.
However, JVM runs shutdown hooks in unspecified order.
The executor's shutdown might run first and shuts down the executor.
Then, the graceful-exit logic might try to execute more tasks,
causing RejectedExecutionExceptions
and making graceful-exit not very graceful.

This problem isn't isolated to shutdown hooks.
For example, if we create two non-daemon threads A and B;
A calls System.exit();
and B calls executor.execute();
B might also get the exception.

Proposed Solution
=================

This PR implements (1) but not (2).
On exit, we'd no longer shuts down the executor.
User code may still use executor to perform any shutdown logic it needs.
However, when all user threads exit, executor threads will abruptly terminate
with no grace period.
@rcoy-v
Copy link
Author

rcoy-v commented Jan 12, 2018

@pongad Is there a timeframe when this fix will be available? 1.16.0 of gax-java is still not used in a released version of pubsub, unless I'm mistaken. It seems I should not explicitly use a different version of gax-java that is out of step with google-cloud-java either.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

4 participants