diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 3b10c03e3..1a2c371ec 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Date; +import java.util.Optional; import java.util.Set; import org.apache.commons.lang.Validate; @@ -213,6 +214,9 @@ public class KinesisClientLibConfiguration { private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; + @Getter + private Optional timeoutInSeconds = Optional.empty(); + @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; @@ -1092,7 +1096,7 @@ public KinesisClientLibConfiguration withShardPrioritizationStrategy(ShardPriori * Sets the size of the thread pool that will be used to renew leases. * * Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate. - * + * * @param maxLeaseRenewalThreads * the maximum size of the lease renewal thread pool * @throws IllegalArgumentException @@ -1106,4 +1110,12 @@ public KinesisClientLibConfiguration withMaxLeaseRenewalThreads(int maxLeaseRene return this; } + + /** + * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for + */ + public void withTimeoutInSeconds(final int timeoutInSeconds) { + this.timeoutInSeconds = Optional.of(timeoutInSeconds); + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index f191eedc4..01b3a27ff 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -100,7 +100,7 @@ public MultiLangDaemonConfig(String propertiesFile, kinesisClientLibConfig = configurator.getConfiguration(properties); executorService = buildExecutorService(properties); - recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService); + recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService, kinesisClientLibConfig); LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream " + kinesisClientLibConfig.getStreamName() + " with executable " + executableName); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 67a97770b..0376242ae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -14,11 +14,9 @@ */ package com.amazonaws.services.kinesis.multilang; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; @@ -29,9 +27,14 @@ import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; - import lombok.extern.apachecommons.CommonsLog; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + /** * An implementation of the multi language protocol. */ @@ -41,10 +44,11 @@ class MultiLangProtocol { private MessageReader messageReader; private MessageWriter messageWriter; private final InitializationInput initializationInput; + private KinesisClientLibConfiguration configuration; /** * Constructor. - * + * * @param messageReader * A message reader. * @param messageWriter @@ -53,16 +57,17 @@ class MultiLangProtocol { * information about the shard this processor is starting to process */ MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, - InitializationInput initializationInput) { + InitializationInput initializationInput, KinesisClientLibConfiguration configuration) { this.messageReader = messageReader; this.messageWriter = messageWriter; this.initializationInput = initializationInput; + this.configuration = configuration; } /** * Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with * a {@link StatusMessage} on its STDOUT. - * + * * @return Whether or not this operation succeeded. */ boolean initialize() { @@ -77,7 +82,7 @@ boolean initialize() { /** * Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond * with a {@link StatusMessage} on its STDOUT. - * + * * @param processRecordsInput * The records, and associated metadata, to process. * @return Whether or not this operation succeeded. @@ -90,7 +95,7 @@ boolean processRecords(ProcessRecordsInput processRecordsInput) { /** * Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a * {@link StatusMessage} on its STDOUT. - * + * * @param checkpointer A checkpointer. * @param reason Why this processor is being shutdown. * @return Whether or not this operation succeeded. @@ -119,7 +124,7 @@ boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { * all communications with the child process regarding checkpointing were successful. Note that whether or not the * checkpointing itself was successful is not the concern of this method. This method simply cares whether it was * able to successfully communicate the results of its attempts to checkpoint. - * + * * @param action * What action is being waited on. * @param checkpointer @@ -150,44 +155,75 @@ private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer /** * Waits for status message and verifies it against the expectation - * + * * @param action * What action is being waited on. * @param checkpointer * the original process records request * @return Whether or not this operation succeeded. */ - private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { - StatusMessage statusMessage = null; - while (statusMessage == null) { + boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { + Optional statusMessage = Optional.empty(); + while (!statusMessage.isPresent()) { Future future = this.messageReader.getNextMessageFromSTDOUT(); - try { - Message message = future.get(); - // Note that instanceof doubles as a check against a value being null - if (message instanceof CheckpointMessage) { - boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get(); - if (!checkpointWriteSucceeded) { - return false; - } - } else if (message instanceof StatusMessage) { - statusMessage = (StatusMessage) message; - } - } catch (InterruptedException e) { - log.error(String.format("Interrupted while waiting for %s message for shard %s", action, - initializationInput.getShardId())); + Optional message = configuration.getTimeoutInSeconds() + .map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action)) + .orElse(futureMethod(future::get, action)); + + if (!message.isPresent()) { return false; - } catch (ExecutionException e) { - log.error(String.format("Failed to get status message for %s action for shard %s", action, - initializationInput.getShardId()), e); + } + + Optional checkpointFailed = message.filter(m -> m instanceof CheckpointMessage ) + .map(m -> (CheckpointMessage) m) + .flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint")) + .map(checkpointSuccess -> !checkpointSuccess); + + if (checkpointFailed.orElse(false)) { return false; } + + statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m ); + } + return this.validateStatusMessage(statusMessage.get(), action); + } + + private interface FutureMethod { + T get() throws InterruptedException, TimeoutException, ExecutionException; + } + + private Optional futureMethod(FutureMethod fm, String action) { + try { + return Optional.of(fm.get()); + } catch (InterruptedException e) { + log.error(String.format("Interrupted while waiting for %s message for shard %s", action, + initializationInput.getShardId()), e); + } catch (ExecutionException e) { + log.error(String.format("Failed to get status message for %s action for shard %s", action, + initializationInput.getShardId()), e); + } catch (TimeoutException e) { + log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...", + action, + initializationInput.getShardId()), + e); + haltJvm(1); } - return this.validateStatusMessage(statusMessage, action); + return Optional.empty(); + } + + /** + * This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM + * without calling the Shutdown hooks. + * + * @param exitStatus The exit status with which the JVM is to be halted. + */ + protected void haltJvm(int exitStatus) { + Runtime.getRuntime().halt(exitStatus); } /** * Utility for confirming that the status message is for the provided action. - * + * * @param statusMessage The status of the child process. * @param action The action that was being waited on. * @return Whether or not this operation succeeded. @@ -205,7 +241,7 @@ private boolean validateStatusMessage(StatusMessage statusMessage, String action * provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then * this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing * the attempt to write the result of this checkpoint attempt to the child process. - * + * * @param checkpointMessage A checkpoint message. * @param checkpointer A checkpointer. * @return Whether or not this operation succeeded. diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 2532771bc..bbcf957bc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,8 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti private MultiLangProtocol protocol; + private KinesisClientLibConfiguration configuration; + @Override public void initialize(InitializationInput initializationInput) { try { @@ -87,7 +90,7 @@ public void initialize(InitializationInput initializationInput) { // Submit the error reader for execution stderrReadTask = executorService.submit(readSTDERRTask); - protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput); + protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput, configuration); if (!protocol.initialize()) { throw new RuntimeException("Failed to initialize child process"); } @@ -173,9 +176,9 @@ private enum ProcessState { * An obejct mapper. */ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) { this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), - new DrainChildSTDERRTask()); + new DrainChildSTDERRTask(), configuration); } /** @@ -195,13 +198,16 @@ private enum ProcessState { * Error reader to read from child process's stderr */ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, - MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) { + MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask, + KinesisClientLibConfiguration configuration) { this.executorService = executorService; this.processBuilder = processBuilder; this.objectMapper = objectMapper; this.messageWriter = messageWriter; this.messageReader = messageReader; this.readSTDERRTask = readSTDERRTask; + this.configuration = configuration; + this.state = ProcessState.ACTIVE; } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java index e55217a6f..e596abf2c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,12 +40,15 @@ public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory private final ExecutorService executorService; + private final KinesisClientLibConfiguration configuration; + /** * @param command The command that will do processing for this factory's record processors. * @param executorService An executor service to use while processing inputs and outputs of the child process. */ - public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) { - this(command, executorService, new ObjectMapper()); + public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, + KinesisClientLibConfiguration configuration) { + this(command, executorService, new ObjectMapper(), configuration); } /** @@ -52,11 +56,13 @@ public MultiLangRecordProcessorFactory(String command, ExecutorService executorS * @param executorService An executor service to use while processing inputs and outputs of the child process. * @param objectMapper An object mapper used to convert messages to json to be written to the child process */ - public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) { + public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper, + KinesisClientLibConfiguration configuration) { this.command = command; this.commandArray = command.split(COMMAND_DELIMETER_REGEX); this.executorService = executorService; this.objectMapper = objectMapper; + this.configuration = configuration; } @Override @@ -65,7 +71,8 @@ public IRecordProcessor createProcessor() { /* * Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments. */ - return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper); + return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper, + this.configuration); } String[] getCommandArray() { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index e1827799b..3f35b8fa2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -14,64 +14,78 @@ */ package com.amazonaws.services.kinesis.multilang; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; - +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; - import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.google.common.util.concurrent.SettableFuture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class MultiLangProtocolTest { private static final List EMPTY_RECORD_LIST = Collections.emptyList(); + @Mock private MultiLangProtocol protocol; + @Mock private MessageWriter messageWriter; + @Mock private MessageReader messageReader; private String shardId; + @Mock private IRecordProcessorCheckpointer checkpointer; + @Mock + private KinesisClientLibConfiguration configuration; @Before public void setup() { this.shardId = "shard-id-123"; - messageWriter = Mockito.mock(MessageWriter.class); - messageReader = Mockito.mock(MessageReader.class); - protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId)); - checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + protocol = new MultiLangProtocolForTesting(messageReader, messageWriter, + new InitializationInput().withShardId(shardId), configuration); + + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty()); } private Future buildFuture(T value) { @@ -165,7 +179,10 @@ public void processRecordsWithCheckpointsTest() throws InterruptedException, Exe this.add(new StatusMessage("processRecords")); } })); - assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true)); + + boolean result = protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)); + + assertThat(result, equalTo(true)); verify(checkpointer, timeout(1)).checkpoint(); verify(checkpointer, timeout(1)).checkpoint("123", 0L); @@ -183,4 +200,50 @@ public void processRecordsWithABadCheckpointTest() throws InterruptedException, })); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); } + + @Test(expected = NullPointerException.class) + public void waitForStatusMessageTimeoutTest() throws InterruptedException, TimeoutException, ExecutionException { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + Future future = Mockito.mock(Future.class); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(future); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); + when(future.get(anyInt(), eq(TimeUnit.SECONDS))).thenThrow(TimeoutException.class); + protocol = new MultiLangProtocolForTesting(messageReader, + messageWriter, + new InitializationInput().withShardId(shardId), + configuration); + + protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)); + } + + @Test + public void waitForStatusMessageSuccessTest() { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.of(5)); + + assertTrue(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST))); + } + + private class MultiLangProtocolForTesting extends MultiLangProtocol { + /** + * Constructor. + * + * @param messageReader A message reader. + * @param messageWriter A message writer. + * @param initializationInput + * @param configuration + */ + MultiLangProtocolForTesting(final MessageReader messageReader, + final MessageWriter messageWriter, + final InitializationInput initializationInput, + final KinesisClientLibConfiguration configuration) { + super(messageReader, messageWriter, initializationInput, configuration); + } + + @Override + protected void haltJvm(final int exitStatus) { + throw new NullPointerException(); + } + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java index a8f5885bb..aa6aceeaa 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java @@ -14,16 +14,24 @@ */ package com.amazonaws.services.kinesis.multilang; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import org.junit.Assert; import org.junit.Test; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorFactoryTest { + @Mock + private KinesisClientLibConfiguration configuration; + @Test public void createProcessorTest() { - MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null); + MultiLangRecordProcessorFactory factory = new MultiLangRecordProcessorFactory("somecommand", null, configuration); IRecordProcessor processor = factory.createProcessor(); Assert.assertEquals("Should have constructed a StreamingRecordProcessor", MultiLangRecordProcessor.class, diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 2c02b5e9a..d27b94808 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -14,41 +14,13 @@ */ package com.amazonaws.services.kinesis.multilang; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; @@ -59,6 +31,35 @@ import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorTest { @@ -108,6 +109,9 @@ public void checkpoint(String sequenceNumber, long subSequenceNumber) private MultiLangRecordProcessor recordProcessor; + @Mock + private KinesisClientLibConfiguration configuration; + @Before public void prepare() throws IOException, InterruptedException, ExecutionException { // Fake command @@ -121,10 +125,11 @@ public void prepare() throws IOException, InterruptedException, ExecutionExcepti messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); errorReader = Mockito.mock(DrainChildSTDERRTask.class); + when(configuration.getTimeoutInSeconds()).thenReturn(Optional.empty()); recordProcessor = new MultiLangRecordProcessor(new ProcessBuilder(), executor, new ObjectMapper(), messageWriter, - messageReader, errorReader) { + messageReader, errorReader, configuration) { // Just don't do anything when we exit. void exit() {