Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added IKinesisProxy injector in Worker.Builder #274

Merged
merged 9 commits into from
Jan 15, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -996,6 +997,11 @@ public Worker(
metricsFactory, execService);
}

@VisibleForTesting
StreamConfig getStreamConfig() {
return streamConfig;
}

/**
* Given configuration, returns appropriate metrics factory.
*
Expand Down Expand Up @@ -1073,6 +1079,7 @@ public static class Builder {
private IMetricsFactory metricsFactory;
private ExecutorService execService;
private ShardPrioritization shardPrioritization;
private IKinesisProxy kinesisProxy;

/**
* Default constructor.
Expand Down Expand Up @@ -1192,6 +1199,19 @@ public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
return this;
}

/**
* Set KinesisProxy for the worker.
*
* @param kinesisProxy
* Sets an implementation of IKinesisProxy.
*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
this.kinesisProxy = kinesisProxy;
return this;
}

/**
* Build the Worker instance.
*
Expand Down Expand Up @@ -1257,13 +1277,15 @@ public Worker build() {
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);
}

if (kinesisProxy == null) {
kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName());
}

return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
new StreamConfig(kinesisProxy,
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
Expand Down Expand Up @@ -1474,6 +1475,31 @@ public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {

}

@Test
public void testBuilderWithDefaultKinesisProxy() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy);
}

@Test
public void testBuilderWhenKinesisProxyIsSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
// Create an instance of KinesisLocalFileProxy for injection and validation
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.kinesisProxy(kinesisProxy)
.build();
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
}

private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,
Expand Down