Skip to content

Commit

Permalink
merge changes from KinesisIO v1
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarr1-godaddy committed Jul 26, 2020
1 parent 33832a4 commit 2aff7de
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.net.URI;
import javax.annotation.Nullable;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -35,7 +35,7 @@ class BasicKinesisProvider implements AWSClientsProvider {
private final String accessKey;
private final String secretKey;
private final String region;
@Nullable private final String serviceEndpoint;
private final @Nullable String serviceEndpoint;

BasicKinesisProvider(
String accessKey, String secretKey, Region region, @Nullable String serviceEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.NoSuchElementException;
import java.util.Objects;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
Expand Down Expand Up @@ -57,7 +58,7 @@ public T get() {
}

@Override
public boolean equals(Object o) {
public boolean equals(@Nullable Object o) {
if (!(o instanceof Present)) {
return false;
}
Expand Down Expand Up @@ -89,7 +90,7 @@ public T get() {
}

@Override
public boolean equals(Object o) {
public boolean equals(@Nullable Object o) {
return o instanceof Absent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import com.google.auto.value.AutoValue;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -240,24 +240,19 @@ public static Read read() {
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {

@Nullable
abstract String getStreamName();
abstract @Nullable String getStreamName();

@Nullable
abstract StartingPoint getInitialPosition();
abstract @Nullable StartingPoint getInitialPosition();

@Nullable
abstract AWSClientsProvider getAWSClientsProvider();
abstract @Nullable AWSClientsProvider getAWSClientsProvider();

abstract long getMaxNumRecords();

@Nullable
abstract Duration getMaxReadTime();
abstract @Nullable Duration getMaxReadTime();

abstract Duration getUpToDateThreshold();

@Nullable
abstract Integer getRequestRecordsLimit();
abstract @Nullable Integer getRequestRecordsLimit();

abstract WatermarkPolicyFactory getWatermarkPolicyFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,35 +152,58 @@ public UnboundedSource.CheckpointMark getCheckpointMark() {
}

/**
* Returns total size of all records that remain in Kinesis stream after current watermark. If the
* watermark was not already set then it returns {@link
* UnboundedSource.UnboundedReader#BACKLOG_UNKNOWN}. When currently processed record is not
* further behind than {@link #upToDateThreshold} then this method returns 0.
* Returns total size of all records that remain in Kinesis stream. The size is estimated taking
* into account size of the records that were added to the stream after timestamp of the most
* recent record returned by the reader. If no records have yet been retrieved from the reader
* {@link UnboundedSource.UnboundedReader#BACKLOG_UNKNOWN} is returned. When currently processed
* record is not further behind than {@link #upToDateThreshold} then this method returns 0.
*
* <p>The method can over-estimate size of the records for the split as it reports the backlog
* across all shards. This can lead to unnecessary decisions to scale up the number of workers but
* will never fail to scale up when this is necessary due to backlog size.
*
* @see <a href="https://issues.apache.org/jira/browse/BEAM-9439">BEAM-9439</a>
*/
@Override
public long getTotalBacklogBytes() {
Instant watermark = getWatermark();
public long getSplitBacklogBytes() {
Instant latestRecordTimestamp = shardReadersPool.getLatestRecordTimestamp();

if (watermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
LOG.debug("Split backlog bytes for stream {} unknown", source.getStreamName());
return UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
}

if (watermark.plus(upToDateThreshold).isAfterNow()) {
if (latestRecordTimestamp.plus(upToDateThreshold).isAfterNow()) {
LOG.debug(
"Split backlog bytes for stream {} with latest record timestamp {}: 0 (latest record timestamp is up-to-date with threshold of {})",
source.getStreamName(),
latestRecordTimestamp,
upToDateThreshold);
return 0L;
}

if (backlogBytesLastCheckTime.plus(backlogBytesCheckThreshold).isAfterNow()) {
LOG.debug(
"Split backlog bytes for {} stream with latest record timestamp {}: {} (cached value)",
source.getStreamName(),
latestRecordTimestamp,
lastBacklogBytes);
return lastBacklogBytes;
}

try {
lastBacklogBytes = kinesis.getBacklogBytes(source.getStreamName(), watermark);
lastBacklogBytes = kinesis.getBacklogBytes(source.getStreamName(), latestRecordTimestamp);
backlogBytesLastCheckTime = Instant.now();
} catch (TransientKinesisException e) {
LOG.warn("Transient exception occurred.", e);
LOG.warn(
"Transient exception occurred during backlog estimation for stream {}.",
source.getStreamName(),
e);
}
LOG.info(
"Total backlog bytes for {} stream with {} watermark: {}",
"Split backlog bytes for {} stream with {} latest record timestamp: {}",
source.getStreamName(),
watermark,
latestRecordTimestamp,
lastBacklogBytes);
return lastBacklogBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.charset.StandardCharsets;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
Expand Down Expand Up @@ -109,7 +110,7 @@ public byte[] getDataAsBytes() {
}

@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
return EqualsBuilder.reflectionEquals(this, obj);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -228,8 +229,16 @@ private void awaitTermination() {
}

Instant getWatermark() {
return getMinTimestamp(ShardRecordsIterator::getShardWatermark);
}

Instant getLatestRecordTimestamp() {
return getMinTimestamp(ShardRecordsIterator::getLatestRecordTimestamp);
}

private Instant getMinTimestamp(Function<ShardRecordsIterator, Instant> timestampExtractor) {
return shardIteratorsMap.get().values().stream()
.map(ShardRecordsIterator::getShardWatermark)
.map(timestampExtractor)
.min(Comparator.naturalOrder())
.orElse(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -43,22 +42,23 @@ class ShardRecordsIterator {
private final RecordFilter filter;
private final String streamName;
private final String shardId;
private AtomicReference<ShardCheckpoint> checkpoint;
private final AtomicReference<ShardCheckpoint> checkpoint;
private final WatermarkPolicy watermarkPolicy;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final WatermarkPolicy latestRecordTimestampPolicy =
WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
private String shardIterator;
private AtomicLong millisBehindLatest = new AtomicLong(Long.MAX_VALUE);
private AtomicReference<WatermarkPolicy> watermarkPolicy;
private WatermarkPolicyFactory watermarkPolicyFactory;

ShardRecordsIterator(
final ShardCheckpoint initialCheckpoint,
ShardCheckpoint initialCheckpoint,
SimplifiedKinesisClient simplifiedKinesisClient,
WatermarkPolicyFactory watermarkPolicyFactory)
throws TransientKinesisException {
this(initialCheckpoint, simplifiedKinesisClient, watermarkPolicyFactory, new RecordFilter());
}

ShardRecordsIterator(
final ShardCheckpoint initialCheckpoint,
ShardCheckpoint initialCheckpoint,
SimplifiedKinesisClient simplifiedKinesisClient,
WatermarkPolicyFactory watermarkPolicyFactory,
RecordFilter filter)
Expand All @@ -69,7 +69,7 @@ class ShardRecordsIterator {
this.streamName = initialCheckpoint.getStreamName();
this.shardId = initialCheckpoint.getShardId();
this.shardIterator = initialCheckpoint.getShardIterator(kinesis);
this.watermarkPolicy = new AtomicReference<>(watermarkPolicyFactory.createWatermarkPolicy());
this.watermarkPolicy = watermarkPolicyFactory.createWatermarkPolicy();
this.watermarkPolicyFactory = watermarkPolicyFactory;
}

Expand All @@ -82,10 +82,13 @@ List<KinesisRecord> readNextBatch()
streamName, shardId));
}
GetKinesisRecordsResult response = fetchRecords();
LOG.debug("Fetched {} new records", response.getRecords().size());
LOG.debug(
"Fetched {} new records from shard: streamName={}, shardId={}",
response.getRecords().size(),
streamName,
shardId);

List<KinesisRecord> filteredRecords = filter.apply(response.getRecords(), checkpoint.get());
millisBehindLatest.set(response.getMillisBehindLatest());
return filteredRecords;
}

Expand All @@ -95,7 +98,11 @@ private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException
shardIterator = response.getNextShardIterator();
return response;
} catch (ExpiredIteratorException e) {
LOG.info("Refreshing expired iterator", e);
LOG.info(
"Refreshing expired iterator for shard: streamName={}, shardId={}",
streamName,
shardId,
e);
shardIterator = checkpoint.get().getShardIterator(kinesis);
return fetchRecords();
}
Expand All @@ -107,11 +114,16 @@ ShardCheckpoint getCheckpoint() {

void ackRecord(KinesisRecord record) {
checkpoint.set(checkpoint.get().moveAfter(record));
watermarkPolicy.get().update(record);
watermarkPolicy.update(record);
latestRecordTimestampPolicy.update(record);
}

Instant getShardWatermark() {
return watermarkPolicy.get().getWatermark();
return watermarkPolicy.getWatermark();
}

Instant getLatestRecordTimestamp() {
return latestRecordTimestampPolicy.getWatermark();
}

String getShardId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;
import java.util.Objects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.common.InitialPositionInStream;
Expand Down Expand Up @@ -58,7 +59,7 @@ public Instant getTimestamp() {
}

@Override
public boolean equals(Object o) {
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.mockito.Mockito;
import software.amazon.awssdk.core.SdkBytes;
Expand Down Expand Up @@ -124,7 +125,7 @@ public Record convertToRecord() {
}

@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
return EqualsBuilder.reflectionEquals(this, obj);
}

Expand Down
Loading

0 comments on commit 2aff7de

Please sign in to comment.