Skip to content

Commit

Permalink
Adding support for timeout for multilang protocol, related to issue 1…
Browse files Browse the repository at this point in the history
…85 (#195)

* Adding timeout to waitForStatusMessage future call. Introducing new config properties timeoutEnabled and timeoutInSeconds. Halting the JVM if timeout is reached.

* Adding test cases for halt jvm code. Made the configuration objects for timeout optional.

* Addressing code review comments and making appropriate changes.
  • Loading branch information
sahilpalvia authored Aug 1, 2017
1 parent 7d56c4a commit e8f9ad3
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.Date;
import java.util.Optional;
import java.util.Set;

import org.apache.commons.lang.Validate;
Expand Down Expand Up @@ -213,6 +214,9 @@ public class KinesisClientLibConfiguration {
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization;

@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();

@Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;

Expand Down Expand Up @@ -1092,7 +1096,7 @@ public KinesisClientLibConfiguration withShardPrioritizationStrategy(ShardPriori
* Sets the size of the thread pool that will be used to renew leases.
*
* Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
*
*
* @param maxLeaseRenewalThreads
* the maximum size of the lease renewal thread pool
* @throws IllegalArgumentException
Expand All @@ -1106,4 +1110,12 @@ public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRene

return this;
}

/**
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
*/
public void withTimeoutInSeconds(final int timeoutInSeconds) {
this.timeoutInSeconds = Optional.of(timeoutInSeconds);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public MultiLangDaemonConfig(String propertiesFile,

kinesisClientLibConfig = configurator.getConfiguration(properties);
executorService = buildExecutorService(properties);
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig);

LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
*/
package com.amazonaws.services.kinesis.multilang;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
Expand All @@ -29,9 +27,14 @@
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;

import lombok.extern.apachecommons.CommonsLog;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An implementation of the multi language protocol.
*/
Expand All @@ -41,10 +44,11 @@ class MultiLangProtocol {
private MessageReader messageReader;
private MessageWriter messageWriter;
private final InitializationInput initializationInput;
private KinesisClientLibConfiguration configuration;

/**
* Constructor.
*
*
* @param messageReader
* A message reader.
* @param messageWriter
Expand All @@ -53,16 +57,17 @@ class MultiLangProtocol {
* information about the shard this processor is starting to process
*/
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
InitializationInput initializationInput) {
InitializationInput initializationInput, KinesisClientLibConfiguration configuration) {
this.messageReader = messageReader;
this.messageWriter = messageWriter;
this.initializationInput = initializationInput;
this.configuration = configuration;
}

/**
* Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with
* a {@link StatusMessage} on its STDOUT.
*
*
* @return Whether or not this operation succeeded.
*/
boolean initialize() {
Expand All @@ -77,7 +82,7 @@ boolean initialize() {
/**
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
* with a {@link StatusMessage} on its STDOUT.
*
*
* @param processRecordsInput
* The records, and associated metadata, to process.
* @return Whether or not this operation succeeded.
Expand All @@ -90,7 +95,7 @@ boolean processRecords(ProcessRecordsInput processRecordsInput) {
/**
* Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a
* {@link StatusMessage} on its STDOUT.
*
*
* @param checkpointer A checkpointer.
* @param reason Why this processor is being shutdown.
* @return Whether or not this operation succeeded.
Expand Down Expand Up @@ -119,7 +124,7 @@ boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
* all communications with the child process regarding checkpointing were successful. Note that whether or not the
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
* able to successfully communicate the results of its attempts to checkpoint.
*
*
* @param action
* What action is being waited on.
* @param checkpointer
Expand Down Expand Up @@ -150,44 +155,75 @@ private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer

/**
* Waits for status message and verifies it against the expectation
*
*
* @param action
* What action is being waited on.
* @param checkpointer
* the original process records request
* @return Whether or not this operation succeeded.
*/
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
StatusMessage statusMessage = null;
while (statusMessage == null) {
boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
Optional<StatusMessage> statusMessage = Optional.empty();
while (!statusMessage.isPresent()) {
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
try {
Message message = future.get();
// Note that instanceof doubles as a check against a value being null
if (message instanceof CheckpointMessage) {
boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get();
if (!checkpointWriteSucceeded) {
return false;
}
} else if (message instanceof StatusMessage) {
statusMessage = (StatusMessage) message;
}
} catch (InterruptedException e) {
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
initializationInput.getShardId()));
Optional<Message> message = configuration.getTimeoutInSeconds()
.map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action))
.orElse(futureMethod(future::get, action));

if (!message.isPresent()) {
return false;
} catch (ExecutionException e) {
log.error(String.format("Failed to get status message for %s action for shard %s", action,
initializationInput.getShardId()), e);
}

Optional<Boolean> checkpointFailed = message.filter(m -> m instanceof CheckpointMessage )
.map(m -> (CheckpointMessage) m)
.flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint"))
.map(checkpointSuccess -> !checkpointSuccess);

if (checkpointFailed.orElse(false)) {
return false;
}

statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m );
}
return this.validateStatusMessage(statusMessage.get(), action);
}

private interface FutureMethod<T> {
T get() throws InterruptedException, TimeoutException, ExecutionException;
}

private <T> Optional<T> futureMethod(FutureMethod<T> fm, String action) {
try {
return Optional.of(fm.get());
} catch (InterruptedException e) {
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
initializationInput.getShardId()), e);
} catch (ExecutionException e) {
log.error(String.format("Failed to get status message for %s action for shard %s", action,
initializationInput.getShardId()), e);
} catch (TimeoutException e) {
log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...",
action,
initializationInput.getShardId()),
e);
haltJvm(1);
}
return this.validateStatusMessage(statusMessage, action);
return Optional.empty();
}

/**
* This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM
* without calling the Shutdown hooks.
*
* @param exitStatus The exit status with which the JVM is to be halted.
*/
protected void haltJvm(int exitStatus) {
Runtime.getRuntime().halt(exitStatus);
}

/**
* Utility for confirming that the status message is for the provided action.
*
*
* @param statusMessage The status of the child process.
* @param action The action that was being waited on.
* @return Whether or not this operation succeeded.
Expand All @@ -205,7 +241,7 @@ private boolean validateStatusMessage(StatusMessage statusMessage, String action
* provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then
* this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing
* the attempt to write the result of this checkpoint attempt to the child process.
*
*
* @param checkpointMessage A checkpoint message.
* @param checkpointer A checkpointer.
* @return Whether or not this operation succeeded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti

private MultiLangProtocol protocol;

private KinesisClientLibConfiguration configuration;

@Override
public void initialize(InitializationInput initializationInput) {
try {
Expand All @@ -87,7 +90,7 @@ public void initialize(InitializationInput initializationInput) {
// Submit the error reader for execution
stderrReadTask = executorService.submit(readSTDERRTask);

protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput);
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration);
if (!protocol.initialize()) {
throw new RuntimeException("Failed to initialize child process");
}
Expand Down Expand Up @@ -173,9 +176,9 @@ private enum ProcessState {
* An obejct mapper.
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) {
ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask());
new DrainChildSTDERRTask(), configuration);
}

/**
Expand All @@ -195,13 +198,16 @@ private enum ProcessState {
* Error reader to read from child process's stderr
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask,
KinesisClientLibConfiguration configuration) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask;
this.configuration = configuration;


this.state = ProcessState.ACTIVE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.concurrent.ExecutorService;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand All @@ -39,24 +40,29 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory

private final ExecutorService executorService;

private final KinesisClientLibConfiguration configuration;

/**
* @param command The command that will do processing for this factory's record processors.
* @param executorService An executor service to use while processing inputs and outputs of the child process.
*/
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) {
this(command, executorService, new ObjectMapper());
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService,
KinesisClientLibConfiguration configuration) {
this(command, executorService, new ObjectMapper(), configuration);
}

/**
* @param command The command that will do processing for this factory's record processors.
* @param executorService An executor service to use while processing inputs and outputs of the child process.
* @param objectMapper An object mapper used to convert messages to json to be written to the child process
*/
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) {
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper,
KinesisClientLibConfiguration configuration) {
this.command = command;
this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
this.executorService = executorService;
this.objectMapper = objectMapper;
this.configuration = configuration;
}

@Override
Expand All @@ -65,7 +71,8 @@ public IRecordProcessor createProcessor() {
/*
* Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments.
*/
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper);
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper,
this.configuration);
}

String[] getCommandArray() {
Expand Down
Loading

0 comments on commit e8f9ad3

Please sign in to comment.