Skip to content

[Bug]: Error in SolaceIO during autoscaling events ("Tried to start a closed message consumer") #35304

@ppawel

Description

@ppawel

What happened?

We observe this error frequently during peak hours in a streaming job when Dataflow decides to autoscale workers. It looks like a race condition inside SolaceIO regarding management of Solace JCSMP resources - see stack trace below:

org.apache.beam.sdk.util.UserCodeException: com.google.cloud.RetryHelper$RetryHelperException: com.solacesystems.jcsmp.StaleSessionException: Tried to start a closed message consumer.
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1131)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:666)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
	at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:539)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.google.cloud.RetryHelper$RetryHelperException: com.solacesystems.jcsmp.StaleSessionException: Tried to start a closed message consumer.
	at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:54)
	at org.apache.beam.sdk.io.solace.RetryCallableManager.retryCallable(RetryCallableManager.java:66)
	at org.apache.beam.sdk.io.solace.broker.SolaceMessageReceiver.startFlowReceiver(SolaceMessageReceiver.java:43)
	at org.apache.beam.sdk.io.solace.broker.SolaceMessageReceiver.receive(SolaceMessageReceiver.java:56)
	at org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader.advance(UnboundedSolaceReader.java:143)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.tryClaimOrThrow(Read.java:933)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:903)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:836)
	at org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:578)
Caused by: com.solacesystems.jcsmp.StaleSessionException: Tried to start a closed message consumer.
	at com.solacesystems.jcsmp.impl.flow.FlowHandleImpl.throwClosedException(FlowHandleImpl.java:2239)
	at com.solacesystems.jcsmp.impl.flow.FlowHandleImpl.startImpl(FlowHandleImpl.java:1140)
	at com.solacesystems.jcsmp.impl.flow.FlowHandleImpl.start(FlowHandleImpl.java:1125)
	at org.apache.beam.sdk.io.solace.broker.SolaceMessageReceiver.lambda$startFlowReceiver$0(SolaceMessageReceiver.java:45)
	at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
	at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
	at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
	at org.apache.beam.sdk.io.solace.RetryCallableManager.retryCallable(RetryCallableManager.java:66)
	at org.apache.beam.sdk.io.solace.broker.SolaceMessageReceiver.startFlowReceiver(SolaceMessageReceiver.java:43)
	at org.apache.beam.sdk.io.solace.broker.SolaceMessageReceiver.receive(SolaceMessageReceiver.java:56)
	at org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader.advance(UnboundedSolaceReader.java:143)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.tryClaimOrThrow(Read.java:933)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:903)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:836)
	at org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:578)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1131)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:666)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
	at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
	at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:172)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:539)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.solacesystems.jcsmp.JCSMPTransportException: (JCSMPTransportException) Error receiving data from underlying connection.
	at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel$ClientChannelReconnect.call(TcpClientChannel.java:2559)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	... 3 more

CC @bzablocki

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions