Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass workspace id to sync workflow and use it to selectively enable field selection #20589

Merged
merged 14 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.commons.features;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -16,6 +19,14 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";

private final Set<UUID> FIELD_SELECTION_WORKSPACES = getEnvOrDefault("FIELD_SELECTION_WORKSPACES", Set.of(), (workspaceIdsString) -> {
final Set<UUID> workspaceIds = new HashSet<>();
for (final String workspaceId : workspaceIdsString.split(",")) {
workspaceIds.add(UUID.fromString(workspaceId));
}
return workspaceIds;
});

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down Expand Up @@ -49,7 +60,10 @@ public boolean needStateValidation() {
}

@Override
public boolean applyFieldSelection() {
public boolean applyFieldSelection(UUID workspaceId) {
if (workspaceId != null && FIELD_SELECTION_WORKSPACES.contains(workspaceId)) {
return true;
}
return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.features;

import java.util.UUID;

/**
* Interface that describe which features are activated in airbyte. Currently the only
* implementation relies on env. Ideally it should be on some DB.
Expand All @@ -22,6 +24,12 @@ public interface FeatureFlags {

boolean needStateValidation();

boolean applyFieldSelection();
/**
* Return true if field selection should be applied for the given workspaceId
*
* @param workspaceId that owns the sync
* @return whether field selection should be applied
*/
boolean applyFieldSelection(UUID workspaceId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ properties:
isDestinationCustomConnector:
description: determine if the running image of the destination is a custom connector.
type: boolean
workspaceId:
description: The id of the workspace associated with the sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ properties:
isDestinationCustomConnector:
description: determine if the destination running image is a custom connector.
type: boolean
workspaceId:
description: The id of the workspace associated with the sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ properties:
description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container
type: object
"$ref": ResourceRequirements.yaml
workspaceId:
description: The id of the workspace associated with this sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public Optional<String> runJob() throws Exception {
sourceLauncherConfig.getDockerImage());

log.info("Setting up replication worker...");
log.debug("Field selection is {}", featureFlags.applyFieldSelection(syncInput.getWorkspaceId()) ? "enabled" : "disabled");
final var replicationWorker = new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Expand All @@ -148,7 +149,7 @@ public Optional<String> runJob() throws Exception {
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter, featureFlags.applyFieldSelection());
metricReporter, featureFlags.applyFieldSelection(syncInput.getWorkspaceId()));

log.info("Running replication worker...");
final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final JsonNode webhookOperationConfigs,
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition)
final StandardDestinationDefinition destinationDefinition,
UUID workspaceId)
mfsiega-airbyte marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
// reusing this isn't going to quite work.

Expand Down Expand Up @@ -96,7 +97,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withSourceResourceRequirements(mergedSrcResourceReq)
.withDestinationResourceRequirements(mergedDstResourceReq)
.withIsSourceCustomConnector(sourceDefinition.getCustom())
.withIsDestinationCustomConnector(destinationDefinition.getCustom());
.withIsDestinationCustomConnector(destinationDefinition.getCustom())
.withWorkspaceId(workspaceId);

getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;

public interface JobCreator {

/**
*
* @param source db model representing where data comes from
* @param destination db model representing where data goes
* @param standardSync sync options
* @param sourceDockerImage docker image to use for the source
* @param destinationDockerImage docker image to use for the destination
* @param workspaceId
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Expand All @@ -40,7 +41,8 @@ Optional<Long> createSyncJob(SourceConnection source,
List<StandardSyncOperation> standardSyncOperations,
@Nullable JsonNode webhookOperationConfigs,
StandardSourceDefinition sourceDefinition,
StandardDestinationDefinition destinationDefinition)
StandardDestinationDefinition destinationDefinition,
UUID workspaceId)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public Long create(final UUID connectionId) {
standardSyncOperations,
workspace.getWebhookOperationConfigs(),
sourceDefinition,
destinationDefinition)
destinationDefinition,
workspace.getWorkspaceId())
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void testCreateSyncJob() throws IOException {
List.of(STANDARD_SYNC_OPERATION),
PERSISTED_WEBHOOK_CONFIGS,
STANDARD_SOURCE_DEFINITION,
STANDARD_DESTINATION_DEFINITION).orElseThrow();
STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).orElseThrow();
assertEquals(JOB_ID, jobId);
}

Expand Down Expand Up @@ -247,7 +247,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION).isEmpty());
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).isEmpty());
}

@Test
Expand All @@ -262,7 +262,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION);
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID());

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand Down Expand Up @@ -310,7 +310,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION);
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID());

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand Down Expand Up @@ -364,7 +364,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
null,
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)),
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))));
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))),
UUID.randomUUID());

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
final UUID destinationId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();
final UUID workspaceWebhookConfigId = UUID.randomUUID();
final UUID workspaceId = UUID.randomUUID();
final String workspaceWebhookName = "test-webhook-name";
final JsonNode persistedWebhookConfigs = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
Expand Down Expand Up @@ -87,7 +88,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
when(
jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage,
dstProtocolVersion, operations,
persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition))
persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition, workspaceId))
.thenReturn(Optional.of(jobId));
when(configRepository.getStandardSourceDefinition(sourceDefinitionId))
.thenReturn(standardSourceDefinition);
Expand All @@ -96,7 +97,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(standardDestinationDefinition);

when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn(
new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs));
new StandardWorkspace().withWorkspaceId(workspaceId).withWebhookOperationConfigs(persistedWebhookConfigs));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper);
final long actualJobId = factory.create(connectionId);
Expand All @@ -105,7 +106,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
verify(jobCreator)
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion,
operations, persistedWebhookConfigs,
standardSourceDefinition, standardDestinationDefinition);
standardSourceDefinition, standardDestinationDefinition, workspaceId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
.withResourceRequirements(resetConnection.getResourceRequirements())
.withState(resetConnection.getState())
.withIsSourceCustomConnector(resetConnection.getIsSourceCustomConnector())
.withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector());
.withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector())
.withWorkspaceId(resetConnection.getWorkspaceId());
} else {
throw new IllegalStateException(
String.format("Unexpected config type %s for job %d. The only supported config types for this activity are (%s)",
Expand Down Expand Up @@ -139,7 +140,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
.withState(config.getState())
.withResourceRequirements(config.getResourceRequirements())
.withSourceResourceRequirements(config.getSourceResourceRequirements())
.withDestinationResourceRequirements(config.getDestinationResourceRequirements());
.withDestinationResourceRequirements(config.getDestinationResourceRequirements())
.withWorkspaceId(config.getWorkspaceId());

return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput);

Expand Down