Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28244 ProcedureTestingUtility.restart is broken sometimes after… #5563

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -605,15 +606,23 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
ThreadFactory backingThreadFactory = new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
};
int size = Math.max(2, Runtime.getRuntime().availableProcessors());
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d").build());
ThreadPoolExecutor executor =
new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d")
.setThreadFactory(backingThreadFactory).build());
executor.allowCoreThreadTimeOut(true);
this.asyncTaskExecutor = executor;
forceUpdateExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build());
store.registerListener(new ProcedureStoreListener() {

@Override
Expand Down Expand Up @@ -684,10 +693,10 @@ public void startWorkers() throws IOException {
}

public void stop() {
if (!running.getAndSet(false)) {
return;
}

// it is possible that we fail in init, while loading procedures, so we will not set running to
// true but we should have already started the ProcedureScheduler, and also the two
// ExecutorServices, so here we do not check running state, just stop them
running.set(false);
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
Expand All @@ -708,14 +717,29 @@ public void join() {
for (WorkerThread worker : workerThreads) {
worker.awaitTermination();
}
try {
if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("There are still pending tasks in forceUpdateExecutor");
}
} catch (InterruptedException e) {
LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e);
Thread.currentThread().interrupt();
}
try {
if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("There are still pending tasks in asyncTaskExecutor");
}
} catch (InterruptedException e) {
LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e);
Thread.currentThread().interrupt();
}

// Destroy the Thread Group for the executors
// TODO: Fix. #join is not place to destroy resources.
try {
threadGroup.destroy();
} catch (IllegalThreadStateException e) {
LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup,
e.getMessage());
LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, e);
// This dumps list of threads on STDOUT.
this.threadGroup.list();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.mutable.MutableBoolean;
Expand Down Expand Up @@ -61,6 +62,14 @@ public static void suspendIfNecessary(Procedure<?> proc,
throws IOException, ProcedureSuspendedException {
MutableBoolean completed = new MutableBoolean(false);
Thread currentThread = Thread.currentThread();
// This is for testing. In ProcedureTestingUtility, we will restart a ProcedureExecutor and
// reuse it, for performance, so we need to make sure that all the procedure have been stopped.
// But here, the callback of this future is not executed in a PEWorker, so in ProcedureExecutor
// we have no way to stop it. So here, we will get the asyncTaskExecutor first, in the PEWorker
// thread, where the ProcedureExecutor should have not been stopped yet, then when calling the
// callback, if the ProcedureExecutor have already been stopped and restarted, the
// asyncTaskExecutor will also be shutdown so we can not add anything back to the scheduler.
ExecutorService asyncTaskExecutor = env.getAsyncTaskExecutor();
FutureUtils.addListener(future, (r, e) -> {
if (Thread.currentThread() == currentThread) {
LOG.debug("The future has completed while adding callback, give up suspending procedure {}",
Expand All @@ -77,7 +86,7 @@ public static void suspendIfNecessary(Procedure<?> proc,
// And what makes things worse is that, we persist procedure state to master local region,
// where the AsyncFSWAL implementation will use the same netty's event loop for dealing with
// I/O, which could even cause dead lock.
env.getAsyncTaskExecutor().execute(() -> {
asyncTaskExecutor.execute(() -> {
// should acquire procedure execution lock to make sure that the procedure executor has
// finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
// race and cause unexpected result
Expand All @@ -89,7 +98,7 @@ public static void suspendIfNecessary(Procedure<?> proc,
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", proc, ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, we didn't realize that this exception was written incorrectly before.

env.getMasterServices().abort("Can not acquire procedure execution lock", ioe);
return;
}
try {
Expand Down