diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/AmazonKinesisMock.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java 2019-10-26 20:23:24.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/AmazonKinesisMock.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,83 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static java.lang.Integer.parseInt; import static java.lang.Math.min; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.transform; import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode; -import com.amazonaws.AmazonWebServiceRequest; -import com.amazonaws.ResponseMetadata; -import com.amazonaws.http.HttpResponse; -import com.amazonaws.http.SdkHttpMetadata; -import com.amazonaws.regions.Region; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest; -import com.amazonaws.services.kinesis.model.AddTagsToStreamResult; -import com.amazonaws.services.kinesis.model.CreateStreamRequest; -import com.amazonaws.services.kinesis.model.CreateStreamResult; -import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; -import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult; -import com.amazonaws.services.kinesis.model.DeleteStreamRequest; -import com.amazonaws.services.kinesis.model.DeleteStreamResult; -import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerRequest; -import com.amazonaws.services.kinesis.model.DeregisterStreamConsumerResult; -import com.amazonaws.services.kinesis.model.DescribeLimitsRequest; -import com.amazonaws.services.kinesis.model.DescribeLimitsResult; -import com.amazonaws.services.kinesis.model.DescribeStreamConsumerRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamConsumerResult; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamSummaryResult; -import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest; -import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult; -import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest; -import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; -import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult; -import com.amazonaws.services.kinesis.model.ListShardsRequest; -import com.amazonaws.services.kinesis.model.ListShardsResult; -import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest; -import com.amazonaws.services.kinesis.model.ListStreamConsumersResult; -import com.amazonaws.services.kinesis.model.ListStreamsRequest; -import com.amazonaws.services.kinesis.model.ListStreamsResult; -import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest; -import com.amazonaws.services.kinesis.model.ListTagsForStreamResult; -import com.amazonaws.services.kinesis.model.MergeShardsRequest; -import com.amazonaws.services.kinesis.model.MergeShardsResult; -import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.amazonaws.services.kinesis.model.PutRecordResult; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.RegisterStreamConsumerRequest; -import com.amazonaws.services.kinesis.model.RegisterStreamConsumerResult; -import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest; -import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.SplitShardRequest; -import com.amazonaws.services.kinesis.model.SplitShardResult; -import com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest; -import com.amazonaws.services.kinesis.model.StartStreamEncryptionResult; -import com.amazonaws.services.kinesis.model.StopStreamEncryptionRequest; -import com.amazonaws.services.kinesis.model.StopStreamEncryptionResult; -import com.amazonaws.services.kinesis.model.StreamDescription; -import com.amazonaws.services.kinesis.model.UpdateShardCountRequest; -import com.amazonaws.services.kinesis.model.UpdateShardCountResult; -import com.amazonaws.services.kinesis.producer.IKinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; -import com.amazonaws.services.kinesis.waiters.AmazonKinesisWaiters; import java.io.Serializable; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -100,9 +31,69 @@ import org.apache.commons.lang.builder.EqualsBuilder; import org.joda.time.Instant; import org.mockito.Mockito; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamResponse; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodResponse; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringResponse; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringResponse; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodResponse; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamResponse; +import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; +import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamResponse; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; +import software.amazon.awssdk.services.kinesis.model.SplitShardResponse; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionResponse; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionResponse; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; -/** Mock implemenation of {@link AmazonKinesis} for testing. */ -class AmazonKinesisMock implements AmazonKinesis { +/** Mock implementation of {@link KinesisClient} for testing. */ +class AmazonKinesisMock implements KinesisClient { static class TestData implements Serializable { @@ -124,11 +115,12 @@ } public Record convertToRecord() { - return new Record() - .withApproximateArrivalTimestamp(arrivalTimestamp.toDate()) - .withData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))) - .withSequenceNumber(sequenceNumber) - .withPartitionKey(""); + return Record.builder() + .approximateArrivalTimestamp(TimeUtil.toJava(arrivalTimestamp)) + .data(SdkBytes.fromByteArray(data.getBytes(StandardCharsets.UTF_8))) + .sequenceNumber(sequenceNumber) + .partitionKey("") + .build(); } @Override @@ -167,7 +159,7 @@ } @Override - public AmazonKinesis getKinesisClient() { + public KinesisClient getKinesisClient() { return new AmazonKinesisMock( shardedData.stream() .map(testDatas -> transform(testDatas, TestData::convertToRecord)) @@ -176,13 +168,13 @@ } @Override - public AmazonCloudWatch getCloudWatchClient() { - return Mockito.mock(AmazonCloudWatch.class); + public KinesisAsyncClient getKinesisAsyncClient() { + return Mockito.mock(KinesisAsyncClient.class); } @Override - public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) { - throw new RuntimeException("Not implemented"); + public CloudWatchClient getCloudWatchClient() { + return Mockito.mock(CloudWatchClient.class); } } @@ -195,286 +187,199 @@ } @Override - public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { + public String serviceName() { + return null; + } + + @Override + public void close() {} + + @Override + public GetRecordsResponse getRecords(GetRecordsRequest getRecordsRequest) { List shardIteratorParts = - Splitter.on(':').splitToList(getRecordsRequest.getShardIterator()); + Splitter.on(':').splitToList(getRecordsRequest.shardIterator()); int shardId = parseInt(shardIteratorParts.get(0)); int startingRecord = parseInt(shardIteratorParts.get(1)); List shardData = shardedData.get(shardId); int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); int fromIndex = min(startingRecord, toIndex); - return new GetRecordsResult() - .withRecords(shardData.subList(fromIndex, toIndex)) - .withNextShardIterator(String.format("%s:%s", shardId, toIndex)) - .withMillisBehindLatest(0L); + return GetRecordsResponse.builder() + .records(shardData.subList(fromIndex, toIndex)) + .nextShardIterator(String.format("%s:%s", shardId, toIndex)) + .millisBehindLatest(0L) + .build(); } @Override - public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) { - ShardIteratorType shardIteratorType = - ShardIteratorType.fromValue(getShardIteratorRequest.getShardIteratorType()); + public GetShardIteratorResponse getShardIterator( + GetShardIteratorRequest getShardIteratorRequest) { + ShardIteratorType shardIteratorType = getShardIteratorRequest.shardIteratorType(); String shardIterator; if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { - shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); + shardIterator = String.format("%s:%s", getShardIteratorRequest.shardId(), 0); } else { throw new RuntimeException("Not implemented"); } - return new GetShardIteratorResult().withShardIterator(shardIterator); + return GetShardIteratorResponse.builder().shardIterator(shardIterator).build(); } @Override - public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { + public DescribeStreamResponse describeStream(DescribeStreamRequest describeStreamRequest) { int nextShardId = 0; - if (exclusiveStartShardId != null) { - nextShardId = parseInt(exclusiveStartShardId) + 1; + if (describeStreamRequest.exclusiveStartShardId() != null) { + nextShardId = parseInt(describeStreamRequest.exclusiveStartShardId()) + 1; } boolean hasMoreShards = nextShardId + 1 < shardedData.size(); List shards = new ArrayList<>(); if (nextShardId < shardedData.size()) { - shards.add(new Shard().withShardId(Integer.toString(nextShardId))); + shards.add(Shard.builder().shardId(Integer.toString(nextShardId)).build()); } - HttpResponse response = new HttpResponse(null, null); - response.setStatusCode(200); - DescribeStreamResult result = new DescribeStreamResult(); - result.setSdkHttpMetadata(SdkHttpMetadata.from(response)); - result.withStreamDescription( - new StreamDescription() - .withHasMoreShards(hasMoreShards) - .withShards(shards) - .withStreamName(streamName)); - return result; + DescribeStreamResponse.Builder builder = + DescribeStreamResponse.builder() + .streamDescription( + s -> + s.hasMoreShards(hasMoreShards) + .shards(shards) + .streamName(describeStreamRequest.streamName())); + builder.sdkHttpResponse(SdkHttpResponse.builder().statusCode(200).build()); + return builder.build(); } @Override - public void setEndpoint(String endpoint) {} - - @Override - public void setRegion(Region region) {} - - @Override - public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { + public AddTagsToStreamResponse addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { throw new RuntimeException("Not implemented"); } @Override - public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { + public CreateStreamResponse createStream(CreateStreamRequest createStreamRequest) { throw new RuntimeException("Not implemented"); } @Override - public CreateStreamResult createStream(String streamName, Integer shardCount) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod( + public DecreaseStreamRetentionPeriodResponse decreaseStreamRetentionPeriod( DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) { throw new RuntimeException("Not implemented"); } @Override - public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DeleteStreamResult deleteStream(String streamName) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DeregisterStreamConsumerResult deregisterStreamConsumer( - DeregisterStreamConsumerRequest deregisterStreamConsumerRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DescribeLimitsResult describeLimits(DescribeLimitsRequest describeLimitsRequest) { + public DeleteStreamResponse deleteStream(DeleteStreamRequest deleteStreamRequest) { throw new RuntimeException("Not implemented"); } @Override - public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { + public DescribeLimitsResponse describeLimits(DescribeLimitsRequest describeLimitsRequest) { throw new RuntimeException("Not implemented"); } @Override - public DescribeStreamResult describeStream(String streamName) { - return describeStream(streamName, null); - } - - @Override - public DescribeStreamResult describeStream( - String streamName, Integer limit, String exclusiveStartShardId) { - throw new RuntimeException("Not implemented"); - } - - @Override - public DescribeStreamConsumerResult describeStreamConsumer( + public DescribeStreamConsumerResponse describeStreamConsumer( DescribeStreamConsumerRequest describeStreamConsumerRequest) { throw new RuntimeException("Not implemented"); } @Override - public DescribeStreamSummaryResult describeStreamSummary( + public DescribeStreamSummaryResponse describeStreamSummary( DescribeStreamSummaryRequest describeStreamSummaryRequest) { throw new RuntimeException("Not implemented"); } @Override - public DisableEnhancedMonitoringResult disableEnhancedMonitoring( + public DisableEnhancedMonitoringResponse disableEnhancedMonitoring( DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) { throw new RuntimeException("Not implemented"); } @Override - public EnableEnhancedMonitoringResult enableEnhancedMonitoring( + public EnableEnhancedMonitoringResponse enableEnhancedMonitoring( EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) { throw new RuntimeException("Not implemented"); } @Override - public GetShardIteratorResult getShardIterator( - String streamName, String shardId, String shardIteratorType) { - throw new RuntimeException("Not implemented"); - } - - @Override - public GetShardIteratorResult getShardIterator( - String streamName, String shardId, String shardIteratorType, String startingSequenceNumber) { - throw new RuntimeException("Not implemented"); - } - - @Override - public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod( + public IncreaseStreamRetentionPeriodResponse increaseStreamRetentionPeriod( IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) { throw new RuntimeException("Not implemented"); } @Override - public ListShardsResult listShards(ListShardsRequest listShardsRequest) { + public ListShardsResponse listShards(ListShardsRequest listShardsRequest) { throw new RuntimeException("Not implemented"); } @Override - public ListStreamConsumersResult listStreamConsumers( + public ListStreamConsumersResponse listStreamConsumers( ListStreamConsumersRequest listStreamConsumersRequest) { throw new RuntimeException("Not implemented"); } @Override - public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListStreamsResult listStreams() { + public ListStreamsResponse listStreams(ListStreamsRequest listStreamsRequest) { throw new RuntimeException("Not implemented"); } @Override - public ListStreamsResult listStreams(String exclusiveStartStreamName) { + public ListStreamsResponse listStreams() { throw new RuntimeException("Not implemented"); } @Override - public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) { - throw new RuntimeException("Not implemented"); - } - - @Override - public ListTagsForStreamResult listTagsForStream( + public ListTagsForStreamResponse listTagsForStream( ListTagsForStreamRequest listTagsForStreamRequest) { throw new RuntimeException("Not implemented"); } @Override - public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public MergeShardsResult mergeShards( - String streamName, String shardToMerge, String adjacentShardToMerge) { - throw new RuntimeException("Not implemented"); - } - - @Override - public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { + public MergeShardsResponse mergeShards(MergeShardsRequest mergeShardsRequest) { throw new RuntimeException("Not implemented"); } @Override - public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) { + public PutRecordResponse putRecord(PutRecordRequest putRecordRequest) { throw new RuntimeException("Not implemented"); } @Override - public PutRecordResult putRecord( - String streamName, ByteBuffer data, String partitionKey, String sequenceNumberForOrdering) { + public PutRecordsResponse putRecords(PutRecordsRequest putRecordsRequest) { throw new RuntimeException("Not implemented"); } @Override - public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public RegisterStreamConsumerResult registerStreamConsumer( + public RegisterStreamConsumerResponse registerStreamConsumer( RegisterStreamConsumerRequest registerStreamConsumerRequest) { throw new RuntimeException("Not implemented"); } @Override - public RemoveTagsFromStreamResult removeTagsFromStream( + public RemoveTagsFromStreamResponse removeTagsFromStream( RemoveTagsFromStreamRequest removeTagsFromStreamRequest) { throw new RuntimeException("Not implemented"); } @Override - public SplitShardResult splitShard(SplitShardRequest splitShardRequest) { + public SplitShardResponse splitShard(SplitShardRequest splitShardRequest) { throw new RuntimeException("Not implemented"); } @Override - public SplitShardResult splitShard( - String streamName, String shardToSplit, String newStartingHashKey) { - throw new RuntimeException("Not implemented"); - } - - @Override - public StartStreamEncryptionResult startStreamEncryption( + public StartStreamEncryptionResponse startStreamEncryption( StartStreamEncryptionRequest startStreamEncryptionRequest) { throw new RuntimeException("Not implemented"); } @Override - public StopStreamEncryptionResult stopStreamEncryption( + public StopStreamEncryptionResponse stopStreamEncryption( StopStreamEncryptionRequest stopStreamEncryptionRequest) { throw new RuntimeException("Not implemented"); } @Override - public UpdateShardCountResult updateShardCount(UpdateShardCountRequest updateShardCountRequest) { - throw new RuntimeException("Not implemented"); - } - - @Override - public void shutdown() {} - - @Override - public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { - throw new RuntimeException("Not implemented"); - } - - @Override - public AmazonKinesisWaiters waiters() { + public UpdateShardCountResponse updateShardCount( + UpdateShardCountRequest updateShardCountRequest) { throw new RuntimeException("Not implemented"); } } diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/CustomOptionalTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/CustomOptionalTest.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import com.google.common.testing.EqualsTester; import java.util.NoSuchElementException; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGeneratorTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java 2019-10-24 21:25:10.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGeneratorTest.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,22 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.model.Shard; import java.util.Set; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.kinesis.common.InitialPositionInStream; /** * */ -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(Shard.class) public class DynamicCheckpointGeneratorTest { @Mock private SimplifiedKinesisClient kinesisClient; @@ -39,16 +41,16 @@ @Test public void shouldMapAllShardsToCheckpoints() throws Exception { - when(shard1.getShardId()).thenReturn("shard-01"); - when(shard2.getShardId()).thenReturn("shard-02"); - when(shard3.getShardId()).thenReturn("shard-03"); + when(shard1.shardId()).thenReturn("shard-01"); + when(shard2.shardId()).thenReturn("shard-02"); + when(shard3.shardId()).thenReturn("shard-03"); Set shards = Sets.newHashSet(shard1, shard2, shard3); StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); when(startingPointShardsFinder.findShardsAtStartingPoint( kinesisClient, "stream", startingPoint)) .thenReturn(shards); DynamicCheckpointGenerator underTest = - new DynamicCheckpointGenerator("stream", startingPoint, startingPointShardsFinder); + new DynamicCheckpointGenerator("stream", null, startingPoint, startingPointShardsFinder); KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); @@ -57,9 +59,9 @@ @Test public void shouldMapAllValidShardsToCheckpoints() throws Exception { - when(shard1.getShardId()).thenReturn("shard-01"); - when(shard2.getShardId()).thenReturn("shard-02"); - when(shard3.getShardId()).thenReturn("shard-03"); + when(shard1.shardId()).thenReturn("shard-01"); + when(shard2.shardId()).thenReturn("shard-02"); + when(shard3.shardId()).thenReturn("shard-03"); String streamName = "stream"; Set shards = Sets.newHashSet(shard1, shard2); StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); @@ -68,11 +70,11 @@ .thenReturn(shards); DynamicCheckpointGenerator underTest = - new DynamicCheckpointGenerator(streamName, startingPoint, startingPointShardsFinder); + new DynamicCheckpointGenerator(streamName, null, startingPoint, startingPointShardsFinder); KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); assertThat(checkpoint) .hasSize(2) - .doesNotContain(new ShardCheckpoint(streamName, shard3.getShardId(), startingPoint)); + .doesNotContain(new ShardCheckpoint(streamName, shard3.shardId(), null, startingPoint)); } } diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisIOIT.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisIOIT.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,16 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Random; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.kinesis.KinesisPartitioner; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -40,6 +40,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; /** * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link @@ -79,7 +81,7 @@ .apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes())) .apply( "Write to Kinesis", - KinesisIO.write() + org.apache.beam.sdk.io.kinesis.KinesisIO.write() .withStreamName(options.getAwsKinesisStream()) .withPartitioner(new RandomPartitioner()) .withAWSClientsProvider( @@ -99,7 +101,7 @@ .withAWSClientsProvider( options.getAwsAccessKey(), options.getAwsSecretKey(), - Regions.fromName(options.getAwsKinesisRegion())) + Region.of(options.getAwsKinesisRegion())) .withMaxNumRecords(numberOfRows) // to prevent endless running in case of error .withMaxReadTime(Duration.standardMinutes(10)) diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisMockReadTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisMockReadTest.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,11 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import java.util.List; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -32,6 +31,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import software.amazon.kinesis.common.InitialPositionInStream; /** Tests {@link AmazonKinesisMock}. */ @RunWith(JUnit4.class) Only in sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/: KinesisMockWriteTest.java Only in sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/: KinesisProducerMock.java diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpointTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpointTest.java 2019-10-26 15:27:02.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java 2019-10-22 13:43:07.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderTest.java 2019-10-26 15:28:03.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoderTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoderTest.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisServiceMock.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisServiceMock.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisServiceMock.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisTestOptions.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/KinesisTestOptions.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/RecordFilterTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/RecordFilterTest.java 2019-10-26 15:29:02.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.mockito.Mockito.when; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpointTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpointTest.java 2019-10-27 12:35:17.000000000 -0700 @@ -15,22 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON; -import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; -import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; -import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; +import static software.amazon.kinesis.common.InitialPositionInStream.LATEST; +import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import java.io.IOException; import org.joda.time.DateTime; import org.joda.time.Instant; @@ -39,6 +37,8 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** */ @RunWith(MockitoJUnitRunner.class) @@ -83,12 +83,12 @@ @Test public void testComparisonWithExtendedSequenceNumber() { assertThat( - new ShardCheckpoint("", "", new StartingPoint(LATEST)) + new ShardCheckpoint("", "", null, new StartingPoint(LATEST)) .isBeforeOrAt(recordWith(new ExtendedSequenceNumber("100", 0L)))) .isTrue(); assertThat( - new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)) + new ShardCheckpoint("", "", null, new StartingPoint(TRIM_HORIZON)) .isBeforeOrAt(recordWith(new ExtendedSequenceNumber("100", 0L)))) .isTrue(); @@ -147,7 +147,7 @@ private ShardCheckpoint checkpoint( ShardIteratorType iteratorType, String sequenceNumber, Long subSequenceNumber) { return new ShardCheckpoint( - STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, subSequenceNumber); + STREAM_NAME, SHARD_ID, null, iteratorType, sequenceNumber, subSequenceNumber); } private KinesisRecord recordWith(Instant approximateArrivalTimestamp) { @@ -157,6 +157,6 @@ } private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) { - return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp); + return new ShardCheckpoint(STREAM_NAME, SHARD_ID, null, iteratorType, timestamp); } } diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPoolTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java 2019-10-22 13:43:07.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPoolTest.java 2019-10-27 12:35:17.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIteratorTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIteratorTest.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -24,7 +24,6 @@ import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.when; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import java.io.IOException; import java.util.Collections; import org.joda.time.Duration; @@ -36,6 +35,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; /** Tests {@link ShardRecordsIterator}. */ @RunWith(MockitoJUnitRunner.Silent.class) diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClientTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClientTest.java 2019-10-27 12:35:17.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; @@ -25,25 +25,6 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.AmazonServiceException.ErrorType; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.cloudwatch.model.Datapoint; -import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest; -import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.LimitExceededException; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.StreamDescription; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -56,6 +37,27 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.model.Datapoint; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsResponse; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; /** * */ @RunWith(MockitoJUnitRunner.class) @@ -68,19 +70,21 @@ private static final String SHARD_ITERATOR = "iterator"; private static final String SEQUENCE_NUMBER = "abc123"; - @Mock private AmazonKinesis kinesis; - @Mock private AmazonCloudWatch cloudWatch; + @Mock private KinesisClient kinesis; + @Mock private KinesisAsyncClient kinesisAsync; + @Mock private CloudWatchClient cloudWatch; @InjectMocks private SimplifiedKinesisClient underTest; @Test public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { when(kinesis.getShardIterator( - new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .withStartingSequenceNumber(SEQUENCE_NUMBER))) - .thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR)); + GetShardIteratorRequest.builder() + .streamName(STREAM) + .shardId(SHARD_1) + .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .startingSequenceNumber(SEQUENCE_NUMBER) + .build())) + .thenReturn(GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build()); String stream = underTest.getShardIterator( @@ -93,12 +97,13 @@ public void shouldReturnIteratorStartingWithTimestamp() throws Exception { Instant timestamp = Instant.now(); when(kinesis.getShardIterator( - new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .withTimestamp(timestamp.toDate()))) - .thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR)); + GetShardIteratorRequest.builder() + .streamName(STREAM) + .shardId(SHARD_1) + .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .timestamp(TimeUtil.toJava(timestamp)) + .build())) + .thenReturn(GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build()); String stream = underTest.getShardIterator( @@ -110,31 +115,30 @@ @Test public void shouldHandleExpiredIterationExceptionForGetShardIterator() { shouldHandleGetShardIteratorError( - new ExpiredIteratorException(""), ExpiredIteratorException.class); + ExpiredIteratorException.builder().build(), ExpiredIteratorException.class); } @Test public void shouldHandleLimitExceededExceptionForGetShardIterator() { shouldHandleGetShardIteratorError( - new LimitExceededException(""), TransientKinesisException.class); + LimitExceededException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { shouldHandleGetShardIteratorError( - new ProvisionedThroughputExceededException(""), TransientKinesisException.class); + ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleServiceErrorForGetShardIterator() { shouldHandleGetShardIteratorError( - newAmazonServiceException(ErrorType.Service), TransientKinesisException.class); + SdkServiceException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleClientErrorForGetShardIterator() { - shouldHandleGetShardIteratorError( - newAmazonServiceException(ErrorType.Client), RuntimeException.class); + shouldHandleGetShardIteratorError(SdkClientException.builder().build(), RuntimeException.class); } @Test @@ -145,10 +149,11 @@ private void shouldHandleGetShardIteratorError( Exception thrownException, Class expectedExceptionClass) { GetShardIteratorRequest request = - new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.LATEST); + GetShardIteratorRequest.builder() + .streamName(STREAM) + .shardId(SHARD_1) + .shardIteratorType(ShardIteratorType.LATEST) + .build(); when(kinesis.getShardIterator(request)).thenThrow(thrownException); @@ -164,19 +169,24 @@ @Test public void shouldListAllShards() throws Exception { - Shard shard1 = new Shard().withShardId(SHARD_1); - Shard shard2 = new Shard().withShardId(SHARD_2); - Shard shard3 = new Shard().withShardId(SHARD_3); - when(kinesis.describeStream(STREAM, null)) + Shard shard1 = Shard.builder().shardId(SHARD_1).build(); + Shard shard2 = Shard.builder().shardId(SHARD_2).build(); + Shard shard3 = Shard.builder().shardId(SHARD_3).build(); + when(kinesis.describeStream( + DescribeStreamRequest.builder().streamName(STREAM).exclusiveStartShardId(null).build())) .thenReturn( - new DescribeStreamResult() - .withStreamDescription( - new StreamDescription().withShards(shard1, shard2).withHasMoreShards(true))); - when(kinesis.describeStream(STREAM, SHARD_2)) + DescribeStreamResponse.builder() + .streamDescription(s -> s.shards(shard1, shard2).hasMoreShards(true)) + .build()); + when(kinesis.describeStream( + DescribeStreamRequest.builder() + .streamName(STREAM) + .exclusiveStartShardId(SHARD_2) + .build())) .thenReturn( - new DescribeStreamResult() - .withStreamDescription( - new StreamDescription().withShards(shard3).withHasMoreShards(false))); + DescribeStreamResponse.builder() + .streamDescription(s -> s.shards(shard3).hasMoreShards(false)) + .build()); List shards = underTest.listShards(STREAM); @@ -185,30 +195,31 @@ @Test public void shouldHandleExpiredIterationExceptionForShardListing() { - shouldHandleShardListingError(new ExpiredIteratorException(""), ExpiredIteratorException.class); + shouldHandleShardListingError( + ExpiredIteratorException.builder().build(), ExpiredIteratorException.class); } @Test public void shouldHandleLimitExceededExceptionForShardListing() { - shouldHandleShardListingError(new LimitExceededException(""), TransientKinesisException.class); + shouldHandleShardListingError( + LimitExceededException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { shouldHandleShardListingError( - new ProvisionedThroughputExceededException(""), TransientKinesisException.class); + ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleServiceErrorForShardListing() { shouldHandleShardListingError( - newAmazonServiceException(ErrorType.Service), TransientKinesisException.class); + SdkServiceException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleClientErrorForShardListing() { - shouldHandleShardListingError( - newAmazonServiceException(ErrorType.Client), RuntimeException.class); + shouldHandleShardListingError(SdkClientException.builder().build(), RuntimeException.class); } @Test @@ -218,7 +229,7 @@ private void shouldHandleShardListingError( Exception thrownException, Class expectedExceptionClass) { - when(kinesis.describeStream(STREAM, null)).thenThrow(thrownException); + when(kinesis.describeStream(any(DescribeStreamRequest.class))).thenThrow(thrownException); try { underTest.listShards(STREAM); failBecauseExceptionWasNotThrown(expectedExceptionClass); @@ -236,8 +247,10 @@ Minutes periodTime = Minutes.minutesBetween(countSince, countTo); GetMetricStatisticsRequest metricStatisticsRequest = underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); - GetMetricStatisticsResult result = - new GetMetricStatisticsResult().withDatapoints(new Datapoint().withSum(1.0)); + GetMetricStatisticsResponse result = + GetMetricStatisticsResponse.builder() + .datapoints(Datapoint.builder().sum(1.0).build()) + .build(); when(cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn(result); @@ -253,12 +266,13 @@ Minutes periodTime = Minutes.minutesBetween(countSince, countTo); GetMetricStatisticsRequest metricStatisticsRequest = underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); - GetMetricStatisticsResult result = - new GetMetricStatisticsResult() - .withDatapoints( - new Datapoint().withSum(1.0), - new Datapoint().withSum(3.0), - new Datapoint().withSum(2.0)); + GetMetricStatisticsResponse result = + GetMetricStatisticsResponse.builder() + .datapoints( + Datapoint.builder().sum(1.0).build(), + Datapoint.builder().sum(3.0).build(), + Datapoint.builder().sum(2.0).build()) + .build(); when(cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn(result); @@ -281,25 +295,24 @@ @Test public void shouldHandleLimitExceededExceptionForGetBacklogBytes() { shouldHandleGetBacklogBytesError( - new LimitExceededException(""), TransientKinesisException.class); + LimitExceededException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() { shouldHandleGetBacklogBytesError( - new ProvisionedThroughputExceededException(""), TransientKinesisException.class); + ProvisionedThroughputExceededException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleServiceErrorForGetBacklogBytes() { shouldHandleGetBacklogBytesError( - newAmazonServiceException(ErrorType.Service), TransientKinesisException.class); + SdkServiceException.builder().build(), TransientKinesisException.class); } @Test public void shouldHandleClientErrorForGetBacklogBytes() { - shouldHandleGetBacklogBytesError( - newAmazonServiceException(ErrorType.Client), RuntimeException.class); + shouldHandleGetBacklogBytesError(SdkClientException.builder().build(), RuntimeException.class); } @Test @@ -326,22 +339,19 @@ } } - private AmazonServiceException newAmazonServiceException(ErrorType errorType) { - AmazonServiceException exception = new AmazonServiceException(""); - exception.setErrorType(errorType); - return exception; - } - @Test public void shouldReturnLimitedNumberOfRecords() throws Exception { final Integer limit = 100; doAnswer( - (Answer) + (Answer) invocation -> { GetRecordsRequest request = (GetRecordsRequest) invocation.getArguments()[0]; - List records = generateRecords(request.getLimit()); - return new GetRecordsResult().withRecords(records).withMillisBehindLatest(1000L); + List records = generateRecords(request.limit()); + return GetRecordsResponse.builder() + .records(records) + .millisBehindLatest(1000L) + .build(); }) .when(kinesis) .getRecords(any(GetRecordsRequest.class)); @@ -356,10 +366,11 @@ byte[] value = new byte[1024]; Arrays.fill(value, (byte) i); records.add( - new Record() - .withSequenceNumber(String.valueOf(i)) - .withPartitionKey("key") - .withData(ByteBuffer.wrap(value))); + Record.builder() + .sequenceNumber(String.valueOf(i)) + .partitionKey("key") + .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(value))) + .build()); } return records; } diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinderTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinderTest.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,16 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import java.util.Collections; import java.util.List; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -32,6 +28,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.retrieval.KinesisClientRecord; /** Tests StartingPointShardsFinder. */ @RunWith(JUnit4.class) @@ -61,18 +61,30 @@ */ private final Shard shard00 = createClosedShard("0000"); private final Shard shard01 = createClosedShard("0001"); - private final Shard shard02 = createClosedShard("0002").withParentShardId("0000"); - private final Shard shard03 = createClosedShard("0003").withParentShardId("0000"); - private final Shard shard04 = createClosedShard("0004").withParentShardId("0001"); - private final Shard shard05 = createClosedShard("0005").withParentShardId("0001"); + private final Shard shard02 = createClosedShard("0002").toBuilder().parentShardId("0000").build(); + private final Shard shard03 = createClosedShard("0003").toBuilder().parentShardId("0000").build(); + private final Shard shard04 = createClosedShard("0004").toBuilder().parentShardId("0001").build(); + private final Shard shard05 = createClosedShard("0005").toBuilder().parentShardId("0001").build(); private final Shard shard06 = - createClosedShard("0006").withParentShardId("0003").withAdjacentParentShardId("0004"); - private final Shard shard07 = createClosedShard("0007").withParentShardId("0006"); - private final Shard shard08 = createClosedShard("0008").withParentShardId("0006"); + createClosedShard("0006") + .toBuilder() + .parentShardId("0003") + .adjacentParentShardId("0004") + .build(); + private final Shard shard07 = createClosedShard("0007").toBuilder().parentShardId("0006").build(); + private final Shard shard08 = createClosedShard("0008").toBuilder().parentShardId("0006").build(); private final Shard shard09 = - createOpenShard("0009").withParentShardId("0002").withAdjacentParentShardId("0007"); + createOpenShard("0009") + .toBuilder() + .parentShardId("0002") + .adjacentParentShardId("0007") + .build(); private final Shard shard10 = - createOpenShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005"); + createOpenShard("0010") + .toBuilder() + .parentShardId("0008") + .adjacentParentShardId("0005") + .build(); private final List allShards = ImmutableList.of( @@ -191,7 +203,11 @@ // given StartingPoint latestStartingPoint = new StartingPoint(InitialPositionInStream.LATEST); Shard closedShard10 = - createClosedShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005"); + createClosedShard("0010") + .toBuilder() + .parentShardId("0008") + .adjacentParentShardId("0005") + .build(); List shards = ImmutableList.of( shard00, @@ -213,14 +229,14 @@ } private Shard createClosedShard(String shardId) { - Shard shard = new Shard().withShardId(shardId); + Shard shard = Shard.builder().shardId(shardId).build(); activeAtPoint(shard, ShardIteratorType.TRIM_HORIZON); expiredAtPoint(shard, ShardIteratorType.LATEST); return shard; } private Shard createOpenShard(String shardId) { - Shard shard = new Shard().withShardId(shardId); + Shard shard = Shard.builder().shardId(shardId).build(); activeAtPoint(shard, ShardIteratorType.TRIM_HORIZON); activeAtPoint(shard, ShardIteratorType.LATEST); return shard; @@ -237,13 +253,13 @@ private void activeAtTimestamp(Shard shard, Instant startTimestamp) { prepareShard( shard, - "timestampIterator-" + shard.getShardId(), + "timestampIterator-" + shard.shardId(), ShardIteratorType.AT_TIMESTAMP, startTimestamp); } private void activeAtPoint(Shard shard, ShardIteratorType shardIteratorType) { - prepareShard(shard, shardIteratorType.toString() + shard.getShardId(), shardIteratorType, null); + prepareShard(shard, shardIteratorType.toString() + shard.shardId(), shardIteratorType, null); } private void prepareShard( @@ -252,28 +268,23 @@ ShardIteratorType shardIteratorType, Instant startTimestamp) { try { - String shardIterator = shardIteratorType + shard.getShardId() + "-current"; + String shardIterator = shardIteratorType + shard.shardId() + "-current"; if (shardIteratorType == ShardIteratorType.AT_TIMESTAMP) { when(kinesis.getShardIterator( - STREAM_NAME, - shard.getShardId(), - ShardIteratorType.AT_TIMESTAMP, - null, - startTimestamp)) + STREAM_NAME, shard.shardId(), ShardIteratorType.AT_TIMESTAMP, null, startTimestamp)) .thenReturn(shardIterator); } else { - when(kinesis.getShardIterator( - STREAM_NAME, shard.getShardId(), shardIteratorType, null, null)) + when(kinesis.getShardIterator(STREAM_NAME, shard.shardId(), shardIteratorType, null, null)) .thenReturn(shardIterator); } GetKinesisRecordsResult result = new GetKinesisRecordsResult( - Collections.emptyList(), + Collections.emptyList(), nextIterator, 0, STREAM_NAME, - shard.getShardId()); - when(kinesis.getRecords(shardIterator, STREAM_NAME, shard.getShardId())).thenReturn(result); + shard.shardId()); + when(kinesis.getRecords(shardIterator, STREAM_NAME, shard.shardId())).thenReturn(result); } catch (TransientKinesisException e) { throw new RuntimeException(e); } diff -u sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyTest.java --- sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/test/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyTest.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock;