Skip to content

Commit

Permalink
Port of existing KinesisTransport (#140)
Browse files Browse the repository at this point in the history
* Implement AWS metacodec handler with tests

Closes #70 #27

* Rename AWSMetaCodec to AWSCodec

* Add AWS transport selection handler

* Remove erroneous comment

* Add transport selection test for AWSTransport

* Fix log message that referred to the codec instead of transport

* More log cleanup of log entries

* Fix log entries

- Change info > debug
- Use consistent starting/stopping wording.
- These log entries may be removed later. They are helpful to verify that the AWSTransport is selecting the correct transport based on the AWSMessageType enum.

* Finalize variables

* Finalize variables

* Update version to 3.1.0-beta.2-SNAPSHOT

* Update version to 3.1.0-beta.2-SNAPSHOT

* Rename codec constant AWSMetaCodec -> AWSCodec

Co-Authored-By: Bernd Ahlers <bernd@users.noreply.github.com>

* Move integrations tests to actual folder path

Moved from test/java/org.graylog.integrations to test/java/org/graylog/integrations

* First cut of migrating the existing Kinesis client

Migrate Kinesis client v1.10 from the existing AWS integration to the new.

* Short-circuit usage of multi-AWSAuthProvider

Now, the AWS credentials are directly provided to the KinesisTransport. This will likely be improved in the future. See #139

* Add missing name in codec

* Add processor for Kinesis transport

This is responsible for handling the kinesis payload (decompress if from CloudWatch, or convert bytes to string if not) and converting it into an a list of raw messages.

* Use KinesisTransportProcessor in the KinesisTransport

* Add code comments

* Adjustments to get KinesisTransport running

The main change is to migrate the AWS_MESSAGE_TYPE config prop to the codec, since the codec can only access config properties it owns (due to per-message instantiation and configs being encoded with each raw message). The config prop is still accessible from the transport.

* Improve comments

* Code and comments cleanup

* Add KinesisPayloadDecoder tests

* Add message timestamp coverage to KinesisPayloadDecoder tests

* Remove unused imports and fix formatting

* Update version to 3.1.0-beta.3-SNAPSHOT

* Merge branch 'aws' into aws-transport-kinesis and resolve conflicts

# Conflicts:
#	src/main/java/org/graylog/integrations/aws/service/KinesisService.java
  • Loading branch information
Dan Torrey authored and ceruleancee committed Jul 31, 2019
1 parent 23493b8 commit edb80a9
Show file tree
Hide file tree
Showing 20 changed files with 937 additions and 145 deletions.
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<maven.site.skip>true</maven.site.skip>
<graylog.version>3.1.0-beta.3-SNAPSHOT</graylog.version>
<aws-java-sdk-2.version>2.5.33</aws-java-sdk-2.version>
<aws-kinesis-client.version>2.2.0</aws-kinesis-client.version>
<aws-kinesis-client.version>1.10.0</aws-kinesis-client.version>
</properties>

<dependencyManagement>
Expand All @@ -56,9 +56,15 @@
</dependency>
<!-- AWS maintains a separate client for Kinesis. -->
<dependency>
<groupId>software.amazon.kinesis</groupId>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws-kinesis-client.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Include the latest version of the Apache Http Client as required by the AWS SDK v2 -->
<!-- See https://github.com/aws/aws-sdk-java-v2/issues/652 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public AWSMessageType detectLogMessageType() {
*
* @return true if message is a flow log.
*/
private boolean isFlowLog() {
public boolean isFlowLog() {

// Though unlikely, the message could be null.
if (logMessage == null) {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/graylog/integrations/aws/AWSMessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.transports.Transport;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Identifies the type of input for a particular log source (eg. Cloud Watch or Kinesis) and
* log format.
Expand Down Expand Up @@ -82,4 +86,12 @@ public boolean isKinesis() {
public enum Source {
KINESIS
}

/**
* @return Return all message types except for UNKNOWN.
*/
public static List<AWSMessageType> getMessageTypes() {

return Arrays.stream(values()).filter(m -> !m.equals(UNKNOWN)).collect(Collectors.toList());
}
}
31 changes: 28 additions & 3 deletions src/main/java/org/graylog/integrations/aws/codecs/AWSCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,37 @@

import com.google.inject.assistedinject.Assisted;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.inputs.AWSInput;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.Map;
import java.util.stream.Collectors;

public class AWSCodec extends AbstractCodec {

public static final String NAME = "AWSCodec";
private static final Logger LOG = LoggerFactory.getLogger(AWSCodec.class);

/**
* Specifies one of the {@code AWSInputType} choices, which indicates which codec and transport
* should be used.
*/
public static final String CK_AWS_MESSAGE_TYPE = "aws_message_type";

private final Map<String, Codec.Factory<? extends Codec>> availableCodecs;

@Inject
Expand All @@ -38,7 +47,7 @@ public AWSCodec(@Assisted Configuration configuration,
public Message decode(@Nonnull RawMessage rawMessage) {

// Load the codec by message type.
final AWSMessageType awsMessageType = AWSMessageType.valueOf(configuration.getString(AWSInput.CK_AWS_MESSAGE_TYPE));
final AWSMessageType awsMessageType = AWSMessageType.valueOf(configuration.getString(CK_AWS_MESSAGE_TYPE));
final Codec.Factory<? extends Codec> codecFactory = this.availableCodecs.get(awsMessageType.getCodecName());
if (codecFactory == null) {
LOG.error("A codec with name [{}] could not be found.", awsMessageType.getCodecName());
Expand All @@ -57,6 +66,11 @@ public Message decode(@Nonnull RawMessage rawMessage) {
return message;
}

@Override
public String getName() {
return NAME;
}

@FactoryClass
public interface Factory extends Codec.Factory<AWSCodec> {
@Override
Expand All @@ -70,7 +84,18 @@ public interface Factory extends Codec.Factory<AWSCodec> {
public static class Config extends AbstractCodec.Config {
@Override
public ConfigurationRequest getRequestedConfiguration() {
return new ConfigurationRequest();
ConfigurationRequest request = new ConfigurationRequest();

request.addField(new DropdownField(
CK_AWS_MESSAGE_TYPE,
"AWS Message Type",
Region.US_EAST_1.id(),
AWSMessageType.getMessageTypes().stream()
.collect(Collectors.toMap(AWSMessageType::toString, AWSMessageType::getLabel)),
"The AWS region the Kinesis stream is running in.",
ConfigurationField.Optional.NOT_OPTIONAL));

return request;
}

@Override
Expand Down
46 changes: 26 additions & 20 deletions src/main/java/org/graylog/integrations/aws/inputs/AWSInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
import org.graylog.integrations.aws.codecs.AWSCodec;
import org.graylog.integrations.aws.service.AWSService;
import org.graylog.integrations.aws.transports.AWSTransport;
import org.graylog.integrations.aws.transports.KinesisTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.InputBuffer;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

import javax.inject.Inject;
Expand All @@ -47,13 +47,6 @@ public class AWSInput extends MessageInput {
public static final String NAME = "AWS";
public static final String TYPE = "org.graylog.integrations.aws.inputs.AWSInput";

private static final Logger LOG = LoggerFactory.getLogger(AWSInput.class);

/**
* Specifies one of the {@code AWSInputType} choices, which indicates which codec and transport
* should be used.
*/
public static final String CK_AWS_MESSAGE_TYPE = "aws_message_type";
public static final String CK_TITLE = "title";
public static final String CK_DESCRIPTION = "description";
public static final String CK_GLOBAL = "global";
Expand Down Expand Up @@ -122,39 +115,52 @@ public ConfigurationRequest combinedRequestedConfiguration() {
ConfigurationRequest request = super.combinedRequestedConfiguration();

// These config values will be shared amongst many AWS codecs and transports.

request.addField(new DropdownField(
CK_AWS_REGION,
"AWS Region",
Region.US_EAST_1.id(),
AWSService.buildRegionChoices(),
"The AWS region the Kinesis stream is running in.",
ConfigurationField.Optional.NOT_OPTIONAL
));
ConfigurationField.Optional.NOT_OPTIONAL));

request.addField(new TextField(
CK_ACCESS_KEY,
"AWS access key",
"",
"Access key of an AWS user with sufficient permissions. (See documentation)",
ConfigurationField.Optional.OPTIONAL
));
ConfigurationField.Optional.OPTIONAL));

request.addField(new TextField(
CK_SECRET_KEY,
"AWS secret key",
"",
"Secret key of an AWS user with sufficient permissions. (See documentation)",
ConfigurationField.Optional.OPTIONAL,
TextField.Attribute.IS_PASSWORD
));
TextField.Attribute.IS_PASSWORD));

request.addField(new NumberField(
KinesisTransport.CK_KINESIS_MAX_THROTTLED_WAIT_MS,
"Throttled wait milliseconds",
KinesisTransport.DEFAULT_THROTTLED_WAIT_MS,
"The maximum time that the Kinesis input will pause for when in a throttled state. If this time is exceeded, then the Kinesis consumer will shut down until the throttled state is cleared. Recommended default: 60,000 ms",
ConfigurationField.Optional.OPTIONAL,
NumberField.Attribute.ONLY_POSITIVE));

request.addField(new TextField(
CK_ASSUME_ROLE_ARN,
"AWS assume role ARN",
KinesisTransport.CK_KINESIS_STREAM_NAME,
"Kinesis Stream name",
"",
"Role ARN with required permissions (cross account access)",
ConfigurationField.Optional.OPTIONAL
));
"The name of the Kinesis stream that receives your messages. See README for instructions on how to connect messages to a Kinesis Stream.",
ConfigurationField.Optional.NOT_OPTIONAL));

request.addField(new NumberField(
KinesisTransport.CK_KINESIS_RECORD_BATCH_SIZE,
"Kinesis Record batch size.",
KinesisTransport.DEFAULT_BATCH_SIZE,
"The number of Kinesis records to fetch at a time. Each record may be up to 1MB in size. The AWS default is 10,000. Enter a smaller value to process smaller chunks at a time.",
ConfigurationField.Optional.OPTIONAL,
NumberField.Attribute.ONLY_POSITIVE));

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.AWSPolicy;
import org.graylog.integrations.aws.AWSPolicyStatement;
import org.graylog.integrations.aws.codecs.AWSCodec;
import org.graylog.integrations.aws.inputs.AWSInput;
import org.graylog.integrations.aws.resources.requests.AWSInputCreateRequest;
import org.graylog.integrations.aws.resources.responses.AWSRegion;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class AWSService {

/**
* The only version supported is 2012-10-17
*
* @see <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html">IAM JSON Policy Elements: Version</a>
*/
private static final String AWS_POLICY_VERSION = "2012-10-17";
Expand Down Expand Up @@ -202,7 +204,7 @@ public Input saveInput(AWSInputCreateRequest request, User user) throws Exceptio

// Transpose the SaveAWSInputRequest to the needed InputCreateRequest
final HashMap<String, Object> configuration = new HashMap<>();
configuration.put(AWSInput.CK_AWS_MESSAGE_TYPE, request.awsMessageType());
configuration.put(AWSCodec.CK_AWS_MESSAGE_TYPE, request.awsMessageType());
configuration.put(AWSInput.CK_TITLE, request.name()); // TODO: Should name and title be the same?
configuration.put(AWSInput.CK_DESCRIPTION, request.description());
configuration.put(AWSInput.CK_GLOBAL, request.global());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,6 @@ public StreamsResponse getKinesisStreamNames(String regionName, String accessKey
return StreamsResponse.create(streamNames, streamNames.size());
}

/**
* Checks if the supplied stream is GZip compressed.
*
* @param bytes a byte array.
* @return true if the byte array is GZip compressed and false if not.
*/
public boolean isCompressed(byte[] bytes) {
if ((bytes == null) || (bytes.length < 2)) {
return false;
} else {

// If the byte array is GZipped, then the first or second byte will be the GZip magic number.
final boolean firstByteIsMagicNumber = bytes[0] == (byte) (GZIPInputStream.GZIP_MAGIC);
// The >> operator shifts the GZIP magic number to the second byte.
final boolean secondByteIsMagicNumber = bytes[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> EIGHT_BITS);
return firstByteIsMagicNumber && secondByteIsMagicNumber;
}
}

/**
* CloudWatch Kinesis subscription payloads are always compressed. Detecting a compressed payload is currently
* how the Health Check identifies that the payload has been sent from CloudWatch.
Expand Down Expand Up @@ -387,6 +368,25 @@ Record selectRandomRecord(List<Record> recordsList) {
return recordsList.get(new Random().nextInt(recordsList.size()));
}

/**
* Checks if the supplied stream is GZip compressed.
*
* @param bytes a byte array.
* @return true if the byte array is GZip compressed and false if not.
*/
public static boolean isCompressed(byte[] bytes) {
if ((bytes == null) || (bytes.length < 2)) {
return false;
} else {

// If the byte array is GZipped, then the first or second byte will be the GZip magic number.
final boolean firstByteIsMagicNumber = bytes[0] == (byte) (GZIPInputStream.GZIP_MAGIC);
// The >> operator shifts the GZIP magic number to the second byte.
final boolean secondByteIsMagicNumber = bytes[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> EIGHT_BITS);
return firstByteIsMagicNumber && secondByteIsMagicNumber;
}
}

/**
* Creates a new Kinesis stream.
*
Expand Down
Loading

0 comments on commit edb80a9

Please sign in to comment.