diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index cc88681305dd..3cfa7983d45b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -177,7 +177,8 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationId(final Des final DestinationConnection destination = configRepository.getDestinationConnection(destinationIdRequestBody.getDestinationId()); final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()); final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()); - return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName)); + final Version protocolVersion = new Version(destinationDef.getProtocolVersion()); + return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName, protocolVersion)); } public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final DestinationCoreConfig destinationConfig) @@ -195,7 +196,8 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final .withWorkspaceId(destinationConfig.getWorkspaceId()); final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag()); - return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName)); + final Version protocolVersion = new Version(destDef.getProtocolVersion()); + return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName, protocolVersion)); } public CheckConnectionRead checkDestinationConnectionFromDestinationIdForUpdate(final DestinationUpdate destinationUpdate) diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java index 493fea4dc120..d49a3efdd29b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClient.java @@ -87,7 +87,8 @@ public SynchronousResponse createSourceCheckConne @Override public SynchronousResponse createDestinationCheckConnectionJob(final DestinationConnection destination, - final String dockerImage) + final String dockerImage, + final Version protocolVersion) throws IOException { final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters( destination.getDestinationDefinitionId(), @@ -95,7 +96,8 @@ public SynchronousResponse createDestinationCheck destination.getConfiguration()); final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig() .withConnectionConfiguration(destinationConfiguration) - .withDockerImage(dockerImage); + .withDockerImage(dockerImage) + .withProtocolVersion(protocolVersion); final UUID jobId = UUID.randomUUID(); final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); diff --git a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java index 719a6b196729..02f9a3d7995f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java +++ b/airbyte-server/src/main/java/io/airbyte/server/scheduler/SynchronousSchedulerClient.java @@ -23,7 +23,9 @@ SynchronousResponse createSourceCheckConnectionJo Version protocolVersion) throws IOException; - SynchronousResponse createDestinationCheckConnectionJob(DestinationConnection destination, String dockerImage) + SynchronousResponse createDestinationCheckConnectionJob(DestinationConnection destination, + String dockerImage, + Version protocolVersion) throws IOException; SynchronousResponse createDiscoverSchemaJob(SourceConnection source, String dockerImage, String connectorVersion) throws IOException; diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 98bdebd68d3b..d7997987a902 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -297,15 +297,18 @@ void testCheckDestinationConnectionFromDestinationId() throws IOException, JsonV .thenReturn(new StandardDestinationDefinition() .withDockerRepository(DESTINATION_DOCKER_REPO) .withDockerImageTag(DESTINATION_DOCKER_TAG) + .withProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withDestinationDefinitionId(destination.getDestinationDefinitionId())); when(configRepository.getDestinationConnection(destination.getDestinationId())).thenReturn(destination); - when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE)) - .thenReturn((SynchronousResponse) jobResponse); + when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE, + new Version(DESTINATION_PROTOCOL_VERSION))) + .thenReturn((SynchronousResponse) jobResponse); schedulerHandler.checkDestinationConnectionFromDestinationId(request); verify(configRepository).getDestinationConnection(destination.getDestinationId()); - verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE, + new Version(DESTINATION_PROTOCOL_VERSION)); } @Test @@ -322,16 +325,19 @@ void testCheckDestinationConnectionFromDestinationCreate() throws JsonValidation .thenReturn(new StandardDestinationDefinition() .withDockerRepository(DESTINATION_DOCKER_REPO) .withDockerImageTag(DESTINATION_DOCKER_TAG) + .withProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withDestinationDefinitionId(destination.getDestinationDefinitionId())); - when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE)) - .thenReturn((SynchronousResponse) jobResponse); + when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE, + new Version(DESTINATION_PROTOCOL_VERSION))) + .thenReturn((SynchronousResponse) jobResponse); when(secretsRepositoryWriter.statefulSplitEphemeralSecrets( eq(destination.getConfiguration()), any())).thenReturn(destination.getConfiguration()); schedulerHandler.checkDestinationConnectionFromDestinationCreate(destinationCoreConfig); - verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE, + new Version(DESTINATION_PROTOCOL_VERSION)); } @Test @@ -344,6 +350,7 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() .withDockerRepository(DESTINATION_DOCKER_REPO) .withDockerImageTag(DESTINATION_DOCKER_TAG) + .withProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withDestinationDefinitionId(destination.getDestinationDefinitionId()) .withSpec(CONNECTOR_SPECIFICATION); when(configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId())) @@ -354,15 +361,17 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati final DestinationConnection submittedDestination = new DestinationConnection() .withDestinationDefinitionId(destination.getDestinationDefinitionId()) .withConfiguration(destination.getConfiguration()); - when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE)) - .thenReturn((SynchronousResponse) jobResponse); + when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE, + new Version(DESTINATION_PROTOCOL_VERSION))) + .thenReturn((SynchronousResponse) jobResponse); when(secretsRepositoryWriter.statefulSplitEphemeralSecrets( eq(destination.getConfiguration()), any())).thenReturn(destination.getConfiguration()); schedulerHandler.checkDestinationConnectionFromDestinationIdForUpdate(destinationUpdate); verify(jsonSchemaValidator).ensure(CONNECTOR_SPECIFICATION.getConnectionSpecification(), destination.getConfiguration()); - verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE); + verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE, + new Version(DESTINATION_PROTOCOL_VERSION)); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java index 02c203a53318..85415dfa2065 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/scheduler/DefaultSynchronousSchedulerClientTest.java @@ -219,14 +219,15 @@ void testCreateSourceCheckConnectionJob() throws IOException { void testCreateDestinationCheckConnectionJob() throws IOException { final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig() .withConnectionConfiguration(DESTINATION_CONNECTION.getConfiguration()) - .withDockerImage(DOCKER_IMAGE); + .withDockerImage(DOCKER_IMAGE) + .withProtocolVersion(PROTOCOL_VERSION); final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class); final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput); when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig))) .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = - schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE); + schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION); assertEquals(mockOutput, response.getOutput()); }