diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 61cac4740..4c8ec3ed7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO; import com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode; @@ -60,7 +61,7 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService // User agent suffix we want to pass in to ingest service public static final String USER_AGENT_SUFFIX_FORMAT = "SFKafkaConnector/%s provider/%s"; - private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); SnowflakeConnectionServiceV1( Properties prop, @@ -1048,8 +1049,8 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( } ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO = - OBJECT_MAPPER.readValue( - migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class); + getChannelMigrateOffsetTokenResponseDTO(migrateOffsetTokenResultFromSysFunc); + LOGGER.info( "Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}" + " is:{}", @@ -1077,4 +1078,13 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( throw SnowflakeErrors.ERROR_5023.getException(errorMsg, this.telemetry); } } + + @VisibleForTesting + protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenResponseDTO( + String migrateOffsetTokenResultFromSysFunc) throws JsonProcessingException { + ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO = + OBJECT_MAPPER.readValue( + migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class); + return channelMigrateOffsetTokenResponseDTO; + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1Test.java b/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1Test.java new file mode 100644 index 000000000..88369d527 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1Test.java @@ -0,0 +1,39 @@ +package com.snowflake.kafka.connector.internal; + +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO; +import org.junit.Test; + +public class SnowflakeConnectionServiceV1Test { + @Test + public void testChannelMigrationResponse_validResponse() throws Exception { + SnowflakeConnectionServiceV1 v1MockConnectionService = mock(SnowflakeConnectionServiceV1.class); + final String validMigrationResponse = + "{\"responseCode\": 51, \"responseMessage\": \"Source Channel does not exist for Offset" + + " Migration\"}"; + when(v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(anyString())) + .thenCallRealMethod(); + + ChannelMigrateOffsetTokenResponseDTO migrationDTO = + v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(validMigrationResponse); + assert migrationDTO.getResponseCode() == 51; + assert migrationDTO + .getResponseMessage() + .contains("Source Channel does not exist for Offset Migration"); + } + + @Test(expected = JsonProcessingException.class) + public void testChannelMigrationResponse_InvalidResponse() throws Exception { + SnowflakeConnectionServiceV1 v1MockConnectionService = mock(SnowflakeConnectionServiceV1.class); + final String validMigrationResponse = + "{\"responseCode\": 51, \"responseMessage\": \"Source Channel does not exist for Offset" + + " Migration\", \"unknown\":62}"; + when(v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(anyString())) + .thenCallRealMethod(); + v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(validMigrationResponse); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index abc84aec7..75d9de64a 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -631,6 +631,15 @@ public void testChannelMigrateOffsetTokenSystemFunction_NonNullOffsetTokenForSou TestUtils.assertWithRetry( () -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords, 5, 5); + // add few more records + records = + TestUtils.createJsonStringSinkRecords(noOfRecords, noOfRecords, testTableName, PARTITION); + records.forEach(service::insert); + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords + noOfRecords, + 5, + 5); + service.closeAll(); }