Skip to content

Commit

Permalink
move graceful thread shutdown to a helper (#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Oct 13, 2020
1 parent 8dd3933 commit eaa40ed
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.commons.concurrency;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerShutdownHandler extends Thread {
public class GracefulShutdownHandler extends Thread {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerShutdownHandler.class);
private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownHandler.class);
private final Duration terminateWaitDuration;
private final ExecutorService[] threadPools;

public SchedulerShutdownHandler(final ExecutorService... threadPools) {
public GracefulShutdownHandler(Duration terminateWaitDuration, final ExecutorService... threadPools) {
this.terminateWaitDuration = terminateWaitDuration;
this.threadPools = threadPools;
}

Expand All @@ -44,11 +47,11 @@ public void run() {
threadPool.shutdown();

try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
LOGGER.error("Unable to kill worker threads by shutdown timeout.");
if (!threadPool.awaitTermination(terminateWaitDuration.getSeconds(), TimeUnit.SECONDS)) {
LOGGER.error("Unable to kill threads by shutdown timeout.");
}
} catch (InterruptedException e) {
LOGGER.error("Wait for graceful worker thread shutdown interrupted.", e);
LOGGER.error("Wait for graceful thread shutdown interrupted.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@
* SOFTWARE.
*/

package io.airbyte.scheduler;
package io.airbyte.commons.concurrency;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import org.junit.jupiter.api.Test;

class SchedulerShutdownHandlerTest {
class GracefulShutdownHandlerTest {

@Test
public void testRun() throws InterruptedException {
final ExecutorService executorService = mock(ExecutorService.class);
final SchedulerShutdownHandler schedulerShutdownHandler = new SchedulerShutdownHandler(executorService);
schedulerShutdownHandler.start();
schedulerShutdownHandler.join();
final GracefulShutdownHandler gracefulShutdownHandler = new GracefulShutdownHandler(Duration.ofSeconds(30), executorService);
gracefulShutdownHandler.start();
gracefulShutdownHandler.join();

verify(executorService).shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.scheduler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.persistence.ConfigPersistence;
Expand All @@ -36,6 +37,7 @@
import io.airbyte.workers.process.DockerProcessBuilderFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -98,7 +100,7 @@ public void start() {
JOB_SUBMITTER_DELAY_MILLIS,
TimeUnit.MILLISECONDS);

Runtime.getRuntime().addShutdownHook(new SchedulerShutdownHandler(workerThreadPool, scheduledPool));
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(30), workerThreadPool, scheduledPool));
}

public static void main(String[] args) {
Expand Down

0 comments on commit eaa40ed

Please sign in to comment.