Skip to content

Commit

Permalink
Build failure reasons for synchronous jobs (check/spec/discover) (#14715
Browse files Browse the repository at this point in the history
)

* demo for surfacing synchronous job failures

* add missing changes for StandardDiscoverCatalogOutput impl

* extract trace message failure reason for discover job

* move to using a single pojo to represent synchronous job outputs

* format

* handle new output type in check before sync

* re-genericize DefaultSynchronousSchedulerClient.execute

* fix failing tests

* fix failing scheduler client tests

* get spec returns failure reason from trace message

* build failure reason from trace message for check job

* type safety

* only consider error-type trace messages

* add more tests

* just use nulls

* this was removed but incorrectly re-added when merging master into the branch

* check output version for workflow replay support

* refactor trace message finding to util method

* additionalProperties: true

* add versioning for CheckConnectionWorkflow

* update comment
  • Loading branch information
pedroslopez authored Jul 19, 2022
1 parent 36c659d commit 198e580
Show file tree
Hide file tree
Showing 35 changed files with 826 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ConnectorJobOutput.yaml
title: ConnectorJobOutput
description: connector command job output
type: object
additionalProperties: true
required:
- outputType
properties:
outputType:
type: string
enum:
- checkConnection
- discoverCatalog
- spec
checkConnection:
"$ref": StandardCheckConnectionOutput.yaml
discoverCatalog:
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
spec:
existingJavaType: io.airbyte.protocol.models.ConnectorSpecification
failureReason:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig.ConfigType;
Expand All @@ -22,6 +23,7 @@
import io.airbyte.workers.temporal.TemporalResponse;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -55,6 +57,7 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne
ConfigType.CHECK_CONNECTION_SOURCE,
source.getSourceDefinitionId(),
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
source.getWorkspaceId());
}

Expand All @@ -74,11 +77,13 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
ConfigType.CHECK_CONNECTION_DESTINATION,
destination.getDestinationDefinitionId(),
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
destination.getWorkspaceId());
}

@Override
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) throws IOException {
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage)
throws IOException {
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
source.getSourceDefinitionId(),
source.getWorkspaceId(),
Expand All @@ -91,6 +96,7 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
ConfigType.DISCOVER_SCHEMA,
source.getSourceDefinitionId(),
jobId -> temporalClient.submitDiscoverSchema(UUID.randomUUID(), 0, jobDiscoverCatalogConfig),
ConnectorJobOutput::getDiscoverCatalog,
source.getWorkspaceId());
}

Expand All @@ -102,25 +108,32 @@ public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String
ConfigType.GET_SPEC,
null,
jobId -> temporalClient.submitGetSpec(UUID.randomUUID(), 0, jobSpecConfig),
ConnectorJobOutput::getSpec,
null);
}

@VisibleForTesting
<T> SynchronousResponse<T> execute(final ConfigType configType,
@Nullable final UUID connectorDefinitionId,
final Function<UUID, TemporalResponse<T>> executor,
final UUID workspaceId) {
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
@Nullable final UUID connectorDefinitionId,
final Function<UUID, TemporalResponse<U>> executor,
final Function<U, T> outputMapper,
final UUID workspaceId) {
final long createdAt = Instant.now().toEpochMilli();
final UUID jobId = UUID.randomUUID();
try {
track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null);
final TemporalResponse<T> operationOutput = executor.apply(jobId);
final JobState outputState = operationOutput.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;
track(jobId, configType, connectorDefinitionId, workspaceId, outputState, operationOutput.getOutput().orElse(null));
final long endedAt = Instant.now().toEpochMilli();
final TemporalResponse<U> temporalResponse = executor.apply(jobId);
final Optional<U> jobOutput = temporalResponse.getOutput();
final T mappedOutput = jobOutput.map(outputMapper).orElse(null);
final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;

track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput);
// TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above

final long endedAt = Instant.now().toEpochMilli();
return SynchronousResponse.fromTemporalResponse(
operationOutput,
temporalResponse,
mappedOutput,
jobId,
configType,
connectorDefinitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ public static <T> SynchronousResponse<T> success(final T output, final Synchrono
return new SynchronousResponse<>(output, metadata);
}

public static <T> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<T> temporalResponse,
final UUID id,
final ConfigType configType,
final UUID configId,
final long createdAt,
final long endedAt) {
public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<U> temporalResponse,
final T output,
final UUID id,
final ConfigType configType,
final UUID configId,
final long createdAt,
final long endedAt) {

final SynchronousJobMetadata metadata = SynchronousJobMetadata.fromJobMetadata(
temporalResponse.getMetadata(),
Expand All @@ -36,7 +37,7 @@ public static <T> SynchronousResponse<T> fromTemporalResponse(final TemporalResp
configId,
createdAt,
endedAt);
return new SynchronousResponse<>(temporalResponse.getOutput().orElse(null), metadata);
return new SynchronousResponse<>(output, metadata);
}

public SynchronousResponse(final T output, final SynchronousJobMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig.ConfigType;
Expand Down Expand Up @@ -97,10 +98,11 @@ class ExecuteSynchronousJob {
void testExecuteJobSuccess() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
final Function<String, String> mapperFunction = output -> output;
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));

final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID);
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertEquals("hello", response.getOutput());
Expand All @@ -114,15 +116,36 @@ void testExecuteJobSuccess() {
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.SUCCEEDED));
}

@SuppressWarnings("unchecked")
@Test
void testExecuteMappedOutput() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<Integer>> function = mock(Function.class);
final Function<Integer, String> mapperFunction = Object::toString;
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(42, createMetadata(true)));

final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertEquals("42", response.getOutput());
assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType());
assertTrue(response.getMetadata().getConfigId().isPresent());
assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get());
assertTrue(response.getMetadata().isSucceeded());
assertEquals(LOG_PATH, response.getMetadata().getLogPath());
}

@SuppressWarnings("unchecked")
@Test
void testExecuteJobFailure() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
final Function<String, String> mapperFunction = output -> output;
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(null, createMetadata(false)));

final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID);
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertNull(response.getOutput());
Expand All @@ -141,11 +164,12 @@ void testExecuteJobFailure() {
void testExecuteRuntimeException() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
final Function<String, String> mapperFunction = output -> output;
when(function.apply(any(UUID.class))).thenThrow(new RuntimeException());

assertThrows(
RuntimeException.class,
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID));
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID));

verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED));
Expand All @@ -164,8 +188,9 @@ void testCreateSourceCheckConnectionJob() throws IOException {
.withDockerImage(DOCKER_IMAGE);

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, DOCKER_IMAGE);
assertEquals(mockOutput, response.getOutput());
Expand All @@ -178,8 +203,9 @@ void testCreateDestinationCheckConnectionJob() throws IOException {
.withDockerImage(DOCKER_IMAGE);

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE);
assertEquals(mockOutput, response.getOutput());
Expand All @@ -192,8 +218,9 @@ void testCreateDiscoverSchemaJob() throws IOException {
.withDockerImage(DOCKER_IMAGE);

final AirbyteCatalog mockOutput = mock(AirbyteCatalog.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalog(mockOutput);
when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(jobDiscoverCatalogConfig)))
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<AirbyteCatalog> response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE);
assertEquals(mockOutput, response.getOutput());
}
Expand All @@ -203,8 +230,9 @@ void testCreateGetSpecJob() throws IOException {
final JobGetSpecConfig jobSpecConfig = new JobGetSpecConfig().withDockerImage(DOCKER_IMAGE);

final ConnectorSpecification mockOutput = mock(ConnectorSpecification.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withSpec(mockOutput);
when(temporalClient.submitGetSpec(any(UUID.class), eq(0), eq(jobSpecConfig)))
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<ConnectorSpecification> response = schedulerClient.createGetSpecJob(DOCKER_IMAGE);
assertEquals(mockOutput, response.getOutput());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
final SourceDiscoverSchemaRead returnValue = discoverJobToOutput(response);
if (response.isSuccess()) {
final UUID catalogId = configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash);
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash);
returnValue.catalogId(catalogId);
}
return returnValue;
Expand Down
29 changes: 29 additions & 0 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -100,6 +111,24 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa
.withState(sync.getState());
}

public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType,
final String defaultErrorMessage)
throws WorkerException {
final Optional<AirbyteTraceMessage> traceMessage =
messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream()
.map(AirbyteMessage::getTrace)
.filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR)
.findFirst();

if (traceMessage.isPresent()) {
final FailureReason failureReason = FailureHelper.genericFailure(traceMessage.get(), null, null);
return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason);
}

throw new WorkerException(defaultErrorMessage);
}

public static Map<String, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
return syncInput.getCatalog().getStreams().stream().collect(
Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.workers.general;

import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.workers.Worker;

public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, StandardCheckConnectionOutput> {}
public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, ConnectorJobOutput> {}
Loading

0 comments on commit 198e580

Please sign in to comment.