From a84885db79291ca8c221eb44dc25a36383a0ed17 Mon Sep 17 00:00:00 2001 From: Marcin Ciszak Date: Tue, 29 May 2018 17:52:08 +0100 Subject: [PATCH] =?UTF-8?q?Enables=20property=20for=20setting=20AT=5FTIMES?= =?UTF-8?q?TAMP=20shard=20iterator=20initial=20time=E2=80=A6=20(#342)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allows setting the timestamp for InitialPositiinInStream.AT_TIMESTAMP from a properties file. --- .../config/DatePropertyValueDecorder.java | 53 +++++++++++++++++ .../config/KinesisClientLibConfigurator.java | 1 + .../config/DatePropertyValueDecoderTest.java | 53 +++++++++++++++++ .../KinesisClientLibConfiguratorTest.java | 58 +++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecoderTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java new file mode 100644 index 000000000..4f6a588c5 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecorder.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.config; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +/** + * Provide Date property. + */ +class DatePropertyValueDecoder implements IPropertyValueDecoder { + + /** + * Constructor. + */ + DatePropertyValueDecoder() { + } + + /** + * @param value property value as String + * @return corresponding variable in correct type + */ + @Override + public Date decodeValue(String value) { + try { + return new Date(Long.parseLong(value) * 1000L); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Date property value must be numeric."); + } + } + + /** + * @return list of supported types + */ + @Override + public List> getSupportedTypes() { + return Arrays.asList(Date.class); + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java index e239f967a..8059d6af7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java @@ -66,6 +66,7 @@ public KinesisClientLibConfigurator() { Arrays.asList(new IntegerPropertyValueDecoder(), new LongPropertyValueDecoder(), new BooleanPropertyValueDecoder(), + new DatePropertyValueDecoder(), new AWSCredentialsProviderPropertyValueDecoder(), new StringPropertyValueDecoder(), new InitialPositionInStreamPropertyValueDecoder(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecoderTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecoderTest.java new file mode 100644 index 000000000..df405978d --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/DatePropertyValueDecoderTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Date; + +import org.junit.Test; + +import com.amazonaws.services.kinesis.clientlibrary.config.DatePropertyValueDecoder; + +public class DatePropertyValueDecoderTest { + + private DatePropertyValueDecoder decoder = new DatePropertyValueDecoder(); + + private static final String TEST_VALUE = "1527267472"; + + @Test + public void testNumericValue() { + Date timestamp = decoder.decodeValue(TEST_VALUE); + assertEquals(timestamp.getClass(), Date.class); + assertEquals(timestamp, new Date(Long.parseLong(TEST_VALUE) * 1000L)); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyValue() { + Date timestamp = decoder.decodeValue(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullValue() { + Date timestamp = decoder.decodeValue(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testNonNumericValue() { + Date timestamp = decoder.decodeValue("123abc"); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java index 72f171fcb..08a2598a7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.util.Date; import java.util.Optional; import java.util.Set; @@ -143,6 +144,20 @@ public void testWithBooleanVariables() { assertTrue(config.shouldValidateSequenceNumberBeforeCheckpointing()); } + @Test + public void testWithDateVariables() { + KinesisClientLibConfiguration config = + getConfiguration(StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = ABCD, " + credentialName1, + "timestampAtInitialPositionInStream = 1527267472" + }, '\n')); + + assertEquals(config.getTimestampAtInitialPositionInStream(), + new Date(1527267472 * 1000L)); + } + @Test public void testWithStringVariables() { KinesisClientLibConfiguration config = @@ -189,6 +204,49 @@ public void testWithInitialPositionInStreamVariables() { }, '\n')); assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + } + + @Test + public void testWithTimestampAtInitialPositionInStreamVariables() { + KinesisClientLibConfiguration config = + getConfiguration(StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = ABCD," + credentialName1, + "timestampAtInitialPositionInStream = 1527267472" + }, '\n')); + + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP); + assertEquals(config.getTimestampAtInitialPositionInStream(), + new Date(1527267472 * 1000L)); + } + + @Test + public void testWithEmptyTimestampAtInitialPositionInStreamVariables() { + KinesisClientLibConfiguration config = + getConfiguration(StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = ABCD," + credentialName1, + "timestampAtInitialPositionInStream = " + }, '\n')); + + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST); + assertEquals(config.getTimestampAtInitialPositionInStream(), null); + } + + @Test + public void testWithNonNumericTimestampAtInitialPositionInStreamVariables() { + KinesisClientLibConfiguration config = + getConfiguration(StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = ABCD," + credentialName1, + "timestampAtInitialPositionInStream = 123abc" + }, '\n')); + + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST); + assertEquals(config.getTimestampAtInitialPositionInStream(), null); } @Test