Skip to content

Commit

Permalink
Revi's minor suggestion, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel committed Nov 21, 2023
1 parent 3dbf6c1 commit 21697ad
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
// User agent suffix we want to pass in to ingest service
public static final String USER_AGENT_SUFFIX_FORMAT = "SFKafkaConnector/%s provider/%s";

private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

SnowflakeConnectionServiceV1(
Properties prop,
Expand Down Expand Up @@ -1048,8 +1049,8 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
}

ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO =
OBJECT_MAPPER.readValue(
migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class);
getChannelMigrateOffsetTokenResponseDTO(migrateOffsetTokenResultFromSysFunc);

LOGGER.info(
"Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}"
+ " is:{}",
Expand Down Expand Up @@ -1077,4 +1078,13 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
throw SnowflakeErrors.ERROR_5023.getException(errorMsg, this.telemetry);
}
}

@VisibleForTesting
protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenResponseDTO(
String migrateOffsetTokenResultFromSysFunc) throws JsonProcessingException {
ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO =
OBJECT_MAPPER.readValue(
migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class);
return channelMigrateOffsetTokenResponseDTO;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.snowflake.kafka.connector.internal;

import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO;
import org.junit.Test;

public class SnowflakeConnectionServiceV1Test {
@Test
public void testChannelMigrationResponse_validResponse() throws Exception {
SnowflakeConnectionServiceV1 v1MockConnectionService = mock(SnowflakeConnectionServiceV1.class);
final String validMigrationResponse =
"{\"responseCode\": 51, \"responseMessage\": \"Source Channel does not exist for Offset"
+ " Migration\"}";
when(v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(anyString()))
.thenCallRealMethod();

ChannelMigrateOffsetTokenResponseDTO migrationDTO =
v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(validMigrationResponse);
assert migrationDTO.getResponseCode() == 51;
assert migrationDTO
.getResponseMessage()
.contains("Source Channel does not exist for Offset Migration");
}

@Test(expected = JsonProcessingException.class)
public void testChannelMigrationResponse_InvalidResponse() throws Exception {
SnowflakeConnectionServiceV1 v1MockConnectionService = mock(SnowflakeConnectionServiceV1.class);
final String validMigrationResponse =
"{\"responseCode\": 51, \"responseMessage\": \"Source Channel does not exist for Offset"
+ " Migration\", \"unknown\":62}";
when(v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(anyString()))
.thenCallRealMethod();
v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(validMigrationResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,15 @@ public void testChannelMigrateOffsetTokenSystemFunction_NonNullOffsetTokenForSou
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords, 5, 5);

// add few more records
records =
TestUtils.createJsonStringSinkRecords(noOfRecords, noOfRecords, testTableName, PARTITION);
records.forEach(service::insert);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords + noOfRecords,
5,
5);

service.closeAll();
}

Expand Down

0 comments on commit 21697ad

Please sign in to comment.