Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17553 Fix shutdown race condition in StreamThreadTest #17191

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from

Conversation

mumrah
Copy link
Contributor

@mumrah mumrah commented Sep 13, 2024

Reverting #17180 in order to generate a thread dump of the stalled test.

@mumrah mumrah added the do-not-merge PRs that are only open temporarily and should not be merged label Sep 13, 2024
@mumrah
Copy link
Contributor Author

mumrah commented Sep 14, 2024

Thread dump here https://github.com/apache/kafka/actions/runs/10852637802/artifacts/1932150615


clientId-StreamThread-1-TaskExecutor-1" #12570 prio=5 os_prio=0 cpu=0.21ms elapsed=7481.03s tid=0x00007f4eea2ec410 nid=0x197ee waiting on condition  [0x00007f4ab53fb000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.12/Native Method)
	- parking to wait for  <0x00000000d60d47d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.12/LockSupport.java:341)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@17.0.12/AbstractQueuedSynchronizer.java:506)
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.12/ForkJoinPool.java:3465)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.12/ForkJoinPool.java:3436)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@17".0.12/AbstractQueuedSynchronizer.java:1625)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda$3587/0x00007f4e888ed980.get(Unknown Source)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:353)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor$TaskExecutorThread.runOnce(DefaultTaskExecutor.java:119)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor$TaskExecutorThread.run(DefaultTaskExecutor.java:62)
"stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1" #1 prio=5 os_prio=0 cpu=39817.70ms elapsed=10386.12s tid=0x00007f4ee802c700 nid=0x1186 in Object.wait()  [0x00007f4eecf13000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(java.base@17.0.12/Native Method)
	- waiting on <no object reference available>
	at java.lang.Thread.join(java.base@17.0.12/Thread.java:1307)
	- locked <0x00000000d60d4bc0> (a org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor$TaskExecutorThread)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor.awaitShutdown(DefaultTaskExecutor.java:264)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.shutdown(DefaultTaskManager.java:377)
	at org.apache.kafka.streams.processor.internals.TaskManager.shutdownSchedulingTaskManager(TaskManager.java:1512)
	at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:1444)
	at org.apache.kafka.streams.processor.internals.StreamThreadTest.tearDown(StreamThreadTest.java:233)

@mumrah
Copy link
Contributor Author

mumrah commented Sep 14, 2024

@mumrah
Copy link
Contributor Author

mumrah commented Sep 17, 2024

I managed to reproduce the issue in this run https://github.com/apache/kafka/actions/runs/10893557629/job/30231419064?pr=17191

2024-09-17T01:39:47.4626128Z 
2024-09-17T01:39:47.4627809Z Gradle Test Run :streams:test > Gradle Test Executor 11 > StreamThreadTest > shouldGetProducerInstanceId(boolean, boolean) > "shouldGetProducerInstanceId(boolean, boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED
2024-09-17T01:39:57.5507998Z 
2024-09-17T01:39:57.5510603Z Gradle Test Run :streams:test > Gradle Test Executor 11 > StreamThreadTest > shouldGetProducerInstanceId(boolean, boolean) > "shouldGetProducerInstanceId(boolean, boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" PASSED

Notice the 10 second delay in this test. This is coming from the changes I have in this PR:

    // in DefaultTaskManager
    public void shutdown(final Duration duration) {
        for (final TaskExecutor t: taskExecutors) {
            t.requestShutdown();
        }
        signalTaskExecutors();
        try {
            for (final TaskExecutor t: taskExecutors) {
                t.awaitShutdown(Duration.ofSeconds(10));
            }
        } catch (final Exception e) {
            signalTaskExecutors();
            for (final TaskExecutor t: taskExecutors) {
                t.awaitShutdown(duration);
            }
        }
    }

This is kind of a hacky solution, but it does avoid the deadlock.

@mjsax @chia7712 WDYT?

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah thanks for this quick fix. the approach is good but it may not honor the timeout. Maybe we can add a Supplier<Boolean> to awaitProcessableTasks to return without await? the check is holding the lock, so it can fix the race condition. for example:

awaitProcessableTasks(Supplier<Boolean> needToAwait)
taskManager.awaitProcessableTasks(() -> !shutdownRequested.get());

@chia7712 chia7712 changed the title Testing StreamThreadTest KAFKA-17553 Testing StreamThreadTest Sep 17, 2024
@mumrah mumrah changed the title KAFKA-17553 Testing StreamThreadTest KAFKA-17553 Fix shutdown race condition in StreamThreadTest Sep 17, 2024
@mumrah
Copy link
Contributor Author

mumrah commented Sep 18, 2024

One of the Gradle workers hit an OOM

org.gradle.api.internal.tasks.testing.TestSuiteExecutionException: Could not complete execution for Gradle Test Executor 10.
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:65)
	at java.base@17.0.12/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base@17.0.12/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base@17.0.12/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base@17.0.12/java.lang.reflect.Method.invoke(Method.java:569)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:92)
	at jdk.proxy1/jdk.proxy1.$Proxy4.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:200)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:132)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:103)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:63)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:121)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3537)
	at java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:228)
	at java.base/java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:582)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:179)
	at org.mockito.internal.exceptions.Reporter.argumentsAreDifferent(Reporter.java:360)
	at org.mockito.internal.verification.checkers.MissingInvocationChecker.checkMissingInvocation(MissingInvocationChecker.java:54)
	at org.mockito.internal.verification.AtLeast.verify(AtLeast.java:30)
	at org.mockito.internal.verification.VerificationOverTimeImpl.verify(VerificationOverTimeImpl.java:87)
	at org.mockito.internal.verification.VerificationWrapper.verify(VerificationWrapper.java:21)
	at org.mockito.internal.verification.MockAwareVerificationMode.verify(MockAwareVerificationMode.java:30)
	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:75)
	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:56)
	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptAbstract(MockMethodInterceptor.java:161)
	at org.apache.kafka.streams.processor.internals.tasks.TaskManager$MockitoMock$OwZ6zTD7.awaitProcessableTasks(Unknown Source)
	at org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutorTest.shouldAwaitProcessableTasksIfNoneAssignable(DefaultTaskExecutorTest.java:121)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor$$Lambda$232/0x00007f45cc1c2c30.apply(Unknown Source)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall$$Lambda$233/0x00007f45cc1c3050.apply(Unknown Source)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)

Not sure why, but looks like Mockito was generating a large string as part of a test failure.

@mumrah
Copy link
Contributor Author

mumrah commented Sep 18, 2024

@chia7712 I've updated the code with the supplier approach. I was wondering about something like this, but hesitant to change too much code I'm not familiar with 😅. Turns out not to be too bad.

@mumrah mumrah requested review from guozhangwang, ableegoldman and vvcephei and removed request for lucasbru and vvcephei September 18, 2024 01:03
@mumrah mumrah added streams and removed do-not-merge PRs that are only open temporarily and should not be merged labels Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants