Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json codec changes with specific json input codec config #5054

Merged
merged 2 commits into from
Oct 15, 2024

Conversation

sb2k16
Copy link
Member

@sb2k16 sb2k16 commented Oct 11, 2024

Description

This PR is to add support for additional configuration entries in the JsonInputCodec so that CloudWatch logs from subscription filters could be decoded in Data Prepper. CloudWatch Subscription Filter

For the input message from CloudWatch

{
    "owner": "111111111111",
    "logGroup": "CloudTrail/logs",
    "logStream": "111111111111_CloudTrail/logs_us-east-1",
    "subscriptionFilters": [
        "Destination"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "31953106606966983378809025079804211143289615424298221568",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221569",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221570",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        }
    ]
}

and with the following configuration:

      codec:
        json:
          key_name: "logEvents"
          include_keys: ['owner', 'logGroup', 'logStream' ]
          include_keys_metadata: ['owner', 'logGroup', 'logStream' ]

the output event should be split into 3 separate events with additional data and metadata as specified in the config

{
  "id": "31953106606966983378809025079804211143289615424298221568",
  "timestamp": 1432826855000,
  "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}",
  "owner": "111111111111",
  "logGroup": "CloudTrail/logs",
  "logStream": "111111111111_CloudTrail/logs_us-east-1",
}

{
  "id": "31953106606966983378809025079804211143289615424298221569",
  "timestamp": 1432826855000,
  "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}",
  "owner": "111111111111",
  "logGroup": "CloudTrail/logs",
  "logStream": "111111111111_CloudTrail/logs_us-east-1",
}

{
  "id": "31953106606966983378809025079804211143289615424298221570",
  "timestamp": 1432826855000,
  "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}",
  "owner": "111111111111",
  "logGroup": "CloudTrail/logs",
  "logStream": "111111111111_CloudTrail/logs_us-east-1",
}

Issues Resolved

Resolves #5045

Check List

  • New functionality includes testing.
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Souvik Bose <souvbose@amazon.com>
private List<String> includeKeys;
private List<String> includeKeysMetadata;

public JsonDecoder(String keyName, List<String> includeKeys, List<String> includeKeysMetadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public JsonDecoder(String keyName, List<String> includeKeys, List<String> includeKeysMetadata) {
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata) {

import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();
private String keyName;
private List<String> includeKeys;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<String> includeKeys;
private Collection<String> includeKeys;

import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();
private String keyName;
private List<String> includeKeys;
private List<String> includeKeysMetadata;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<String> includeKeysMetadata;
private Collection<String> includeKeysMetadata;

@Test
public void testJsonInputCodecConfig() {
JsonInputCodecConfig jsonInputCodecConfig = createObjectUnderTest();
assertTrue(jsonInputCodecConfig.getKeyName().equals(JsonInputCodecConfig.DEFAULT_KEY_NAME));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertTrue(jsonInputCodecConfig.getKeyName().equals(JsonInputCodecConfig.DEFAULT_KEY_NAME));
assertNulljsonInputCodecConfig.getKeyName());

Is this not the default behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. This is the default behavior. I will make the change.

@@ -215,6 +220,38 @@ void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects)
}
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 100})
void parse_with_InputStream_calls_Consumer_with_EventConfig(final int numberOfObjects) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally, each of these conditions is tested independently of each other. I see three things: 1. Using a key_name, 2. Keys as data, 3. Keys as metadata.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I have added tests to cover the scenarios for 3 configs separately.

@Nested
class JsonDecoderWithInputConfig {
private ObjectMapper objectMapper;
final List<String> include_keys = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to includeKeys.

Also, set this list in the @BeforeEach. Otherwise, tests may interfere with each other.

receivedTime = ((Event)record.getData()).getEventHandle().getInternalOriginationTime();
});

records.forEach(record -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert the size of records first or this will miss an empty list.

Map<String, Object> dataMap = record.getData().toMap();
Map<String, Object> metadataMap = record.getData().getMetadata().getAttributes();

for (String include_key: include_keys) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert the size of include_keys first or you may miss an empty list. Also, rename to includeKeys.

private void parseRecordsArray(final JsonParser jsonParser,
final Instant timeReceived,
final Consumer<Record<Event>> eventConsumer,
Map<String, Object> includeKeysMap,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<String, Object> includeKeysMap,
final Map<String, Object> includeKeysMap,

final Instant timeReceived,
final Consumer<Record<Event>> eventConsumer,
Map<String, Object> includeKeysMap,
Map<String, Object> includeMetadataKeysMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<String, Object> includeMetadataKeysMap
final Map<String, Object> includeMetadataKeysMap

Signed-off-by: Souvik Bose <souvbose@amazon.com>
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, timeReceived, eventConsumer);
if (keyName != null && !nodeName.equals(keyName)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only allows for first level of fields? Should we make it generic to allow nested fields, like

{
   "key1" : "value1",
   "key2" : {
             "logEvents" : [
             ]
    }
}

@kkondaka kkondaka merged commit 89bc937 into opensearch-project:main Oct 15, 2024
46 of 47 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Oct 15, 2024
* Json codec changes with specific json input codec config

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Modify the tests and address comments

Signed-off-by: Souvik Bose <souvbose@amazon.com>

---------

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>
(cherry picked from commit 89bc937)
dlvenable pushed a commit that referenced this pull request Oct 15, 2024
* Json codec changes with specific json input codec config

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Modify the tests and address comments

Signed-off-by: Souvik Bose <souvbose@amazon.com>

---------

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>
(cherry picked from commit 89bc937)

Co-authored-by: Souvik Bose <souvik04in@gmail.com>
san81 pushed a commit to san81/data-prepper that referenced this pull request Oct 17, 2024
…project#5054)

* Json codec changes with specific json input codec config

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Modify the tests and address comments

Signed-off-by: Souvik Bose <souvbose@amazon.com>

---------

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>
@oeyh
Copy link
Collaborator

oeyh commented Oct 28, 2024

@sb2k16 Can we update the documentation website with this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for reading CloudWatch Logs JSON using json codec
5 participants