Skip to content

Commit

Permalink
fix: inject oauth params when generating sync input (#22635)
Browse files Browse the repository at this point in the history
* fix: inject oauth params when generating sync inpt

* make it final

* rm oauth injection from job creator factory

* keep it

* keep it 2

* test that we're actually using the oauth values

* fmt
  • Loading branch information
pedroslopez authored Feb 9, 2023
1 parent a70d6e8 commit 44a85a9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.AttemptApi;
Expand Down Expand Up @@ -39,6 +40,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand All @@ -63,19 +65,22 @@ public class GenerateInputActivityImpl implements GenerateInputActivity {
private final AttemptApi attemptApi;
private final StateApi stateApi;
private final FeatureFlags featureFlags;
private final OAuthConfigSupplier oAuthConfigSupplier;

private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class);

public GenerateInputActivityImpl(final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final StateApi stateApi,
final AttemptApi attemptApi,
final FeatureFlags featureFlags) {
final FeatureFlags featureFlags,
final OAuthConfigSupplier oAuthConfigSupplier) {
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.stateApi = stateApi;
this.attemptApi = attemptApi;
this.featureFlags = featureFlags;
this.oAuthConfigSupplier = oAuthConfigSupplier;
}

private Optional<State> getCurrentConnectionState(final UUID connectionId) {
Expand Down Expand Up @@ -122,7 +127,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
if (ConfigType.SYNC.equals(jobConfigType)) {
config = job.getConfig().getSync();
final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId());
attemptSyncConfig.setSourceConfiguration(source.getConfiguration());
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
attemptSyncConfig.setSourceConfiguration(sourceConfiguration);
} else if (ConfigType.RESET_CONNECTION.equals(jobConfigType)) {
final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection();
final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration();
Expand Down Expand Up @@ -156,7 +165,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);

final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId());
attemptSyncConfig.setDestinationConfiguration(destination.getConfiguration());
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
destination.getDestinationDefinitionId(),
destination.getWorkspaceId(),
destination.getConfiguration());
attemptSyncConfig.setDestinationConfiguration(destinationConfiguration);

final StandardSourceDefinition sourceDefinition =
configRepository.getSourceDefinitionFromSource(standardSync.getSourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand All @@ -58,15 +59,21 @@ class GenerateInputActivityTest {
static private JobPersistence jobPersistence;
static private ConfigRepository configRepository;
static private GenerateInputActivityImpl generateInputActivity;
static private OAuthConfigSupplier oAuthConfigSupplier;
static private Job job;

static private final JsonNode SOURCE_CONFIGURATION = Jsons.jsonNode(Map.of("source_key", "source_value"));
static private final JsonNode SOURCE_CONFIG_WITH_OAUTH = Jsons.jsonNode(Map.of("source_key", "source_value", "oauth", "oauth_value"));
static private final JsonNode DESTINATION_CONFIGURATION = Jsons.jsonNode(Map.of("destination_key", "destination_value"));
static private final JsonNode DESTINATION_CONFIG_WITH_OAUTH =
Jsons.jsonNode(Map.of("destination_key", "destination_value", "oauth", "oauth_value"));
static private final State STATE = new State().withState(Jsons.jsonNode(Map.of("state_key", "state_value")));

static private final UUID WORKSPACE_ID = UUID.randomUUID();
static private final long JOB_ID = 1;
static private final int ATTEMPT_ID = 1;
static private final UUID SOURCE_ID = UUID.randomUUID();
static private final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
static private final UUID DESTINATION_ID = UUID.randomUUID();
static private final UUID CONNECTION_ID = UUID.randomUUID();

Expand All @@ -75,24 +82,26 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
final StateApi stateApi = mock(StateApi.class);
final FeatureFlags featureFlags = mock(FeatureFlags.class);

oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
attemptApi = mock(AttemptApi.class);
jobPersistence = mock(JobPersistence.class);
configRepository = mock(ConfigRepository.class);
generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags);
generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags, oAuthConfigSupplier);

job = mock(Job.class);

when(jobPersistence.getJob(JOB_ID)).thenReturn(job);

final UUID destinationDefinitionId = UUID.randomUUID();

final DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(DESTINATION_ID)
.withDestinationDefinitionId(destinationDefinitionId)
.withWorkspaceId(WORKSPACE_ID)
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID)
.withConfiguration(DESTINATION_CONFIGURATION);
when(configRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destinationConnection);
when(configRepository.getStandardDestinationDefinition(destinationDefinitionId)).thenReturn(mock(StandardDestinationDefinition.class));
when(configRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)).thenReturn(mock(StandardDestinationDefinition.class));
when(configRepository.getSourceDefinitionFromSource(SOURCE_ID)).thenReturn(mock(StandardSourceDefinition.class));
when(oAuthConfigSupplier.injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION))
.thenReturn(DESTINATION_CONFIG_WITH_OAUTH);

final StandardSync standardSync = new StandardSync()
.withSourceId(SOURCE_ID)
Expand All @@ -109,10 +118,15 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundException, IOException, ApiException {
final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID);

final UUID sourceDefinitionId = UUID.randomUUID();
final SourceConnection sourceConnection = new SourceConnection()
.withSourceId(SOURCE_ID)
.withSourceDefinitionId(sourceDefinitionId)
.withWorkspaceId(WORKSPACE_ID)
.withConfiguration(SOURCE_CONFIGURATION);
when(configRepository.getSourceConnection(SOURCE_ID)).thenReturn(sourceConnection);
when(oAuthConfigSupplier.injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION))
.thenReturn(SOURCE_CONFIG_WITH_OAUTH);

final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withWorkspaceId(UUID.randomUUID())
Expand All @@ -131,8 +145,8 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
.withWorkspaceId(jobSyncConfig.getWorkspaceId())
.withSourceId(SOURCE_ID)
.withDestinationId(DESTINATION_ID)
.withSourceConfiguration(SOURCE_CONFIGURATION)
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
.withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH)
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
.withState(STATE)
.withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog())
.withWorkspaceId(jobSyncConfig.getWorkspaceId());
Expand Down Expand Up @@ -161,10 +175,13 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
assertEquals(expectedGeneratedJobInput, generatedJobInput);

final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(SOURCE_CONFIGURATION)
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
.withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH)
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
.withState(STATE);

verify(oAuthConfigSupplier).injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION);
verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION);

verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody()
.jobId(JOB_ID)
.attemptNumber(ATTEMPT_ID)
Expand Down Expand Up @@ -192,7 +209,7 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException {
.withSourceId(SOURCE_ID)
.withDestinationId(DESTINATION_ID)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
.withState(STATE)
.withCatalog(jobResetConfig.getConfiguredAirbyteCatalog())
.withWorkspaceId(jobResetConfig.getWorkspaceId());
Expand Down Expand Up @@ -222,9 +239,11 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException {

final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
.withState(STATE);

verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION);

verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody()
.jobId(JOB_ID)
.attemptNumber(ATTEMPT_ID)
Expand Down

0 comments on commit 44a85a9

Please sign in to comment.