From c862c0b961c6014a323a1604d728681e4e82c1ea Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Fri, 16 Dec 2022 18:00:32 +0100 Subject: [PATCH 1/8] pass workspace id to sync workflow and use it to selectively enable field selection --- .../features/EnvVariableFeatureFlags.java | 16 +++++++++++++++- .../airbyte/commons/features/FeatureFlags.java | 10 +++++++++- .../types/JobResetConnectionConfig.yaml | 4 ++++ .../src/main/resources/types/JobSyncConfig.yaml | 4 ++++ .../main/resources/types/StandardSyncInput.yaml | 4 ++++ .../orchestrator/ReplicationJobOrchestrator.java | 3 ++- .../persistence/job/DefaultJobCreator.java | 6 ++++-- .../io/airbyte/persistence/job/JobCreator.java | 6 ++++-- .../job/factory/DefaultSyncJobFactory.java | 3 ++- .../persistence/job/DefaultJobCreatorTest.java | 11 ++++++----- .../job/factory/DefaultSyncJobFactoryTest.java | 7 ++++--- .../activities/GenerateInputActivityImpl.java | 6 ++++-- 12 files changed, 62 insertions(+), 18 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index e09108194518..dfb17b3edb27 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -4,6 +4,9 @@ package io.airbyte.commons.features; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @@ -16,6 +19,14 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION"; public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; + private final Set FIELD_SELECTION_WORKSPACES = getEnvOrDefault("FIELD_SELECTION_WORKSPACES", Set.of(), (workspaceIdsString) -> { + final Set workspaceIds = new HashSet<>(); + for (final String workspaceId : workspaceIdsString.split(",")) { + workspaceIds.add(UUID.fromString(workspaceId)); + } + return workspaceIds; + }); + @Override public boolean autoDisablesFailingConnections() { log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"))); @@ -49,7 +60,10 @@ public boolean needStateValidation() { } @Override - public boolean applyFieldSelection() { + public boolean applyFieldSelection(UUID workspaceId) { + if (workspaceId != null && FIELD_SELECTION_WORKSPACES.contains(workspaceId)) { + return true; + } return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean); } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 3633f9bb4ed7..d103b5807c71 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -4,6 +4,8 @@ package io.airbyte.commons.features; +import java.util.UUID; + /** * Interface that describe which features are activated in airbyte. Currently the only * implementation relies on env. Ideally it should be on some DB. @@ -22,6 +24,12 @@ public interface FeatureFlags { boolean needStateValidation(); - boolean applyFieldSelection(); + /** + * Return true if field selection should be applied for the given workspaceId + * + * @param workspaceId that owns the sync + * @return whether field selection should be applied + */ + boolean applyFieldSelection(UUID workspaceId); } diff --git a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml index 3bba82085306..462a8ab1229d 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml @@ -58,3 +58,7 @@ properties: isDestinationCustomConnector: description: determine if the running image of the destination is a custom connector. type: boolean + workspaceId: + description: The id of the workspace associated with the sync + type: string + format: uuid diff --git a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml index 1862f17f884c..652996a9b5c0 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml @@ -77,3 +77,7 @@ properties: isDestinationCustomConnector: description: determine if the destination running image is a custom connector. type: boolean + workspaceId: + description: The id of the workspace associated with the sync + type: string + format: uuid diff --git a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml index ea84cecce3d4..2e0b943b9104 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml @@ -56,3 +56,7 @@ properties: description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container type: object "$ref": ResourceRequirements.yaml + workspaceId: + description: The id of the workspace associated with this sync + type: string + format: uuid diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index b4d73416e743..b3b141bb724f 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -138,6 +138,7 @@ public Optional runJob() throws Exception { sourceLauncherConfig.getDockerImage()); log.info("Setting up replication worker..."); + log.debug("Field selection is {}", featureFlags.applyFieldSelection(syncInput.getWorkspaceId()) ? "enabled" : "disabled"); final var replicationWorker = new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), @@ -148,7 +149,7 @@ public Optional runJob() throws Exception { new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, featureFlags.applyFieldSelection()); + metricReporter, featureFlags.applyFieldSelection(syncInput.getWorkspaceId())); log.info("Running replication worker..."); final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java index 408953197783..1d903a458e49 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java @@ -60,7 +60,8 @@ public Optional createSyncJob(final SourceConnection source, final List standardSyncOperations, @Nullable final JsonNode webhookOperationConfigs, final StandardSourceDefinition sourceDefinition, - final StandardDestinationDefinition destinationDefinition) + final StandardDestinationDefinition destinationDefinition, + UUID workspaceId) throws IOException { // reusing this isn't going to quite work. @@ -96,7 +97,8 @@ public Optional createSyncJob(final SourceConnection source, .withSourceResourceRequirements(mergedSrcResourceReq) .withDestinationResourceRequirements(mergedDstResourceReq) .withIsSourceCustomConnector(sourceDefinition.getCustom()) - .withIsDestinationCustomConnector(destinationDefinition.getCustom()); + .withIsDestinationCustomConnector(destinationDefinition.getCustom()) + .withWorkspaceId(workspaceId); getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java index 92d285e699a8..d3905878d420 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java @@ -16,17 +16,18 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.UUID; import javax.annotation.Nullable; public interface JobCreator { /** - * * @param source db model representing where data comes from * @param destination db model representing where data goes * @param standardSync sync options * @param sourceDockerImage docker image to use for the source * @param destinationDockerImage docker image to use for the destination + * @param workspaceId * @return the new job if no other conflicting job was running, otherwise empty * @throws IOException if something wrong happens */ @@ -40,7 +41,8 @@ Optional createSyncJob(SourceConnection source, List standardSyncOperations, @Nullable JsonNode webhookOperationConfigs, StandardSourceDefinition sourceDefinition, - StandardDestinationDefinition destinationDefinition) + StandardDestinationDefinition destinationDefinition, + UUID workspaceId) throws IOException; /** diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java index 116114e061cc..e7a69eb64533 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java @@ -94,7 +94,8 @@ public Long create(final UUID connectionId) { standardSyncOperations, workspace.getWebhookOperationConfigs(), sourceDefinition, - destinationDefinition) + destinationDefinition, + workspace.getWorkspaceId()) .orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already.")); } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java index 1804f3226a67..41a5ca697920 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java @@ -210,7 +210,7 @@ void testCreateSyncJob() throws IOException { List.of(STANDARD_SYNC_OPERATION), PERSISTED_WEBHOOK_CONFIGS, STANDARD_SOURCE_DEFINITION, - STANDARD_DESTINATION_DEFINITION).orElseThrow(); + STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).orElseThrow(); assertEquals(JOB_ID, jobId); } @@ -247,7 +247,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION).isEmpty()); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).isEmpty()); } @Test @@ -262,7 +262,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -310,7 +310,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -364,7 +364,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { null, new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)), new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of( - new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements))))); + new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))), + UUID.randomUUID()); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java index 120f9e4b984f..5c4e439e5542 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java @@ -44,6 +44,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo final UUID destinationId = UUID.randomUUID(); final UUID operationId = UUID.randomUUID(); final UUID workspaceWebhookConfigId = UUID.randomUUID(); + final UUID workspaceId = UUID.randomUUID(); final String workspaceWebhookName = "test-webhook-name"; final JsonNode persistedWebhookConfigs = Jsons.deserialize( String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}", @@ -87,7 +88,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo when( jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion, operations, - persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition)) + persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition, workspaceId)) .thenReturn(Optional.of(jobId)); when(configRepository.getStandardSourceDefinition(sourceDefinitionId)) .thenReturn(standardSourceDefinition); @@ -96,7 +97,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo .thenReturn(standardDestinationDefinition); when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn( - new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs)); + new StandardWorkspace().withWorkspaceId(workspaceId).withWebhookOperationConfigs(persistedWebhookConfigs)); final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper); final long actualJobId = factory.create(connectionId); @@ -105,7 +106,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo verify(jobCreator) .createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion, operations, persistedWebhookConfigs, - standardSourceDefinition, standardDestinationDefinition); + standardSourceDefinition, standardDestinationDefinition, workspaceId); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 2a550a62eb36..bf73a08c2c6c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -79,7 +79,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withResourceRequirements(resetConnection.getResourceRequirements()) .withState(resetConnection.getState()) .withIsSourceCustomConnector(resetConnection.getIsSourceCustomConnector()) - .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()); + .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()) + .withWorkspaceId(resetConnection.getWorkspaceId()); } else { throw new IllegalStateException( String.format("Unexpected config type %s for job %d. The only supported config types for this activity are (%s)", @@ -139,7 +140,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withState(config.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) - .withDestinationResourceRequirements(config.getDestinationResourceRequirements()); + .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) + .withWorkspaceId(config.getWorkspaceId()); return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); From 269e102a7327e367c95a32df890e82fd3a8c1a69 Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Fri, 16 Dec 2022 23:07:43 +0100 Subject: [PATCH 2/8] fix tests around workspace id in job creation --- .../general/DefaultReplicationWorker.java | 1 + .../job/DefaultJobCreatorTest.java | 21 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 1d8034479239..1c4abf657239 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -140,6 +140,7 @@ public DefaultReplicationWorker(final String jobId, @Override public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException { LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt); + LOGGER.info("fieldSelectionEnabled: {}", fieldSelectionEnabled); LineGobbler.startSection("REPLICATION"); // todo (cgardens) - this should not be happening in the worker. this is configuration information diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java index 41a5ca697920..7311ade09c97 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java @@ -75,6 +75,7 @@ class DefaultJobCreatorTest { private static final StandardSourceDefinition STANDARD_SOURCE_DEFINITION; private static final StandardDestinationDefinition STANDARD_DESTINATION_DEFINITION; private static final long JOB_ID = 12L; + private static final UUID WORKSPACE_ID = UUID.randomUUID(); private JobPersistence jobPersistence; private StatePersistence statePersistence; @@ -190,7 +191,8 @@ void testCreateSyncJob() throws IOException { .withDestinationResourceRequirements(workerResourceRequirements) .withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig jobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -210,7 +212,7 @@ void testCreateSyncJob() throws IOException { List.of(STANDARD_SYNC_OPERATION), PERSISTED_WEBHOOK_CONFIGS, STANDARD_SOURCE_DEFINITION, - STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).orElseThrow(); + STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID).orElseThrow(); assertEquals(JOB_ID, jobId); } @@ -262,7 +264,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -280,7 +282,8 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { .withSourceResourceRequirements(workerResourceRequirements) .withDestinationResourceRequirements(workerResourceRequirements) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig expectedJobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -310,7 +313,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -328,7 +331,8 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { .withSourceResourceRequirements(standardSyncResourceRequirements) .withDestinationResourceRequirements(standardSyncResourceRequirements) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig expectedJobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -365,7 +369,7 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)), new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of( new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))), - UUID.randomUUID()); + WORKSPACE_ID); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -383,7 +387,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { .withSourceResourceRequirements(sourceResourceRequirements) .withDestinationResourceRequirements(destResourceRequirements) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig expectedJobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) From a291e49962d723f599f24cb353acb75b17abc323 Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Sat, 17 Dec 2022 01:00:54 +0100 Subject: [PATCH 3/8] make sure field selection environment variables get passed through properly --- .../temporal/sync/OrchestratorConstants.java | 4 ++- .../general/DefaultReplicationWorker.java | 1 + .../process/AirbyteIntegrationLauncher.java | 4 ++- .../AirbyteIntegrationLauncherTest.java | 4 ++- .../features/EnvVariableFeatureFlags.java | 21 +++++---------- .../commons/features/FeatureFlags.java | 15 +++++++---- .../main/java/io/airbyte/config/Configs.java | 4 +++ .../java/io/airbyte/config/EnvConfigs.java | 12 +++++++++ .../ReplicationJobOrchestrator.java | 27 ++++++++++++++++--- ...ontainerOrchestratorConfigBeanFactory.java | 2 ++ .../sync/ReplicationActivityImpl.java | 23 +++++++++++++++- .../airbyte-worker/templates/deployment.yaml | 10 +++++++ docker-compose.yaml | 8 ++++++ 13 files changed, 108 insertions(+), 27 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 61fccbad4e1e..ef8d7452d136 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -68,7 +68,9 @@ public class OrchestratorConstants { EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY, EnvConfigs.STATE_STORAGE_S3_REGION, EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA)) + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES)) .build(); public static final String INIT_FILE_ENV_MAP = "envMap.json"; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 1c4abf657239..e1e3b23ccf1a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -140,6 +140,7 @@ public DefaultReplicationWorker(final String jobId, @Override public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException { LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt); + LOGGER.info("syncInput: {}", syncInput); LOGGER.info("fieldSelectionEnabled: {}", fieldSelectionEnabled); LineGobbler.startSection("REPLICATION"); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 3d6871786731..a8688b912ea6 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -227,7 +227,9 @@ private Map getWorkerMetadata() { WorkerEnvConstants.WORKER_JOB_ID, jobId, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt), EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())); + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()), + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()), + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 30c8779a2449..69632d85bee4 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -54,7 +54,9 @@ class AirbyteIntegrationLauncherTest { WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema())); + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()), + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()), + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces()); private WorkerConfigs workerConfigs; @Mock diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index dfb17b3edb27..ce3683e42e98 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -4,9 +4,6 @@ package io.airbyte.commons.features; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @@ -19,13 +16,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION"; public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; - private final Set FIELD_SELECTION_WORKSPACES = getEnvOrDefault("FIELD_SELECTION_WORKSPACES", Set.of(), (workspaceIdsString) -> { - final Set workspaceIds = new HashSet<>(); - for (final String workspaceId : workspaceIdsString.split(",")) { - workspaceIds.add(UUID.fromString(workspaceId)); - } - return workspaceIds; - }); + public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; @Override public boolean autoDisablesFailingConnections() { @@ -60,13 +51,15 @@ public boolean needStateValidation() { } @Override - public boolean applyFieldSelection(UUID workspaceId) { - if (workspaceId != null && FIELD_SELECTION_WORKSPACES.contains(workspaceId)) { - return true; - } + public boolean applyFieldSelection() { return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean); } + @Override + public String fieldSelectionWorkspaces() { + return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg); + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index d103b5807c71..aa1858acda1e 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -4,8 +4,6 @@ package io.airbyte.commons.features; -import java.util.UUID; - /** * Interface that describe which features are activated in airbyte. Currently the only * implementation relies on env. Ideally it should be on some DB. @@ -25,11 +23,18 @@ public interface FeatureFlags { boolean needStateValidation(); /** - * Return true if field selection should be applied for the given workspaceId + * Return true if field selection should be applied. See also fieldSelectionWorkspaces. * - * @param workspaceId that owns the sync * @return whether field selection should be applied */ - boolean applyFieldSelection(UUID workspaceId); + boolean applyFieldSelection(); + + /** + * Get the workspaces allow-listed for field selection. This should take precedence over + * applyFieldSelection. + * + * @return a comma-separated list of workspace ids where field selection should be enabled. + */ + String fieldSelectionWorkspaces(); } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 1d4d03139e1e..956d67b9bc4e 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -740,6 +740,10 @@ public interface Configs { boolean getAutoDetectSchema(); + boolean getApplyFieldSelection(); + + String getFieldSelectionWorkspaces(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index ca175dd8c39f..c183336f620c 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -217,6 +217,8 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5; private static final String DEFAULT_NETWORK = "host"; private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA"; + private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; + private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), @@ -1123,6 +1125,16 @@ public boolean getAutoDetectSchema() { return getEnvOrDefault(AUTO_DETECT_SCHEMA, false); } + @Override + public boolean getApplyFieldSelection() { + return getEnvOrDefault(APPLY_FIELD_SELECTION, false); + } + + @Override + public String getFieldSelectionWorkspaces() { + return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, ""); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index b3b141bb724f..df399ee92e54 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -43,16 +43,18 @@ import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.sync.ReplicationLauncherWorker; -import java.lang.invoke.MethodHandles; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReplicationJobOrchestrator implements JobOrchestrator { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger log = LoggerFactory.getLogger(ReplicationJobOrchestrator.class); private final ProcessFactory processFactory; private final Configs configs; private final FeatureFlags featureFlags; @@ -138,7 +140,8 @@ public Optional runJob() throws Exception { sourceLauncherConfig.getDockerImage()); log.info("Setting up replication worker..."); - log.debug("Field selection is {}", featureFlags.applyFieldSelection(syncInput.getWorkspaceId()) ? "enabled" : "disabled"); + log.debug("Field selection is {}", featureFlags.applyFieldSelection() ? "enabled" : "disabled"); + log.debug("Field selection workspaces are: {}", featureFlags.fieldSelectionWorkspaces()); final var replicationWorker = new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), @@ -149,7 +152,7 @@ public Optional runJob() throws Exception { new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, featureFlags.applyFieldSelection(syncInput.getWorkspaceId())); + metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId())); log.info("Running replication worker..."); final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(), @@ -166,4 +169,20 @@ private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, fin : new DefaultAirbyteStreamFactory(mdcScope); } + private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { + final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + final Set workspaceIds = new HashSet<>(); + for (final String id : workspaceIdsString.split(",")) { + workspaceIds.add(UUID.fromString(id)); + } + for (final UUID workspace : workspaceIds) { + log.info("field selection workspace: {}", workspace); + } + if (workspaceId != null && workspaceIds.contains(workspaceId)) { + return true; + } + + return featureFlags.applyFieldSelection(); + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index d9259005b3ce..f8841b007414 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -77,6 +77,8 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics); environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState())); environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); + environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection())); + environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); if (System.getenv(DD_ENV_ENV_VAR) != null) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 17a501ff25c9..2ff15d7455a3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -72,8 +72,10 @@ import jakarta.inject.Singleton; import java.nio.file.Path; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -293,6 +295,9 @@ private CheckedSupplier, Exception> final MetricClient metricClient = MetricClientFactory.getMetricClient(); final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); + LOGGER.debug("Field selection is {}", featureFlags.applyFieldSelection() ? "enabled" : "disabled"); + LOGGER.debug("Field selection workspaces are: {}", featureFlags.fieldSelectionWorkspaces()); + return new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), @@ -304,7 +309,7 @@ private CheckedSupplier, Exception> new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, false); + metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId())); }; } @@ -342,4 +347,20 @@ private boolean isResetJob(final String dockerImage) { return WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equalsIgnoreCase(dockerImage); } + private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { + final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + final Set workspaceIds = new HashSet<>(); + for (final String id : workspaceIdsString.split(",")) { + workspaceIds.add(UUID.fromString(id)); + } + for (final UUID workspace : workspaceIds) { + LOGGER.info("field selection workspace: {}", workspace); + } + if (workspaceId != null && workspaceIds.contains(workspaceId)) { + return true; + } + + return featureFlags.applyFieldSelection(); + } + } diff --git a/charts/airbyte-worker/templates/deployment.yaml b/charts/airbyte-worker/templates/deployment.yaml index ea026012ad73..d4d8b8af05c7 100644 --- a/charts/airbyte-worker/templates/deployment.yaml +++ b/charts/airbyte-worker/templates/deployment.yaml @@ -296,6 +296,16 @@ spec: configMapKeyRef: name: {{ .Release.Name }}-airbyte-env key: AUTO_DETECT_SCHEMA + - name: APPLY_FIELD_SELECTION + valueFrom: + configMapKeyRef: + name: { { .Release.Name } }-airbyte-env + key: APPLY_FIELD_SELECTION + - name: FIELD_SELECTION_WORKSPACES + valueFrom: + configMapKeyRef: + name: { { .Release.Name } }-airbyte-env + key: FIELD_SELECTION_WORKSPACES - name: USE_STREAM_CAPABLE_STATE valueFrom: configMapKeyRef: diff --git a/docker-compose.yaml b/docker-compose.yaml index 46fbab29492c..3b0bbc96ed04 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -32,6 +32,8 @@ services: - DATABASE_USER=${DATABASE_USER} - LOG_LEVEL=${LOG_LEVEL} - RUN_DATABASE_MIGRATION_ON_STARTUP=${RUN_DATABASE_MIGRATION_ON_STARTUP} + - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} + - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} networks: - airbyte_internal db: @@ -108,6 +110,8 @@ services: - AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA} - USE_STREAM_CAPABLE_STATE=${USE_STREAM_CAPABLE_STATE} - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} + - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} + - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT} @@ -149,6 +153,8 @@ services: - WORKSPACE_ROOT=${WORKSPACE_ROOT} - GITHUB_STORE_BRANCH=${GITHUB_STORE_BRANCH} - AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA} + - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} + - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} ports: - 8001 volumes: @@ -189,6 +195,8 @@ services: - POSTGRES_PWD=${DATABASE_PASSWORD} - POSTGRES_SEEDS=${DATABASE_HOST} - POSTGRES_USER=${DATABASE_USER} + - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} + - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} volumes: - ./temporal/dynamicconfig:/etc/temporal/config/dynamicconfig networks: From 0febc12441278a8cee2865cd019a00a14070cedc Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Sat, 17 Dec 2022 01:26:01 +0100 Subject: [PATCH 4/8] clean up handling around field selection flags --- .../workers/general/DefaultReplicationWorker.java | 2 -- .../orchestrator/ReplicationJobOrchestrator.java | 5 ++--- .../io/airbyte/persistence/job/DefaultJobCreator.java | 2 +- .../workers/temporal/sync/ReplicationActivityImpl.java | 3 --- charts/airbyte-worker/templates/deployment.yaml | 10 ---------- docker-compose.yaml | 6 ------ 6 files changed, 3 insertions(+), 25 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index e1e3b23ccf1a..1d8034479239 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -140,8 +140,6 @@ public DefaultReplicationWorker(final String jobId, @Override public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException { LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt); - LOGGER.info("syncInput: {}", syncInput); - LOGGER.info("fieldSelectionEnabled: {}", fieldSelectionEnabled); LineGobbler.startSection("REPLICATION"); // todo (cgardens) - this should not be happening in the worker. this is configuration information diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index df399ee92e54..8a35fbdc82cc 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -43,6 +43,7 @@ import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.sync.ReplicationLauncherWorker; +import java.lang.invoke.MethodHandles; import java.nio.file.Path; import java.util.HashSet; import java.util.Map; @@ -54,7 +55,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator { - private static final Logger log = LoggerFactory.getLogger(ReplicationJobOrchestrator.class); + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final ProcessFactory processFactory; private final Configs configs; private final FeatureFlags featureFlags; @@ -140,8 +141,6 @@ public Optional runJob() throws Exception { sourceLauncherConfig.getDockerImage()); log.info("Setting up replication worker..."); - log.debug("Field selection is {}", featureFlags.applyFieldSelection() ? "enabled" : "disabled"); - log.debug("Field selection workspaces are: {}", featureFlags.fieldSelectionWorkspaces()); final var replicationWorker = new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java index 1d903a458e49..e09056525a28 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java @@ -61,7 +61,7 @@ public Optional createSyncJob(final SourceConnection source, @Nullable final JsonNode webhookOperationConfigs, final StandardSourceDefinition sourceDefinition, final StandardDestinationDefinition destinationDefinition, - UUID workspaceId) + final UUID workspaceId) throws IOException { // reusing this isn't going to quite work. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 2ff15d7455a3..9f298c2e6d99 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -295,9 +295,6 @@ private CheckedSupplier, Exception> final MetricClient metricClient = MetricClientFactory.getMetricClient(); final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); - LOGGER.debug("Field selection is {}", featureFlags.applyFieldSelection() ? "enabled" : "disabled"); - LOGGER.debug("Field selection workspaces are: {}", featureFlags.fieldSelectionWorkspaces()); - return new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), diff --git a/charts/airbyte-worker/templates/deployment.yaml b/charts/airbyte-worker/templates/deployment.yaml index d4d8b8af05c7..ea026012ad73 100644 --- a/charts/airbyte-worker/templates/deployment.yaml +++ b/charts/airbyte-worker/templates/deployment.yaml @@ -296,16 +296,6 @@ spec: configMapKeyRef: name: {{ .Release.Name }}-airbyte-env key: AUTO_DETECT_SCHEMA - - name: APPLY_FIELD_SELECTION - valueFrom: - configMapKeyRef: - name: { { .Release.Name } }-airbyte-env - key: APPLY_FIELD_SELECTION - - name: FIELD_SELECTION_WORKSPACES - valueFrom: - configMapKeyRef: - name: { { .Release.Name } }-airbyte-env - key: FIELD_SELECTION_WORKSPACES - name: USE_STREAM_CAPABLE_STATE valueFrom: configMapKeyRef: diff --git a/docker-compose.yaml b/docker-compose.yaml index 3b0bbc96ed04..095f67bc94c6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -32,8 +32,6 @@ services: - DATABASE_USER=${DATABASE_USER} - LOG_LEVEL=${LOG_LEVEL} - RUN_DATABASE_MIGRATION_ON_STARTUP=${RUN_DATABASE_MIGRATION_ON_STARTUP} - - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} networks: - airbyte_internal db: @@ -153,8 +151,6 @@ services: - WORKSPACE_ROOT=${WORKSPACE_ROOT} - GITHUB_STORE_BRANCH=${GITHUB_STORE_BRANCH} - AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA} - - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} ports: - 8001 volumes: @@ -195,8 +191,6 @@ services: - POSTGRES_PWD=${DATABASE_PASSWORD} - POSTGRES_SEEDS=${DATABASE_HOST} - POSTGRES_USER=${DATABASE_USER} - - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} volumes: - ./temporal/dynamicconfig:/etc/temporal/config/dynamicconfig networks: From 9c54ed65737d332fed3333dbf8a4677ba16aa83f Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Sat, 17 Dec 2022 09:25:03 +0100 Subject: [PATCH 5/8] debug logging for field selection --- .../io/airbyte/workers/general/DefaultReplicationWorker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 1d8034479239..4b669bd9e71e 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -158,6 +158,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path .stream() .collect(Collectors.toMap(s -> s.getStream().getNamespace() + "." + s.getStream().getName(), s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode())))); + LOGGER.debug("field selection enabled: {}", fieldSelectionEnabled); final WorkerSourceConfig sourceConfig = WorkerUtils.syncToWorkerSourceConfig(syncInput); ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot)); From 88f0ca0cd10481d405694826dcaf08144c193e2d Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Sat, 17 Dec 2022 09:53:41 +0100 Subject: [PATCH 6/8] properly handle empty field selection feature flag --- .../temporal/sync/ReplicationActivityImpl.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 9f298c2e6d99..0d2271a50d5c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -347,11 +347,15 @@ private boolean isResetJob(final String dockerImage) { private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); final Set workspaceIds = new HashSet<>(); - for (final String id : workspaceIdsString.split(",")) { - workspaceIds.add(UUID.fromString(id)); - } - for (final UUID workspace : workspaceIds) { - LOGGER.info("field selection workspace: {}", workspace); + LOGGER.debug("Field selection enabled for {}", workspaceIdsString); + if (!workspaceIdsString.equals("")) { + for (final String id : workspaceIdsString.split(",")) { + try { + workspaceIds.add(UUID.fromString(id)); + } catch (IllegalArgumentException e) { + LOGGER.warn("Malformed workspace id for field selection: {}", id); + } + } } if (workspaceId != null && workspaceIds.contains(workspaceId)) { return true; From f7efca793c3e87f747e6b8dad2496b53993e55a3 Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Sat, 17 Dec 2022 10:09:27 +0100 Subject: [PATCH 7/8] fix pmd --- .../airbyte/workers/temporal/sync/ReplicationActivityImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 0d2271a50d5c..aec1714fc1f9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -348,7 +348,7 @@ private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); final Set workspaceIds = new HashSet<>(); LOGGER.debug("Field selection enabled for {}", workspaceIdsString); - if (!workspaceIdsString.equals("")) { + if (!"".equals(workspaceIdsString)) { for (final String id : workspaceIdsString.split(",")) { try { workspaceIds.add(UUID.fromString(id)); From 710fe88d89099239bc65b9c0d4ac5b047bab266d Mon Sep 17 00:00:00 2001 From: Michael Siega Date: Sat, 17 Dec 2022 10:27:14 +0100 Subject: [PATCH 8/8] actually fix pmd --- .../airbyte/workers/temporal/sync/ReplicationActivityImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index aec1714fc1f9..238fd7da2bc5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -348,7 +348,7 @@ private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); final Set workspaceIds = new HashSet<>(); LOGGER.debug("Field selection enabled for {}", workspaceIdsString); - if (!"".equals(workspaceIdsString)) { + if (!workspaceIdsString.isEmpty()) { for (final String id : workspaceIdsString.split(",")) { try { workspaceIds.add(UUID.fromString(id));