Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added IKinesisProxy injector in Worker.Builder #274

Merged
merged 9 commits into from
Jan 15, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import lombok.Getter;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
Expand Down Expand Up @@ -1062,6 +1064,7 @@ static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Builder to construct a Worker instance.
*/
@Getter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no use for the builder to have a getter, we should be testing what the builder has created, if that has the correct value.

public static class Builder {

private IRecordProcessorFactory recordProcessorFactory;
Expand All @@ -1073,6 +1076,7 @@ public static class Builder {
private IMetricsFactory metricsFactory;
private ExecutorService execService;
private ShardPrioritization shardPrioritization;
private IKinesisProxy kinesisProxy;

/**
* Default constructor.
Expand Down Expand Up @@ -1192,6 +1196,19 @@ public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
return this;
}

/**
* Set KinesisProxy for the worker.
*
* @param kinesisProxy
* KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this comment, it would be confusing for the customers since KCL already has DDB endpoints. Can you remove DynaoDBStreams from the comment.

*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
this.kinesisProxy = kinesisProxy;
return this;
}

/**
* Build the Worker instance.
*
Expand Down Expand Up @@ -1252,18 +1269,20 @@ public Worker build() {
}
}
if (metricsFactory == null) {
metricsFactory = getMetricsFactory(cloudWatchClient, config);
metricsFactory = Worker.getMetricsFactory(cloudWatchClient, config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this being changed? The getMetricsFactory method is a static method which is accessible within the same class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was changed because the Lombok Getter had the same name as the static method.

}
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);
}

if (kinesisProxy == null) {
kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName());
}

return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
new StreamConfig(kinesisProxy,
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand All @@ -41,6 +43,7 @@
import java.lang.Thread.State;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -108,6 +111,13 @@
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand Down Expand Up @@ -1474,6 +1484,71 @@ public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {

}

@Test
public void testBuilderWithDefaultKinesisProxy() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);

Worker.Builder builder = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config);
builder.build();
assertNotNull(builder.getKinesisProxy());
assertTrue(builder.getKinesisProxy() instanceof KinesisProxy);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct way to test the proxy that is set. We need to make sure that the worker is getting the proxy that we expect it to use.

}

@Test
public void testBuilderWhenKinesisProxyIsSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IKinesisProxy kinesisProxy = new DummyKinesisProxy();
Worker.Builder builder = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.kinesisProxy(kinesisProxy)
.config(config);
builder.build();
assertNotNull(builder.getKinesisProxy());
assertTrue(builder.getKinesisProxy() instanceof DummyKinesisProxy);
}

// KinesisProxyImplementation to test KinesisProxy injection.
private class DummyKinesisProxy implements IKinesisProxy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to use KinesisLocalFileProxy instead of creating a new one. Let me get back to you on that one.

@Override public GetRecordsResult get(String shardIterator, int maxRecords)
throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
return null;
}

@Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException {
return null;
}

@Override public Set<String> getAllShardIds() throws ResourceNotFoundException {
return null;
}

@Override public List<Shard> getShardList() throws ResourceNotFoundException {
return null;
}

@Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber)
throws ResourceNotFoundException, InvalidArgumentException {
return null;
}

@Override public String getIterator(String shardId, String iteratorEnum)
throws ResourceNotFoundException, InvalidArgumentException {
return null;
}

@Override public String getIterator(String shardId, Date timestamp)
throws ResourceNotFoundException, InvalidArgumentException {
return null;
}

@Override public PutRecordResult put(String sequenceNumberForOrdering, String explicitHashKey,
String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException {
return null;
}
}

private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,
Expand Down