Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart from #492

Merged
merged 13 commits into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.google.common.annotations.VisibleForTesting;

import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -44,7 +39,6 @@
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;

/**
* Responsible for consuming data records of a (specified) shard.
Expand All @@ -60,7 +54,6 @@ public class ShardConsumer {
public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 35000;
private final RecordsPublisher recordsPublisher;
private final ExecutorService executorService;
private final Scheduler scheduler;
private final ShardInfo shardInfo;
private final ShardConsumerArgument shardConsumerArgument;
@NonNull
Expand All @@ -72,9 +65,6 @@ public class ShardConsumer {
private ConsumerTask currentTask;
private TaskOutcome taskOutcome;

private final AtomicReference<Throwable> processFailure = new AtomicReference<>(null);
private final AtomicReference<Throwable> dispatchFailure = new AtomicReference<>(null);

private CompletableFuture<Boolean> stateChangeFuture;
private boolean needsInitialization = true;

Expand All @@ -94,7 +84,7 @@ public class ShardConsumer {
private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification;

private final InternalSubscriber subscriber;
private final ShardConsumerSubscriber subscriber;

public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
Expand All @@ -119,73 +109,16 @@ public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executor
this.taskExecutionListener = taskExecutionListener;
this.currentState = initialState;
this.taskMetricsDecorator = taskMetricsDecorator;
scheduler = Schedulers.from(executorService);
subscriber = new InternalSubscriber();
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, this);
this.bufferSize = bufferSize;

if (this.shardInfo.isCompleted()) {
markForShutdown(ShutdownReason.SHARD_END);
}
}

private void startSubscriptions() {
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
.subscribe(subscriber);
}

private final Object lockObject = new Object();
private Instant lastRequestTime = null;

private class InternalSubscriber implements Subscriber<ProcessRecordsInput> {

private Subscription subscription;
private volatile Instant lastDataArrival;

@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}

@Override
public void onNext(ProcessRecordsInput input) {
try {
synchronized (lockObject) {
lastRequestTime = null;
}
lastDataArrival = Instant.now();
handleInput(input.toBuilder().cacheExitTime(Instant.now()).build(), subscription);
} catch (Throwable t) {
log.warn("{}: Caught exception from handleInput", shardInfo.shardId(), t);
dispatchFailure.set(t);
} finally {
subscription.request(1);
synchronized (lockObject) {
lastRequestTime = Instant.now();
}
}
}

@Override
public void onError(Throwable t) {
log.warn("{}: onError(). Cancelling subscription, and marking self as failed.", shardInfo.shardId(), t);
subscription.cancel();
processFailure.set(t);
}

@Override
public void onComplete() {
log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally", shardInfo.shardId());
}

public void cancel() {
if (subscription != null) {
subscription.cancel();
}
}
}

private synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
if (isShutdownRequested()) {
subscription.cancel();
return;
Expand Down Expand Up @@ -240,50 +173,15 @@ public void executeLifecycle() {
Throwable healthCheck() {
logNoDataRetrievedAfterTime();
logLongRunningTask();
Throwable failure = processFailure.get();
if (!processFailure.compareAndSet(failure, null) && failure != null) {
log.error("{}: processFailure was updated while resetting, this shouldn't happen. " +
"Will retry on next health check", shardInfo.shardId());
return null;
}
Throwable failure = subscriber.healthCheck(MAX_TIME_BETWEEN_REQUEST_RESPONSE);

if (failure != null) {
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardInfo.shardId());
if (failure instanceof RetryableRetrievalException) {
log.debug(logMessage, failure.getCause());
} else {
log.warn(logMessage, failure);
}
startSubscriptions();
return failure;
}
Throwable expectedDispatchFailure = dispatchFailure.get();
if (expectedDispatchFailure != null) {
if (!dispatchFailure.compareAndSet(expectedDispatchFailure, null)) {
log.info("{}: Unable to reset the dispatch failure, this can happen if the record processor is failing aggressively.", shardInfo.shardId());
return null;
}
log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped", expectedDispatchFailure);
return expectedDispatchFailure;
}
synchronized (lockObject) {
if (lastRequestTime != null) {
Instant now = Instant.now();
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > MAX_TIME_BETWEEN_REQUEST_RESPONSE) {
log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.",
shardInfo.shardId(), lastRequestTime, now, timeSinceLastResponse);
if (subscriber != null) {
subscriber.cancel();
}
//
// Set the last request time to now, we specifically don't null it out since we want it to trigger a
// restart if the subscription still doesn't start producing.
//
lastRequestTime = Instant.now();
startSubscriptions();
}
}
Throwable dispatchFailure = subscriber.getAndResetDispatchFailure();
if (dispatchFailure != null) {
log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped", dispatchFailure);
return dispatchFailure;
}

return null;
Expand All @@ -306,10 +204,10 @@ String longRunningTaskMessage(Duration taken) {

private void logNoDataRetrievedAfterTime() {
logWarningForTaskAfterMillis.ifPresent(value -> {
Instant lastDataArrival = subscriber.lastDataArrival;
Instant lastDataArrival = subscriber.lastDataArrival();
if (lastDataArrival != null) {
Instant now = Instant.now();
Duration timeSince = Duration.between(subscriber.lastDataArrival, now);
Duration timeSince = Duration.between(subscriber.lastDataArrival(), now);
if (timeSince.toMillis() > value) {
log.warn("Last time data arrived: {} ({})", lastDataArrival, timeSince);
}
Expand All @@ -335,7 +233,7 @@ private void logLongRunningTask() {

@VisibleForTesting
void subscribe() {
startSubscriptions();
subscriber.startSubscriptions();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;

@Slf4j
@Accessors(fluent = true)
class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {

private final RecordsPublisher recordsPublisher;
private final Scheduler scheduler;
private final int bufferSize;
private final ShardConsumer shardConsumer;

@VisibleForTesting
final Object lockObject = new Object();
private Instant lastRequestTime = null;
private RecordsRetrieved lastAccepted = null;

private Subscription subscription;
@Getter
private volatile Instant lastDataArrival;
@Getter
private volatile Throwable dispatchFailure;
@Getter(AccessLevel.PACKAGE)
private volatile Throwable retrievalFailure;

ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer) {
this.recordsPublisher = recordsPublisher;
this.scheduler = Schedulers.from(executorService);
this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer;
}

void startSubscriptions() {
synchronized (lockObject) {
if (lastAccepted != null) {
recordsPublisher.restartFrom(lastAccepted);
}
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
.subscribe(this);
}
}

Throwable healthCheck(long maxTimeBetweenRequests) {
Throwable result = restartIfFailed();
if (result == null) {
restartIfRequestTimerExpired(maxTimeBetweenRequests);
}
return result;
}

Throwable getAndResetDispatchFailure() {
synchronized (lockObject) {
Throwable failure = dispatchFailure;
dispatchFailure = null;
return failure;
}
}

private Throwable restartIfFailed() {
Throwable oldFailure = null;
if (retrievalFailure != null) {
synchronized (lockObject) {
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardConsumer.shardInfo().shardId());
if (retrievalFailure instanceof RetryableRetrievalException) {
log.debug(logMessage, retrievalFailure.getCause());
} else {
log.warn(logMessage, retrievalFailure);
}
oldFailure = retrievalFailure;
retrievalFailure = null;
}
startSubscriptions();
}

return oldFailure;
}

private void restartIfRequestTimerExpired(long maxTimeBetweenRequests) {
synchronized (lockObject) {
if (lastRequestTime != null) {
Instant now = Instant.now();
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse);
cancel();
//
// Set the last request time to now, we specifically don't null it out since we want it to
// trigger a
// restart if the subscription still doesn't start producing.
//
lastRequestTime = Instant.now();
startSubscriptions();
}
}
}
}

@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}

@Override
public void onNext(RecordsRetrieved input) {
try {
synchronized (lockObject) {
lastRequestTime = null;
}
lastDataArrival = Instant.now();
shardConsumer.handleInput(input.processRecordsInput().toBuilder().cacheExitTime(Instant.now()).build(),
subscription);

} catch (Throwable t) {
log.warn("{}: Caught exception from handleInput", shardConsumer.shardInfo().shardId(), t);
synchronized (lockObject) {
dispatchFailure = t;
}
} finally {
subscription.request(1);
synchronized (lockObject) {
lastAccepted = input;
lastRequestTime = Instant.now();
}
}
}

@Override
public void onError(Throwable t) {
synchronized (lockObject) {
log.warn("{}: onError(). Cancelling subscription, and marking self as failed.",
shardConsumer.shardInfo().shardId(), t);
subscription.cancel();
retrievalFailure = t;
}
}

@Override
public void onComplete() {
log.debug("{}: onComplete(): Received onComplete. Activity should be triggered externally",
shardConsumer.shardInfo().shardId());
}

public void cancel() {
if (subscription != null) {
subscription.cancel();
}
}
}
Loading