Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Message Migration to Destination Connection Checks #17954

Merged
merged 4 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationId(final Des
final DestinationConnection destination = configRepository.getDestinationConnection(destinationIdRequestBody.getDestinationId());
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
final Version protocolVersion = new Version(destinationDef.getProtocolVersion());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName, protocolVersion));
}

public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final DestinationCoreConfig destinationConfig)
Expand All @@ -195,7 +196,8 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final
.withWorkspaceId(destinationConfig.getWorkspaceId());

final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
final Version protocolVersion = new Version(destDef.getProtocolVersion());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName, protocolVersion));
}

public CheckConnectionRead checkDestinationConnectionFromDestinationIdForUpdate(final DestinationUpdate destinationUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne

@Override
public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(final DestinationConnection destination,
final String dockerImage)
final String dockerImage,
final Version protocolVersion)
throws IOException {
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
destination.getDestinationDefinitionId(),
destination.getWorkspaceId(),
destination.getConfiguration());
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(destinationConfiguration)
.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withProtocolVersion(protocolVersion);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJo
Version protocolVersion)
throws IOException;

SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(DestinationConnection destination, String dockerImage)
SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(DestinationConnection destination,
String dockerImage,
Version protocolVersion)
throws IOException;

SynchronousResponse<UUID> createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,18 @@ void testCheckDestinationConnectionFromDestinationId() throws IOException, JsonV
.thenReturn(new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPO)
.withDockerImageTag(DESTINATION_DOCKER_TAG)
.withProtocolVersion(DESTINATION_PROTOCOL_VERSION)
.withDestinationDefinitionId(destination.getDestinationDefinitionId()));
when(configRepository.getDestinationConnection(destination.getDestinationId())).thenReturn(destination);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION)))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);

schedulerHandler.checkDestinationConnectionFromDestinationId(request);

verify(configRepository).getDestinationConnection(destination.getDestinationId());
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION));
}

@Test
Expand All @@ -322,16 +325,19 @@ void testCheckDestinationConnectionFromDestinationCreate() throws JsonValidation
.thenReturn(new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPO)
.withDockerImageTag(DESTINATION_DOCKER_TAG)
.withProtocolVersion(DESTINATION_PROTOCOL_VERSION)
.withDestinationDefinitionId(destination.getDestinationDefinitionId()));

when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION)))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(secretsRepositoryWriter.statefulSplitEphemeralSecrets(
eq(destination.getConfiguration()),
any())).thenReturn(destination.getConfiguration());
schedulerHandler.checkDestinationConnectionFromDestinationCreate(destinationCoreConfig);

verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION));
}

@Test
Expand All @@ -344,6 +350,7 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPO)
.withDockerImageTag(DESTINATION_DOCKER_TAG)
.withProtocolVersion(DESTINATION_PROTOCOL_VERSION)
.withDestinationDefinitionId(destination.getDestinationDefinitionId())
.withSpec(CONNECTOR_SPECIFICATION);
when(configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()))
Expand All @@ -354,15 +361,17 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati
final DestinationConnection submittedDestination = new DestinationConnection()
.withDestinationDefinitionId(destination.getDestinationDefinitionId())
.withConfiguration(destination.getConfiguration());
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION)))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(secretsRepositoryWriter.statefulSplitEphemeralSecrets(
eq(destination.getConfiguration()),
any())).thenReturn(destination.getConfiguration());
schedulerHandler.checkDestinationConnectionFromDestinationIdForUpdate(destinationUpdate);

verify(jsonSchemaValidator).ensure(CONNECTOR_SPECIFICATION.getConnectionSpecification(), destination.getConfiguration());
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE,
new Version(DESTINATION_PROTOCOL_VERSION));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ void testCreateSourceCheckConnectionJob() throws IOException {
void testCreateDestinationCheckConnectionJob() throws IOException {
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
.withDockerImage(DOCKER_IMAGE)
.withProtocolVersion(PROTOCOL_VERSION);

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE);
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION);
assertEquals(mockOutput, response.getOutput());
}

Expand Down