Skip to content

Commit

Permalink
# 0.23.8
Browse files Browse the repository at this point in the history
*synapase-kinesis*
* Bugfix NullCheck for response.failedRecordCount() in KinesisMessageSender
* Bugfix configure skipNextEmptyParts in KinesisMessageLogReader
  • Loading branch information
peterfouquet0001 committed Feb 21, 2022
1 parent 9a87c9c commit 4f77f2a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# 0.23.8
*synapase-kinesis*
* Bugfix NullCheck for response.failedRecordCount() in KinesisMessageSender
* Bugfix configure skipNextEmptyParts in KinesisMessageLogReader

# 0.23.7
*synapse-compaction-aws-s3*
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ subprojects {

compileJava.dependsOn(processResources)

version = '0.23.8-SNAPSHOT'
version = '0.23.8'
group = 'de.otto.synapse'

repositories {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ public class KinesisMessageLogReader {

private static final Logger LOG = getLogger(KinesisMessageLogReader.class);

public static final int DEFAULT_WAITING_TIME_ON_EMPTY_RECORDS = 10000;

private final String channelName;
private final KinesisAsyncClient kinesisClient;
private final ExecutorService executorService;
private final Clock clock;
private final AtomicReference<List<KinesisShardReader>> kinesisShardReaders = new AtomicReference<>();

public static final int SKIP_NEXT_PARTS = 8;
private final int waitingTimeOnEmptyRecords;
public static final int DEFAULT_WAITING_TIME_ON_EMPTY_RECORDS = 10000;
public static final int DEFAULT_WAITING_TIME_ON_SKIP_EMPTY_PARTS = 200; // max 5 calls per second per shard

private final int waitingTimeOnEmptyRecords;
private final int skipNextEmptyParts;
private final int waitingTimeOnSkipEmptyParts;
private final Marker marker;


Expand All @@ -56,12 +58,25 @@ public KinesisMessageLogReader(final String channelName,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final Clock clock, final int waitingTimeOnEmptyRecords, final Marker marker) {
this(channelName, kinesisClient, executorService, clock, waitingTimeOnEmptyRecords, SKIP_NEXT_PARTS, DEFAULT_WAITING_TIME_ON_SKIP_EMPTY_PARTS, marker);
}

public KinesisMessageLogReader(final String channelName,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final Clock clock,
final int waitingTimeOnEmptyRecords,
final int skipNextEmptyParts,
final int waitingTimeOnSkipEmptyParts,
final Marker marker) {
this.channelName = channelName;
this.kinesisClient = kinesisClient;
this.executorService = executorService;
this.clock = clock;

this.waitingTimeOnEmptyRecords = waitingTimeOnEmptyRecords;
this.skipNextEmptyParts = skipNextEmptyParts;
this.waitingTimeOnSkipEmptyParts = waitingTimeOnSkipEmptyParts;

this.marker = marker;
}
Expand Down Expand Up @@ -113,7 +128,7 @@ public CompletableFuture<ChannelResponse> read(final KinesisMessageLogIterator i
.map(shardReader -> supplyAsync(
() -> {
final KinesisShardIterator shardIterator = iterator.getShardIterator(shardReader.getShardName());
return fetchNext(shardIterator, SKIP_NEXT_PARTS);
return fetchNext(shardIterator, skipNextEmptyParts);
},
executorService))
.collect(toList());
Expand All @@ -132,6 +147,11 @@ private ShardResponse fetchNext(final KinesisShardIterator shardIterator, int sk
final String id = shardIterator.getId();
final ShardResponse shardResponse = shardIterator.next();
if(shardResponse.getMessages().isEmpty() && !shardIterator.isPoison() && !Objects.equals(shardIterator.getId(), id) && skipNextParts > 0) {
try {
Thread.sleep(waitingTimeOnSkipEmptyParts); // avoid to many request in a short timespan
} catch (InterruptedException e) {
LOG.warn(marker, "Thread got interrupted");
}
return fetchNext(shardIterator, --skipNextParts);
}
return shardResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void before() {
public void shouldRetrieveEmptyListOfShards() {
// given
describeStreamResponse(of());
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
List<KinesisShardReader> shards = logReader.getCurrentKinesisShards();
Expand All @@ -94,7 +94,7 @@ public void shouldRetrieveEmptyListOfShards() {
public void shouldRetrieveSingleOpenShard() {
// given
describeStreamResponse(of(someShard("shard1", true)));
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
List<KinesisShardReader> shards = logReader.getCurrentKinesisShards();
Expand All @@ -112,7 +112,7 @@ public void shouldGetOpenShards() {
someShard("shard2", false),
someShard("shard3", true)
));
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
List<String> shards = logReader.getOpenShards();
Expand All @@ -130,7 +130,7 @@ public void shouldRetrieveOnlyOpenShards() {
someShard("shard1", true),
someShard("shard2", false),
someShard("shard3", true)));
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
List<KinesisShardReader> shards = logReader.getCurrentKinesisShards();
Expand All @@ -152,7 +152,7 @@ public void shouldRetrieveShardsOfMultipleResponses() {
someShard("shard3", true),
someShard("shard4", true)),
true);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
List<KinesisShardReader> shards = logReader.getCurrentKinesisShards();
Expand All @@ -176,7 +176,7 @@ public void shouldRetrieveShardsOfMultipleResponsesWithFirstShardsClosed() {
someShard("shard3", true),
someShard("shard4", true)),
true);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock,100, 2, 10, null);

// when
List<KinesisShardReader> shards = logReader.getCurrentKinesisShards();
Expand All @@ -195,7 +195,7 @@ public void shouldConsumeAllResponses() throws ExecutionException, InterruptedEx
someShard("shard1", true)));
describeRecordsForShard("shard1", true);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock,100, 2, 10, null);

// when
logReader.consumeUntil(fromHorizon(), shutdown(), responseConsumer).get();
Expand Down Expand Up @@ -230,7 +230,7 @@ public void shouldConsumeAllResponsesUntilEndOfChannel() throws ExecutionExcepti
someShard("shard1", true)));
describeRecordsForShard("shard1", false);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock,100, 2, 10, null);

// when
ChannelPosition position = logReader.consumeUntil(fromHorizon(), endOfChannel(), responseConsumer).get();
Expand Down Expand Up @@ -266,7 +266,7 @@ public void shouldConsumeAllResponsesFromMultipleShardsTilEndOfChannel() throws
describeRecordsForShard("shard1", false);
describeRecordsForShard("shard2", false);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
ChannelPosition position = logReader.consumeUntil(fromHorizon(), endOfChannel(), responseConsumer).get();
Expand All @@ -290,7 +290,7 @@ public void shouldIterateResponses() throws ExecutionException, InterruptedExcep
someShard("shard1", true)));
describeRecordsForShard("shard1", true);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
final KinesisMessageLogIterator iterator = logReader.getMessageLogIterator(fromHorizon());
Expand Down Expand Up @@ -321,7 +321,7 @@ public void shouldIterateResponsesWithMultipleShards() throws ExecutionException
describeRecordsForShard("shard1", false);
describeRecordsForShard("shard2", false);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
final KinesisMessageLogIterator iterator = logReader.getMessageLogIterator(fromHorizon());
Expand Down Expand Up @@ -383,7 +383,7 @@ public void shouldConsumeAllMessagesFromMultipleShards() throws ExecutionExcepti


// when
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

final CompletableFuture<ChannelPosition> futurePosition = logReader.consumeUntil(fromHorizon(), shutdown(), responseConsumer);
futurePosition.get();
Expand All @@ -407,7 +407,7 @@ public void shouldShutdownOnStop() throws ExecutionException, InterruptedExcepti
someShard("shard1", true)));
describeRecordsForShard("shard1", false);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 1000, null);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
final CompletableFuture<ChannelPosition> finalChannelPosition = logReader.consumeUntil(fromHorizon(), shutdown(), responseConsumer);
Expand All @@ -428,7 +428,7 @@ public void shouldStopShardsOnStop() throws InterruptedException, ExecutionExcep
someShard("shard1", true)));
describeRecordsForShard("shard1", false);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 1000, null);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);


// when
Expand All @@ -450,7 +450,7 @@ public void shouldShutdownOnException() throws ExecutionException, InterruptedEx
);
describeRecordsForShard("shard1", true);
describeRecordsForShard("failing-shard2", true);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);

// when
logReader.consumeUntil(fromHorizon(), shutdown(), responseConsumer).get();
Expand All @@ -473,7 +473,7 @@ public void shouldBeAbleToRestartConsumeAfterException() throws ExecutionExcepti
describeRecordsForShard("shard1", true);
describeRecordsForShard("shard2", true);

logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock);
logReader = new KinesisMessageLogReader("channelName", kinesisClient, executorService, clock, 100, 2, 10, null);
try {
logReader.consumeUntil(fromHorizon(), shutdown(), responseConsumer).get();
} catch (ExecutionException e) {
Expand Down

0 comments on commit 4f77f2a

Please sign in to comment.