diff --git a/airbyte-config/models/src/main/resources/types/NormalizationSummary.yaml b/airbyte-config/models/src/main/resources/types/NormalizationSummary.yaml new file mode 100644 index 000000000000..f7bb6dc158f2 --- /dev/null +++ b/airbyte-config/models/src/main/resources/types/NormalizationSummary.yaml @@ -0,0 +1,15 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NormalizationSummary.yaml +title: NormalizationSummary +description: information output by syncs for which a normalization step was performed +type: object +required: + - startTime + - endTime +additionalProperties: false +properties: + startTime: + type: integer + endTime: + type: integer diff --git a/airbyte-config/models/src/main/resources/types/StandardSyncOutput.yaml b/airbyte-config/models/src/main/resources/types/StandardSyncOutput.yaml index 79b2471b69e0..28fc46a3bfd1 100644 --- a/airbyte-config/models/src/main/resources/types/StandardSyncOutput.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardSyncOutput.yaml @@ -12,6 +12,8 @@ required: properties: standardSyncSummary: "$ref": StandardSyncSummary.yaml + normalizationSummary: + "$ref": NormalizationSummary.yaml state: "$ref": State.yaml output_catalog: diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java index 1a491d466070..38ceb30e8263 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java @@ -6,6 +6,7 @@ import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.workers.normalization.NormalizationRunner; import java.nio.file.Files; import java.nio.file.Path; @@ -39,7 +40,7 @@ public DefaultNormalizationWorker(final String jobId, } @Override - public Void run(final NormalizationInput input, final Path jobRoot) throws WorkerException { + public NormalizationSummary run(final NormalizationInput input, final Path jobRoot) throws WorkerException { final long startTime = System.currentTimeMillis(); try (normalizationRunner) { @@ -64,11 +65,16 @@ public Void run(final NormalizationInput input, final Path jobRoot) throws Worke LOGGER.info("Normalization was cancelled."); } - final Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime); + final long endTime = System.currentTimeMillis(); + final Duration duration = Duration.ofMillis(endTime - startTime); final String durationDescription = DurationFormatUtils.formatDurationWords(duration.toMillis(), true, true); LOGGER.info("Normalization executed in {}.", durationDescription); - return null; + final NormalizationSummary summary = new NormalizationSummary() + .withStartTime(startTime) + .withEndTime(endTime); + + return summary; } @Override diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/NormalizationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/NormalizationWorker.java index 1e87fc12677f..643fd1935618 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/NormalizationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/NormalizationWorker.java @@ -5,5 +5,6 @@ package io.airbyte.workers; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; -public interface NormalizationWorker extends Worker {} +public interface NormalizationWorker extends Worker {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java index 6cbee1bbe004..1fb27124ca88 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.activity.ActivityInterface; @@ -14,8 +15,8 @@ public interface NormalizationActivity { @ActivityMethod - Void normalize(JobRunConfig jobRunConfig, - IntegrationLauncherConfig destinationLauncherConfig, - NormalizationInput input); + NormalizationSummary normalize(JobRunConfig jobRunConfig, + IntegrationLauncherConfig destinationLauncherConfig, + NormalizationInput input); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index e73dce33e58a..e2475a34fda7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -10,6 +10,7 @@ import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.scheduler.models.IntegrationLauncherConfig; @@ -65,9 +66,9 @@ public NormalizationActivityImpl(final Optional { final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); @@ -77,7 +78,7 @@ public Void normalize(final JobRunConfig jobRunConfig, return fullInput; }; - final CheckedSupplier, Exception> workerFactory; + final CheckedSupplier, Exception> workerFactory; if (containerOrchestratorConfig.isPresent()) { workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig); @@ -85,7 +86,7 @@ public Void normalize(final JobRunConfig jobRunConfig, workerFactory = getLegacyWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig); } - final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( + final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( workspaceRoot, workerEnvironment, logConfigs, jobRunConfig, workerFactory, @@ -94,14 +95,15 @@ public Void normalize(final JobRunConfig jobRunConfig, jobPersistence, airbyteVersion); - return temporalAttemptExecution.get(); + final NormalizationSummary normalizationSummary = temporalAttemptExecution.get(); + return normalizationSummary; }); } - private CheckedSupplier, Exception> getLegacyWorkerFactory( - final WorkerConfigs workerConfigs, - final IntegrationLauncherConfig destinationLauncherConfig, - final JobRunConfig jobRunConfig) { + private CheckedSupplier, Exception> getLegacyWorkerFactory( + final WorkerConfigs workerConfigs, + final IntegrationLauncherConfig destinationLauncherConfig, + final JobRunConfig jobRunConfig) { return () -> new DefaultNormalizationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), @@ -113,10 +115,10 @@ private CheckedSupplier, Exception> getLegacyWo workerEnvironment); } - private CheckedSupplier, Exception> getContainerLauncherWorkerFactory( - final WorkerConfigs workerConfigs, - final IntegrationLauncherConfig destinationLauncherConfig, - final JobRunConfig jobRunConfig) + private CheckedSupplier, Exception> getContainerLauncherWorkerFactory( + final WorkerConfigs workerConfigs, + final IntegrationLauncherConfig destinationLauncherConfig, + final JobRunConfig jobRunConfig) throws IOException { final var jobScope = jobPersistence.getJob(Long.parseLong(jobRunConfig.getJobId())).getScope(); final var connectionId = UUID.fromString(jobScope); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java index 918ca1761f6b..5dea8dfcb53e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java @@ -6,6 +6,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerApp; @@ -13,7 +14,7 @@ import java.util.Map; import java.util.UUID; -public class NormalizationLauncherWorker extends LauncherWorker { +public class NormalizationLauncherWorker extends LauncherWorker { public static final String NORMALIZATION = "normalization-orchestrator"; private static final String POD_NAME_PREFIX = "orchestrator-norm"; @@ -33,7 +34,7 @@ public NormalizationLauncherWorker(final UUID connectionId, INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)), containerOrchestratorConfig, workerConfigs.getResourceRequirements(), - Void.class); + NormalizationSummary.class); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 028ae6fded4d..bcd056b06f2b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOperation; @@ -39,7 +40,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final StandardSyncInput syncInput, final UUID connectionId) { - final StandardSyncOutput run = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); + StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); @@ -47,7 +48,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, // the state is persisted immediately after the replication succeeded, because the // state is a checkpoint of the raw data that has been copied to the destination; // normalization & dbt does not depend on it - persistActivity.persist(connectionId, run); + persistActivity.persist(connectionId, syncOutput); } if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { @@ -55,10 +56,12 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) { final NormalizationInput normalizationInput = new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) - .withCatalog(run.getOutputCatalog()) + .withCatalog(syncOutput.getOutputCatalog()) .withResourceRequirements(syncInput.getDestinationResourceRequirements()); - normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); + final NormalizationSummary normalizationSummary = + normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); + syncOutput = syncOutput.withNormalizationSummary(normalizationSummary); } else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) { final OperatorDbtInput operatorDbtInput = new OperatorDbtInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) @@ -73,7 +76,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, } } - return run; + return syncOutput; } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java index dbac45f0a8ca..8c46bbef029a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -11,6 +12,7 @@ import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; import io.airbyte.workers.normalization.NormalizationRunner; @@ -60,7 +62,7 @@ void test() throws Exception { final DefaultNormalizationWorker normalizationWorker = new DefaultNormalizationWorker(JOB_ID, JOB_ATTEMPT, normalizationRunner, WorkerEnvironment.DOCKER); - normalizationWorker.run(normalizationInput, jobRoot); + final NormalizationSummary normalizationOutput = normalizationWorker.run(normalizationInput, jobRoot); verify(normalizationRunner).start(); verify(normalizationRunner).normalize( @@ -70,6 +72,8 @@ void test() throws Exception { normalizationInput.getDestinationConfiguration(), normalizationInput.getCatalog(), workerConfigs.getResourceRequirements()); verify(normalizationRunner).close(); + assertNotNull(normalizationOutput.getStartTime()); + assertNotNull(normalizationOutput.getEndTime()); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java index a9efe7086a1c..b04b4c632f34 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import io.airbyte.config.NormalizationInput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSync; @@ -81,6 +82,7 @@ class SyncWorkflowTest { private OperatorDbtInput operatorDbtInput; private StandardSyncOutput replicationSuccessOutput; + private NormalizationSummary normalizationSummary; @BeforeEach public void setUp() { @@ -94,6 +96,7 @@ public void setUp() { sync = syncPair.getKey(); syncInput = syncPair.getValue(); replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()); + normalizationSummary = new NormalizationSummary(); normalizationInput = new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) @@ -127,13 +130,18 @@ void testSuccess() { DESTINATION_LAUNCHER_CONFIG, syncInput); + doReturn(normalizationSummary).when(normalizationActivity).normalize( + JOB_RUN_CONFIG, + DESTINATION_LAUNCHER_CONFIG, + normalizationInput); + final StandardSyncOutput actualOutput = execute(); - assertEquals(replicationSuccessOutput, actualOutput); verifyReplication(replicationActivity, syncInput, sync.getConnectionId()); - verifyPersistState(persistStateActivity, sync, actualOutput); + verifyPersistState(persistStateActivity, sync, replicationSuccessOutput); verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); + assertEquals(replicationSuccessOutput.withNormalizationSummary(normalizationSummary), actualOutput); } @Test diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index e6eadce820d5..5448541f87cd 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -9,6 +9,7 @@ import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.JobOutput; +import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; @@ -88,7 +89,9 @@ public class JobCreationAndStatusUpdateActivityTest { private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput() .withStandardSyncSummary( new StandardSyncSummary() - .withStatus(ReplicationStatus.COMPLETED)); + .withStatus(ReplicationStatus.COMPLETED)) + .withNormalizationSummary( + new NormalizationSummary()); private static final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput);