Skip to content

Commit

Permalink
Create and Persist Normalization Output (#11764)
Browse files Browse the repository at this point in the history
Add NormalizationSummary output to StandardSyncOutput
  • Loading branch information
alovew authored Apr 12, 2022
1 parent dfd25f0 commit ba48d40
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NormalizationSummary.yaml
title: NormalizationSummary
description: information output by syncs for which a normalization step was performed
type: object
required:
- startTime
- endTime
additionalProperties: false
properties:
startTime:
type: integer
endTime:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ required:
properties:
standardSyncSummary:
"$ref": StandardSyncSummary.yaml
normalizationSummary:
"$ref": NormalizationSummary.yaml
state:
"$ref": State.yaml
output_catalog:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.workers.normalization.NormalizationRunner;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -39,7 +40,7 @@ public DefaultNormalizationWorker(final String jobId,
}

@Override
public Void run(final NormalizationInput input, final Path jobRoot) throws WorkerException {
public NormalizationSummary run(final NormalizationInput input, final Path jobRoot) throws WorkerException {
final long startTime = System.currentTimeMillis();

try (normalizationRunner) {
Expand All @@ -64,11 +65,16 @@ public Void run(final NormalizationInput input, final Path jobRoot) throws Worke
LOGGER.info("Normalization was cancelled.");
}

final Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
final long endTime = System.currentTimeMillis();
final Duration duration = Duration.ofMillis(endTime - startTime);
final String durationDescription = DurationFormatUtils.formatDurationWords(duration.toMillis(), true, true);
LOGGER.info("Normalization executed in {}.", durationDescription);

return null;
final NormalizationSummary summary = new NormalizationSummary()
.withStartTime(startTime)
.withEndTime(endTime);

return summary;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
package io.airbyte.workers;

import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;

public interface NormalizationWorker extends Worker<NormalizationInput, Void> {}
public interface NormalizationWorker extends Worker<NormalizationInput, NormalizationSummary> {}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.activity.ActivityInterface;
Expand All @@ -14,8 +15,8 @@
public interface NormalizationActivity {

@ActivityMethod
Void normalize(JobRunConfig jobRunConfig,
IntegrationLauncherConfig destinationLauncherConfig,
NormalizationInput input);
NormalizationSummary normalize(JobRunConfig jobRunConfig,
IntegrationLauncherConfig destinationLauncherConfig,
NormalizationInput input);

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
Expand Down Expand Up @@ -65,9 +66,9 @@ public NormalizationActivityImpl(final Optional<WorkerApp.ContainerOrchestratorC
}

@Override
public Void normalize(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final NormalizationInput input) {
public NormalizationSummary normalize(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final NormalizationInput input) {
return TemporalUtils.withBackgroundHeartbeat(() -> {
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);
Expand All @@ -77,15 +78,15 @@ public Void normalize(final JobRunConfig jobRunConfig,
return fullInput;
};

final CheckedSupplier<Worker<NormalizationInput, Void>, Exception> workerFactory;
final CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> workerFactory;

if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
} else {
workerFactory = getLegacyWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
}

final TemporalAttemptExecution<NormalizationInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
final TemporalAttemptExecution<NormalizationInput, NormalizationSummary> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
workerFactory,
Expand All @@ -94,14 +95,15 @@ public Void normalize(final JobRunConfig jobRunConfig,
jobPersistence,
airbyteVersion);

return temporalAttemptExecution.get();
final NormalizationSummary normalizationSummary = temporalAttemptExecution.get();
return normalizationSummary;
});
}

private CheckedSupplier<Worker<NormalizationInput, Void>, Exception> getLegacyWorkerFactory(
final WorkerConfigs workerConfigs,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig) {
private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> getLegacyWorkerFactory(
final WorkerConfigs workerConfigs,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig) {
return () -> new DefaultNormalizationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Expand All @@ -113,10 +115,10 @@ private CheckedSupplier<Worker<NormalizationInput, Void>, Exception> getLegacyWo
workerEnvironment);
}

private CheckedSupplier<Worker<NormalizationInput, Void>, Exception> getContainerLauncherWorkerFactory(
final WorkerConfigs workerConfigs,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig)
private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Exception> getContainerLauncherWorkerFactory(
final WorkerConfigs workerConfigs,
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig)
throws IOException {
final var jobScope = jobPersistence.getJob(Long.parseLong(jobRunConfig.getJobId())).getScope();
final var connectionId = UUID.fromString(jobScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
import java.util.Map;
import java.util.UUID;

public class NormalizationLauncherWorker extends LauncherWorker<NormalizationInput, Void> {
public class NormalizationLauncherWorker extends LauncherWorker<NormalizationInput, NormalizationSummary> {

public static final String NORMALIZATION = "normalization-orchestrator";
private static final String POD_NAME_PREFIX = "orchestrator-norm";
Expand All @@ -33,7 +34,7 @@ public NormalizationLauncherWorker(final UUID connectionId,
INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)),
containerOrchestratorConfig,
workerConfigs.getResourceRequirements(),
Void.class);
NormalizationSummary.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
Expand Down Expand Up @@ -39,26 +40,28 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final StandardSyncInput syncInput,
final UUID connectionId) {

final StandardSyncOutput run = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput);
StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput);

final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);

if (version > Workflow.DEFAULT_VERSION) {
// the state is persisted immediately after the replication succeeded, because the
// state is a checkpoint of the raw data that has been copied to the destination;
// normalization & dbt does not depend on it
persistActivity.persist(connectionId, run);
persistActivity.persist(connectionId, syncOutput);
}

if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) {
for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) {
if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) {
final NormalizationInput normalizationInput = new NormalizationInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
.withCatalog(run.getOutputCatalog())
.withCatalog(syncOutput.getOutputCatalog())
.withResourceRequirements(syncInput.getDestinationResourceRequirements());

normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput);
final NormalizationSummary normalizationSummary =
normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput);
syncOutput = syncOutput.withNormalizationSummary(normalizationSummary);
} else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) {
final OperatorDbtInput operatorDbtInput = new OperatorDbtInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
Expand All @@ -73,7 +76,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
}
}

return run;
return syncOutput;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

package io.airbyte.workers;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.workers.normalization.NormalizationRunner;
Expand Down Expand Up @@ -60,7 +62,7 @@ void test() throws Exception {
final DefaultNormalizationWorker normalizationWorker =
new DefaultNormalizationWorker(JOB_ID, JOB_ATTEMPT, normalizationRunner, WorkerEnvironment.DOCKER);

normalizationWorker.run(normalizationInput, jobRoot);
final NormalizationSummary normalizationOutput = normalizationWorker.run(normalizationInput, jobRoot);

verify(normalizationRunner).start();
verify(normalizationRunner).normalize(
Expand All @@ -70,6 +72,8 @@ void test() throws Exception {
normalizationInput.getDestinationConfiguration(),
normalizationInput.getCatalog(), workerConfigs.getResourceRequirements());
verify(normalizationRunner).close();
assertNotNull(normalizationOutput.getStartTime());
assertNotNull(normalizationOutput.getEndTime());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.mockito.Mockito.verifyNoInteractions;

import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -81,6 +82,7 @@ class SyncWorkflowTest {
private OperatorDbtInput operatorDbtInput;

private StandardSyncOutput replicationSuccessOutput;
private NormalizationSummary normalizationSummary;

@BeforeEach
public void setUp() {
Expand All @@ -94,6 +96,7 @@ public void setUp() {
sync = syncPair.getKey();
syncInput = syncPair.getValue();
replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog());
normalizationSummary = new NormalizationSummary();

normalizationInput = new NormalizationInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
Expand Down Expand Up @@ -127,13 +130,18 @@ void testSuccess() {
DESTINATION_LAUNCHER_CONFIG,
syncInput);

doReturn(normalizationSummary).when(normalizationActivity).normalize(
JOB_RUN_CONFIG,
DESTINATION_LAUNCHER_CONFIG,
normalizationInput);

final StandardSyncOutput actualOutput = execute();
assertEquals(replicationSuccessOutput, actualOutput);

verifyReplication(replicationActivity, syncInput, sync.getConnectionId());
verifyPersistState(persistStateActivity, sync, actualOutput);
verifyPersistState(persistStateActivity, sync, replicationSuccessOutput);
verifyNormalize(normalizationActivity, normalizationInput);
verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput);
assertEquals(replicationSuccessOutput.withNormalizationSummary(normalizationSummary), actualOutput);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
Expand Down Expand Up @@ -88,7 +89,9 @@ public class JobCreationAndStatusUpdateActivityTest {
private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
.withStatus(ReplicationStatus.COMPLETED));
.withStatus(ReplicationStatus.COMPLETED))
.withNormalizationSummary(
new NormalizationSummary());

private static final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput);

Expand Down

0 comments on commit ba48d40

Please sign in to comment.