Skip to content

Commit

Permalink
[BEAM-8382] Add polling interval to KinesisIO.Read
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarr1-godaddy committed Feb 10, 2020
1 parent a005fd7 commit 1a1b6bb
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.AmazonClientException;

/** Thrown when the Kinesis client was throttled due to rate limits. */
class KinesisClientThrottledException extends TransientKinesisException {

public KinesisClientThrottledException(String s, AmazonClientException e) {
super(s, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public static Read read() {
.setMaxNumRecords(Long.MAX_VALUE)
.setUpToDateThreshold(Duration.ZERO)
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withoutLimiter())
.setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD)
.build();
}
Expand Down Expand Up @@ -273,6 +274,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Kinesis

abstract WatermarkPolicyFactory getWatermarkPolicyFactory();

abstract RateLimitPolicyFactory getRateLimitPolicyFactory();

abstract Integer getMaxCapacityPerShard();

abstract Builder toBuilder();
Expand All @@ -296,6 +299,8 @@ abstract static class Builder {

abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);

abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory);

abstract Builder setMaxCapacityPerShard(Integer maxCapacity);

abstract Read build();
Expand Down Expand Up @@ -425,6 +430,35 @@ public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFact
return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
}

/**
* Specifies the rate limit policy as FixedDelayRateLimiter with the default delay of 1 second.
*/
public Read withFixedDelayRateLimitPolicy() {
return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay()).build();
}

/**
* Specifies the rate limit policy as FixedDelayRateLimiter with the given delay.
*
* @param delay Denotes the fixed delay duration.
*/
public Read withFixedDelayRateLimitPolicy(Duration delay) {
checkArgument(delay != null, "delay cannot be null");
return toBuilder()
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay(delay))
.build();
}

/**
* Specifies the {@code RateLimitPolicyFactory} for a custom rate limiter.
*
* @param rateLimitPolicyFactory Custom rate limit policy factory.
*/
public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
checkArgument(rateLimitPolicyFactory != null, "rateLimitPolicyFactory cannot be null");
return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build();
}

/** Specifies the maximum number of messages per one shard. */
public Read withMaxCapacityPerShard(Integer maxCapacity) {
checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity);
Expand All @@ -441,6 +475,7 @@ public PCollection<KinesisRecord> expand(PBegin input) {
getInitialPosition(),
getUpToDateThreshold(),
getWatermarkPolicyFactory(),
getRateLimitPolicyFactory(),
getRequestRecordsLimit(),
getMaxCapacityPerShard()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
private final KinesisSource source;
private final CheckpointGenerator initialCheckpointGenerator;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final RateLimitPolicyFactory rateLimitPolicyFactory;
private final Duration upToDateThreshold;
private final Duration backlogBytesCheckThreshold;
private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
Expand All @@ -53,13 +54,15 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Duration upToDateThreshold,
Integer maxCapacityPerShard) {
this(
kinesis,
initialCheckpointGenerator,
source,
watermarkPolicyFactory,
rateLimitPolicyFactory,
upToDateThreshold,
Duration.standardSeconds(30),
maxCapacityPerShard);
Expand All @@ -70,13 +73,15 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Duration upToDateThreshold,
Duration backlogBytesCheckThreshold,
Integer maxCapacityPerShard) {
this.kinesis = checkNotNull(kinesis, "kinesis");
this.initialCheckpointGenerator =
checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.rateLimitPolicyFactory = rateLimitPolicyFactory;
this.source = source;
this.upToDateThreshold = upToDateThreshold;
this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
Expand Down Expand Up @@ -185,6 +190,7 @@ ShardReadersPool createShardReadersPool() throws TransientKinesisException {
kinesis,
initialCheckpointGenerator.generate(kinesis),
watermarkPolicyFactory,
rateLimitPolicyFactory,
maxCapacityPerShard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
private final String streamName;
private final Duration upToDateThreshold;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final RateLimitPolicyFactory rateLimitPolicyFactory;
private CheckpointGenerator initialCheckpointGenerator;
private final Integer limit;
private final Integer maxCapacityPerShard;
Expand All @@ -48,6 +49,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
StartingPoint startingPoint,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Integer limit,
Integer maxCapacityPerShard) {
this(
Expand All @@ -56,6 +58,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
streamName,
upToDateThreshold,
watermarkPolicyFactory,
rateLimitPolicyFactory,
limit,
maxCapacityPerShard);
}
Expand All @@ -66,13 +69,15 @@ private KinesisSource(
String streamName,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Integer limit,
Integer maxCapacityPerShard) {
this.awsClientsProvider = awsClientsProvider;
this.initialCheckpointGenerator = initialCheckpoint;
this.streamName = streamName;
this.upToDateThreshold = upToDateThreshold;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.rateLimitPolicyFactory = rateLimitPolicyFactory;
this.limit = limit;
this.maxCapacityPerShard = maxCapacityPerShard;
validate();
Expand All @@ -98,6 +103,7 @@ public List<KinesisSource> split(int desiredNumSplits, PipelineOptions options)
streamName,
upToDateThreshold,
watermarkPolicyFactory,
rateLimitPolicyFactory,
limit,
maxCapacityPerShard));
}
Expand Down Expand Up @@ -126,6 +132,7 @@ public UnboundedReader<KinesisRecord> createReader(
checkpointGenerator,
this,
watermarkPolicyFactory,
rateLimitPolicyFactory,
upToDateThreshold,
maxCapacityPerShard);
}
Expand All @@ -139,6 +146,8 @@ public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
public void validate() {
checkNotNull(awsClientsProvider);
checkNotNull(initialCheckpointGenerator);
checkNotNull(watermarkPolicyFactory);
checkNotNull(rateLimitPolicyFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;

import java.util.List;

public interface RateLimitPolicy {

/**
* Called after Kinesis records are successfully retrieved.
*
* @param records The list of retrieved records.
*/
default void onSuccess(List<KinesisRecord> records) throws InterruptedException {}

/**
* Called after the Kinesis client is throttled.
*
* @param e The {@code KinesisClientThrottledException} thrown by the client.
*/
default void onThrottle(KinesisClientThrottledException e) throws InterruptedException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;

import java.io.Serializable;
import java.util.List;
import org.joda.time.Duration;

/**
* Implement this interface to create a {@code RateLimitPolicy}. Used to create a rate limiter for
* each shard.
*/
public interface RateLimitPolicyFactory extends Serializable {

RateLimitPolicy getRateLimitPolicy();

static RateLimitPolicyFactory withoutLimiter() {
return () -> new RateLimitPolicy() {};
}

static RateLimitPolicyFactory withFixedDelay() {
return FixedDelayRateLimiter::new;
}

static RateLimitPolicyFactory withFixedDelay(Duration delay) {
return () -> new FixedDelayRateLimiter(delay);
}

class FixedDelayRateLimiter implements RateLimitPolicy {

private static final Duration DEFAULT_DELAY = Duration.standardSeconds(1);

private final Duration delay;

public FixedDelayRateLimiter() {
this(DEFAULT_DELAY);
}

public FixedDelayRateLimiter(Duration delay) {
this.delay = delay;
}

@Override
public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
Thread.sleep(delay.getMillis());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class ShardReadersPool {

private final SimplifiedKinesisClient kinesis;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final RateLimitPolicyFactory rateLimitPolicyFactory;
private final KinesisReaderCheckpoint initialCheckpoint;
private final int queueCapacityPerShard;
private final AtomicBoolean poolOpened = new AtomicBoolean(true);
Expand All @@ -83,10 +84,12 @@ class ShardReadersPool {
SimplifiedKinesisClient kinesis,
KinesisReaderCheckpoint initialCheckpoint,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
int queueCapacityPerShard) {
this.kinesis = kinesis;
this.initialCheckpoint = initialCheckpoint;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.rateLimitPolicyFactory = rateLimitPolicyFactory;
this.queueCapacityPerShard = queueCapacityPerShard;
this.executorService = Executors.newCachedThreadPool();
this.numberOfRecordsInAQueueByShard = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -115,11 +118,12 @@ void start() throws TransientKinesisException {
void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) {
numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new AtomicInteger());
executorService.submit(() -> readLoop(recordsIterator));
executorService.submit(
() -> readLoop(recordsIterator, rateLimitPolicyFactory.getRateLimitPolicy()));
}
}

private void readLoop(ShardRecordsIterator shardRecordsIterator) {
private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
while (poolOpened.get()) {
try {
List<KinesisRecord> kinesisRecords;
Expand All @@ -143,10 +147,20 @@ private void readLoop(ShardRecordsIterator shardRecordsIterator) {
recordsQueue.put(kinesisRecord);
numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
}
rateLimiter.onSuccess(kinesisRecords);
} catch (KinesisClientThrottledException e) {
try {
rateLimiter.onThrottle(e);
} catch (InterruptedException ex) {
LOG.warn("Thread was interrupted, finishing the read loop", ex);
Thread.currentThread().interrupt();
break;
}
} catch (TransientKinesisException e) {
LOG.warn("Transient exception occurred.", e);
} catch (InterruptedException e) {
LOG.warn("Thread was interrupted, finishing the read loop", e);
Thread.currentThread().interrupt();
break;
} catch (Throwable e) {
LOG.error("Unexpected exception occurred", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisExcept
} catch (ExpiredIteratorException e) {
throw e;
} catch (LimitExceededException | ProvisionedThroughputExceededException e) {
throw new TransientKinesisException(
throw new KinesisClientThrottledException(
"Too many requests to Kinesis. Wait some time and retry.", e);
} catch (AmazonServiceException e) {
if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private KinesisReader createReader(Duration backlogBytesCheckThreshold) {
generator,
kinesisSource,
WatermarkPolicyFactory.withArrivalTimePolicy(),
RateLimitPolicyFactory.withoutLimiter(),
Duration.ZERO,
backlogBytesCheckThreshold,
ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD) {
Expand Down
Loading

0 comments on commit 1a1b6bb

Please sign in to comment.