Skip to content

Commit

Permalink
Added Time Based Iterator Support
Browse files Browse the repository at this point in the history
Added support for time based iterators.  Time based iterators are only
used if there is no current checkpoint for that shard, otherwise the
sequence number of the checkpoint is used.
  • Loading branch information
pfifer committed Aug 11, 2016
1 parent 41832de commit aa47fce
Show file tree
Hide file tree
Showing 32 changed files with 1,040 additions and 248 deletions.
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
AmazonKinesisClientLibrary
Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>DynamoDBLocal</artifactId>
<version>1.10.5.1</version>
<version>1.11.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,5 +31,9 @@ public enum SentinelCheckpoint {
/**
* We've completely processed all records in this shard.
*/
SHARD_END;
SHARD_END,
/**
* Start from the record at or after the specified server-side timestamp.
*/
AT_TIMESTAMP
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,18 @@
* This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents).
*/
public enum InitialPositionInStream {

/**
* Start after the most recent data record (fetch new data).
*/
LATEST,

/**
* Start from the oldest available data record.
*/
TRIM_HORIZON;
TRIM_HORIZON,

/**
* Start from the record at or after the specified server-side timestamp.
*/
AT_TIMESTAMP
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.Date;

/**
* Class that houses the entities needed to specify the position in the stream from where a new application should
* start.
*/
class InitialPositionInStreamExtended {

private final InitialPositionInStream position;
private final Date timestamp;

/**
* This is scoped as private to forbid callers from using it directly and to convey the intent to use the
* static methods instead.
*
* @param position One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The Amazon Kinesis Client Library will start
* fetching records from this position when the application starts up if there are no checkpoints.
* If there are checkpoints, we will process records from the checkpoint position.
* @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream.
*/
private InitialPositionInStreamExtended(final InitialPositionInStream position, final Date timestamp) {
this.position = position;
this.timestamp = timestamp;
}

/**
* Get the initial position in the stream where the application should start from.
*
* @return The initial position in stream.
*/
protected InitialPositionInStream getInitialPositionInStream() {
return this.position;
}

/**
* Get the timestamp from where we need to start the application.
* Valid only for initial position of type AT_TIMESTAMP, returns null for other positions.
*
* @return The timestamp from where we need to start the application.
*/
protected Date getTimestamp() {
return this.timestamp;
}

protected static InitialPositionInStreamExtended newInitialPosition(final InitialPositionInStream position) {
switch (position) {
case LATEST:
return new InitialPositionInStreamExtended(InitialPositionInStream.LATEST, null);
case TRIM_HORIZON:
return new InitialPositionInStreamExtended(InitialPositionInStream.TRIM_HORIZON, null);
default:
throw new IllegalArgumentException("Invalid InitialPosition: " + position);
}
}

protected static InitialPositionInStreamExtended newInitialPositionAtTimestamp(final Date timestamp) {
if (timestamp == null) {
throw new IllegalArgumentException("Timestamp must be specified for InitialPosition AT_TIMESTAMP");
}
return new InitialPositionInStreamExtended(InitialPositionInStream.AT_TIMESTAMP, timestamp);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@ class InitializeTask implements ITask {
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
// Back off for this interval if we encounter a problem (exception)
private final long backoffTimeMillis;
private final StreamConfig streamConfig;

/**
* Constructor.
Expand All @@ -50,13 +51,15 @@ class InitializeTask implements ITask {
ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
long backoffTimeMillis) {
long backoffTimeMillis,
StreamConfig streamConfig) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.dataFetcher = dataFetcher;
this.backoffTimeMillis = backoffTimeMillis;
this.streamConfig = streamConfig;
}

/*
Expand All @@ -74,7 +77,7 @@ public TaskResult call() {
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
ExtendedSequenceNumber initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());

dataFetcher.initialize(initialCheckpoint.getSequenceNumber());
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.Date;
import java.util.Set;

import com.amazonaws.ClientConfiguration;
Expand Down Expand Up @@ -185,6 +186,7 @@ public class KinesisClientLibConfiguration {
private int maxLeasesToStealAtOneTime;
private int initialLeaseTableReadCapacity;
private int initialLeaseTableWriteCapacity;
private InitialPositionInStreamExtended initialPositionInStreamExtended;

/**
* Constructor.
Expand Down Expand Up @@ -263,7 +265,6 @@ public KinesisClientLibConfiguration(String applicationName,
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
Expand Down Expand Up @@ -330,6 +331,8 @@ public KinesisClientLibConfiguration(String applicationName,
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
}

// Check if value is positive, otherwise throw an exception
Expand Down Expand Up @@ -580,6 +583,22 @@ public int getInitialLeaseTableWriteCapacity() {
return initialLeaseTableWriteCapacity;
}

/**
* Keeping it protected to forbid outside callers from depending on this internal object.
* @return The initialPositionInStreamExtended object.
*/
protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
return initialPositionInStreamExtended;
}

/**
* @return The timestamp from where we need to start the application.
* Valid only for initial position of type AT_TIMESTAMP, returns null for other positions.
*/
public Date getTimestampAtInitialPositionInStream() {
return initialPositionInStreamExtended.getTimestamp();
}

// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param tableName name of the lease table in DynamoDB
Expand All @@ -600,13 +619,25 @@ public KinesisClientLibConfiguration withKinesisEndpoint(String kinesisEndpoint)
}

/**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library will start
* fetching records from this position when the application starts up if there are no checkpoints. If there
* are checkpoints, we will process records from the checkpoint position.
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
* will start fetching records from this position when the application starts up if there are no checkpoints.
* If there are checkpoints, we will process records from the checkpoint position.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withInitialPositionInStream(InitialPositionInStream initialPositionInStream) {
this.initialPositionInStream = initialPositionInStream;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
return this;
}

/**
* @param timestamp The timestamp to use with the AT_TIMESTAMP value for initialPositionInStream.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withTimestampAtInitialPositionInStream(Date timestamp) {
this.initialPositionInStream = InitialPositionInStream.AT_TIMESTAMP;
this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;

import java.util.Date;

/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
Expand All @@ -41,8 +43,7 @@ class KinesisDataFetcher {
/**
*
* @param kinesisProxy Kinesis proxy
* @param shardId shardId (we'll fetch data for this shard)
* @param checkpoint used to get current checkpoint from which to start fetching records
* @param shardInfo The shardInfo object.
*/
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) {
this.shardId = shardInfo.getShardId();
Expand Down Expand Up @@ -83,32 +84,36 @@ public GetRecordsResult getRecords(int maxRecords) {
/**
* Initializes this KinesisDataFetcher's iterator based on the checkpointed sequence number.
* @param initialCheckpoint Current checkpoint sequence number for this shard.
*
* @param initialPositionInStream The initialPositionInStream.
*/
public void initialize(String initialCheckpoint) {
public void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint);
advanceIteratorTo(initialCheckpoint);
advanceIteratorTo(initialCheckpoint, initialPositionInStream);
isInitialized = true;
}

public void initialize(ExtendedSequenceNumber initialCheckpoint) {

public void initialize(ExtendedSequenceNumber initialCheckpoint,
InitialPositionInStreamExtended initialPositionInStream) {
LOG.info("Initializing shard " + shardId + " with " + initialCheckpoint.getSequenceNumber());
advanceIteratorTo(initialCheckpoint.getSequenceNumber());
advanceIteratorTo(initialCheckpoint.getSequenceNumber(), initialPositionInStream);
isInitialized = true;
}

/**
* Advances this KinesisDataFetcher's internal iterator to be at the passed-in sequence number.
*
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
void advanceIteratorTo(String sequenceNumber) {
void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
nextIterator = getIterator(ShardIteratorType.LATEST.toString(), null);
nextIterator = getIterator(ShardIteratorType.LATEST.toString());
} else if (sequenceNumber.equals(SentinelCheckpoint.TRIM_HORIZON.toString())) {
nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString(), null);
nextIterator = getIterator(ShardIteratorType.TRIM_HORIZON.toString());
} else if (sequenceNumber.equals(SentinelCheckpoint.AT_TIMESTAMP.toString())) {
nextIterator = getIterator(initialPositionInStream.getTimestamp());
} else if (sequenceNumber.equals(SentinelCheckpoint.SHARD_END.toString())) {
nextIterator = null;
} else {
Expand All @@ -120,8 +125,8 @@ void advanceIteratorTo(String sequenceNumber) {
}

/**
* @param iteratorType
* @param sequenceNumber
* @param iteratorType The iteratorType - either AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER.
* @param sequenceNumber The sequenceNumber.
*
* @return iterator or null if we catch a ResourceNotFound exception
*/
Expand All @@ -139,6 +144,40 @@ private String getIterator(String iteratorType, String sequenceNumber) {
return iterator;
}

/**
* @param iteratorType The iteratorType - either TRIM_HORIZON or LATEST.
* @return iterator or null if we catch a ResourceNotFound exception
*/
private String getIterator(String iteratorType) {
String iterator = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Calling getIterator for " + shardId + " and iterator type " + iteratorType);
}
iterator = kinesisProxy.getIterator(shardId, iteratorType);
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e);
}
return iterator;
}

/**
* @param timestamp The timestamp.
* @return iterator or null if we catch a ResourceNotFound exception
*/
private String getIterator(Date timestamp) {
String iterator = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Calling getIterator for " + shardId + " and timestamp " + timestamp);
}
iterator = kinesisProxy.getIterator(shardId, timestamp);
} catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when getting an iterator for shard " + shardId, e);
}
return iterator;
}

/**
* @return the shardEndReached
*/
Expand Down
Loading

0 comments on commit aa47fce

Please sign in to comment.