diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/AWSClientsProvider.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java 2019-10-23 19:05:10.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/AWSClientsProvider.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.producer.IKinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import java.io.Serializable; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; /** * Provides instances of AWS clients. @@ -30,9 +29,9 @@ * ensure it can be sent to worker machines. */ public interface AWSClientsProvider extends Serializable { - AmazonKinesis getKinesisClient(); + KinesisClient getKinesisClient(); - AmazonCloudWatch getCloudWatchClient(); + KinesisAsyncClient getKinesisAsyncClient(); - IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config); + CloudWatchClient getCloudWatchClient(); } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/BasicKinesisProvider.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/BasicKinesisProvider.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,76 +15,78 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.producer.IKinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import java.net.URI; import javax.annotation.Nullable; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; /** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */ class BasicKinesisProvider implements AWSClientsProvider { private final String accessKey; private final String secretKey; - private final Regions region; + private final String region; @Nullable private final String serviceEndpoint; BasicKinesisProvider( - String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) { + String accessKey, String secretKey, Region region, @Nullable String serviceEndpoint) { checkArgument(accessKey != null, "accessKey can not be null"); checkArgument(secretKey != null, "secretKey can not be null"); checkArgument(region != null, "region can not be null"); this.accessKey = accessKey; this.secretKey = secretKey; - this.region = region; + this.region = region.toString(); this.serviceEndpoint = serviceEndpoint; } - private AWSCredentialsProvider getCredentialsProvider() { - return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); + private AwsCredentialsProvider getCredentialsProvider() { + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)); } @Override - public AmazonKinesis getKinesisClient() { - AmazonKinesisClientBuilder clientBuilder = - AmazonKinesisClientBuilder.standard().withCredentials(getCredentialsProvider()); - if (serviceEndpoint == null) { - clientBuilder.withRegion(region); - } else { - clientBuilder.withEndpointConfiguration( - new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region.getName())); + public KinesisClient getKinesisClient() { + KinesisClientBuilder clientBuilder = + KinesisClient.builder() + .credentialsProvider(getCredentialsProvider()) + .region(Region.of(region)); + if (serviceEndpoint != null) { + clientBuilder.endpointOverride(URI.create(serviceEndpoint)); } return clientBuilder.build(); } @Override - public AmazonCloudWatch getCloudWatchClient() { - AmazonCloudWatchClientBuilder clientBuilder = - AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider()); - if (serviceEndpoint == null) { - clientBuilder.withRegion(region); - } else { - clientBuilder.withEndpointConfiguration( - new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region.getName())); + public KinesisAsyncClient getKinesisAsyncClient() { + KinesisAsyncClientBuilder clientBuilder = + KinesisAsyncClient.builder() + .credentialsProvider(getCredentialsProvider()) + .region(Region.of(region)); + if (serviceEndpoint != null) { + clientBuilder.endpointOverride(URI.create(serviceEndpoint)); } return clientBuilder.build(); } @Override - public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) { - config.setRegion(region.getName()); - config.setCredentialsProvider(getCredentialsProvider()); - return new KinesisProducer(config); + public CloudWatchClient getCloudWatchClient() { + CloudWatchClientBuilder clientBuilder = + CloudWatchClient.builder() + .credentialsProvider(getCredentialsProvider()) + .region(Region.of(region)); + if (serviceEndpoint != null) { + clientBuilder.endpointOverride(URI.create(serviceEndpoint)); + } + return clientBuilder.build(); } } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/CheckpointGenerator.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/CheckpointGenerator.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import java.io.Serializable; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/CustomOptional.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/CustomOptional.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import java.util.NoSuchElementException; import java.util.Objects; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGenerator.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/DynamicCheckpointGenerator.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import com.amazonaws.services.kinesis.model.Shard; import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; /** * Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream. List of @@ -33,20 +33,25 @@ private static final Logger LOG = LoggerFactory.getLogger(DynamicCheckpointGenerator.class); private final String streamName; + private final String consumerArn; private final StartingPoint startingPoint; private final StartingPointShardsFinder startingPointShardsFinder; - public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) { + public DynamicCheckpointGenerator( + String streamName, String consumerArn, StartingPoint startingPoint) { this.streamName = streamName; + this.consumerArn = consumerArn; this.startingPoint = startingPoint; this.startingPointShardsFinder = new StartingPointShardsFinder(); } public DynamicCheckpointGenerator( String streamName, + String consumerArn, StartingPoint startingPoint, StartingPointShardsFinder startingPointShardsFinder) { this.streamName = checkNotNull(streamName, "streamName"); + this.consumerArn = consumerArn; this.startingPoint = checkNotNull(startingPoint, "startingPoint"); this.startingPointShardsFinder = checkNotNull(startingPointShardsFinder, "startingPointShardsFinder"); @@ -63,7 +68,9 @@ startingPoint.getTimestamp()); return new KinesisReaderCheckpoint( shardsAtStartingPoint.stream() - .map(shard -> new ShardCheckpoint(streamName, shard.getShardId(), startingPoint)) + .map( + shard -> + new ShardCheckpoint(streamName, shard.shardId(), consumerArn, startingPoint)) .collect(Collectors.toList())); } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/GetKinesisRecordsResult.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/GetKinesisRecordsResult.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import java.util.List; import java.util.stream.Collectors; +import software.amazon.kinesis.retrieval.KinesisClientRecord; /** Represents the output of 'get' operation on Kinesis stream. */ class GetKinesisRecordsResult { @@ -29,7 +29,7 @@ private final long millisBehindLatest; public GetKinesisRecordsResult( - List records, + List records, String nextShardIterator, long millisBehindLatest, final String streamName, diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisIO.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java 2019-10-26 16:56:44.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisIO.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,45 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.producer.Attempt; -import com.amazonaws.services.kinesis.producer.IKinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; -import com.amazonaws.services.kinesis.producer.UserRecordFailedException; -import com.amazonaws.services.kinesis.producer.UserRecordResult; import com.google.auto.value.AutoValue; -import com.google.common.util.concurrent.ListenableFuture; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.Read.Unbounded; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.kinesis.common.InitialPositionInStream; /** * {@link PTransform}s for reading from and writing to {@link InitialPositionInStream#TRIM_HORIZON} - reading will begin at the very * beginning of the stream * - *
  • data used to initialize {@link AmazonKinesis} and {@link AmazonCloudWatch} clients: + *
  • data used to initialize {@link KinesisClient} and {@link CloudWatchClient} clients: *
      *
    • credentials (aws key, aws secret) *
    • region where the stream is located *
    * * - *

    In case when you want to set up {@link AmazonKinesis} or {@link AmazonCloudWatch} client by + *

    In case when you want to set up {@link KinesisClient} or {@link CloudWatchClient} client by * your own (for example if you're using more sophisticated authorization methods like Amazon STS, * etc.) you can do it by implementing {@link AWSClientsProvider} class: * *

    {@code
      * public class MyCustomKinesisClientProvider implements AWSClientsProvider {
      *   {@literal @}Override
    - *   public AmazonKinesis getKinesisClient() {
    + *   public KinesisClient getKinesisClient() {
      *     // set up your client here
      *   }
      *
    - *   public AmazonCloudWatch getCloudWatchClient() {
    + *   public CloudWatchClient getCloudWatchClient() {
      *     // set up your client here
      *   }
      *
    @@ -172,59 +152,6 @@
      *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
      *    .withCustomWatermarkPolicy(new MyCustomPolicyFactory())
      * }
    - * - *

    Writing to Kinesis

    - * - *

    Example usage: - * - *

    {@code
    - * PCollection data = ...;
    - *
    - * data.apply(KinesisIO.write()
    - *     .withStreamName("streamName")
    - *     .withPartitionKey("partitionKey")
    - *     .withAWSClientsProvider(AWS_KEY, AWS_SECRET, STREAM_REGION));
    - * }
    - * - *

    As a client, you need to provide at least 3 things: - * - *

      - *
    • name of the stream where you're going to write - *
    • partition key (or implementation of {@link KinesisPartitioner}) that defines which - * partition will be used for writing - *
    • data used to initialize {@link AmazonKinesis} and {@link AmazonCloudWatch} clients: - *
        - *
      • credentials (aws key, aws secret) - *
      • region where the stream is located - *
      - *
    - * - *

    In case if you need to define more complicated logic for key partitioning then you can create - * your own implementation of {@link KinesisPartitioner} and set it by {@link - * KinesisIO.Write#withPartitioner(KinesisPartitioner)} - * - *

    Internally, {@link KinesisIO.Write} relies on Amazon Kinesis Producer Library (KPL). This - * library can be configured with a set of {@link Properties} if needed. - * - *

    Example usage of KPL configuration: - * - *

    {@code
    - * Properties properties = new Properties();
    - * properties.setProperty("KinesisEndpoint", "localhost");
    - * properties.setProperty("KinesisPort", "4567");
    - *
    - * PCollection data = ...;
    - *
    - * data.apply(KinesisIO.write()
    - *     .withStreamName("streamName")
    - *     .withPartitionKey("partitionKey")
    - *     .withAWSClientsProvider(AWS_KEY, AWS_SECRET, STREAM_REGION)
    - *     .withProducerProperties(properties));
    - * }
    - * - *

    For more information about configuratiom parameters, see the sample - * of configuration file. */ @Experimental(Experimental.Kind.SOURCE_SINK) public final class KinesisIO { @@ -242,11 +169,6 @@ .build(); } - /** A {@link PTransform} writing data to Kinesis. */ - public static Write write() { - return new AutoValue_KinesisIO_Write.Builder().setRetries(DEFAULT_NUM_RETRIES).build(); - } - /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read extends PTransform> { @@ -255,6 +177,9 @@ abstract String getStreamName(); @Nullable + abstract String getConsumerArn(); + + @Nullable abstract StartingPoint getInitialPosition(); @Nullable @@ -279,6 +204,8 @@ abstract Builder setStreamName(String streamName); + abstract Builder setConsumerArn(String consumerArn); + abstract Builder setInitialPosition(StartingPoint startingPoint); abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider); @@ -301,6 +228,11 @@ return toBuilder().setStreamName(streamName).build(); } + /** Specify reading from an enhanced fan-out consumer. */ + public Read withConsumerArn(String consumerArn) { + return toBuilder().setConsumerArn(consumerArn).build(); + } + /** Specify reading from some initial position in stream. */ public Read withInitialPositionInStream(InitialPositionInStream initialPosition) { return toBuilder().setInitialPosition(new StartingPoint(initialPosition)).build(); @@ -315,10 +247,10 @@ } /** - * Allows to specify custom {@link AWSClientsProvider}. {@link AWSClientsProvider} provides - * {@link AmazonKinesis} and {@link AmazonCloudWatch} instances which are later used for + * Allows to specify custom {@link AWSClientsProvider}. {@link AWSClientsProvider provides + * {@link KinesisClient} and {@link CloudWatchClient} instances which are later used for * communication with Kinesis. You should use this method if {@link - * Read#withAWSClientsProvider(String, String, Regions)} does not suit your needs. + * Read#withAWSClientsProvider(String, String, Region)} does not suit your needs. */ public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) { return toBuilder().setAWSClientsProvider(awsClientsProvider).build(); @@ -329,7 +261,7 @@ * sophisticated credential protocol, then you should look at {@link * Read#withAWSClientsProvider(AWSClientsProvider)}. */ - public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) { + public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Region region) { return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null); } @@ -342,7 +274,7 @@ * the tests with a kinesis service emulator. */ public Read withAWSClientsProvider( - String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) { + String awsAccessKey, String awsSecretKey, Region region, String serviceEndpoint) { return withAWSClientsProvider( new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); } @@ -427,6 +359,7 @@ new KinesisSource( getAWSClientsProvider(), getStreamName(), + getConsumerArn(), getInitialPosition(), getUpToDateThreshold(), getWatermarkPolicyFactory(), @@ -442,342 +375,4 @@ return input.apply(transform); } } - - /** Implementation of {@link #write}. */ - @AutoValue - public abstract static class Write extends PTransform, PDone> { - @Nullable - abstract String getStreamName(); - - @Nullable - abstract String getPartitionKey(); - - @Nullable - abstract KinesisPartitioner getPartitioner(); - - @Nullable - abstract Properties getProducerProperties(); - - @Nullable - abstract AWSClientsProvider getAWSClientsProvider(); - - abstract int getRetries(); - - abstract Builder builder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setStreamName(String streamName); - - abstract Builder setPartitionKey(String partitionKey); - - abstract Builder setPartitioner(KinesisPartitioner partitioner); - - abstract Builder setProducerProperties(Properties properties); - - abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider); - - abstract Builder setRetries(int retries); - - abstract Write build(); - } - - /** Specify Kinesis stream name which will be used for writing, this name is required. */ - public Write withStreamName(String streamName) { - return builder().setStreamName(streamName).build(); - } - - /** - * Specify default partition key. - * - *

    In case if you need to define more complicated logic for key partitioning then you can - * create your own implementation of {@link KinesisPartitioner} and specify it by {@link - * KinesisIO.Write#withPartitioner(KinesisPartitioner)} - * - *

    Using one of the methods {@link KinesisIO.Write#withPartitioner(KinesisPartitioner)} or - * {@link KinesisIO.Write#withPartitionKey(String)} is required but not both in the same time. - */ - public Write withPartitionKey(String partitionKey) { - return builder().setPartitionKey(partitionKey).build(); - } - - /** - * Allows to specify custom implementation of {@link KinesisPartitioner}. - * - *

    This method should be used to balance a distribution of new written records among all - * stream shards. - * - *

    Using one of the methods {@link KinesisIO.Write#withPartitioner(KinesisPartitioner)} or - * {@link KinesisIO.Write#withPartitionKey(String)} is required but not both in the same time. - */ - public Write withPartitioner(KinesisPartitioner partitioner) { - return builder().setPartitioner(partitioner).build(); - } - - /** - * Specify the configuration properties for Kinesis Producer Library (KPL). - * - *

    Example of creating new KPL configuration: - * - *

    {@code Properties properties = new Properties(); - * properties.setProperty("CollectionMaxCount", "1000"); - * properties.setProperty("ConnectTimeout", "10000");} - */ - public Write withProducerProperties(Properties properties) { - return builder().setProducerProperties(properties).build(); - } - - /** - * Allows to specify custom {@link AWSClientsProvider}. {@link AWSClientsProvider} creates new - * {@link IKinesisProducer} which is later used for writing to Kinesis. - * - *

    This method should be used if {@link Write#withAWSClientsProvider(String, String, - * Regions)} does not suit well. - */ - public Write withAWSClientsProvider(AWSClientsProvider awsClientsProvider) { - return builder().setAWSClientsProvider(awsClientsProvider).build(); - } - - /** - * Specify credential details and region to be used to write to Kinesis. If you need more - * sophisticated credential protocol, then you should look at {@link - * Write#withAWSClientsProvider(AWSClientsProvider)}. - */ - public Write withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) { - return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null); - } - - /** - * Specify credential details and region to be used to write to Kinesis. If you need more - * sophisticated credential protocol, then you should look at {@link - * Write#withAWSClientsProvider(AWSClientsProvider)}. - * - *

    The {@code serviceEndpoint} sets an alternative service host. This is useful to execute - * the tests with Kinesis service emulator. - */ - public Write withAWSClientsProvider( - String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) { - return withAWSClientsProvider( - new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); - } - - /** - * Specify the number of retries that will be used to flush the outstanding records in case if - * they were not flushed from the first time. Default number of retries is {@code - * DEFAULT_NUM_RETRIES = 10}. - * - *

    This is used for testing. - */ - @VisibleForTesting - Write withRetries(int retries) { - return builder().setRetries(retries).build(); - } - - @Override - public PDone expand(PCollection input) { - checkArgument(getStreamName() != null, "withStreamName() is required"); - checkArgument( - (getPartitionKey() != null) || (getPartitioner() != null), - "withPartitionKey() or withPartitioner() is required"); - checkArgument( - getPartitionKey() == null || (getPartitioner() == null), - "only one of either withPartitionKey() or withPartitioner() is possible"); - checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() is required"); - - input.apply(ParDo.of(new KinesisWriterFn(this))); - return PDone.in(input.getPipeline()); - } - - private static class KinesisWriterFn extends DoFn { - - private static final int MAX_NUM_FAILURES = 10; - - private final KinesisIO.Write spec; - private static transient IKinesisProducer producer; - private transient KinesisPartitioner partitioner; - private transient LinkedBlockingDeque failures; - private transient List> putFutures; - - KinesisWriterFn(KinesisIO.Write spec) { - this.spec = spec; - initKinesisProducer(); - } - - @Setup - public void setup() { - // Use custom partitioner if it exists - if (spec.getPartitioner() != null) { - partitioner = spec.getPartitioner(); - } - } - - @StartBundle - public void startBundle() { - putFutures = Collections.synchronizedList(new ArrayList<>()); - /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */ - failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES); - initKinesisProducer(); - } - - private synchronized void initKinesisProducer() { - // Init producer config - Properties props = spec.getProducerProperties(); - if (props == null) { - props = new Properties(); - } - KinesisProducerConfiguration config = KinesisProducerConfiguration.fromProperties(props); - // Fix to avoid the following message "WARNING: Exception during updateCredentials" during - // producer.destroy() call. More details can be found in this thread: - // https://github.com/awslabs/amazon-kinesis-producer/issues/10 - config.setCredentialsRefreshDelay(100); - - // Init Kinesis producer - if (producer == null) { - producer = spec.getAWSClientsProvider().createKinesisProducer(config); - } - } - - private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException { - is.defaultReadObject(); - initKinesisProducer(); - } - - /** - * It adds a record asynchronously which then should be delivered by Kinesis producer in - * background (Kinesis producer forks native processes to do this job). - * - *

    The records can be batched and then they will be sent in one HTTP request. Amazon KPL - * supports two types of batching - aggregation and collection - and they can be configured by - * producer properties. - * - *

    More details can be found here: KPL Key - * Concepts and Configuring - * the KPL - */ - @ProcessElement - public void processElement(ProcessContext c) { - ByteBuffer data = ByteBuffer.wrap(c.element()); - String partitionKey = spec.getPartitionKey(); - String explicitHashKey = null; - - // Use custom partitioner - if (partitioner != null) { - partitionKey = partitioner.getPartitionKey(c.element()); - explicitHashKey = partitioner.getExplicitHashKey(c.element()); - } - - ListenableFuture f = - producer.addUserRecord(spec.getStreamName(), partitionKey, explicitHashKey, data); - putFutures.add(f); - } - - @FinishBundle - public void finishBundle() throws Exception { - flushBundle(); - } - - /** - * Flush outstanding records until the total number of failed records will be less than 0 or - * the number of retries will be exhausted. The retry timeout starts from 1 second and it - * doubles on every iteration. - */ - private void flushBundle() throws InterruptedException, ExecutionException, IOException { - int retries = spec.getRetries(); - int numFailedRecords; - int retryTimeout = 1000; // initial timeout, 1 sec - String message = ""; - - do { - numFailedRecords = 0; - producer.flush(); - - // Wait for puts to finish and check the results - for (Future f : putFutures) { - UserRecordResult result = f.get(); // this does block - if (!result.isSuccessful()) { - numFailedRecords++; - } - } - - // wait until outstanding records will be flushed - Thread.sleep(retryTimeout); - retryTimeout *= 2; // exponential backoff - } while (numFailedRecords > 0 && retries-- > 0); - - if (numFailedRecords > 0) { - for (Future f : putFutures) { - UserRecordResult result = f.get(); - if (!result.isSuccessful()) { - failures.offer( - new KinesisWriteException( - "Put record was not successful.", new UserRecordFailedException(result))); - } - } - - message = - String.format( - "After [%d] retries, number of failed records [%d] is still greater than 0", - spec.getRetries(), numFailedRecords); - LOG.error(message); - } - - checkForFailures(message); - } - - /** If any write has asynchronously failed, fail the bundle with a useful error. */ - private void checkForFailures(String message) throws IOException { - if (failures.isEmpty()) { - return; - } - - StringBuilder logEntry = new StringBuilder(); - logEntry.append(message).append(System.lineSeparator()); - - int i = 0; - while (!failures.isEmpty()) { - i++; - KinesisWriteException exc = failures.remove(); - - logEntry.append(System.lineSeparator()).append(exc.getMessage()); - Throwable cause = exc.getCause(); - if (cause != null) { - logEntry.append(": ").append(cause.getMessage()); - - if (cause instanceof UserRecordFailedException) { - List attempts = - ((UserRecordFailedException) cause).getResult().getAttempts(); - for (Attempt attempt : attempts) { - if (attempt.getErrorMessage() != null) { - logEntry.append(System.lineSeparator()).append(attempt.getErrorMessage()); - } - } - } - } - } - - String errorMessage = - String.format( - "Some errors occurred writing to Kinesis. First %d errors: %s", - i, logEntry.toString()); - throw new IOException(errorMessage); - } - - @Teardown - public void teardown() throws Exception { - if (producer != null && producer.getOutstandingRecordsCount() > 0) { - producer.flushSync(); - } - producer = null; - } - } - } - - /** An exception that puts information about the failed record. */ - static class KinesisWriteException extends IOException { - KinesisWriteException(String message, Throwable cause) { - super(message, cause); - } - } } Only in sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/: KinesisPartitioner.java diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisReader.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java 2019-10-22 13:43:07.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisReader.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpoint.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisReaderCheckpoint.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.partition; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisRecord.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisRecord.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,18 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.ToStringBuilder; import org.joda.time.Instant; +import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -/** {@link UserRecord} enhanced with utility methods. */ +/** {@link KinesisClientRecord} enhanced with utility methods. */ public class KinesisRecord { private Instant readTime; @@ -38,13 +39,13 @@ private ByteBuffer data; private String partitionKey; - public KinesisRecord(UserRecord record, String streamName, String shardId) { + public KinesisRecord(KinesisClientRecord record, String streamName, String shardId) { this( - record.getData(), - record.getSequenceNumber(), - record.getSubSequenceNumber(), - record.getPartitionKey(), - new Instant(record.getApproximateArrivalTimestamp()), + record.data(), + record.sequenceNumber(), + record.subSequenceNumber(), + record.partitionKey(), + TimeUtil.toJoda(record.approximateArrivalTimestamp()), Instant.now(), streamName, shardId); @@ -59,7 +60,7 @@ Instant readTime, String streamName, String shardId) { - this.data = data; + this.data = copyData(data); this.sequenceNumber = sequenceNumber; this.subSequenceNumber = subSequenceNumber; this.partitionKey = partitionKey; @@ -69,6 +70,13 @@ this.shardId = shardId; } + private ByteBuffer copyData(ByteBuffer data) { + data.rewind(); + byte[] bytes = new byte[data.remaining()]; + data.get(bytes); + return ByteBuffer.wrap(bytes); + } + public ExtendedSequenceNumber getExtendedSequenceNumber() { return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber()); } @@ -104,6 +112,11 @@ return reflectionHashCode(this); } + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + public long getSubSequenceNumber() { return subSequenceNumber; } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoder.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisRecordCoder.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import java.io.IOException; import java.io.InputStream; @@ -43,7 +43,7 @@ @Override public void encode(KinesisRecord value, OutputStream outStream) throws IOException { - BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); + BYTE_ARRAY_CODER.encode(value.getDataAsBytes(), outStream); STRING_CODER.encode(value.getSequenceNumber(), outStream); STRING_CODER.encode(value.getPartitionKey(), outStream); INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisShardClosedException.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisShardClosedException.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisShardClosedException.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisShardClosedException.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; /** Internal exception thrown when shard end is encountered during iteration. */ class KinesisShardClosedException extends Exception { diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisSource.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java 2019-10-23 19:59:51.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisSource.java 2019-10-26 15:28:53.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; @@ -36,6 +36,7 @@ private final AWSClientsProvider awsClientsProvider; private final String streamName; + private final String consumerArn; private final Duration upToDateThreshold; private final WatermarkPolicyFactory watermarkPolicyFactory; private CheckpointGenerator initialCheckpointGenerator; @@ -44,14 +45,16 @@ KinesisSource( AWSClientsProvider awsClientsProvider, String streamName, + String consumerArn, StartingPoint startingPoint, Duration upToDateThreshold, WatermarkPolicyFactory watermarkPolicyFactory, Integer limit) { this( awsClientsProvider, - new DynamicCheckpointGenerator(streamName, startingPoint), + new DynamicCheckpointGenerator(streamName, consumerArn, startingPoint), streamName, + consumerArn, upToDateThreshold, watermarkPolicyFactory, limit); @@ -61,12 +64,14 @@ AWSClientsProvider awsClientsProvider, CheckpointGenerator initialCheckpoint, String streamName, + String consumerArn, Duration upToDateThreshold, WatermarkPolicyFactory watermarkPolicyFactory, Integer limit) { this.awsClientsProvider = awsClientsProvider; this.initialCheckpointGenerator = initialCheckpoint; this.streamName = streamName; + this.consumerArn = consumerArn; this.upToDateThreshold = upToDateThreshold; this.watermarkPolicyFactory = watermarkPolicyFactory; this.limit = limit; @@ -91,6 +96,7 @@ awsClientsProvider, new StaticCheckpointGenerator(partition), streamName, + consumerArn, upToDateThreshold, watermarkPolicyFactory, limit)); diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/RecordFilter.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/RecordFilter.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpoint.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/ShardCheckpoint.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,19 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; -import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; -import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; -import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; +import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.LATEST; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import java.io.Serializable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.joda.time.Instant; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** * Checkpoint mark for single shard in the stream. Current position in the shard is determined by @@ -46,36 +50,52 @@ private final String streamName; private final String shardId; + private final String consumerArn; private final String sequenceNumber; private final ShardIteratorType shardIteratorType; private final Long subSequenceNumber; private final Instant timestamp; - public ShardCheckpoint(String streamName, String shardId, StartingPoint startingPoint) { + public ShardCheckpoint( + String streamName, String shardId, String consumerArn, StartingPoint startingPoint) { this( streamName, shardId, + consumerArn, ShardIteratorType.fromValue(startingPoint.getPositionName()), startingPoint.getTimestamp()); } public ShardCheckpoint( - String streamName, String shardId, ShardIteratorType shardIteratorType, Instant timestamp) { - this(streamName, shardId, shardIteratorType, null, null, timestamp); + String streamName, + String shardId, + String consumerArn, + ShardIteratorType shardIteratorType, + Instant timestamp) { + this(streamName, shardId, consumerArn, shardIteratorType, null, null, timestamp); } public ShardCheckpoint( String streamName, String shardId, + String consumerArn, ShardIteratorType shardIteratorType, String sequenceNumber, Long subSequenceNumber) { - this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null); + this( + streamName, + shardId, + consumerArn, + shardIteratorType, + sequenceNumber, + subSequenceNumber, + null); } private ShardCheckpoint( String streamName, String shardId, + String consumerArn, ShardIteratorType shardIteratorType, String sequenceNumber, Long subSequenceNumber, @@ -83,6 +103,7 @@ this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType"); this.streamName = checkNotNull(streamName, "streamName"); this.shardId = checkNotNull(shardId, "shardId"); + this.consumerArn = consumerArn; if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) { checkNotNull( sequenceNumber, @@ -138,6 +159,24 @@ shardIteratorType, streamName, shardId, sequenceNumber); } + public CompletableFuture subscribeToShard( + boolean resubscribe, + SimplifiedKinesisClient kinesisClient, + final SubscribeToShardResponseHandler.Visitor visitor, + final Consumer onError) + throws TransientKinesisException { + if (resubscribe) { + return kinesisClient.subscribeToShard( + consumerArn, shardId, LATEST, null, null, visitor, onError); + } + if (checkpointIsInTheMiddleOfAUserRecord()) { + return kinesisClient.subscribeToShard( + consumerArn, shardId, AT_SEQUENCE_NUMBER, sequenceNumber, null, visitor, onError); + } + return kinesisClient.subscribeToShard( + consumerArn, shardId, shardIteratorType, sequenceNumber, timestamp, visitor, onError); + } + public String getShardIterator(SimplifiedKinesisClient kinesisClient) throws TransientKinesisException { if (checkpointIsInTheMiddleOfAUserRecord()) { @@ -162,6 +201,7 @@ return new ShardCheckpoint( streamName, shardId, + consumerArn, AFTER_SEQUENCE_NUMBER, record.getSequenceNumber(), record.getSubSequenceNumber()); @@ -174,4 +214,8 @@ public String getShardId() { return shardId; } + + public String getConsumerArn() { + return consumerArn; + } } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPool.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java 2019-10-22 13:43:07.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/ShardReadersPool.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; @@ -121,7 +121,38 @@ void startReadingShards(Iterable shardRecordsIterators) { for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) { numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new AtomicInteger()); - executorService.submit(() -> readLoop(recordsIterator)); + if (recordsIterator.hasConsumer()) { + LOG.info("Subscribing to shard {}", recordsIterator.getShardId()); + executorService.submit(() -> subscribeLoop(recordsIterator)); + } else { + LOG.info("Reading shard {}", recordsIterator.getShardId()); + executorService.submit(() -> readLoop(recordsIterator)); + } + } + } + + private void subscribeLoop(ShardRecordsIterator shardRecordsIterator) { + while (poolOpened.get()) { + try { + shardRecordsIterator.subscribeToShard(this::putRecord); + } catch (TransientKinesisException e) { + LOG.warn("Transient exception occurred.", e); + } catch (Throwable e) { + LOG.error("Unexpected exception occurred", e); + } + } + LOG.info("Kinesis Shard subscribe loop has finished"); + } + + private void putRecord(KinesisRecord kinesisRecord) { + try { + LOG.debug("Received record: {}", kinesisRecord); + recordsQueue.put(kinesisRecord); + numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet(); + } catch (InterruptedException e) { + LOG.warn("Thread was interrupted, finishing the read loop", e); + Thread.currentThread().interrupt(); + poolOpened.set(false); } } @@ -145,15 +176,9 @@ readFromSuccessiveShards(shardRecordsIterator); break; } - for (KinesisRecord kinesisRecord : kinesisRecords) { - recordsQueue.put(kinesisRecord); - numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet(); - } + kinesisRecords.forEach(this::putRecord); } catch (TransientKinesisException e) { LOG.warn("Transient exception occurred.", e); - } catch (InterruptedException e) { - LOG.warn("Thread was interrupted, finishing the read loop", e); - break; } catch (Throwable e) { LOG.error("Unexpected exception occurred", e); } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIterator.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/ShardRecordsIterator.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,20 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.Shard; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.kinesis.common.InitialPositionInStream; /** * Iterates over records in a single shard. Records are retrieved in batches via calls to {@link @@ -43,7 +48,9 @@ private final RecordFilter filter; private final String streamName; private final String shardId; + private final Optional consumerArn; private AtomicReference checkpoint; + private boolean resubscribe; private String shardIterator; private AtomicLong millisBehindLatest = new AtomicLong(Long.MAX_VALUE); private AtomicReference watermarkPolicy; @@ -68,11 +75,41 @@ this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient"); this.streamName = initialCheckpoint.getStreamName(); this.shardId = initialCheckpoint.getShardId(); + this.consumerArn = Optional.ofNullable(initialCheckpoint.getConsumerArn()); this.shardIterator = initialCheckpoint.getShardIterator(kinesis); this.watermarkPolicy = new AtomicReference<>(watermarkPolicyFactory.createWatermarkPolicy()); this.watermarkPolicyFactory = watermarkPolicyFactory; } + boolean hasConsumer() { + return consumerArn.isPresent(); + } + + void subscribeToShard(Consumer consumer) throws TransientKinesisException { + SubscribeToShardResponseHandler.Visitor visitor = + new SubscribeToShardResponseHandler.Visitor() { + @Override + public void visit(SubscribeToShardEvent event) { + LOG.debug("Received subscribe to shard event " + event); + if (!event.records().isEmpty()) { + List kinesisRecords = + SimplifiedKinesisClient.deaggregate(event.records()).stream() + .map(r -> new KinesisRecord(r, streamName, shardId)) + .collect(Collectors.toList()); + filter.apply(kinesisRecords, checkpoint.get()).forEach(consumer); + } + } + }; + checkpoint + .get() + .subscribeToShard( + resubscribe, + kinesis, + visitor, + e -> LOG.error("Error during stream - " + e.getMessage())) + .join(); + } + List readNextBatch() throws TransientKinesisException, KinesisShardClosedException { if (shardIterator == null) { @@ -108,6 +145,7 @@ void ackRecord(KinesisRecord record) { checkpoint.set(checkpoint.get().moveAfter(record)); watermarkPolicy.get().update(record); + resubscribe = true; } Instant getShardWatermark() { @@ -122,11 +160,12 @@ List shards = kinesis.listShards(streamName); List successiveShardRecordIterators = new ArrayList<>(); for (Shard shard : shards) { - if (shardId.equals(shard.getParentShardId())) { + if (shardId.equals(shard.parentShardId())) { ShardCheckpoint shardCheckpoint = new ShardCheckpoint( streamName, - shard.getShardId(), + shard.shardId(), + consumerArn.orElse(null), new StartingPoint(InitialPositionInStream.TRIM_HORIZON)); successiveShardRecordIterators.add( new ShardRecordsIterator(shardCheckpoint, kinesis, watermarkPolicyFactory)); diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClient.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClient.java 2019-10-27 12:35:16.000000000 -0700 @@ -15,58 +15,99 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.cloudwatch.model.Datapoint; -import com.amazonaws.services.cloudwatch.model.Dimension; -import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest; -import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.LimitExceededException; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.StreamDescription; -import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.joda.time.Instant; import org.joda.time.Minutes; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.model.Datapoint; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest; +import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsResponse; +import software.amazon.awssdk.services.cloudwatch.model.Statistic; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamDescription; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; -/** Wraps {@link AmazonKinesis} class providing much simpler interface and proper error handling. */ +/** Wraps {@link KinesisClient} class providing much simpler interface and proper error handling. */ class SimplifiedKinesisClient { private static final String KINESIS_NAMESPACE = "AWS/Kinesis"; private static final String INCOMING_RECORDS_METRIC = "IncomingBytes"; private static final int PERIOD_GRANULARITY_IN_SECONDS = 60; - private static final String SUM_STATISTIC = "Sum"; private static final String STREAM_NAME_DIMENSION = "StreamName"; - private final AmazonKinesis kinesis; - private final AmazonCloudWatch cloudWatch; + + private final KinesisClient kinesis; + private final KinesisAsyncClient kinesisAsync; + private final CloudWatchClient cloudWatch; private final Integer limit; public SimplifiedKinesisClient( - AmazonKinesis kinesis, AmazonCloudWatch cloudWatch, Integer limit) { + KinesisClient kinesis, + KinesisAsyncClient kinesisAsync, + CloudWatchClient cloudWatch, + Integer limit) { this.kinesis = checkNotNull(kinesis, "kinesis"); + this.kinesisAsync = checkNotNull(kinesisAsync, "kinesisAsync"); this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch"); this.limit = limit; } public static SimplifiedKinesisClient from(AWSClientsProvider provider, Integer limit) { return new SimplifiedKinesisClient( - provider.getKinesisClient(), provider.getCloudWatchClient(), limit); + provider.getKinesisClient(), + provider.getKinesisAsyncClient(), + provider.getCloudWatchClient(), + limit); + } + + public CompletableFuture subscribeToShard( + final String consumerArn, + final String shardId, + final ShardIteratorType shardIteratorType, + final String startingSequenceNumber, + final Instant timestamp, + final SubscribeToShardResponseHandler.Visitor visitor, + final Consumer onError) + throws TransientKinesisException { + SubscribeToShardRequest request = + SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition( + s -> + s.type(shardIteratorType) + .sequenceNumber(startingSequenceNumber) + .timestamp(TimeUtil.toJava(timestamp))) + .build(); + SubscribeToShardResponseHandler responseHandler = + SubscribeToShardResponseHandler.builder().subscriber(visitor).onError(onError).build(); + return wrapExceptions(() -> kinesisAsync.subscribeToShard(request, responseHandler)); } public String getShardIterator( @@ -76,18 +117,18 @@ final String startingSequenceNumber, final Instant timestamp) throws TransientKinesisException { - final Date date = timestamp != null ? timestamp.toDate() : null; return wrapExceptions( () -> kinesis .getShardIterator( - new GetShardIteratorRequest() - .withStreamName(streamName) - .withShardId(shardId) - .withShardIteratorType(shardIteratorType) - .withStartingSequenceNumber(startingSequenceNumber) - .withTimestamp(date)) - .getShardIterator()); + GetShardIteratorRequest.builder() + .streamName(streamName) + .shardId(shardId) + .shardIteratorType(shardIteratorType) + .startingSequenceNumber(startingSequenceNumber) + .timestamp(TimeUtil.toJava(timestamp)) + .build()) + .shardIterator()); } public List listShards(final String streamName) throws TransientKinesisException { @@ -98,11 +139,18 @@ StreamDescription description; do { - description = kinesis.describeStream(streamName, lastShardId).getStreamDescription(); - - shards.addAll(description.getShards()); - lastShardId = shards.get(shards.size() - 1).getShardId(); - } while (description.getHasMoreShards()); + description = + kinesis + .describeStream( + DescribeStreamRequest.builder() + .streamName(streamName) + .exclusiveStartShardId(lastShardId) + .build()) + .streamDescription(); + + shards.addAll(description.shards()); + lastShardId = shards.get(shards.size() - 1).shardId(); + } while (description.hasMoreShards()); return shards; }); @@ -133,18 +181,27 @@ throws TransientKinesisException { return wrapExceptions( () -> { - GetRecordsResult response = + GetRecordsResponse response = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(limit)); + GetRecordsRequest.builder().shardIterator(shardIterator).limit(limit).build()); + List records = response.records(); return new GetKinesisRecordsResult( - UserRecord.deaggregate(response.getRecords()), - response.getNextShardIterator(), - response.getMillisBehindLatest(), + deaggregate(records), + response.nextShardIterator(), + response.millisBehindLatest(), streamName, shardId); }); } + public static List deaggregate(List records) { + return records.isEmpty() + ? ImmutableList.of() + : new AggregatorUtil() + .deaggregate( + records.stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList())); + } + /** * Gets total size in bytes of all events that remain in Kinesis stream after specified instant. * @@ -175,9 +232,9 @@ createMetricStatisticsRequest(streamName, countSince, countTo, period); long totalSizeInBytes = 0; - GetMetricStatisticsResult result = cloudWatch.getMetricStatistics(request); - for (Datapoint point : result.getDatapoints()) { - totalSizeInBytes += point.getSum().longValue(); + GetMetricStatisticsResponse response = cloudWatch.getMetricStatistics(request); + for (Datapoint point : response.datapoints()) { + totalSizeInBytes += point.sum().longValue(); } return totalSizeInBytes; }); @@ -185,16 +242,15 @@ GetMetricStatisticsRequest createMetricStatisticsRequest( String streamName, Instant countSince, Instant countTo, Minutes period) { - return new GetMetricStatisticsRequest() - .withNamespace(KINESIS_NAMESPACE) - .withMetricName(INCOMING_RECORDS_METRIC) - .withPeriod(period.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS) - .withStartTime(countSince.toDate()) - .withEndTime(countTo.toDate()) - .withStatistics(Collections.singletonList(SUM_STATISTIC)) - .withDimensions( - Collections.singletonList( - new Dimension().withName(STREAM_NAME_DIMENSION).withValue(streamName))); + return GetMetricStatisticsRequest.builder() + .namespace(KINESIS_NAMESPACE) + .metricName(INCOMING_RECORDS_METRIC) + .period(period.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS) + .startTime(TimeUtil.toJava(countSince)) + .endTime(TimeUtil.toJava(countTo)) + .statistics(Statistic.SUM) + .dimensions(Dimension.builder().name(STREAM_NAME_DIMENSION).value(streamName).build()) + .build(); } /** @@ -213,13 +269,10 @@ } catch (LimitExceededException | ProvisionedThroughputExceededException e) { throw new TransientKinesisException( "Too many requests to Kinesis. Wait some time and retry.", e); - } catch (AmazonServiceException e) { - if (e.getErrorType() == AmazonServiceException.ErrorType.Service) { - throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", e); - } - throw new RuntimeException("Kinesis client side failure", e); - } catch (AmazonClientException e) { - if (e.isRetryable()) { + } catch (SdkServiceException e) { + throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", e); + } catch (SdkClientException e) { + if (e.retryable()) { throw new TransientKinesisException("Retryable client failure", e); } throw new RuntimeException("Not retryable client failure", e); diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/StartingPoint.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/StartingPoint.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import java.io.Serializable; import java.util.Objects; import org.joda.time.Instant; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.kinesis.common.InitialPositionInStream; /** * Denotes a point at which the reader should start reading from a Kinesis stream. It can be diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinder.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinder.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPointShardsFinder.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/StartingPointShardsFinder.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,10 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import java.io.Serializable; import java.util.HashSet; import java.util.List; @@ -27,6 +25,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; /** * This class is responsible for establishing the initial set of shards that existed at the given @@ -68,11 +68,11 @@ *

  • 0001 is already closed at T2 timestamp so its successors 0003 and 0004 are next. * 0003 is valid but 0004 is already closed at T3 timestamp. It has one successor 0007 * which is the result of merging 0004 and 0005 shards. 0007 has two parent shards - * then stored in {@link Shard#parentShardId} and {@link Shard#adjacentParentShardId} - * fields. Only one of them should follow the relation to its successor so it is - * always the shard stored in parentShardId field. Let's assume that it was 0004 shard - * and it's the one that considers 0007 its successor. 0007 is valid at T3 timestamp - * and it's added to the result set. + * then stored in {@link Shard#parentShardId()} and {@link + * Shard#adjacentParentShardId()} fields. Only one of them should follow the relation + * to its successor so it is always the shard stored in parentShardId field. Let's + * assume that it was 0004 shard and it's the one that considers 0007 its successor. + * 0007 is valid at T3 timestamp and it's added to the result set. *
  • 0002 is closed at T3 timestamp so its successors 0005 and 0006 are next. 0005 is * also closed because it was merged with 0004 shard. Their successor is 0007 and it * was already considered by 0004 shard so no action here is needed. Shard 0006 is @@ -132,10 +132,10 @@ for (Shard expiredShard : expiredShards) { boolean successorFound = false; for (Shard shard : allShards) { - if (Objects.equals(expiredShard.getShardId(), shard.getParentShardId())) { + if (Objects.equals(expiredShard.shardId(), shard.parentShardId())) { nextShards.add(shard); successorFound = true; - } else if (Objects.equals(expiredShard.getShardId(), shard.getAdjacentParentShardId())) { + } else if (Objects.equals(expiredShard.shardId(), shard.adjacentParentShardId())) { successorFound = true; } } @@ -156,12 +156,12 @@ private Set findInitialShardsWithoutParents(String streamName, List allShards) { Set shardIds = new HashSet<>(); for (Shard shard : allShards) { - shardIds.add(shard.getShardId()); + shardIds.add(shard.shardId()); } LOGGER.info("Stream {} has following shards: {}", streamName, shardIds); Set shardsWithoutParents = new HashSet<>(); for (Shard shard : allShards) { - if (!shardIds.contains(shard.getParentShardId())) { + if (!shardIds.contains(shard.parentShardId())) { shardsWithoutParents.add(shard); } } @@ -186,13 +186,9 @@ for (Shard shard : rootShards) { String shardIterator = kinesis.getShardIterator( - streamName, - shard.getShardId(), - shardIteratorType, - null, - startingPoint.getTimestamp()); + streamName, shard.shardId(), shardIteratorType, null, startingPoint.getTimestamp()); GetKinesisRecordsResult records = - kinesis.getRecords(shardIterator, streamName, shard.getShardId()); + kinesis.getRecords(shardIterator, streamName, shard.shardId()); if (records.getNextShardIterator() != null || !records.getRecords().isEmpty()) { validShards.add(shard); } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/StaticCheckpointGenerator.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/StaticCheckpointGenerator.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; Only in sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/: TimeUtil.java diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/TransientKinesisException.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/TransientKinesisException.java 2019-10-27 12:34:56.000000000 -0700 @@ -15,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; - -import com.amazonaws.AmazonClientException; +package org.apache.beam.sdk.io.kinesis2; /** A transient exception thrown by Kinesis. */ class TransientKinesisException extends Exception { - public TransientKinesisException(String s, AmazonClientException e) { + public TransientKinesisException(String s, Throwable e) { super(s, e); } } diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/WatermarkParameters.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/WatermarkParameters.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicy.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicy.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicy.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicy.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import java.io.Serializable; import org.joda.time.Instant; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyFactory.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/WatermarkPolicyFactory.java 2019-10-25 12:08:13.000000000 -0700 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2; import java.io.Serializable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; diff -u sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/package-info.java --- sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java 2019-10-09 22:46:48.000000000 -0700 +++ sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/package-info.java 2019-10-25 12:08:13.000000000 -0700 @@ -17,4 +17,4 @@ */ /** Transforms for reading and writing from Amazon Kinesis. */ -package org.apache.beam.sdk.io.kinesis; +package org.apache.beam.sdk.io.kinesis2;