Skip to content

Commit

Permalink
autoformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 16, 2023
1 parent 85fac99 commit f90a3ee
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 9 deletions.
3 changes: 1 addition & 2 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,7 @@ static String validateConfig(Map<String, String> config) {
|| config
.get(INGESTION_METHOD_OPT)
.equalsIgnoreCase(IngestionMethodConfig.SNOWPIPE.toString())) {
invalidConfigParams.putAll(
SnowpipeBufferThreshold.validateBufferThreshold(config));
invalidConfigParams.putAll(SnowpipeBufferThreshold.validateBufferThreshold(config));

if (config.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)
&& Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -111,7 +110,8 @@ public long getFlushTimeThresholdSeconds() {
}

/**
* Check if provided snowflake kafka connector buffer properties are within permissible values for Snowpipe. Streaming does not use these buffer properties.
* Check if provided snowflake kafka connector buffer properties are within permissible values for
* Snowpipe. Streaming does not use these buffer properties.
*
* <p>This method invokes three verifiers - Time based threshold, buffer size and buffer count
* threshold.
Expand All @@ -122,8 +122,7 @@ public long getFlushTimeThresholdSeconds() {
public static ImmutableMap<String, String> validateBufferThreshold(
Map<String, String> providedSFConnectorConfig) {
Map<String, String> invalidConfigParams = new HashMap<>();
invalidConfigParams.putAll(
verifyBufferFlushTimeThreshold(providedSFConnectorConfig));
invalidConfigParams.putAll(verifyBufferFlushTimeThreshold(providedSFConnectorConfig));
invalidConfigParams.putAll(verifyBufferCountThreshold(providedSFConnectorConfig));
invalidConfigParams.putAll(verifyBufferBytesThreshold(providedSFConnectorConfig));
return ImmutableMap.copyOf(invalidConfigParams);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

Expand Down Expand Up @@ -517,6 +516,7 @@ public void setFileSize(long size) {
public void setFlushTime(long time) {
LOGGER.info("Ignore requested flush time {} as streaming does not use the KC buffer", time);
}

@Override
public long getRecordNumber() {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ public void testFlushBufferedBytesBased() {
SnowpipeBufferThreshold snowpipeBufferThreshold =
new SnowpipeBufferThreshold(10, bytesThresholdForBuffer, 100) {};

Assert.assertTrue(
snowpipeBufferThreshold.shouldFlushOnBufferByteSize(bytesThresholdForBuffer));
Assert.assertTrue(snowpipeBufferThreshold.shouldFlushOnBufferByteSize(bytesThresholdForBuffer));

Assert.assertTrue(
snowpipeBufferThreshold.shouldFlushOnBufferByteSize(bytesThresholdForBuffer + 1));
Expand Down

0 comments on commit f90a3ee

Please sign in to comment.