Skip to content

Commit

Permalink
Surface any active child thread of dying connectors (#10660)
Browse files Browse the repository at this point in the history
* Interrupt child thread of dying connectors to avoid getting stuck

* Catch and print stacktrace

* Add test on interrupt/kill time outs

* Send message to sentry too
  • Loading branch information
ChristopheDuong authored Mar 3, 2022
1 parent 698ca5e commit 0d38f27
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
Expand All @@ -18,13 +20,21 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.sentry.ITransaction;
import io.sentry.Sentry;
import io.sentry.SentryLevel;
import io.sentry.SpanStatus;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +47,11 @@ public class IntegrationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunner.class);

public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60;
public static final int EXIT_THREAD_DELAY_MINUTES = 70;

public static final int FORCED_EXIT_CODE = 2;

private final IntegrationCliParser cliParser;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Integration integration;
Expand Down Expand Up @@ -74,7 +89,7 @@ public IntegrationRunner(final Source source) {
final Source source,
final JsonSchemaValidator jsonSchemaValidator) {
this(cliParser, outputRecordCollector, destination, source);
this.validator = jsonSchemaValidator;
validator = jsonSchemaValidator;
}

public void run(final String[] args) throws Exception {
Expand Down Expand Up @@ -145,7 +160,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
AirbyteSentry.executeWithTracing("WriteDestination", () -> runConsumer(consumer));
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
Expand All @@ -171,6 +186,88 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc
}
}

private static void runConsumer(final AirbyteMessageConsumer consumer) throws Exception {
watchForOrphanThreads(
() -> consumeWriteStream(consumer),
() -> System.exit(FORCED_EXIT_CODE),
true,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}

/**
* This method calls a runMethod and make sure that it won't produce orphan non-daemon active
* threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread
* is done, whereas daemon ones don't.
*
* If any active non-daemon threads would be left as orphans, this method will schedule some
* interrupt/exit hooks after giving it some time delay to close up properly. It is generally
* preferred to have a proper closing sequence from children threads instead of interrupting or
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
* and sentry for maintainers to fix the code behavior instead.
*/
@VisibleForTesting
static void watchForOrphanThreads(final Procedure runMethod,
final Runnable exitHook,
final boolean sentryEnabled,
final int interruptTimeDelay,
final TimeUnit interruptTimeUnit,
final int exitTimeDelay,
final TimeUnit exitTimeUnit)
throws Exception {
final Thread currentThread = Thread.currentThread();
try {
runMethod.call();
} finally {
final List<Thread> runningThreads = ThreadUtils.getAllThreads()
.stream()
// daemon threads don't block the JVM if the main `currentThread` exits, so they are not problematic
.filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) && !runningThread.isDaemon())
.collect(Collectors.toList());
if (!runningThreads.isEmpty()) {
final StringBuilder sentryMessageBuilder = new StringBuilder();
LOGGER.warn("""
The main thread is exiting while children non-daemon threads from a connector are still active.
Ideally, this situation should not happen...
Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
The main thread is: {}""", dumpThread(currentThread));
sentryMessageBuilder.append("The main thread is exiting while children non-daemon threads are still active.\nMain Thread:")
.append(dumpThread(currentThread));
final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder()
// this thread executor will create daemon threads, so it does not block exiting if all other active
// threads are already stopped.
.daemon(true).build());
for (final Thread runningThread : runningThreads) {
final String str = "Active non-daemon thread: " + dumpThread(runningThread);
LOGGER.warn(str);
sentryMessageBuilder.append(str);
// even though the main thread is already shutting down, we still leave some chances to the children
// threads to close properly on their own.
// So, we schedule an interrupt hook after a fixed time delay instead...
scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit);
}
if (!sentryEnabled) {
Sentry.captureMessage(sentryMessageBuilder.toString(), SentryLevel.WARNING);
}
scheduledExecutorService.schedule(() -> {
if (ThreadUtils.getAllThreads().stream()
.anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) {
LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n");
exitHook.run();
}
}, exitTimeDelay, exitTimeUnit);
}
}
}

private static String dumpThread(final Thread thread) {
return String.format("%s (%s)\n Thread stacktrace: %s", thread.getName(), thread.getState(),
Strings.join(List.of(thread.getStackTrace()), "\n at "));
}

private static void validateConfig(final JsonNode schemaJson, final JsonNode objectJson, final String operationType) throws Exception {
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -38,14 +40,27 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IntegrationRunnerTest {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunnerTest.class);

private static final String CONFIG_FILE_NAME = "config.json";
private static final String CONFIGURED_CATALOG_FILE_NAME = "configured_catalog.json";
private static final String STATE_FILE_NAME = "state.json";
Expand Down Expand Up @@ -82,6 +97,12 @@ void setup() throws IOException {
configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG));
statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE));

final String testName = Thread.currentThread().getName();
ThreadUtils.getAllThreads()
.stream()
.filter(runningThread -> !runningThread.isDaemon())
.forEach(runningThread -> runningThread.setName(testName));
}

@Test
Expand Down Expand Up @@ -275,4 +296,76 @@ void testDestinationConsumerLifecycleFailure() throws Exception {
}
}

@Test
void testInterruptOrphanThreadFailure() {
final String testName = Thread.currentThread().getName();
final List<Exception> caughtExceptions = new ArrayList<>();
startSleepingThread(caughtExceptions, false);
assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads(
() -> {
throw new IOException("random error");
},
Assertions::fail,
false,
3, TimeUnit.SECONDS,
10, TimeUnit.SECONDS));
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
final List<Thread> runningThreads = ThreadUtils.getAllThreads().stream()
.filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName))
.collect(Collectors.toList());
// all threads should be interrupted
assertEquals(List.of(), runningThreads);
assertEquals(1, caughtExceptions.size());
}

@Test
void testNoInterruptOrphanThreadFailure() {
final String testName = Thread.currentThread().getName();
final List<Exception> caughtExceptions = new ArrayList<>();
final AtomicBoolean exitCalled = new AtomicBoolean(false);
startSleepingThread(caughtExceptions, true);
assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads(
() -> {
throw new IOException("random error");
},
() -> exitCalled.set(true),
false,
3, TimeUnit.SECONDS,
10, TimeUnit.SECONDS));
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
final List<Thread> runningThreads = ThreadUtils.getAllThreads().stream()
.filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName))
.collect(Collectors.toList());
// a thread that refuses to be interrupted should remain
assertEquals(1, runningThreads.size());
assertEquals(1, caughtExceptions.size());
assertTrue(exitCalled.get());
}

private void startSleepingThread(final List<Exception> caughtExceptions, final boolean ignoreInterrupt) {
final ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
for (int tries = 0; tries < 3; tries++) {
try {
TimeUnit.MINUTES.sleep(5);
} catch (Exception e) {
LOGGER.info("Caught Exception", e);
caughtExceptions.add(e);
if (!ignoreInterrupt) {
executorService.shutdownNow();
break;
}
}
}
});
}

}

0 comments on commit 0d38f27

Please sign in to comment.