Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: subscriber.stopAsync().awaitTerminated() does not destroy respective threads #22

Closed
menulis opened this issue May 28, 2019 · 3 comments · Fixed by #74
Closed
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@menulis
Copy link

menulis commented May 28, 2019

Environment details

OS type and version: macOS 10.14.5 / Debian 4.9.168-1 (App Engine Flex)
Java version: Oracle 1.8.0_181 / openjdk version "1.8.0_212"
google-cloud-java version: 1.75.0

Steps to reproduce

  1. Create a subscriber, call startAsync() followed by stopAsync() as per documentation
  2. Observe the number of threads before and after calling stopAsync()
  3. The number of threads is growing

Code example

import com.google.api.gax.core.ExecutorProvider
import com.google.api.gax.core.FixedExecutorProvider
import com.google.cloud.ServiceOptions
import com.google.cloud.pubsub.v1.Subscriber
import com.google.pubsub.v1.ProjectSubscriptionName
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

fun main() {
    val subscriptionName = ProjectSubscriptionName.of(
        ServiceOptions.getDefaultProjectId(), "test"
    )

    for (x in 1..2) {
        var i = 0
        val subscriber = Subscriber.newBuilder(subscriptionName)
            { message, consumer -> consumer.ack() }
            .build()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        subscriber.startAsync().awaitRunning()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        subscriber.stopAsync().awaitTerminated()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
    }

    for (x in 3..8) {
        var i = 0
        val executorProvider: ExecutorProvider =
            FixedExecutorProvider.create(Executors.newSingleThreadScheduledExecutor())
        val subscriber = Subscriber.newBuilder(subscriptionName)
        { message, consumer -> consumer.ack() }
            .setExecutorProvider(executorProvider)
            .setSystemExecutorProvider(executorProvider)
            .build()
        subscriber.startAsync().awaitRunning()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        subscriber.stopAsync().awaitTerminated()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        executorProvider.executor.shutdown()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        if (!executorProvider.executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
            val hangingTasks = executorProvider.executor.shutdownNow()
            println("Hanging tasks: $hangingTasks")
        }
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries}")
    }
}

Expected results

The number of active threads before calling startAsync() and after calling stopAsync() shall be equal.

Actual results

When not explicitly providing executor providers, calling startAsync() created five threads and calling stopAsync() does not stop any of them.

When providing Executors.newSingleThreadScheduledExecutor() as executorProvider and systemExecutorProvider and explicitly calling shutdown() on the executor, the result is "better" in a way that the number of threads grow much slower, but it still grows. Further, if three or more such subscribers with a custom executor provider are started and stopped, "ManagedChannel allocation site ("was not shutdown properly") exceptions appear, which suggests that the underlying gRPC state is not being cleaned up properly upon stopAsync().

1/1: 2: [Monitor Ctrl-Break, Finalizer, Reference Handler, main, Signal Dispatcher]
1/2: 7: [grpc-default-worker-ELG-1-1, pool-1-thread-2, Attach Listener, grpc-default-executor-0, Monitor Ctrl-Break, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler, main, Signal Dispatcher]
1/3: 9: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, Attach Listener, Monitor Ctrl-Break, main, Signal Dispatcher, grpc-default-executor-0, pool-1-thread-4, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
2/1: 9: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, Attach Listener, Monitor Ctrl-Break, main, Signal Dispatcher, grpc-default-executor-0, pool-1-thread-4, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
2/2: 14: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, pool-4-thread-2, Attach Listener, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
2/3: 16: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/1: 19: [grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-5, pool-1-thread-2, grpc-default-executor-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-6-thread-1, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/2: 19: [grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-5, pool-1-thread-2, grpc-default-executor-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-6-thread-1, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/3: 19: [grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-5, pool-1-thread-2, grpc-default-executor-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-6-thread-1, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/4: 18: [Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@609db546, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@20f5281c, Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@56c4278e, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@301eda63, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@3d246ea3, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@341814d3, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@4397ad89, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@59cba5a, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@1bd39d3c, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@6f19ac19, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@119cbf96, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@71329995, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@768fc0f2, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@5454d35e, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@20c0a64d, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@455b6df1, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@4ddbbdf8, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@3f67593e, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@1ab06251, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@41ab013, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@14bee915, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@1115ec15]
4/1: 21: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-3, Finalizer, Reference Handler, pool-8-thread-1]
4/2: 21: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-3, Finalizer, Reference Handler, pool-8-thread-1]
4/3: 21: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-3, Finalizer, Reference Handler, pool-8-thread-1]
4/4: 20: [Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@58ce9668, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@172b013, Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@56673b2c, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@2796aeae, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@b4711e2, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@1fa1cab1, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@70f02c32, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@62010f5c, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@51fadaff, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@401f7633, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@31ff43be, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@5b6ec132, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@5c44c582, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@67d18ed7, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@2c78d320, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@132e0cc, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@7b205dbd, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@106cc338, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@7a67e3c6, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@6cc558c6, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@15713d56, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@63f259c3, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@26ceffa8, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@600b90df]
5/1: 23: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-10-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
5/2: 23: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-10-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
5/3: 22: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
5/4: 22: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@22f31dec, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@34c01041, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@76f4b65, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@c94fd30, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@36328d33, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@2c4d1ac, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@7f0d96f2, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@545b995e, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@76a2ddf3, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@524f3b3a, Thread[grpc-default-executor-4,5,main]=[Ljava.lang.StackTraceElement;@41e68d87, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@49ff7d8c, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@29526c05, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@2ef14fe, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@77102b91, Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@45312be2, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@7fb95505, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@58be6e8, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@7331196b, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@3f9342d4, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@ab7395e, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@50d13246, Thread[grpc-default-worker-ELG-1-5,5,main]=[Ljava.lang.StackTraceElement;@2bd08376, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@e70f13a, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@3d3e5463, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@64a40280]
Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=9, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:33)
	at TestKt.main(Test.kt)

Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=1, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:18)
	at TestKt.main(Test.kt)

Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=16, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:33)
	at TestKt.main(Test.kt)

Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:18)
	at TestKt.main(Test.kt)

6/1: 25: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, pool-12-thread-1, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
6/2: 25: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, pool-12-thread-1, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
6/3: 25: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
6/4: 24: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@6c0d9d86, Thread[grpc-default-executor-5,5,main]=[Ljava.lang.StackTraceElement;@4ce1d6d0, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@24111ef1, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@1f3f02ee, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@1fde5d22, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@5dcb4f5f, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@71812481, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@16ce702d, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@7b94089b, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@7ca20101, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@47f9738, Thread[grpc-default-executor-4,5,main]=[Ljava.lang.StackTraceElement;@6155d082, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@3a5ecce3, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@561868a0, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@2ea6e30c, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@6138e79a, Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@2dcd168a, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@388526fb, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@21a21c64, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@7803bfd, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@42bc14c1, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@531f4093, Thread[grpc-default-worker-ELG-1-6,5,main]=[Ljava.lang.StackTraceElement;@62ef27a8, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@6436a7db, Thread[grpc-default-worker-ELG-1-5,5,main]=[Ljava.lang.StackTraceElement;@460ebd80, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@6f3c660a, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@74f5ce22, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@25aca718]
7/1: 27: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-14-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-6]
7/2: 27: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-14-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-6]
7/3: 27: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-6]
7/4: 26: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@2a448449, Thread[grpc-default-executor-5,5,main]=[Ljava.lang.StackTraceElement;@32f232a5, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@43f82e78, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@e54303, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@e8df99a, Thread[grpc-default-worker-ELG-1-7,5,main]=[Ljava.lang.StackTraceElement;@2dc995f4, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@2f40e5db, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@517566b, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@64b73e7a, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@530712d, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@2df6226d, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@12ed9db6, Thread[grpc-default-executor-4,5,main]=[Ljava.lang.StackTraceElement;@4ff4357f, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@49cb9cb5, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@55322aab, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@2b4c1d96, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@45fd9a4d, Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@50468873, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@146587a2, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@5f0e9815, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@76884e4b, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@126945f9, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@2a898881, Thread[grpc-default-worker-ELG-1-6,5,main]=[Ljava.lang.StackTraceElement;@16c63f5, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@35229f85, Thread[grpc-default-worker-ELG-1-5,5,main]=[Ljava.lang.StackTraceElement;@6d3c5255, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@b1712f3, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@6986bbaf, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@4879dfad, Thread[grpc-default-executor-6,5,main]=[Ljava.lang.StackTraceElement;@4758820d]
8/1: 29: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-16-thread-1, grpc-default-worker-ELG-1-8, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-7, grpc-default-executor-6]
8/2: 29: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-16-thread-1, grpc-default-worker-ELG-1-8, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-7, grpc-default-executor-6]
8/3: 29: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, grpc-default-worker-ELG-1-8, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-7, grpc-default-executor-6]
8/4: 28: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@1e8ce150, Thread[grpc-default-executor-5,5,main]=[Ljava.lang.StackTraceElement;@604f2bd2, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@1d3a

Stack trace

"pool-20-thread-6" googleapis/google-cloud-java#139 prio=5 os_prio=0 tid=0x00007f33ed9f1000 nid=0x184 waiting on condition [0x00007f33a9556000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b6a0c940> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"grpc-default-executor-4" googleapis/google-cloud-java#128 daemon prio=5 os_prio=0 tid=0x00007f33d816e000 nid=0x158 waiting on condition [0x00007f33aa968000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000726f346b0> (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"Gax-18" googleapis/google-cloud-java#95 daemon prio=5 os_prio=0 tid=0x00007f33b403f800 nid=0x84 waiting on condition [0x00007f33aaa69000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007bb5e2f30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

PubSub documentation used

Related issues

@sduskis
Copy link
Contributor

sduskis commented May 29, 2019

It looks like we need to make a change. We're not cleaning up pool-*-thread-* threads.

This line is problematic (plus a couple of others). I'll submit a fix.

@menulis
Copy link
Author

menulis commented Jun 26, 2019

Just for your information: with google-cloud-pubsub 1.77.0 the number of threads is no longer growing. However, the io.grpc.InternalChannelz instance seems to be growing in size continuously and the number of io.grpc.netty.shaded.io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback instances also grows with time, eventually leading to OOM.

Screenshot 2019-06-26 at 11 47 16

@chingor13 chingor13 transferred this issue from googleapis/google-cloud-java Dec 4, 2019
@chingor13 chingor13 added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Dec 4, 2019
@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Dec 4, 2019
@hannahrogers-google
Copy link
Contributor

This is likely do to the GrpcSubscriberStub not being shut down correctly - fix coming soon.

@hannahrogers-google hannahrogers-google removed the 🚨 This issue needs some love. label Jan 27, 2020
@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Jan 27, 2020
@google-cloud-label-sync google-cloud-label-sync bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label Jan 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
6 participants