Skip to content

Commit

Permalink
airbyte-common-workers: Collect trace message on failed connection_st…
Browse files Browse the repository at this point in the history
…atus (#20721)
  • Loading branch information
alafanechere authored Jan 12, 2023
1 parent f3c8d83 commit 2dc5b2f
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.workers;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.commons.io.IOs;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardSyncInput;
Expand All @@ -20,6 +20,12 @@
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -111,6 +117,14 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa
.withState(sync.getState());
}

private static ConnectorCommand getConnectorCommandFromOutputType(final OutputType outputType) {
return switch (outputType) {
case SPEC -> ConnectorCommand.SPEC;
case CHECK_CONNECTION -> ConnectorCommand.CHECK;
case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER;
};
}

public static Optional<AirbyteControlConnectorConfigMessage> getMostRecentConfigControlMessage(final Map<Type, List<AirbyteMessage>> messagesByType) {
return messagesByType.getOrDefault(Type.CONTROL, new ArrayList<>()).stream()
.map(AirbyteMessage::getControl)
Expand All @@ -119,28 +133,35 @@ public static Optional<AirbyteControlConnectorConfigMessage> getMostRecentConfig
.reduce((first, second) -> second);
}

public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType,
final String defaultErrorMessage)
throws WorkerException {
final Optional<AirbyteTraceMessage> traceMessage =
messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream()
.map(AirbyteMessage::getTrace)
.filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR)
.findFirst();
private static Optional<AirbyteTraceMessage> getTraceMessageFromMessagesByType(final Map<Type, List<AirbyteMessage>> messagesByType) {
return messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream()
.map(AirbyteMessage::getTrace)
.filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR)
.findFirst();
}

public static Map<Type, List<AirbyteMessage>> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut)
throws IOException {
final Map<Type, List<AirbyteMessage>> messagesByType;
try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

WorkerUtils.gentleClose(process, timeOut, TimeUnit.MINUTES);
return messagesByType;
}
}

public static Optional<FailureReason> getJobFailureReasonFromMessages(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType) {
final Optional<AirbyteTraceMessage> traceMessage = getTraceMessageFromMessagesByType(messagesByType);
if (traceMessage.isPresent()) {
final ConnectorCommand connectorCommand = switch (outputType) {
case SPEC -> ConnectorCommand.SPEC;
case CHECK_CONNECTION -> ConnectorCommand.CHECK;
case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER;
};

final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand);
return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason);
final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType);
return Optional.of(FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand));
} else {
return Optional.empty();
}

throw new WorkerException(defaultErrorMessage);
}

public static Map<AirbyteStreamNameNamespacePair, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
Expand All @@ -151,4 +172,25 @@ public static Map<AirbyteStreamNameNamespacePair, JsonNode> mapStreamNamesToSche

}

public static String getStdErrFromErrorStream(final InputStream errorStream) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream, StandardCharsets.UTF_8));
final StringBuilder errorOutput = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
errorOutput.append(line);
errorOutput.append(System.lineSeparator());
}
return errorOutput.toString();
}

public static void throwWorkerException(final String errorMessage, final Process process)
throws WorkerException, IOException {
final String stderr = getStdErrFromErrorStream(process.getErrorStream());
if (stderr.isEmpty()) {
throw new WorkerException(errorMessage);
} else {
throw new WorkerException(errorMessage + ": \n" + stderr);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

import datadog.trace.api.Trace;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
Expand All @@ -29,14 +29,11 @@
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,18 +70,12 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(input.getConnectionConfiguration()));

LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType;
try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION);

WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
}
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final int exitCode = process.exitValue();
final Optional<AirbyteConnectionStatus> status = messagesByType
final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);
final Optional<AirbyteConnectionStatus> connectionStatus = messagesByType
.getOrDefault(Type.CONNECTION_STATUS, new ArrayList<>()).stream()
.map(AirbyteMessage::getConnectionStatus)
.findFirst();
Expand All @@ -104,25 +95,30 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
});
}

if (status.isPresent() && exitCode == 0) {
final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);

final int exitCode = process.exitValue();
if (exitCode != 0) {
LOGGER.warn("Check connection job subprocess finished with exit code {}", exitCode);
}

if (connectionStatus.isPresent()) {
final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput()
.withStatus(Enums.convertTo(status.get().getStatus(), Status.class))
.withMessage(status.get().getMessage());

LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode);
LOGGER.debug("Check connection job received output: {}", output);
LineGobbler.endSection("CHECK");
return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output);
} else {
final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode);
LOGGER.error(message);

return WorkerUtils.getJobFailureOutputOrThrow(OutputType.CHECK_CONNECTION, messagesByType, message);
.withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class))
.withMessage(connectionStatus.get().getMessage());
LOGGER.info("Check connection job received output: {}", output);
jobOutput.setCheckConnection(output);
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Error checking connection status: no status nor failure reason were outputted", process);
}
LineGobbler.endSection("CHECK");
return jobOutput;

} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unexpected error while checking connection: ", e);
LineGobbler.endSection("CHECK");
throw new WorkerException("Unexpected error while getting checking connection.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
Expand All @@ -29,16 +29,13 @@
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,16 +77,10 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(discoverSchemaInput.getConnectionConfiguration()));

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType;

try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES);
}
final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);

final Optional<AirbyteCatalog> catalog = messagesByType
.getOrDefault(Type.CATALOG, new ArrayList<>()).stream()
Expand All @@ -102,26 +93,27 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
UUID.fromString(discoverSchemaInput.getSourceId()),
configMessage.getConfig()));

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);

final int exitCode = process.exitValue();
if (exitCode == 0) {
if (catalog.isEmpty()) {
throw new WorkerException("Integration failed to output a catalog struct.");
}
if (exitCode != 0) {
LOGGER.warn("Discover job subprocess finished with exit codee {}", exitCode);
}

if (catalog.isPresent()) {
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(catalog.get(),
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()),
discoverSchemaInput.getConnectorVersion(),
discoverSchemaInput.getConfigHash());
return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId);
} else {
return WorkerUtils.getJobFailureOutputOrThrow(
OutputType.DISCOVER_CATALOG_ID,
messagesByType,
String.format("Discover job subprocess finished with exit code %s", exitCode));
jobOutput.setDiscoverCatalogId(catalogId);
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process);
}
return jobOutput;
} catch (final WorkerException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -23,14 +23,11 @@
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +37,6 @@ public class DefaultGetSpecWorker implements GetSpecWorker {

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;

private Process process;

public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher,
Expand All @@ -60,38 +56,31 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot)
try {
process = integrationLauncher.spec(jobRoot);

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType;
try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

// todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for
// this.
// retrieving spec should generally be instantaneous, but since docker images might not be pulled
// it could take a while longer depending on internet conditions as well.
WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES);
}
final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);

final Optional<ConnectorSpecification> spec = messagesByType
.getOrDefault(Type.SPEC, new ArrayList<>()).stream()
.map(AirbyteMessage::getSpec)
.findFirst();

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);

final int exitCode = process.exitValue();
if (exitCode == 0) {
if (spec.isEmpty()) {
throw new WorkerException("integration failed to output a spec struct.");
}

return new ConnectorJobOutput().withOutputType(OutputType.SPEC).withSpec(spec.get());
} else {
return WorkerUtils.getJobFailureOutputOrThrow(
OutputType.SPEC,
messagesByType,
String.format("Spec job subprocess finished with exit code %s", exitCode));
if (exitCode != 0) {
LOGGER.warn("Spec job subprocess finished with exit code {}", exitCode);
}

if (spec.isPresent()) {
jobOutput.setSpec(spec.get());
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a spec struct and did not output a failure reason", process);
}

return jobOutput;
} catch (final Exception e) {
throw new WorkerException(String.format("Error while getting spec from image %s", config.getDockerImage()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<Sta
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));

if (response.isSuccess()) {
if (response.getOutput() != null) {
checkConnectionRead
.status(Enums.convertTo(response.getOutput().getStatus(), StatusEnum.class))
.message(response.getOutput().getMessage());
Expand Down
Loading

0 comments on commit 2dc5b2f

Please sign in to comment.