From f90a3eefc8e43fd3277c3d523a6e1711c341afd6 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Mon, 16 Oct 2023 16:14:32 -0700 Subject: [PATCH] autoformatting --- src/main/java/com/snowflake/kafka/connector/Utils.java | 3 +-- .../kafka/connector/internal/SnowpipeBufferThreshold.java | 7 +++---- .../internal/streaming/SnowflakeSinkServiceV2.java | 2 +- .../connector/internal/SnowpipeBufferThresholdTest.java | 3 +-- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index a229aa1b0..bc1cb5384 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -423,8 +423,7 @@ static String validateConfig(Map config) { || config .get(INGESTION_METHOD_OPT) .equalsIgnoreCase(IngestionMethodConfig.SNOWPIPE.toString())) { - invalidConfigParams.putAll( - SnowpipeBufferThreshold.validateBufferThreshold(config)); + invalidConfigParams.putAll(SnowpipeBufferThreshold.validateBufferThreshold(config)); if (config.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG) && Boolean.parseBoolean( diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThreshold.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThreshold.java index 03b80634b..6d9b0f4c3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThreshold.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThreshold.java @@ -6,7 +6,6 @@ import com.google.common.collect.ImmutableMap; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; -import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -111,7 +110,8 @@ public long getFlushTimeThresholdSeconds() { } /** - * Check if provided snowflake kafka connector buffer properties are within permissible values for Snowpipe. Streaming does not use these buffer properties. + * Check if provided snowflake kafka connector buffer properties are within permissible values for + * Snowpipe. Streaming does not use these buffer properties. * *

This method invokes three verifiers - Time based threshold, buffer size and buffer count * threshold. @@ -122,8 +122,7 @@ public long getFlushTimeThresholdSeconds() { public static ImmutableMap validateBufferThreshold( Map providedSFConnectorConfig) { Map invalidConfigParams = new HashMap<>(); - invalidConfigParams.putAll( - verifyBufferFlushTimeThreshold(providedSFConnectorConfig)); + invalidConfigParams.putAll(verifyBufferFlushTimeThreshold(providedSFConnectorConfig)); invalidConfigParams.putAll(verifyBufferCountThreshold(providedSFConnectorConfig)); invalidConfigParams.putAll(verifyBufferBytesThreshold(providedSFConnectorConfig)); return ImmutableMap.copyOf(invalidConfigParams); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 647c6b2da..557dc00c2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -1,6 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE; import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; @@ -517,6 +516,7 @@ public void setFileSize(long size) { public void setFlushTime(long time) { LOGGER.info("Ignore requested flush time {} as streaming does not use the KC buffer", time); } + @Override public long getRecordNumber() { return -1; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThresholdTest.java b/src/test/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThresholdTest.java index 5fb0fbf9d..48665a8dc 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThresholdTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SnowpipeBufferThresholdTest.java @@ -12,8 +12,7 @@ public void testFlushBufferedBytesBased() { SnowpipeBufferThreshold snowpipeBufferThreshold = new SnowpipeBufferThreshold(10, bytesThresholdForBuffer, 100) {}; - Assert.assertTrue( - snowpipeBufferThreshold.shouldFlushOnBufferByteSize(bytesThresholdForBuffer)); + Assert.assertTrue(snowpipeBufferThreshold.shouldFlushOnBufferByteSize(bytesThresholdForBuffer)); Assert.assertTrue( snowpipeBufferThreshold.shouldFlushOnBufferByteSize(bytesThresholdForBuffer + 1));