Skip to content

Commit

Permalink
add support for kinesis enhanced fanout consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarr1-godaddy committed Apr 5, 2020
1 parent 177c134 commit 4bc0e51
Show file tree
Hide file tree
Showing 48 changed files with 5,885 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ class BeamModulePlugin implements Plugin<Project> {
aws_java_sdk2_auth : "software.amazon.awssdk:auth:$aws_java_sdk2_version",
aws_java_sdk2_cloudwatch : "software.amazon.awssdk:cloudwatch:$aws_java_sdk2_version",
aws_java_sdk2_dynamodb : "software.amazon.awssdk:dynamodb:$aws_java_sdk2_version",
aws_java_sdk2_kinesis : "software.amazon.awssdk:kinesis:$aws_java_sdk2_version",
aws_java_sdk2_sdk_core : "software.amazon.awssdk:sdk-core:$aws_java_sdk2_version",
aws_java_sdk2_sns : "software.amazon.awssdk:sns:$aws_java_sdk2_version",
bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
Expand Down
17 changes: 17 additions & 0 deletions sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.aws2')
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2"
ext.summary = "IO library to read and write Amazon Web Services services from Beam."
Expand All @@ -31,15 +33,28 @@ dependencies {
compile library.java.aws_java_sdk2_auth
compile library.java.aws_java_sdk2_cloudwatch
compile library.java.aws_java_sdk2_dynamodb
compile library.java.aws_java_sdk2_kinesis
compile library.java.aws_java_sdk2_sdk_core
compile library.java.aws_java_sdk2_sns
compile library.java.jackson_core
compile library.java.jackson_annotations
compile library.java.jackson_databind
compile library.java.jackson_dataformat_cbor
compile library.java.joda_time
compile library.java.slf4j_api
compile "software.amazon.kinesis:amazon-kinesis-client:2.2.5"
compile "commons-lang:commons-lang:2.6"
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
testCompile project(path: ":sdks:java:io:kinesis", configuration: "testRuntime")
testCompile library.java.mockito_core
testCompile library.java.guava_testlib
testCompile library.java.hamcrest_core
testCompile library.java.junit
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile library.java.powermock_mockito
testCompile "org.assertj:assertj-core:3.11.1"
testCompile 'org.testcontainers:testcontainers:1.11.3'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(":runners:direct-java")
Expand All @@ -50,4 +65,6 @@ test {
'--region=us-west-2',
'--awsCredentialsProvider={"@type": "StaticCredentialsProvider", "accessKeyId": "key_id_value", "secretAccessKey": "secret_value"}'
])
// Forking every test resolves an issue where KinesisMockReadTest gets stuck forever.
forkEvery 1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;

/**
* Provides instances of AWS clients.
*
* <p>Please note, that any instance of {@link AWSClientsProvider} must be {@link Serializable} to
* ensure it can be sent to worker machines.
*/
public interface AWSClientsProvider extends Serializable {
KinesisClient getKinesisClient();

KinesisAsyncClient getKinesisAsyncClient();

CloudWatchClient getCloudWatchClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.net.URI;
import javax.annotation.Nullable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;

/** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */
class BasicKinesisProvider implements AWSClientsProvider {
private final String accessKey;
private final String secretKey;
private final String region;
@Nullable private final String serviceEndpoint;

BasicKinesisProvider(
String accessKey, String secretKey, Region region, @Nullable String serviceEndpoint) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region.toString();
this.serviceEndpoint = serviceEndpoint;
}

private AwsCredentialsProvider getCredentialsProvider() {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
}

@Override
public KinesisClient getKinesisClient() {
KinesisClientBuilder clientBuilder =
KinesisClient.builder()
.credentialsProvider(getCredentialsProvider())
.region(Region.of(region));
if (serviceEndpoint != null) {
clientBuilder.endpointOverride(URI.create(serviceEndpoint));
}
return clientBuilder.build();
}

@Override
public KinesisAsyncClient getKinesisAsyncClient() {
KinesisAsyncClientBuilder clientBuilder =
KinesisAsyncClient.builder()
.credentialsProvider(getCredentialsProvider())
.region(Region.of(region));
if (serviceEndpoint != null) {
clientBuilder.endpointOverride(URI.create(serviceEndpoint));
}
return clientBuilder.build();
}

@Override
public CloudWatchClient getCloudWatchClient() {
CloudWatchClientBuilder clientBuilder =
CloudWatchClient.builder()
.credentialsProvider(getCredentialsProvider())
.region(Region.of(region));
if (serviceEndpoint != null) {
clientBuilder.endpointOverride(URI.create(serviceEndpoint));
}
return clientBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;

/**
* Used to generate checkpoint object on demand. How exactly the checkpoint is generated is up to
* implementing class.
*/
interface CheckpointGenerator extends Serializable {

KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) throws TransientKinesisException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.NoSuchElementException;
import java.util.Objects;

/**
* Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
*/
abstract class CustomOptional<T> {

@SuppressWarnings("unchecked")
public static <T> CustomOptional<T> absent() {
return (Absent<T>) Absent.INSTANCE;
}

public static <T> CustomOptional<T> of(T v) {
return new Present<>(v);
}

public abstract boolean isPresent();

public abstract T get();

private static class Present<T> extends CustomOptional<T> {

private final T value;

private Present(T value) {
this.value = value;
}

@Override
public boolean isPresent() {
return true;
}

@Override
public T get() {
return value;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Present)) {
return false;
}

Present<?> present = (Present<?>) o;
return Objects.equals(value, present.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}
}

private static class Absent<T> extends CustomOptional<T> {

private static final Absent<Object> INSTANCE = new Absent<>();

private Absent() {}

@Override
public boolean isPresent() {
return false;
}

@Override
public T get() {
throw new NoSuchElementException();
}

@Override
public boolean equals(Object o) {
return o instanceof Absent;
}

@Override
public int hashCode() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Shard;

/**
* Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream. List of
* shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
*/
class DynamicCheckpointGenerator implements CheckpointGenerator {

private static final Logger LOG = LoggerFactory.getLogger(DynamicCheckpointGenerator.class);
private final String streamName;
private final String consumerArn;
private final StartingPoint startingPoint;
private final StartingPointShardsFinder startingPointShardsFinder;

public DynamicCheckpointGenerator(
String streamName, String consumerArn, StartingPoint startingPoint) {
this.streamName = streamName;
this.consumerArn = consumerArn;
this.startingPoint = startingPoint;
this.startingPointShardsFinder = new StartingPointShardsFinder();
}

public DynamicCheckpointGenerator(
String streamName,
String consumerArn,
StartingPoint startingPoint,
StartingPointShardsFinder startingPointShardsFinder) {
this.streamName = checkNotNull(streamName, "streamName");
this.consumerArn = consumerArn;
this.startingPoint = checkNotNull(startingPoint, "startingPoint");
this.startingPointShardsFinder =
checkNotNull(startingPointShardsFinder, "startingPointShardsFinder");
}

@Override
public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
throws TransientKinesisException {
Set<Shard> shardsAtStartingPoint =
startingPointShardsFinder.findShardsAtStartingPoint(kinesis, streamName, startingPoint);
LOG.info(
"Creating a checkpoint with following shards {} at {}",
shardsAtStartingPoint,
startingPoint.getTimestamp());
return new KinesisReaderCheckpoint(
shardsAtStartingPoint.stream()
.map(
shard ->
new ShardCheckpoint(streamName, shard.shardId(), consumerArn, startingPoint))
.collect(Collectors.toList()));
}

@Override
public String toString() {
return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
}
}
Loading

0 comments on commit 4bc0e51

Please sign in to comment.