diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 700fd4ff8..87d744436 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -15,29 +15,7 @@ package io.confluent.connect.s3; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.storage.common.util.StringUtils; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.ConfigDef.Validator; -import org.apache.kafka.common.config.ConfigDef.Width; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.json.DecimalFormat; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import java.util.ArrayList; import java.util.Arrays; @@ -56,6 +34,31 @@ import java.util.stream.IntStream; import java.util.zip.Deflater; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.DecimalFormat; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.SSEAlgorithm; + import io.confluent.connect.s3.format.avro.AvroFormat; import io.confluent.connect.s3.format.bytearray.ByteArrayFormat; import io.confluent.connect.s3.format.json.JsonFormat; @@ -67,6 +70,7 @@ import io.confluent.connect.storage.common.GenericRecommender; import io.confluent.connect.storage.common.ParentValueRecommender; import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.format.Format; import io.confluent.connect.storage.partitioner.DailyPartitioner; import io.confluent.connect.storage.partitioner.DefaultPartitioner; @@ -75,8 +79,6 @@ import io.confluent.connect.storage.partitioner.PartitionerConfig; import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; - public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { // S3 Group @@ -1123,14 +1125,14 @@ public void ensureValid(String name, Object region) { throw new ConfigException( name, region, - "Value must be one of: " + Utils.join(RegionUtils.getRegions(), ", ") + "Value must be one of: " + RegionUtils.getRegions().stream().map(Region::toString).collect(Collectors.joining(", ")) ); } } @Override public String toString() { - return "[" + Utils.join(RegionUtils.getRegions(), ", ") + "]"; + return "[" + RegionUtils.getRegions().stream().map(Region::toString).collect(Collectors.joining(", ")) + "]"; } } @@ -1144,7 +1146,7 @@ private static class CompressionTypeValidator implements ConfigDef.Validator { TYPES_BY_NAME.put(compressionType.name, compressionType); names.add(compressionType.name); } - ALLOWED_VALUES = Utils.join(names, ", "); + ALLOWED_VALUES = String.join(", ", names); } @Override @@ -1220,7 +1222,7 @@ public void ensureValid(String name, Object compressionCodecName) { @Override public String toString() { - return "[" + Utils.join(ALLOWED_VALUES, ", ") + "]"; + return "[" + String.join(", ", ALLOWED_VALUES) + "]"; } } @@ -1234,7 +1236,7 @@ private static class CannedAclValidator implements ConfigDef.Validator { ACLS_BY_HEADER_VALUE.put(acl.toString(), acl); aclHeaderValues.add(acl.toString()); } - ALLOWED_VALUES = Utils.join(aclHeaderValues, ", "); + ALLOWED_VALUES = String.join(", ", aclHeaderValues); } @Override