Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentModificationException in subscriber callbacks when using exactly-once-delivery #1778

Closed
Sourc opened this issue Oct 18, 2023 · 7 comments · Fixed by #1807
Closed
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@Sourc
Copy link

Sourc commented Oct 18, 2023

Description

This recently merged PR introduces a concurrent-modification exception when exactly-once-delivery is enabled. I believe the source is this unsynchronized call, as all other usages seem to be synchronized.

I've also noticed that the subscriber often stops processing messages completely shortly after this is thrown (usually the 2nd or 3rd time the exception is thrown), which makes this even more critical to fix. I've not fully verified why the subscriber completely stops processing messages (it's still considered in a running state at the top-level, maybe the cause would be messages sitting in-memory but not getting sent to the user to ack / nack 🤷 ), so it's possible the soft-crashes are caused by something else - but the pattern is very consistent.

Environment details

Tested using library version 1.125.6, Java 17.

Steps to reproduce

  1. Create a subscriber with exactly-once-delivery enabled as shown in the official docs. I've tested by simply copying the exampel straight off, removing the shutdown at the end.
  2. Create a publisher, also as shown in the official docs. I've changed it publish 500 messages every 10 seconds.
  3. Start publishing messages, eventually a listener call will throw.

Stack trace

RuntimeException while executing runnable CallbackListener{com.google.api.core.ApiFutures$1@28557790} with executor MoreExecutors.directExecutor() java.util.ConcurrentModificationException: null
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
	at java.base/java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:788)
	at java.base/java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:786)
	at com.google.cloud.pubsub.v1.MessageDispatcher.notifyAckSuccess(MessageDispatcher.java:423)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2.onSuccess(StreamingSubscriberConnection.java:530)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2.onSuccess(StreamingSubscriberConnection.java:523)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
	at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:203)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:115)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:87)
	at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:70)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onSuccess(GrpcExceptionCallable.java:88)
	at com.google.api.core.ApiFutures$1.onSuccess(ApiFutures.java:89)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
	at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
	at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:563)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:536)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:546)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
 [com.google.common.util.concurrent.AbstractFuture.executeListener() @ 1291]
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label Oct 18, 2023
@Sourc
Copy link
Author

Sourc commented Oct 20, 2023

Today I verified that the latest version of the library that did not include the linked PR (version 1.124.2) has no soft-crash issues (and no concurrent-modification issue, duh).

Since the soft-crashes can't be detected without monitoring the amount of messages received and knowing the expected number, I would say this issue is quite critical to fix.

The old version also seems to perform better (based on my minimal testing). Publishing 50 small messages per second and consuming them via 5 threads on my local machine generates no surplus with the old version, while the new one easily generates 1000s of messages more than it consumes on average. I've not done extensive performance testing though since the subscribers usually stop too early with the new version, so it's possible this is not true hadn't the subscribers been soft-crashing.

@maitrimangal maitrimangal self-assigned this Oct 23, 2023
@maitrimangal maitrimangal added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. labels Oct 23, 2023
@maitrimangal
Copy link
Member

tried this setup last night, and was able to see this error:
I was running into the following error every 5-10 seconds:

com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1127)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:807)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)

I tried adding the keyword 'synchronized' to the processReceivedMessages(), and was able to confirm that this error does not show up anymore. I will make a PR regarding this

@Sourc
Copy link
Author

Sourc commented Oct 31, 2023

@maitrimangal Yes that error also shows up a lot, but isn't as critical to fix. But adding synchronized would solve the critical issue as well - I haven't looked into the code in detail, so I can't judge if that will prevent the subscriber from processing multiple messages in parallel however. If so, I would look into using a concurrent hashmap instead.

@maitrimangal
Copy link
Member

Please reopen if the above PR does not fix the issue.

@Sourc
Copy link
Author

Sourc commented Nov 24, 2023

@maitrimangal Maybe time to do a release so that we can try it out? Is there something blocking?

@maitrimangal maitrimangal reopened this Nov 27, 2023
@maitrimangal
Copy link
Member

Have merged in the new release. Let me know if this solves the problem!

@Sourc
Copy link
Author

Sourc commented Nov 27, 2023

The issue is solved from what I can see, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants