Skip to content

Commit

Permalink
Version 1.6.1 of the Amazon Kinesis Client Library
Browse files Browse the repository at this point in the history
  • Loading branch information
Gosalia, Manan committed Sep 23, 2015
1 parent 97e606f commit c6e393c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 14 deletions.
14 changes: 7 additions & 7 deletions META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.6.0
Bundle-Version: 1.6.1
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport,
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.3.2",
com.fasterxml.jackson.core.jackson-core;bundle-version="2.3.2",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
com.amazonaws.sdk;bundle-version="1.10.8",
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.5.3",
com.fasterxml.jackson.core.jackson-core;bundle-version="2.5.3",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.5.0",
org.apache.httpcomponents.httpcore;bundle-version="4.3.3",
org.apache.httpcomponents.httpclient;bundle-version="4.3.6"
com.amazonaws.sdk;bundle-version="1.10.20",
Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.config,
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.

## Release Notes
### Release 1.6.1 (September 23, 2015)
* Expose [approximateArrivalTimestamp](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) for Records in processRecords API call.

### Release 1.6.0 (July 31, 2015)
* Restores compatibility with [dynamodb-streams-kinesis-adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) (which was broken in 1.4.0).

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.6.0</version>
<version>1.6.1</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -23,7 +23,7 @@
</licenses>

<properties>
<aws-java-sdk.version>1.10.8</aws-java-sdk.version>
<aws-java-sdk.version>1.10.20</aws-java-sdk.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.0";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.6.1";

/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public TaskResult call() {

recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
filterAndGetMaxExtendedSequenceNumber(scope, records,
recordProcessorCheckpointer.getLastCheckpointValue()));
recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));

if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
LOG.debug("Calling application processRecords() with " + records.size()
Expand Down Expand Up @@ -204,11 +205,13 @@ public TaskType getTaskType() {
* @param scope metrics scope to emit metrics into
* @param records list of records to scan and change in-place as needed
* @param lastCheckpointValue the most recent checkpoint value
* @param lastLargestPermittedCheckpointValue previous largest permitted checkpoint value
* @return the largest extended sequence number among the retained records
*/
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records,
final ExtendedSequenceNumber lastCheckpointValue) {
ExtendedSequenceNumber largestExtendedSequenceNumber = lastCheckpointValue;
final ExtendedSequenceNumber lastCheckpointValue,
final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) {
ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue;
ListIterator<Record> recordIterator = records.listIterator();
while (recordIterator.hasNext()) {
Record record = recordIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -75,6 +76,7 @@ protected UserRecord(boolean aggregated, Record record, long subSequenceNumber,
this.setSequenceNumber(record.getSequenceNumber());
this.setData(record.getData());
this.setPartitionKey(record.getPartitionKey());
this.setApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp());
}

/**
Expand Down Expand Up @@ -195,6 +197,7 @@ public static List<UserRecord> deaggregate(List<Record> records) {
* partition keys fall within the range of the startingHashKey and
* the endingHashKey.
*/
// CHECKSTYLE:OFF NPathComplexity
public static List<UserRecord> deaggregate(List<Record> records, BigInteger startingHashKey,
BigInteger endingHashKey) {
List<UserRecord> result = new ArrayList<>();
Expand Down Expand Up @@ -232,6 +235,8 @@ public static List<UserRecord> deaggregate(List<Record> records, BigInteger star
Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData);
List<String> pks = ar.getPartitionKeyTableList();
List<String> ehks = ar.getExplicitHashKeyTableList();
long aat = r.getApproximateArrivalTimestamp() == null
? -1 : r.getApproximateArrivalTimestamp().getTime();
try {
int recordsInCurrRecord = 0;
for (Messages.Record mr : ar.getRecordsList()) {
Expand All @@ -257,7 +262,8 @@ public static List<UserRecord> deaggregate(List<Record> records, BigInteger star
Record record = new Record()
.withData(ByteBuffer.wrap(mr.getData().toByteArray()))
.withPartitionKey(partitionKey)
.withSequenceNumber(r.getSequenceNumber());
.withSequenceNumber(r.getSequenceNumber())
.withApproximateArrivalTimestamp(aat < 0 ? null : new Date(aat));
result.add(new UserRecord(true, record, subSeqNum++, explicitHashKey));
}
} catch (Exception e) {
Expand Down Expand Up @@ -295,4 +301,5 @@ public static List<UserRecord> deaggregate(List<Record> records, BigInteger star
}
return result;
}
// CHECKSTYLE:ON NPathComplexity
}

0 comments on commit c6e393c

Please sign in to comment.