From c6e393c13ec348f77b8b08082ba56823776ee48a Mon Sep 17 00:00:00 2001 From: "Gosalia, Manan" Date: Wed, 23 Sep 2015 13:54:08 -0700 Subject: [PATCH] Version 1.6.1 of the Amazon Kinesis Client Library --- META-INF/MANIFEST.MF | 14 +++++++------- README.md | 3 +++ pom.xml | 4 ++-- .../lib/worker/KinesisClientLibConfiguration.java | 2 +- .../clientlibrary/lib/worker/ProcessTask.java | 9 ++++++--- .../kinesis/clientlibrary/types/UserRecord.java | 9 ++++++++- 6 files changed, 27 insertions(+), 14 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 6546c23e3..008db351e 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,17 +2,17 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.6.0 +Bundle-Version: 1.6.1 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport, - com.fasterxml.jackson.core.jackson-databind;bundle-version="2.3.2", - com.fasterxml.jackson.core.jackson-core;bundle-version="2.3.2", - com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0", - org.apache.httpcomponents.httpcore;bundle-version="4.3.2", - org.apache.httpcomponents.httpclient;bundle-version="4.3.4" - com.amazonaws.sdk;bundle-version="1.10.8", + com.fasterxml.jackson.core.jackson-databind;bundle-version="2.5.3", + com.fasterxml.jackson.core.jackson-core;bundle-version="2.5.3", + com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.5.0", + org.apache.httpcomponents.httpcore;bundle-version="4.3.3", + org.apache.httpcomponents.httpclient;bundle-version="4.3.6" + com.amazonaws.sdk;bundle-version="1.10.20", Export-Package: com.amazonaws.services.kinesis, com.amazonaws.services.kinesis.clientlibrary, com.amazonaws.services.kinesis.clientlibrary.config, diff --git a/README.md b/README.md index d80ce3b50..0d2617fa7 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,9 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.6.1 (September 23, 2015) +* Expose [approximateArrivalTimestamp](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) for Records in processRecords API call. + ### Release 1.6.0 (July 31, 2015) * Restores compatibility with [dynamodb-streams-kinesis-adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) (which was broken in 1.4.0). diff --git a/pom.xml b/pom.xml index 87057ed43..18f0b2c69 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.6.0 + 1.6.1 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. https://aws.amazon.com/kinesis @@ -23,7 +23,7 @@ - 1.10.8 + 1.10.20 diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 6af25fcc3..9829c13e9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -119,7 +119,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.0"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.1"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 7a8b2528f..47ee7a5d6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -153,7 +153,8 @@ public TaskResult call() { recordProcessorCheckpointer.setLargestPermittedCheckpointValue( filterAndGetMaxExtendedSequenceNumber(scope, records, - recordProcessorCheckpointer.getLastCheckpointValue())); + recordProcessorCheckpointer.getLastCheckpointValue(), + recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) { LOG.debug("Calling application processRecords() with " + records.size() @@ -204,11 +205,13 @@ public TaskType getTaskType() { * @param scope metrics scope to emit metrics into * @param records list of records to scan and change in-place as needed * @param lastCheckpointValue the most recent checkpoint value + * @param lastLargestPermittedCheckpointValue previous largest permitted checkpoint value * @return the largest extended sequence number among the retained records */ private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List records, - final ExtendedSequenceNumber lastCheckpointValue) { - ExtendedSequenceNumber largestExtendedSequenceNumber = lastCheckpointValue; + final ExtendedSequenceNumber lastCheckpointValue, + final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { + ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ListIterator recordIterator = records.listIterator(); while (recordIterator.hasNext()) { Record record = recordIterator.next(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java index def49cc17..2f60671af 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/UserRecord.java @@ -20,6 +20,7 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.List; import org.apache.commons.lang.StringUtils; @@ -75,6 +76,7 @@ protected UserRecord(boolean aggregated, Record record, long subSequenceNumber, this.setSequenceNumber(record.getSequenceNumber()); this.setData(record.getData()); this.setPartitionKey(record.getPartitionKey()); + this.setApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp()); } /** @@ -195,6 +197,7 @@ public static List deaggregate(List records) { * partition keys fall within the range of the startingHashKey and * the endingHashKey. */ + // CHECKSTYLE:OFF NPathComplexity public static List deaggregate(List records, BigInteger startingHashKey, BigInteger endingHashKey) { List result = new ArrayList<>(); @@ -232,6 +235,8 @@ public static List deaggregate(List records, BigInteger star Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData); List pks = ar.getPartitionKeyTableList(); List ehks = ar.getExplicitHashKeyTableList(); + long aat = r.getApproximateArrivalTimestamp() == null + ? -1 : r.getApproximateArrivalTimestamp().getTime(); try { int recordsInCurrRecord = 0; for (Messages.Record mr : ar.getRecordsList()) { @@ -257,7 +262,8 @@ public static List deaggregate(List records, BigInteger star Record record = new Record() .withData(ByteBuffer.wrap(mr.getData().toByteArray())) .withPartitionKey(partitionKey) - .withSequenceNumber(r.getSequenceNumber()); + .withSequenceNumber(r.getSequenceNumber()) + .withApproximateArrivalTimestamp(aat < 0 ? null : new Date(aat)); result.add(new UserRecord(true, record, subSeqNum++, explicitHashKey)); } } catch (Exception e) { @@ -295,4 +301,5 @@ public static List deaggregate(List records, BigInteger star } return result; } + // CHECKSTYLE:ON NPathComplexity }