Skip to content

Commit

Permalink
Add Message Migration to Destination Connection Checks (#17954)
Browse files Browse the repository at this point in the history
* Add Message Migration to Destination Connection Checks

* Fix test setup
  • Loading branch information
gosusnp authored Oct 17, 2022
1 parent ef3e84c commit 5a80c76
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationId(final Des
final DestinationConnection destination = configRepository.getDestinationConnection(destinationIdRequestBody.getDestinationId());
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
final Version protocolVersion = new Version(destinationDef.getProtocolVersion());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName, protocolVersion));
}

public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final DestinationCoreConfig destinationConfig)
Expand All @@ -195,7 +196,8 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final
.withWorkspaceId(destinationConfig.getWorkspaceId());

final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
final Version protocolVersion = new Version(destDef.getProtocolVersion());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName, protocolVersion));
}

public CheckConnectionRead checkDestinationConnectionFromDestinationIdForUpdate(final DestinationUpdate destinationUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne

@Override
public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(final DestinationConnection destination,
final String dockerImage)
final String dockerImage,
final Version protocolVersion)
throws IOException {
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
destination.getDestinationDefinitionId(),
destination.getWorkspaceId(),
destination.getConfiguration());
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(destinationConfiguration)
.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withProtocolVersion(protocolVersion);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJo
Version protocolVersion)
throws IOException;

SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(DestinationConnection destination, String dockerImage)
SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(DestinationConnection destination,
String dockerImage,
Version protocolVersion)
throws IOException;

SynchronousResponse<UUID> createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,18 @@ void testCheckDestinationConnectionFromDestinationId() throws IOException, JsonV
.thenReturn(new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPO)
.withDockerImageTag(DESTINATION_DOCKER_TAG)
.withProtocolVersion(DESTINATION_PROTOCOL_VERSION)
.withDestinationDefinitionId(destination.getDestinationDefinitionId()));
when(configRepository.getDestinationConnection(destination.getDestinationId())).thenReturn(destination);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION)))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);

schedulerHandler.checkDestinationConnectionFromDestinationId(request);

verify(configRepository).getDestinationConnection(destination.getDestinationId());
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION));
}

@Test
Expand All @@ -322,16 +325,19 @@ void testCheckDestinationConnectionFromDestinationCreate() throws JsonValidation
.thenReturn(new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPO)
.withDockerImageTag(DESTINATION_DOCKER_TAG)
.withProtocolVersion(DESTINATION_PROTOCOL_VERSION)
.withDestinationDefinitionId(destination.getDestinationDefinitionId()));

when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION)))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(secretsRepositoryWriter.statefulSplitEphemeralSecrets(
eq(destination.getConfiguration()),
any())).thenReturn(destination.getConfiguration());
schedulerHandler.checkDestinationConnectionFromDestinationCreate(destinationCoreConfig);

verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION));
}

@Test
Expand All @@ -344,6 +350,7 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPO)
.withDockerImageTag(DESTINATION_DOCKER_TAG)
.withProtocolVersion(DESTINATION_PROTOCOL_VERSION)
.withDestinationDefinitionId(destination.getDestinationDefinitionId())
.withSpec(CONNECTOR_SPECIFICATION);
when(configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()))
Expand All @@ -354,15 +361,17 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati
final DestinationConnection submittedDestination = new DestinationConnection()
.withDestinationDefinitionId(destination.getDestinationDefinitionId())
.withConfiguration(destination.getConfiguration());
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION)))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(secretsRepositoryWriter.statefulSplitEphemeralSecrets(
eq(destination.getConfiguration()),
any())).thenReturn(destination.getConfiguration());
schedulerHandler.checkDestinationConnectionFromDestinationIdForUpdate(destinationUpdate);

verify(jsonSchemaValidator).ensure(CONNECTOR_SPECIFICATION.getConnectionSpecification(), destination.getConfiguration());
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ void testCreateSourceCheckConnectionJob() throws IOException {
void testCreateDestinationCheckConnectionJob() throws IOException {
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
.withDockerImage(DOCKER_IMAGE)
.withProtocolVersion(PROTOCOL_VERSION);

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE);
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION);
assertEquals(mockOutput, response.getOutput());
}

Expand Down

0 comments on commit 5a80c76

Please sign in to comment.