Skip to content

Commit

Permalink
Upgrade to latest Kinesis Client version (#151)
Browse files Browse the repository at this point in the history
* Implement AWS metacodec handler with tests

Closes #70 #27

* Rename AWSMetaCodec to AWSCodec

* Add AWS transport selection handler

* Remove erroneous comment

* Add transport selection test for AWSTransport

* Fix log message that referred to the codec instead of transport

* More log cleanup of log entries

* Fix log entries

- Change info > debug
- Use consistent starting/stopping wording.
- These log entries may be removed later. They are helpful to verify that the AWSTransport is selecting the correct transport based on the AWSMessageType enum.

* Finalize variables

* Finalize variables

* Update version to 3.1.0-beta.2-SNAPSHOT

* Update version to 3.1.0-beta.2-SNAPSHOT

* Rename codec constant AWSMetaCodec -> AWSCodec

Co-Authored-By: Bernd Ahlers <bernd@users.noreply.github.com>

* Move integrations tests to actual folder path

Moved from test/java/org.graylog.integrations to test/java/org/graylog/integrations

* First cut of migrating the existing Kinesis client

Migrate Kinesis client v1.10 from the existing AWS integration to the new.

* Short-circuit usage of multi-AWSAuthProvider

Now, the AWS credentials are directly provided to the KinesisTransport. This will likely be improved in the future. See #139

* Add missing name in codec

* Add processor for Kinesis transport

This is responsible for handling the kinesis payload (decompress if from CloudWatch, or convert bytes to string if not) and converting it into an a list of raw messages.

* Use KinesisTransportProcessor in the KinesisTransport

* Add code comments

* Adjustments to get KinesisTransport running

The main change is to migrate the AWS_MESSAGE_TYPE config prop to the codec, since the codec can only access config properties it owns (due to per-message instantiation and configs being encoded with each raw message). The config prop is still accessible from the transport.

* Improve comments

* Code and comments cleanup

* Add KinesisPayloadDecoder tests

* Add message timestamp coverage to KinesisPayloadDecoder tests

* Update Kinesis Client version

* Add Kinesis client library v2 Consumer sample

* Migrate KinesisConsumer to Kinesis Client Library v2

* Remove unused imports and fix formatting

* Finish migration of Kinesis client to new version

* Complete upgrade to Kinesis Client Library v2

* Bump KCL version

* Migrate shard processor to its own class

* Bump version to 3.1.0-beta.3-SNAPSHOT

* Add batch size limit

* Code cleanup

* Remove unneeded throttle time limit reached shutdown

* Remove kinesis_max_throttled_wait_ms save input field

This field is no longer needed, since the new Kinesis Consumer appears to correctly handle longer throttling and pausing in `processRecords` without making the consumer unhealthy.

* Temporarily remove Assume Role Arn auth

This will be added back later. See #29

* Adjust logging levels

* Remove sample class

* Ignore unmapped properties due to removed max_throttled_wait field

See #156
  • Loading branch information
Dan Torrey authored and ceruleancee committed Jul 31, 2019
1 parent edb80a9 commit 6ea58a9
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 369 deletions.
23 changes: 12 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>org.graylog.plugins</groupId>
<artifactId>graylog-plugin-web-parent</artifactId>
<version>3.1.0-beta.3-SNAPSHOT</version>
<version>3.1.0-beta.4-SNAPSHOT</version>
<relativePath>../graylog2-server/graylog-plugin-parent/graylog-plugin-web-parent</relativePath>
</parent>

<artifactId>graylog-plugin-integrations</artifactId>
<version>3.1.0-beta.3-SNAPSHOT</version>
<version>3.1.0-beta.4-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.artifactId}</name>
Expand All @@ -31,9 +31,9 @@
<maven.install.skip>true</maven.install.skip>
<maven.deploy.skip>true</maven.deploy.skip>
<maven.site.skip>true</maven.site.skip>
<graylog.version>3.1.0-beta.3-SNAPSHOT</graylog.version>
<graylog.version>3.1.0-beta.4-SNAPSHOT</graylog.version>
<aws-java-sdk-2.version>2.5.33</aws-java-sdk-2.version>
<aws-kinesis-client.version>1.10.0</aws-kinesis-client.version>
<aws-kinesis-client.version>2.2.1</aws-kinesis-client.version>
</properties>

<dependencyManagement>
Expand All @@ -56,15 +56,16 @@
</dependency>
<!-- AWS maintains a separate client for Kinesis. -->
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws-kinesis-client.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>2.2.1</version>
</dependency>
<!-- Include the latest version of the Apache Http Client as required by the AWS SDK v2 -->
<!-- See https://github.com/aws/aws-sdk-java-v2/issues/652 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,6 @@ public ConfigurationRequest combinedRequestedConfiguration() {
ConfigurationField.Optional.OPTIONAL,
TextField.Attribute.IS_PASSWORD));

request.addField(new NumberField(
KinesisTransport.CK_KINESIS_MAX_THROTTLED_WAIT_MS,
"Throttled wait milliseconds",
KinesisTransport.DEFAULT_THROTTLED_WAIT_MS,
"The maximum time that the Kinesis input will pause for when in a throttled state. If this time is exceeded, then the Kinesis consumer will shut down until the throttled state is cleared. Recommended default: 60,000 ms",
ConfigurationField.Optional.OPTIONAL,
NumberField.Attribute.ONLY_POSITIVE));

request.addField(new TextField(
KinesisTransport.CK_KINESIS_STREAM_NAME,
"Kinesis Stream name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.graylog.autovalue.WithBeanGetter;
Expand All @@ -29,6 +30,8 @@
@JsonAutoDetect
@AutoValue
@WithBeanGetter
// TODO: Remove this ignore annotation once https://github.com/Graylog2/graylog-plugin-integrations/issues/156 is fixed.
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class AWSInputCreateRequest implements AWSRequest {

private static final String NAME = "name";
Expand All @@ -39,7 +42,6 @@ public abstract class AWSInputCreateRequest implements AWSRequest {
private static final String ASSUME_ROLE_ARN = "assume_role_arn";
private static final String GLOBAL = "global";
private static final String THROTTLING_ALLOWED = "enable_throttling";
private static final String KINESIS_MAX_THROTTLED_WAIT_MS = "kinesis_max_throttled_wait_ms";

@JsonProperty(NAME)
public abstract String name();
Expand Down Expand Up @@ -74,9 +76,6 @@ public abstract class AWSInputCreateRequest implements AWSRequest {
@JsonProperty(THROTTLING_ALLOWED)
public abstract boolean throttlingAllowed();

@JsonProperty(KINESIS_MAX_THROTTLED_WAIT_MS)
public abstract int kinesisMaxThrottledWaitMs();

@JsonCreator
public static AWSInputCreateRequest create(@JsonProperty(NAME) String name,
@JsonProperty(DESCRIPTION) String description,
Expand All @@ -88,10 +87,9 @@ public static AWSInputCreateRequest create(@JsonProperty(NAME) String name,
@JsonProperty(BATCH_SIZE) int batchSize,
@JsonProperty(ASSUME_ROLE_ARN) String assumeRoleArn,
@JsonProperty(GLOBAL) boolean global,
@JsonProperty(THROTTLING_ALLOWED) boolean enableThrottling,
@JsonProperty(KINESIS_MAX_THROTTLED_WAIT_MS) int kinesisMaxThrottledWaitMs) {
@JsonProperty(THROTTLING_ALLOWED) boolean enableThrottling ) {
return new AutoValue_AWSInputCreateRequest(name, description, awsMessageType, awsAccessKey, awsSecretKey,
streamName, assumeRoleArn, region, batchSize, global,
enableThrottling, kinesisMaxThrottledWaitMs);
enableThrottling);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static Map<String, String> buildRegionChoices() {
*
* @return A credential provider
*/
static StaticCredentialsProvider buildCredentialProvider(String accessKeyId, String secretAccessKey) {
public static StaticCredentialsProvider buildCredentialProvider(String accessKeyId, String secretAccessKey) {
Preconditions.checkArgument(StringUtils.isNotBlank(accessKeyId), "An AWS access key is required.");
Preconditions.checkArgument(StringUtils.isNotBlank(secretAccessKey), "An AWS secret key is required.");

Expand Down Expand Up @@ -218,7 +218,6 @@ public Input saveInput(AWSInputCreateRequest request, User user) throws Exceptio
if (inputType.isKinesis()) {
configuration.put(KinesisTransport.CK_KINESIS_STREAM_NAME, request.streamName());
configuration.put(KinesisTransport.CK_KINESIS_RECORD_BATCH_SIZE, request.batchSize());
configuration.put(KinesisTransport.CK_KINESIS_MAX_THROTTLED_WAIT_MS, request.kinesisMaxThrottledWaitMs());
} else {
throw new Exception("The specified input type is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.graylog.integrations.aws.transports;

import com.amazonaws.regions.Regions;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand All @@ -9,6 +8,7 @@
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -63,17 +63,17 @@ public static Builder builder() {
}

@JsonIgnore
public List<Regions> getLookupRegions() {
public List<Region> getLookupRegions() {
if (lookupRegions() == null || lookupRegions().isEmpty()) {
return Collections.emptyList();
}

ImmutableList.Builder<Regions> builder = ImmutableList.<Regions>builder();
ImmutableList.Builder<Region> builder = ImmutableList.<Region>builder();

String[] regions = lookupRegions().split(",");
for (String regionName : regions) {
try {
builder.add(Regions.fromName(regionName.trim()));
builder.add(Region.of(regionName.trim()));
} catch (IllegalArgumentException e) {
LOG.info("Cannot translate [{}] into AWS region. Make sure it is a correct region code like for example 'us-west-1'.", regionName);
}
Expand Down
Loading

0 comments on commit 6ea58a9

Please sign in to comment.