diff --git a/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml new file mode 100644 index 000000000000..7611cae277c0 --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml @@ -0,0 +1,24 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ConnectorJobOutput.yaml +title: ConnectorJobOutput +description: connector command job output +type: object +additionalProperties: true +required: + - outputType +properties: + outputType: + type: string + enum: + - checkConnection + - discoverCatalog + - spec + checkConnection: + "$ref": StandardCheckConnectionOutput.yaml + discoverCatalog: + existingJavaType: io.airbyte.protocol.models.AirbyteCatalog + spec: + existingJavaType: io.airbyte.protocol.models.ConnectorSpecification + failureReason: + "$ref": FailureReason.yaml diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java index d110b9e507d3..2a8dc2929407 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -22,6 +23,7 @@ import io.airbyte.workers.temporal.TemporalResponse; import java.io.IOException; import java.time.Instant; +import java.util.Optional; import java.util.UUID; import java.util.function.Function; import javax.annotation.Nullable; @@ -55,6 +57,7 @@ public SynchronousResponse createSourceCheckConne ConfigType.CHECK_CONNECTION_SOURCE, source.getSourceDefinitionId(), jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig), + ConnectorJobOutput::getCheckConnection, source.getWorkspaceId()); } @@ -74,11 +77,13 @@ public SynchronousResponse createDestinationCheck ConfigType.CHECK_CONNECTION_DESTINATION, destination.getDestinationDefinitionId(), jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig), + ConnectorJobOutput::getCheckConnection, destination.getWorkspaceId()); } @Override - public SynchronousResponse createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) throws IOException { + public SynchronousResponse createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) + throws IOException { final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters( source.getSourceDefinitionId(), source.getWorkspaceId(), @@ -91,6 +96,7 @@ public SynchronousResponse createDiscoverSchemaJob(final SourceC ConfigType.DISCOVER_SCHEMA, source.getSourceDefinitionId(), jobId -> temporalClient.submitDiscoverSchema(UUID.randomUUID(), 0, jobDiscoverCatalogConfig), + ConnectorJobOutput::getDiscoverCatalog, source.getWorkspaceId()); } @@ -102,25 +108,32 @@ public SynchronousResponse createGetSpecJob(final String ConfigType.GET_SPEC, null, jobId -> temporalClient.submitGetSpec(UUID.randomUUID(), 0, jobSpecConfig), + ConnectorJobOutput::getSpec, null); } @VisibleForTesting - SynchronousResponse execute(final ConfigType configType, - @Nullable final UUID connectorDefinitionId, - final Function> executor, - final UUID workspaceId) { + SynchronousResponse execute(final ConfigType configType, + @Nullable final UUID connectorDefinitionId, + final Function> executor, + final Function outputMapper, + final UUID workspaceId) { final long createdAt = Instant.now().toEpochMilli(); final UUID jobId = UUID.randomUUID(); try { track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null); - final TemporalResponse operationOutput = executor.apply(jobId); - final JobState outputState = operationOutput.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED; - track(jobId, configType, connectorDefinitionId, workspaceId, outputState, operationOutput.getOutput().orElse(null)); - final long endedAt = Instant.now().toEpochMilli(); + final TemporalResponse temporalResponse = executor.apply(jobId); + final Optional jobOutput = temporalResponse.getOutput(); + final T mappedOutput = jobOutput.map(outputMapper).orElse(null); + final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED; + track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput); + // TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above + + final long endedAt = Instant.now().toEpochMilli(); return SynchronousResponse.fromTemporalResponse( - operationOutput, + temporalResponse, + mappedOutput, jobId, configType, connectorDefinitionId, diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousResponse.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousResponse.java index 515ee8c08569..524eca3a28b1 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousResponse.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousResponse.java @@ -22,12 +22,13 @@ public static SynchronousResponse success(final T output, final Synchrono return new SynchronousResponse<>(output, metadata); } - public static SynchronousResponse fromTemporalResponse(final TemporalResponse temporalResponse, - final UUID id, - final ConfigType configType, - final UUID configId, - final long createdAt, - final long endedAt) { + public static SynchronousResponse fromTemporalResponse(final TemporalResponse temporalResponse, + final T output, + final UUID id, + final ConfigType configType, + final UUID configId, + final long createdAt, + final long endedAt) { final SynchronousJobMetadata metadata = SynchronousJobMetadata.fromJobMetadata( temporalResponse.getMetadata(), @@ -36,7 +37,7 @@ public static SynchronousResponse fromTemporalResponse(final TemporalResp configId, createdAt, endedAt); - return new SynchronousResponse<>(temporalResponse.getOutput().orElse(null), metadata); + return new SynchronousResponse<>(output, metadata); } public SynchronousResponse(final T output, final SynchronousJobMetadata metadata) { diff --git a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java index f6b24f5c9db0..55687e5cfcc0 100644 --- a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -97,10 +98,11 @@ class ExecuteSynchronousJob { void testExecuteJobSuccess() { final UUID sourceDefinitionId = UUID.randomUUID(); final Function> function = mock(Function.class); + final Function mapperFunction = output -> output; when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>("hello", createMetadata(true))); final SynchronousResponse response = schedulerClient - .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID); + .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); assertEquals("hello", response.getOutput()); @@ -114,15 +116,36 @@ void testExecuteJobSuccess() { verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.SUCCEEDED)); } + @SuppressWarnings("unchecked") + @Test + void testExecuteMappedOutput() { + final UUID sourceDefinitionId = UUID.randomUUID(); + final Function> function = mock(Function.class); + final Function mapperFunction = Object::toString; + when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(42, createMetadata(true))); + + final SynchronousResponse response = schedulerClient + .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); + + assertNotNull(response); + assertEquals("42", response.getOutput()); + assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType()); + assertTrue(response.getMetadata().getConfigId().isPresent()); + assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get()); + assertTrue(response.getMetadata().isSucceeded()); + assertEquals(LOG_PATH, response.getMetadata().getLogPath()); + } + @SuppressWarnings("unchecked") @Test void testExecuteJobFailure() { final UUID sourceDefinitionId = UUID.randomUUID(); final Function> function = mock(Function.class); + final Function mapperFunction = output -> output; when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(null, createMetadata(false))); final SynchronousResponse response = schedulerClient - .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID); + .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); assertNull(response.getOutput()); @@ -141,11 +164,12 @@ void testExecuteJobFailure() { void testExecuteRuntimeException() { final UUID sourceDefinitionId = UUID.randomUUID(); final Function> function = mock(Function.class); + final Function mapperFunction = output -> output; when(function.apply(any(UUID.class))).thenThrow(new RuntimeException()); assertThrows( RuntimeException.class, - () -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID)); + () -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID)); verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED)); verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED)); @@ -164,8 +188,9 @@ void testCreateSourceCheckConnectionJob() throws IOException { .withDockerImage(DOCKER_IMAGE); final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput); when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig))) - .thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true))); + .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, DOCKER_IMAGE); assertEquals(mockOutput, response.getOutput()); @@ -178,8 +203,9 @@ void testCreateDestinationCheckConnectionJob() throws IOException { .withDockerImage(DOCKER_IMAGE); final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput); when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig))) - .thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true))); + .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE); assertEquals(mockOutput, response.getOutput()); @@ -192,8 +218,9 @@ void testCreateDiscoverSchemaJob() throws IOException { .withDockerImage(DOCKER_IMAGE); final AirbyteCatalog mockOutput = mock(AirbyteCatalog.class); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalog(mockOutput); when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(jobDiscoverCatalogConfig))) - .thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true))); + .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE); assertEquals(mockOutput, response.getOutput()); } @@ -203,8 +230,9 @@ void testCreateGetSpecJob() throws IOException { final JobGetSpecConfig jobSpecConfig = new JobGetSpecConfig().withDockerImage(DOCKER_IMAGE); final ConnectorSpecification mockOutput = mock(ConnectorSpecification.class); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withSpec(mockOutput); when(temporalClient.submitGetSpec(any(UUID.class), eq(0), eq(jobSpecConfig))) - .thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true))); + .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createGetSpecJob(DOCKER_IMAGE); assertEquals(mockOutput, response.getOutput()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index cbdb54bd451d..c3c06fb21463 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -221,7 +221,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source final SynchronousResponse response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName); final SourceDiscoverSchemaRead returnValue = discoverJobToOutput(response); if (response.isSuccess()) { - final UUID catalogId = configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash); + final UUID catalogId = + configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash); returnValue.catalogId(catalogId); } return returnValue; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index e6b58e580696..6948c70e8fb3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -6,16 +6,27 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.config.WorkerSourceConfig; import io.airbyte.config.helpers.LogClientSingleton; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.helper.FailureHelper; import java.nio.file.Path; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -100,6 +111,24 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa .withState(sync.getState()); } + public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType, + final Map> messagesByType, + final String defaultErrorMessage) + throws WorkerException { + final Optional traceMessage = + messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream() + .map(AirbyteMessage::getTrace) + .filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR) + .findFirst(); + + if (traceMessage.isPresent()) { + final FailureReason failureReason = FailureHelper.genericFailure(traceMessage.get(), null, null); + return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason); + } + + throw new WorkerException(defaultErrorMessage); + } + public static Map mapStreamNamesToSchemas(final StandardSyncInput syncInput) { return syncInput.getCatalog().getStreams().stream().collect( Collectors.toMap( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/CheckConnectionWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/CheckConnectionWorker.java index ae93ad544b4c..559838b5690d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/CheckConnectionWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/CheckConnectionWorker.java @@ -4,8 +4,8 @@ package io.airbyte.workers.general; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardCheckConnectionInput; -import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.workers.Worker; -public interface CheckConnectionWorker extends Worker {} +public interface CheckConnectionWorker extends Worker {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 42c28c3348bd..08bc8e8513e9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -8,6 +8,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; @@ -23,8 +25,12 @@ import io.airbyte.workers.process.IntegrationLauncher; import java.io.InputStream; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +57,7 @@ public DefaultCheckConnectionWorker(final WorkerConfigs workerConfigs, final Int } @Override - public StandardCheckConnectionOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException { + public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException { try { process = integrationLauncher.check( @@ -61,16 +67,19 @@ public StandardCheckConnectionOutput run(final StandardCheckConnectionInput inpu LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Optional status; + final Map> messagesByType; try (final InputStream stdout = process.getInputStream()) { - status = streamFactory.create(IOs.newBufferedReader(stdout)) - .filter(message -> message.getType() == Type.CONNECTION_STATUS) - .map(AirbyteMessage::getConnectionStatus).findFirst(); + messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) + .collect(Collectors.groupingBy(AirbyteMessage::getType)); WorkerUtils.gentleClose(workerConfigs, process, 1, TimeUnit.MINUTES); } final int exitCode = process.exitValue(); + final Optional status = messagesByType + .getOrDefault(Type.CONNECTION_STATUS, new ArrayList<>()).stream() + .map(AirbyteMessage::getConnectionStatus) + .findFirst(); if (status.isPresent() && exitCode == 0) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() @@ -79,14 +88,12 @@ public StandardCheckConnectionOutput run(final StandardCheckConnectionInput inpu LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); LOGGER.debug("Check connection job received output: {}", output); - return output; + return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output); } else { final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode); - LOGGER.error(message); - return new StandardCheckConnectionOutput() - .withStatus(Status.FAILED) - .withMessage(message); + + return WorkerUtils.getJobFailureOutputOrThrow(OutputType.CHECK_CONNECTION, messagesByType, message); } } catch (final Exception e) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 1bc624d94daa..474d206137f3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -7,6 +7,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; @@ -18,8 +20,12 @@ import io.airbyte.workers.process.IntegrationLauncher; import java.io.InputStream; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +53,7 @@ public DefaultDiscoverCatalogWorker(final WorkerConfigs workerConfigs, final Int } @Override - public AirbyteCatalog run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException { + public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException { try { process = integrationLauncher.discover( jobRoot, @@ -56,16 +62,20 @@ public AirbyteCatalog run(final StandardDiscoverCatalogInput discoverSchemaInput LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Optional catalog; + final Map> messagesByType; + try (final InputStream stdout = process.getInputStream()) { - catalog = streamFactory.create(IOs.newBufferedReader(stdout)) - .filter(message -> message.getType() == Type.CATALOG) - .map(AirbyteMessage::getCatalog) - .findFirst(); + messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) + .collect(Collectors.groupingBy(AirbyteMessage::getType)); WorkerUtils.gentleClose(workerConfigs, process, 30, TimeUnit.MINUTES); } + final Optional catalog = messagesByType + .getOrDefault(Type.CATALOG, new ArrayList<>()).stream() + .map(AirbyteMessage::getCatalog) + .findFirst(); + final int exitCode = process.exitValue(); if (exitCode == 0) { if (catalog.isEmpty()) { @@ -78,9 +88,12 @@ public AirbyteCatalog run(final StandardDiscoverCatalogInput discoverSchemaInput throw new WorkerException("Output a catalog struct bigger than 4mb. Larger than grpc max message limit."); } - return catalog.get(); + return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG).withDiscoverCatalog(catalog.get()); } else { - throw new WorkerException(String.format("Discover job subprocess finished with exit code %s", exitCode)); + return WorkerUtils.getJobFailureOutputOrThrow( + OutputType.DISCOVER_CATALOG, + messagesByType, + String.format("Discover job subprocess finished with exit code %s", exitCode)); } } catch (final WorkerException e) { throw e; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 5021ac3ac4e8..711ce2ca18b7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -6,6 +6,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -18,8 +20,12 @@ import io.airbyte.workers.process.IntegrationLauncher; import java.io.InputStream; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,18 +52,16 @@ public DefaultGetSpecWorker(final WorkerConfigs workerConfigs, final Integration } @Override - public ConnectorSpecification run(final JobGetSpecConfig config, final Path jobRoot) throws WorkerException { + public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) throws WorkerException { try { process = integrationLauncher.spec(jobRoot); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Optional spec; + final Map> messagesByType; try (final InputStream stdout = process.getInputStream()) { - spec = streamFactory.create(IOs.newBufferedReader(stdout)) - .filter(message -> message.getType() == Type.SPEC) - .map(AirbyteMessage::getSpec) - .findFirst(); + messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) + .collect(Collectors.groupingBy(AirbyteMessage::getType)); // todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for // this. @@ -66,16 +70,23 @@ public ConnectorSpecification run(final JobGetSpecConfig config, final Path jobR WorkerUtils.gentleClose(workerConfigs, process, 30, TimeUnit.MINUTES); } + final Optional spec = messagesByType + .getOrDefault(Type.SPEC, new ArrayList<>()).stream() + .map(AirbyteMessage::getSpec) + .findFirst();; + final int exitCode = process.exitValue(); if (exitCode == 0) { if (spec.isEmpty()) { throw new WorkerException("integration failed to output a spec struct."); } - return spec.get(); - + return new ConnectorJobOutput().withOutputType(OutputType.SPEC).withSpec(spec.get()); } else { - throw new WorkerException(String.format("Spec job subprocess finished with exit code %s", exitCode)); + return WorkerUtils.getJobFailureOutputOrThrow( + OutputType.SPEC, + messagesByType, + String.format("Spec job subprocess finished with exit code %s", exitCode)); } } catch (final Exception e) { throw new WorkerException(String.format("Error while getting spec from image %s", config.getDockerImage()), e); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DiscoverCatalogWorker.java index d5b5f9167c03..c2f7eccc3cdf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DiscoverCatalogWorker.java @@ -4,8 +4,8 @@ package io.airbyte.workers.general; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.workers.Worker; -public interface DiscoverCatalogWorker extends Worker {} +public interface DiscoverCatalogWorker extends Worker {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/GetSpecWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/GetSpecWorker.java index c184dcec9ffd..58dae6388419 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/GetSpecWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/GetSpecWorker.java @@ -4,8 +4,8 @@ package io.airbyte.workers.general; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobGetSpecConfig; -import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.workers.Worker; -public interface GetSpecWorker extends Worker {} +public interface GetSpecWorker extends Worker {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java b/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java index b3560401bd19..d65f4c54ed47 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java @@ -56,6 +56,7 @@ public static FailureReason genericFailure(final AirbyteTraceMessage m, final Lo } return new FailureReason() .withInternalMessage(m.getError().getInternalMessage()) + .withExternalMessage(m.getError().getMessage()) .withStacktrace(m.getError().getStackTrace()) .withTimestamp(m.getEmittedAt().longValue()) .withFailureType(failureType) @@ -70,8 +71,7 @@ public static FailureReason sourceFailure(final Throwable t, final Long jobId, f public static FailureReason sourceFailure(final AirbyteTraceMessage m, final Long jobId, final Integer attemptNumber) { return genericFailure(m, jobId, attemptNumber) - .withFailureOrigin(FailureOrigin.SOURCE) - .withExternalMessage(m.getError().getMessage()); + .withFailureOrigin(FailureOrigin.SOURCE); } public static FailureReason destinationFailure(final Throwable t, final Long jobId, final Integer attemptNumber) { @@ -82,11 +82,13 @@ public static FailureReason destinationFailure(final Throwable t, final Long job public static FailureReason destinationFailure(final AirbyteTraceMessage m, final Long jobId, final Integer attemptNumber) { return genericFailure(m, jobId, attemptNumber) - .withFailureOrigin(FailureOrigin.DESTINATION) - .withExternalMessage(m.getError().getMessage()); + .withFailureOrigin(FailureOrigin.DESTINATION); } - public static FailureReason checkFailure(final Throwable t, final Long jobId, final Integer attemptNumber, FailureReason.FailureOrigin origin) { + public static FailureReason checkFailure(final Throwable t, + final Long jobId, + final Integer attemptNumber, + final FailureReason.FailureOrigin origin) { return genericFailure(t, jobId, attemptNumber) .withFailureOrigin(origin) .withFailureType(FailureReason.FailureType.CONFIG_ERROR) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 6332eb639fba..2689c281e09d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -8,18 +8,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardCheckConnectionInput; -import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.persistence.StreamResetPersistence; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; @@ -89,7 +87,7 @@ public void dangerouslyTerminateWorkflow(final String workflowId, final String r this.client.newUntypedWorkflowStub(workflowId).terminate(reason); } - public TemporalResponse submitGetSpec(final UUID jobId, final int attempt, final JobGetSpecConfig config) { + public TemporalResponse submitGetSpec(final UUID jobId, final int attempt, final JobGetSpecConfig config) { final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig() @@ -101,9 +99,9 @@ public TemporalResponse submitGetSpec(final UUID jobId, } - public TemporalResponse submitCheckConnection(final UUID jobId, - final int attempt, - final JobCheckConnectionConfig config) { + public TemporalResponse submitCheckConnection(final UUID jobId, + final int attempt, + final JobCheckConnectionConfig config) { final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig() .withJobId(jobId.toString()) @@ -115,7 +113,9 @@ public TemporalResponse submitCheckConnection(fin () -> getWorkflowStub(CheckConnectionWorkflow.class, TemporalJobType.CHECK_CONNECTION).run(jobRunConfig, launcherConfig, input)); } - public TemporalResponse submitDiscoverSchema(final UUID jobId, final int attempt, final JobDiscoverCatalogConfig config) { + public TemporalResponse submitDiscoverSchema(final UUID jobId, + final int attempt, + final JobDiscoverCatalogConfig config) { final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig() .withJobId(jobId.toString()) @@ -450,6 +450,10 @@ private T getWorkflowStub(final Class workflowClass, final TemporalJobTyp return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptions(jobType)); } + private boolean getConnectorJobSucceeded(final ConnectorJobOutput output) { + return output.getFailureReason() == null; + } + @VisibleForTesting TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig); @@ -464,7 +468,12 @@ TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier< exception = e; } - final JobMetadata metadata = new JobMetadata(exception == null, logPath); + boolean succeeded = exception == null; + if (succeeded && operationOutput instanceof ConnectorJobOutput) { + succeeded = getConnectorJobSucceeded((ConnectorJobOutput) operationOutput); + } + + final JobMetadata metadata = new JobMetadata(succeeded, logPath); return new TemporalResponse<>(operationOutput, metadata); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java index ac0569f23259..c42a71deae60 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.check.connection; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; @@ -28,6 +29,9 @@ class CheckConnectionInput { } + @ActivityMethod + ConnectorJobOutput runWithJobOutput(CheckConnectionInput input); + @ActivityMethod StandardCheckConnectionOutput run(CheckConnectionInput input); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 74d3e3336216..f37011753113 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -7,8 +7,10 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.scheduler.models.IntegrationLauncherConfig; @@ -54,7 +56,7 @@ public CheckConnectionActivityImpl(final WorkerConfigs workerConfigs, this.airbyteVersion = airbyteVersion; } - public StandardCheckConnectionOutput run(final CheckConnectionInput args) { + public ConnectorJobOutput runWithJobOutput(final CheckConnectionInput args) { final JsonNode fullConfig = secretsHydrator.hydrate(args.getConnectionConfiguration().getConnectionConfiguration()); final StandardCheckConnectionInput input = new StandardCheckConnectionInput() @@ -62,7 +64,7 @@ public StandardCheckConnectionOutput run(final CheckConnectionInput args) { final ActivityExecutionContext context = Activity.getExecutionContext(); - final TemporalAttemptExecution temporalAttemptExecution = + final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( workspaceRoot, workerEnvironment, logConfigs, args.getJobRunConfig(), @@ -76,8 +78,17 @@ public StandardCheckConnectionOutput run(final CheckConnectionInput args) { return temporalAttemptExecution.get(); } - private CheckedSupplier, Exception> getWorkerFactory( - final IntegrationLauncherConfig launcherConfig) { + public StandardCheckConnectionOutput run(final CheckConnectionInput args) { + final ConnectorJobOutput output = runWithJobOutput(args); + if (output.getFailureReason() != null) { + return new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("Error checking connection"); + } + + return output.getCheckConnection(); + } + + private CheckedSupplier, Exception> getWorkerFactory( + final IntegrationLauncherConfig launcherConfig) { return () -> { final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( launcherConfig.getJobId(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java index 9d9290582022..009a361d1022 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.check.connection; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardCheckConnectionInput; -import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.workflow.WorkflowInterface; @@ -15,8 +15,8 @@ public interface CheckConnectionWorkflow { @WorkflowMethod - StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardCheckConnectionInput connectionConfiguration); + ConnectorJobOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardCheckConnectionInput connectionConfiguration); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java index 015729d45083..4728ba07f9d3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java @@ -4,6 +4,8 @@ package io.airbyte.workers.temporal.check.connection; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; @@ -17,12 +19,24 @@ public class CheckConnectionWorkflowImpl implements CheckConnectionWorkflow { private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.CHECK_ACTIVITY_OPTIONS); + private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output"; + private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1; + @Override - public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardCheckConnectionInput connectionConfiguration) { + public ConnectorJobOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardCheckConnectionInput connectionConfiguration) { + final CheckConnectionInput checkInput = new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration); + + final int jobOutputVersion = + Workflow.getVersion(CHECK_JOB_OUTPUT_TAG, Workflow.DEFAULT_VERSION, CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION); + + if (jobOutputVersion < CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION) { + final StandardCheckConnectionOutput checkOutput = activity.run(checkInput); + return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(checkOutput); + } - return activity.run(new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration)); + return activity.runWithJobOutput(checkInput); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java index 625aa340644d..5a006b77879d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.discover.catalog; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.activity.ActivityInterface; @@ -15,8 +15,8 @@ public interface DiscoverCatalogActivity { @ActivityMethod - AirbyteCatalog run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardDiscoverCatalogInput config); + ConnectorJobOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardDiscoverCatalogInput config); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index ebe914bb8807..69903615e9a0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -7,10 +7,10 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.scheduler.persistence.JobPersistence; @@ -57,9 +57,9 @@ public DiscoverCatalogActivityImpl(final WorkerConfigs workerConfigs, this.airbyteVersion = airbyteVersion; } - public AirbyteCatalog run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardDiscoverCatalogInput config) { + public ConnectorJobOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardDiscoverCatalogInput config) { final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration()); @@ -68,22 +68,23 @@ public AirbyteCatalog run(final JobRunConfig jobRunConfig, final ActivityExecutionContext context = Activity.getExecutionContext(); - final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( - workspaceRoot, - workerEnvironment, - logConfigs, - jobRunConfig, - getWorkerFactory(launcherConfig), - () -> input, - new CancellationHandler.TemporalCancellationHandler(context), - jobPersistence, - airbyteVersion, - () -> context); + final TemporalAttemptExecution temporalAttemptExecution = + new TemporalAttemptExecution<>( + workspaceRoot, + workerEnvironment, + logConfigs, + jobRunConfig, + getWorkerFactory(launcherConfig), + () -> input, + new CancellationHandler.TemporalCancellationHandler(context), + jobPersistence, + airbyteVersion, + () -> context); return temporalAttemptExecution.get(); } - private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { + private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { return () -> { final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java index 75a23b1ef630..243300e76f45 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.discover.catalog; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.workflow.WorkflowInterface; @@ -15,8 +15,8 @@ public interface DiscoverCatalogWorkflow { @WorkflowMethod - AirbyteCatalog run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardDiscoverCatalogInput config); + ConnectorJobOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardDiscoverCatalogInput config); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java index 17bcab75a5de..d0c68d7470bf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.discover.catalog; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.TemporalUtils; @@ -22,9 +22,9 @@ public class DiscoverCatalogWorkflowImpl implements DiscoverCatalogWorkflow { private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options); @Override - public AirbyteCatalog run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardDiscoverCatalogInput config) { + public ConnectorJobOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardDiscoverCatalogInput config) { return activity.run(jobRunConfig, launcherConfig, config); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index d294a2019c09..543ca47c4ec8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -5,12 +5,13 @@ package io.airbyte.workers.temporal.scheduling; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.EnvConfigs; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureType; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; -import io.airbyte.config.StandardCheckConnectionOutput.Status; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; @@ -91,6 +92,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync"; private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1; + private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output"; + private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1; + private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams"; private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1; @@ -325,6 +329,18 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, } } + private ConnectorJobOutput getCheckResponse(final CheckConnectionInput checkInput) { + final int checkJobOutputVersion = + Workflow.getVersion(CHECK_JOB_OUTPUT_TAG, Workflow.DEFAULT_VERSION, CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION); + + if (checkJobOutputVersion < CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION) { + final StandardCheckConnectionOutput checkOutput = runMandatoryActivityWithOutput(checkActivity::run, checkInput); + return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(checkOutput); + } + + return runMandatoryActivityWithOutput(checkActivity::runWithJobOutput, checkInput); + } + private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.GeneratedJobInput jobInputs) { final JobRunConfig jobRunConfig = jobInputs.getJobRunConfig(); final StandardSyncInput syncInput = jobInputs.getSyncInput(); @@ -350,8 +366,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity. log.info("SOURCE CHECK: Skipped"); } else { log.info("SOURCE CHECK: Starting"); - final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); - if (sourceCheckResponse.getStatus() == Status.FAILED) { + final ConnectorJobOutput sourceCheckResponse = getCheckResponse(checkSourceInput); + if (SyncCheckConnectionFailure.isOutputFailed(sourceCheckResponse)) { checkFailure.setFailureOrigin(FailureReason.FailureOrigin.SOURCE); checkFailure.setFailureOutput(sourceCheckResponse); log.info("SOURCE CHECK: Failed"); @@ -367,8 +383,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity. log.info("DESTINATION CHECK: Skipped"); } else { log.info("DESTINATION CHECK: Starting"); - final StandardCheckConnectionOutput destinationCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkDestinationInput); - if (destinationCheckResponse.getStatus() == Status.FAILED) { + final ConnectorJobOutput destinationCheckResponse = getCheckResponse(checkDestinationInput); + if (SyncCheckConnectionFailure.isOutputFailed(destinationCheckResponse)) { checkFailure.setFailureOrigin(FailureReason.FailureOrigin.DESTINATION); checkFailure.setFailureOutput(destinationCheckResponse); log.info("DESTINATION CHECK: Failed"); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java index f4f4c94627f0..ac3c623bbe0f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java @@ -4,6 +4,8 @@ package io.airbyte.workers.temporal.scheduling; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardSyncOutput; @@ -19,17 +21,17 @@ public class SyncCheckConnectionFailure { private final Long jobId; private final Integer attemptId; - private StandardCheckConnectionOutput failureOutput; + private ConnectorJobOutput failureOutput; private FailureReason.FailureOrigin origin = null; - public SyncCheckConnectionFailure(JobRunConfig jobRunConfig) { + public SyncCheckConnectionFailure(final JobRunConfig jobRunConfig) { Long jobId = 0L; Integer attemptId = 0; try { jobId = Long.valueOf(jobRunConfig.getJobId()); attemptId = Math.toIntExact(jobRunConfig.getAttemptId()); - } catch (Exception e) { + } catch (final Exception e) { // In tests, the jobId and attemptId may not be available log.warn("Cannot determine jobId or attemptId: " + e.getMessage()); } @@ -42,11 +44,11 @@ public boolean isFailed() { return this.origin != null && this.failureOutput != null; } - public void setFailureOrigin(FailureReason.FailureOrigin origin) { + public void setFailureOrigin(final FailureReason.FailureOrigin origin) { this.origin = origin; } - public void setFailureOutput(StandardCheckConnectionOutput failureOutput) { + public void setFailureOutput(final ConnectorJobOutput failureOutput) { this.failureOutput = failureOutput; } @@ -55,10 +57,7 @@ public StandardSyncOutput buildFailureOutput() { throw new RuntimeException("Cannot build failure output without a failure origin and output"); } - final Exception ex = new IllegalArgumentException(failureOutput.getMessage()); - final FailureReason checkFailureReason = FailureHelper.checkFailure(ex, jobId, attemptId, origin); - return new StandardSyncOutput() - .withFailures(List.of(checkFailureReason)) + final StandardSyncOutput syncOutput = new StandardSyncOutput() .withStandardSyncSummary( new StandardSyncSummary() .withStatus(StandardSyncSummary.ReplicationStatus.FAILED) @@ -70,7 +69,26 @@ public StandardSyncOutput buildFailureOutput() { .withRecordsEmitted(0L) .withBytesEmitted(0L) .withStateMessagesEmitted(0L) - .withRecordsCommitted(0L))); + .withRecordsCommitted(0L)));; + + if (failureOutput.getFailureReason() != null) { + syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin))); + } else { + final StandardCheckConnectionOutput checkOutput = failureOutput.getCheckConnection(); + final Exception ex = new IllegalArgumentException(checkOutput.getMessage()); + final FailureReason checkFailureReason = FailureHelper.checkFailure(ex, jobId, attemptId, origin); + syncOutput.setFailures(List.of(checkFailureReason)); + } + + return syncOutput; + } + + public static boolean isOutputFailed(final ConnectorJobOutput output) { + if (output.getOutputType() != OutputType.CHECK_CONNECTION) { + throw new IllegalArgumentException("Output type must be CHECK_CONNECTION"); + } + + return output.getFailureReason() != null || output.getCheckConnection().getStatus() == StandardCheckConnectionOutput.Status.FAILED; } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java index ed81459e07ec..c2ee086a7b7b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.spec; -import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.activity.ActivityInterface; @@ -14,6 +14,6 @@ public interface SpecActivity { @ActivityMethod - ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); + ConnectorJobOutput run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index 0b724ea832af..41d19deb7e50 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -6,9 +6,9 @@ import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.helpers.LogConfigs; -import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.scheduler.persistence.JobPersistence; @@ -51,12 +51,12 @@ public SpecActivityImpl(final WorkerConfigs workerConfigs, this.airbyteVersion = airbyteVersion; } - public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { + public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { final Supplier inputSupplier = () -> new JobGetSpecConfig().withDockerImage(launcherConfig.getDockerImage()); final ActivityExecutionContext context = Activity.getExecutionContext(); - final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( + final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( workspaceRoot, workerEnvironment, logConfigs, @@ -71,8 +71,8 @@ public ConnectorSpecification run(final JobRunConfig jobRunConfig, final Integra return temporalAttemptExecution.get(); } - private CheckedSupplier, Exception> getWorkerFactory( - final IntegrationLauncherConfig launcherConfig) { + private CheckedSupplier, Exception> getWorkerFactory( + final IntegrationLauncherConfig launcherConfig) { return () -> { final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( launcherConfig.getJobId(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java index eccaee46f15e..9e0921094977 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.spec; -import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.workflow.WorkflowInterface; @@ -14,6 +14,6 @@ public interface SpecWorkflow { @WorkflowMethod - ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); + ConnectorJobOutput run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java index 19f98d26cd1a..2ed95b062c1f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.spec; -import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.config.ConnectorJobOutput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.TemporalUtils; @@ -21,7 +21,7 @@ public class SpecWorkflowImpl implements SpecWorkflow { private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options); @Override - public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { + public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { return activity.run(jobRunConfig, launcherConfig); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java index c5ab94a134df..c326235e5eb3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java @@ -18,7 +18,10 @@ import com.google.common.collect.Lists; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; @@ -28,6 +31,7 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.AirbyteMessageUtils; import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; import java.io.ByteArrayInputStream; @@ -50,6 +54,7 @@ public class DefaultCheckConnectionWorkerTest { private Process process; private AirbyteStreamFactory successStreamFactory; private AirbyteStreamFactory failureStreamFactory; + private AirbyteStreamFactory traceMessageStreamFactory; @BeforeEach public void setup() throws IOException, WorkerException { @@ -74,6 +79,9 @@ public void setup() throws IOException, WorkerException { .withType(Type.CONNECTION_STATUS) .withConnectionStatus(new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("failed to connect")); failureStreamFactory = noop -> Lists.newArrayList(failureMessage).stream(); + + final AirbyteMessage traceMessage = AirbyteMessageUtils.createTraceMessage("some error from the connector", 123.0); + traceMessageStreamFactory = noop -> Lists.newArrayList(traceMessage).stream(); } @Test @@ -84,30 +92,49 @@ public void testEnums() { @Test public void testSuccessfulConnection() throws WorkerException { final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(workerConfigs, integrationLauncher, successStreamFactory); - final StandardCheckConnectionOutput output = worker.run(input, jobRoot); + final ConnectorJobOutput output = worker.run(input, jobRoot); - assertEquals(Status.SUCCEEDED, output.getStatus()); - assertNull(output.getMessage()); + assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertNull(output.getFailureReason()); + final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); + assertEquals(Status.SUCCEEDED, checkOutput.getStatus()); + assertNull(checkOutput.getMessage()); } @Test public void testFailedConnection() throws WorkerException { final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(workerConfigs, integrationLauncher, failureStreamFactory); - final StandardCheckConnectionOutput output = worker.run(input, jobRoot); + final ConnectorJobOutput output = worker.run(input, jobRoot); + + assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertNull(output.getFailureReason()); - assertEquals(Status.FAILED, output.getStatus()); - assertEquals("failed to connect", output.getMessage()); + final StandardCheckConnectionOutput checkOutput = output.getCheckConnection(); + assertEquals(Status.FAILED, checkOutput.getStatus()); + assertEquals("failed to connect", checkOutput.getMessage()); } @Test - public void testProcessFail() throws WorkerException { + public void testProcessFail() { when(process.exitValue()).thenReturn(1); final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(workerConfigs, integrationLauncher, failureStreamFactory); - final StandardCheckConnectionOutput output = worker.run(input, jobRoot); + assertThrows(WorkerException.class, () -> worker.run(input, jobRoot)); + } + + @Test + public void testProcessFailWithTraceMessage() throws WorkerException { + when(process.exitValue()).thenReturn(1); + + final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(workerConfigs, integrationLauncher, traceMessageStreamFactory); + final ConnectorJobOutput output = worker.run(input, jobRoot); + + assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertNull(output.getCheckConnection()); - assertEquals(Status.FAILED, output.getStatus()); + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); } @Test diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index 414a3eb3ccb3..299db6c823f4 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -5,6 +5,8 @@ package io.airbyte.workers.general; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -17,7 +19,10 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; @@ -28,6 +33,7 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.AirbyteMessageUtils; import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; import java.io.ByteArrayInputStream; @@ -81,9 +87,11 @@ public void setup() throws Exception { @Test public void testDiscoverSchema() throws Exception { final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory); - final AirbyteCatalog output = worker.run(INPUT, jobRoot); + final ConnectorJobOutput output = worker.run(INPUT, jobRoot); - assertEquals(CATALOG, output); + assertNull(output.getFailureReason()); + assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); + assertEquals(CATALOG, output.getDiscoverCatalog()); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { while (process.getErrorStream().available() != 0) { @@ -111,6 +119,32 @@ public void testDiscoverSchemaProcessFail() throws Exception { verify(process).exitValue(); } + @SuppressWarnings("BusyWait") + @Test + public void testDiscoverSchemaProcessFailWithTraceMessage() throws Exception { + final AirbyteStreamFactory traceStreamFactory = noop -> Lists.newArrayList( + AirbyteMessageUtils.createTraceMessage("some error from the connector", 123.0)).stream(); + + when(process.exitValue()).thenReturn(1); + + final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, traceStreamFactory); + final ConnectorJobOutput output = worker.run(INPUT, jobRoot); + assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); + assertNull(output.getDiscoverCatalog()); + assertNotNull(output.getFailureReason()); + + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + + Assertions.assertTimeout(Duration.ofSeconds(5), () -> { + while (process.getErrorStream().available() != 0) { + Thread.sleep(50); + } + }); + + verify(process).exitValue(); + } + @Test public void testDiscoverSchemaException() throws WorkerException { when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java index 2bcf857e6124..b481d3f1e0a9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java @@ -6,6 +6,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -15,13 +18,17 @@ import com.google.common.base.Charsets; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.FailureReason; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.AirbyteMessageUtils; import io.airbyte.workers.process.IntegrationLauncher; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -65,8 +72,9 @@ public void testSuccessfulRun() throws IOException, InterruptedException, Worker when(process.waitFor(anyLong(), any())).thenReturn(true); when(process.exitValue()).thenReturn(0); - final ConnectorSpecification actualOutput = worker.run(config, jobRoot); - final ConnectorSpecification expectedOutput = Jsons.deserialize(expectedSpecString, ConnectorSpecification.class); + final ConnectorJobOutput actualOutput = worker.run(config, jobRoot); + final ConnectorJobOutput expectedOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC) + .withSpec(Jsons.deserialize(expectedSpecString, ConnectorSpecification.class)); assertThat(actualOutput).isEqualTo(expectedOutput); } @@ -106,4 +114,21 @@ public void testFailureOnNonzeroExitCode() throws InterruptedException, IOExcept .hasNoCause(); } + @Test + public void testFailureOnNonzeroExitCodeWithTraceMessage() throws WorkerException, InterruptedException { + final AirbyteMessage message = AirbyteMessageUtils.createTraceMessage("some error from the connector", 123.0); + + when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); + when(process.waitFor(anyLong(), any())).thenReturn(true); + when(process.exitValue()).thenReturn(1); + + final ConnectorJobOutput output = worker.run(config, jobRoot); + assertEquals(OutputType.SPEC, output.getOutputType()); + assertNull(output.getSpec()); + assertNotNull(output.getFailureReason()); + + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 746329b141fc..267099a66d2a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -22,6 +22,8 @@ import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.FailureReason; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; import io.airbyte.config.JobGetSpecConfig; @@ -144,6 +146,23 @@ void testExecuteWithException() { assertEquals(logPath, response.getMetadata().getLogPath()); } + @Test + void testExecuteWithConnectorJobFailure() { + final Supplier supplier = mock(Supplier.class); + final FailureReason mockFailureReason = mock(FailureReason.class); + final ConnectorJobOutput connectorJobOutput = new ConnectorJobOutput() + .withFailureReason(mockFailureReason); + when(supplier.get()).thenReturn(connectorJobOutput); + + final TemporalResponse response = temporalClient.execute(JOB_RUN_CONFIG, supplier); + + assertNotNull(response); + assertTrue(response.getOutput().isPresent()); + assertEquals(connectorJobOutput, response.getOutput().get()); + assertFalse(response.getMetadata().isSucceeded()); + assertEquals(logPath, response.getMetadata().getLogPath()); + } + } @Nested diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java new file mode 100644 index 000000000000..b1681681599b --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.check.connection; + +import io.temporal.testing.WorkflowReplayer; +import org.junit.jupiter.api.Test; + +public class CheckConnectionWorkflowTest { + + @Test + public void replayOldWorkflow() throws Exception { + // This test ensures that a new version of the workflow doesn't break an in-progress execution + // This JSON file is exported from Temporal directly (e.g. + // `http://${temporal-ui}/namespaces/default/workflows/${uuid}/${uuid}/history`) and export + + WorkflowReplayer.replayWorkflowExecutionFromResource("checkWorkflowHistory.json", CheckConnectionWorkflowImpl.class); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 9f4248b4124a..f83e797c11cb 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -6,6 +6,8 @@ import static org.mockito.Mockito.atLeastOnce; +import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; @@ -151,8 +153,9 @@ public void setUp() { new IntegrationLauncherConfig(), new StandardSyncInput())); - Mockito.when(mCheckConnectionActivity.run(Mockito.any())) - .thenReturn(new StandardCheckConnectionOutput().withStatus(Status.SUCCEEDED).withMessage("check worked")); + Mockito.when(mCheckConnectionActivity.runWithJobOutput(Mockito.any())) + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withCheckConnection(new StandardCheckConnectionOutput().withStatus(Status.SUCCEEDED).withMessage("check worked"))); Mockito.when(mAutoDisableConnectionActivity.autoDisableFailingConnection(Mockito.any())) .thenReturn(new AutoDisableConnectionOutput(false)); @@ -917,8 +920,9 @@ public void testSourceCheckFailuresRecorded() throws InterruptedException { .thenReturn(new JobCreationOutput(JOB_ID)); Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttemptNumber(Mockito.any())) .thenReturn(new AttemptNumberCreationOutput(ATTEMPT_ID)); - Mockito.when(mCheckConnectionActivity.run(Mockito.any())) - .thenReturn(new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("nope")); + Mockito.when(mCheckConnectionActivity.runWithJobOutput(Mockito.any())) + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withCheckConnection(new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("nope"))); testEnv.start(); @@ -946,6 +950,45 @@ public void testSourceCheckFailuresRecorded() throws InterruptedException { .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.SOURCE, FailureType.CONFIG_ERROR))); } + @Test + @Timeout(value = 10, + unit = TimeUnit.SECONDS) + @DisplayName("Test that Source CHECK failure reasons are recorded") + public void testSourceCheckFailureReasonsRecorded() throws InterruptedException { + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenReturn(new JobCreationOutput(JOB_ID)); + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttemptNumber(Mockito.any())) + .thenReturn(new AttemptNumberCreationOutput(ATTEMPT_ID)); + Mockito.when(mCheckConnectionActivity.runWithJobOutput(Mockito.any())) + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withFailureReason(new FailureReason().withFailureType(FailureType.SYSTEM_ERROR))); + + testEnv.start(); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() + .connectionId(UUID.randomUUID()) + .jobId(JOB_ID) + .attemptId(ATTEMPT_ID) + .fromFailure(false) + .attemptNumber(1) + .workflowState(workflowState) + .build(); + + startWorkflowAndWaitUntilReady(workflow, input); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); + + workflow.submitManualSync(); + Thread.sleep(500); // any time after no-waiting manual run + + Mockito.verify(mJobCreationAndStatusUpdateActivity) + .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.SOURCE, FailureType.SYSTEM_ERROR))); + } + @Test @Timeout(value = 10, unit = TimeUnit.SECONDS) @@ -955,9 +998,14 @@ public void testDestinationCheckFailuresRecorded() throws InterruptedException { .thenReturn(new JobCreationOutput(JOB_ID)); Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttemptNumber(Mockito.any())) .thenReturn(new AttemptNumberCreationOutput(ATTEMPT_ID)); - Mockito.when(mCheckConnectionActivity.run(Mockito.any())) - .thenReturn(new StandardCheckConnectionOutput().withStatus(Status.SUCCEEDED).withMessage("all good")) // First call (source) succeeds - .thenReturn(new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("nope")); // Second call (destination) fails + Mockito.when(mCheckConnectionActivity.runWithJobOutput(Mockito.any())) + // First call (source) succeeds + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withCheckConnection(new StandardCheckConnectionOutput().withStatus(Status.SUCCEEDED).withMessage("all good"))) + + // Second call (destination) fails + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withCheckConnection(new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("nope"))); testEnv.start(); @@ -985,6 +1033,50 @@ public void testDestinationCheckFailuresRecorded() throws InterruptedException { .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR))); } + @Test + @Timeout(value = 10, + unit = TimeUnit.SECONDS) + @DisplayName("Test that Destination CHECK failure reasons are recorded") + public void testDestinationCheckFailureReasonsRecorded() throws InterruptedException { + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) + .thenReturn(new JobCreationOutput(JOB_ID)); + Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttemptNumber(Mockito.any())) + .thenReturn(new AttemptNumberCreationOutput(ATTEMPT_ID)); + Mockito.when(mCheckConnectionActivity.runWithJobOutput(Mockito.any())) + // First call (source) succeeds + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withCheckConnection(new StandardCheckConnectionOutput().withStatus(Status.SUCCEEDED).withMessage("all good"))) + + // Second call (destination) fails + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withFailureReason(new FailureReason().withFailureType(FailureType.SYSTEM_ERROR))); + + testEnv.start(); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() + .connectionId(UUID.randomUUID()) + .jobId(JOB_ID) + .attemptId(ATTEMPT_ID) + .fromFailure(false) + .attemptNumber(1) + .workflowState(workflowState) + .build(); + + startWorkflowAndWaitUntilReady(workflow, input); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); + + workflow.submitManualSync(); + Thread.sleep(500); // any time after no-waiting manual run + + Mockito.verify(mJobCreationAndStatusUpdateActivity) + .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.SYSTEM_ERROR))); + } + @Test @Timeout(value = 10, unit = TimeUnit.SECONDS) @@ -995,9 +1087,10 @@ public void testSourceCheckSkippedWhenReset() throws InterruptedException { Mockito.when(mJobCreationAndStatusUpdateActivity.createNewAttemptNumber(Mockito.any())) .thenReturn(new AttemptNumberCreationOutput(ATTEMPT_ID)); mockResetJobInput(); - Mockito.when(mCheckConnectionActivity.run(Mockito.any())) - .thenReturn(new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("nope")); // first call, but should fail destination - // because source check is skipped + Mockito.when(mCheckConnectionActivity.runWithJobOutput(Mockito.any())) + // first call, but should fail destination because source check is skipped + .thenReturn(new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION) + .withCheckConnection(new StandardCheckConnectionOutput().withStatus(Status.FAILED).withMessage("nope"))); testEnv.start(); diff --git a/airbyte-workers/src/test/resources/checkWorkflowHistory.json b/airbyte-workers/src/test/resources/checkWorkflowHistory.json new file mode 100644 index 000000000000..a73babefdd1c --- /dev/null +++ b/airbyte-workers/src/test/resources/checkWorkflowHistory.json @@ -0,0 +1,244 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2022-07-13T15:12:21.000Z", + "eventType": "WorkflowExecutionStarted", + "version": "0", + "taskId": "6291547", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "CheckConnectionWorkflow" + }, + "parentWorkflowNamespace": "", + "parentInitiatedEventId": "0", + "taskQueue": { + "name": "CHECK_CONNECTION", + "kind": "Normal" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJqb2JJZCI6IjE5YjU5ZDAwLTBjM2UtNDUyYy1iMmNhLTdhNjg0YTgxMzcxZiIsImF0dGVtcHRJZCI6MH0=" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJqb2JJZCI6IjE5YjU5ZDAwLTBjM2UtNDUyYy1iMmNhLTdhNjg0YTgxMzcxZiIsImF0dGVtcHRJZCI6MCwiZG9ja2VySW1hZ2UiOiJhaXJieXRlL2Rlc3RpbmF0aW9uLWUyZS10ZXN0OjAuMi4yIn0=" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJjb25uZWN0aW9uQ29uZmlndXJhdGlvbiI6eyJ0eXBlIjoiTE9HR0lORyIsImxvZ2dpbmdfY29uZmlnIjp7ImxvZ2dpbmdfdHlwZSI6IkZpcnN0TiIsIm1heF9lbnRyeV9jb3VudCI6MTAwfX19" + } + ] + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "27s", + "continuedExecutionRunId": "", + "initiator": "Unspecified", + "originalExecutionRunId": "dc21093b-f8d8-40ec-a573-5ecd058d3d53", + "identity": "1@9c8b9c2820a1", + "firstExecutionRunId": "dc21093b-f8d8-40ec-a573-5ecd058d3d53", + "retryPolicy": { + "nonRetryableErrorTypes": [], + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 1 + }, + "attempt": 1, + "cronSchedule": "", + "firstWorkflowTaskBackoff": "0s", + "header": { + "fields": {} + } + } + }, + { + "eventId": "2", + "eventTime": "2022-07-13T15:12:21.000Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "6291548", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "CHECK_CONNECTION", + "kind": "Normal" + }, + "startToCloseTimeout": "27s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2022-07-13T15:12:21.000Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "6291552", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "1@3cc9ad00eeaf", + "requestId": "cee7f88e-daa6-4152-af1e-92768691b598" + } + }, + { + "eventId": "4", + "eventTime": "2022-07-13T15:12:21.000Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "6291555", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "1@3cc9ad00eeaf", + "binaryChecksum": "" + } + }, + { + "eventId": "5", + "eventTime": "2022-07-13T15:12:21.000Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "6291556", + "activityTaskScheduledEventAttributes": { + "activityId": "06775a2e-9dd7-3c66-8316-b10644c119be", + "activityType": { + "name": "Run" + }, + "namespace": "", + "taskQueue": { + "name": "CHECK_CONNECTION", + "kind": "Normal" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJqb2JSdW5Db25maWciOnsiam9iSWQiOiIxOWI1OWQwMC0wYzNlLTQ1MmMtYjJjYS03YTY4NGE4MTM3MWYiLCJhdHRlbXB0SWQiOjB9LCJsYXVuY2hlckNvbmZpZyI6eyJqb2JJZCI6IjE5YjU5ZDAwLTBjM2UtNDUyYy1iMmNhLTdhNjg0YTgxMzcxZiIsImF0dGVtcHRJZCI6MCwiZG9ja2VySW1hZ2UiOiJhaXJieXRlL2Rlc3RpbmF0aW9uLWUyZS10ZXN0OjAuMi4yIn0sImNvbm5lY3Rpb25Db25maWd1cmF0aW9uIjp7ImNvbm5lY3Rpb25Db25maWd1cmF0aW9uIjp7InR5cGUiOiJMT0dHSU5HIiwibG9nZ2luZ19jb25maWciOnsibG9nZ2luZ190eXBlIjoiRmlyc3ROIiwibWF4X2VudHJ5X2NvdW50IjoxMDB9fX19" + } + ] + }, + "scheduleToCloseTimeout": "300s", + "scheduleToStartTimeout": "300s", + "startToCloseTimeout": "300s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "nonRetryableErrorTypes": [], + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 1 + } + } + }, + { + "eventId": "6", + "eventTime": "2022-07-13T15:12:21.000Z", + "eventType": "ActivityTaskStarted", + "version": "0", + "taskId": "6291561", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "5", + "identity": "1@3cc9ad00eeaf", + "requestId": "a2e7c24b-c98a-485c-91f7-8b6f29065e35", + "attempt": 1 + } + }, + { + "eventId": "7", + "eventTime": "2022-07-13T15:12:42.000Z", + "eventType": "ActivityTaskCompleted", + "version": "0", + "taskId": "6291562", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJzdGF0dXMiOiJzdWNjZWVkZWQifQ==" + } + ] + }, + "scheduledEventId": "5", + "startedEventId": "6", + "identity": "1@3cc9ad00eeaf" + } + }, + { + "eventId": "8", + "eventTime": "2022-07-13T15:12:42.000Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "6291563", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "1@3cc9ad00eeaf:5b9e5849-cb52-49a4-88ee-a03dfa2a32a6", + "kind": "Sticky" + }, + "startToCloseTimeout": "27s", + "attempt": 1 + } + }, + { + "eventId": "9", + "eventTime": "2022-07-13T15:12:42.000Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "6291567", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "5b9e5849-cb52-49a4-88ee-a03dfa2a32a6", + "requestId": "97f3c1b2-c8dc-4c4e-b728-3e972726d2c8" + } + }, + { + "eventId": "10", + "eventTime": "2022-07-13T15:12:42.000Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "6291570", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "8", + "startedEventId": "9", + "identity": "1@3cc9ad00eeaf", + "binaryChecksum": "" + } + }, + { + "eventId": "11", + "eventTime": "2022-07-13T15:12:42.000Z", + "eventType": "WorkflowExecutionCompleted", + "version": "0", + "taskId": "6291571", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "eyJzdGF0dXMiOiJzdWNjZWVkZWQifQ==" + } + ] + }, + "workflowTaskCompletedEventId": "10", + "newExecutionRunId": "" + } + } + ] +}