Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Data Class for Cloudwatch Logs passed through Kinesis Stream #1488

Closed
2 tasks done
blewinter opened this issue Aug 31, 2022 · 21 comments · Fixed by #1710
Closed
2 tasks done

Feature request: Data Class for Cloudwatch Logs passed through Kinesis Stream #1488

blewinter opened this issue Aug 31, 2022 · 21 comments · Fixed by #1710
Assignees
Labels
event_sources Event Source Data Class utility feature-request feature request

Comments

@blewinter
Copy link

Use case

We are currently passing Cloudwatch logs to a Kinesis stream to be subsequently processed by a Lambda. We'd like to use the cloud_watch_logs_event data class but it only works when the source is Cloudwatch directly, not Kinesis. It seems when Cloudwatch targets Lambda, it wraps the payload in the following superstructure:
'awslogs': {'data': {<PAYLOAD>}}

You can see that the existing CloudWatchLogsEvent class' raw_logs_data property expects this structure to unpack the data.

This superstructure doesn't show up when the logs are passed through kinesis

Solution/User Experience

I think a new method/property within the CloudWatchLogsEvent class, perhaps named parse_logs_kinesis() or something, which can be used to unpack this type of payload, could work.

However that may be less than ideal since accidentally using the existing raw_logs_data property would then cause issues, so it's possible that an entirely new class (like CloudWatchLogsKinesisEvent, ex) would be preferable.

A third option would be to rewrite the existing raw_logs_data property, such that it checks for the presence of the Cloudwatch->Lambda superstructure:
return self['awslogs']['data'] if self.get('awslogs') else self[0]
(safe retrieval from nested dicts can get a bit klunky)

Note: Originally posted in discussions forum here.

Alternative solutions

No response

Acknowledgment

@blewinter blewinter added feature-request feature request triage Pending triage from maintainers labels Aug 31, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 31, 2022

Thanks for opening your first issue here! We'll come back to you as soon as we can.

@blewinter blewinter changed the title DataFeature request: Data Class for Cloudwatch Logs passed through Kinesis Stream Feature request: Data Class for Cloudwatch Logs passed through Kinesis Stream Aug 31, 2022
@leandrodamascena
Copy link
Contributor

Hey @blewinter thanks for following up with a feature request on this - would you have the bandwidth to contribute it? We can offer guidance sync or async on our Discord channel.

If you don't, we can ping you to review a PR once we clear other activities.

🚀

@blewinter
Copy link
Author

blewinter commented Sep 7, 2022

Hi! Apologies, I wasn't receiving notifications on this thread. I'd love to contribute! Will just need a bit of time to get set up. Do you have thoughts on what the best approach might be, or should I post on Discord?

@leandrodamascena leandrodamascena added area/event_sources and removed triage Pending triage from maintainers labels Sep 7, 2022
@leandrodamascena
Copy link
Contributor

However that may be less than ideal since accidentally using the existing raw_logs_data property would then cause issues, so it's possible that an entirely new class (like CloudWatchLogsKinesisEvent, ex) would be preferable.

Hi @blewinter! We think this is the best and safest way to implement this. A new class to handle events coming from Kinesis is a great idea.

We look forward to your contribution. If you have any questions or want help testing something, please let me know and I'll be available for that. 🤖

@leandrodamascena
Copy link
Contributor

Hi @blewinter! I hope things are going well there.

We are planning to include this change in the next release, which I think will be in 2 or 3 weeks. But it's okay to move this change to another release if you need more time to implement this or we can even implement this and ping it for you to review before the merge. We'd love to hear the best option for you 👂.

Thank you

@blewinter
Copy link
Author

Hey, I had to put it aside for a bit but I might be able to have it by the end of next week! I will keep you posted

@blewinter
Copy link
Author

Hi,

So I had played around with the idea of a separate class for this feature, but I think a simple conversion function might be preferable? Of course can wrap in a class. Lots of ways to implement this. What do you think of this approach?

https://github.com/blewinter/aws-lambda-powertools-python/blob/feat/cloudwatch-kinesis-log/aws_lambda_powertools/utilities/data_classes/cloud_watch_kinesis_log.py

@leandrodamascena
Copy link
Contributor

Hi @blewinter! Tomorrow I'll check it out. But at first glance it looks good, I just check more details if nothing is missing.

Thank you

@leandrodamascena
Copy link
Contributor

leandrodamascena commented Oct 6, 2022

So I had played around with the idea of a separate class for this feature, but I think a simple conversion function might be preferable? Of course can wrap in a class. Lots of ways to implement this. What do you think of this approach?

blewinter/aws-lambda-powertools-python@feat/cloudwatch-kinesis-log/aws_lambda_powertools/utilities/data_classes/cloud_watch_kinesis_log.py

Hi @blewinter, I really like the idea of reusing existing code and just wrapping the payload, it's very clever. I don't think we need to use KinesisStreamRecordPayload to. Look at this code that I created and please let me know what do you think about.

cloud_watch_kinesis_log.py

import base64
import json
import zlib
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsEvent, CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class CloudWatchKinesisLogEvents(DictWrapper):

    def __init__(self, data):
        payload = json.dumps(data, indent=2).encode('utf-8')
        payload = base64.b64encode(zlib.compress(payload))
        self.wrapped = {"awslogs": {"data": payload}}

    @property
    def raw_logs_data(self) -> str:
        """The value of the `data` field is a Base64 encoded ZIP archive."""
        return self.wrapped

    def parse_logs_data(self) -> CloudWatchLogsDecodedData:
        return CloudWatchLogsEvent(self.wrapped).parse_logs_data()

lambda_handler.py

from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchKinesisLogEvents
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData

@event_source(data_class=CloudWatchKinesisLogEvents)
def lambda_handler(event: CloudWatchKinesisLogEvents, context):
    
    print(event.raw_logs_data)
    
    decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data()
    print(decompressed_log.log_events)
    
    log_events = decompressed_log.log_events
    for xyz in log_events:
        print(xyz.timestamp,xyz.message,xyz.extracted_fields,xyz.raw_event)

output
image

@heitorlessa or @rubenfonseca can you take a look here and give another opinion if it's worth it? Please 🙏

@blewinter
Copy link
Author

Yeah I considered the tradeoff of adding the KinesisEvent dependency. In this case I thought the clarity provided by specifying it as the input class was worth it, but totally see the other side of it. This looks great! Thank you!

@leandrodamascena
Copy link
Contributor

Yeah I considered the tradeoff of adding the KinesisEvent dependency. In this case I thought the clarity provided by specifying it as the input class was worth it, but totally see the other side of it. This looks great! Thank you!

That's amazing to hear. Do you have the bandwidth to send this PR using this approach? If you have any kind of questions, you can ping me on Discord or even check out this very similar PR #1485.

Thank you!

@blewinter
Copy link
Author

I don't have bandwidth for a full pr, but would essentially be like this

from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsEvent, CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecordPayload
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class CloudWatchKinesisLogEvents(DictWrapper):

    def __init__(self, kinesis_payload: KinesisStreamRecordPayload):
        payload = kinesis_payload.data_as_bytes()
        self.wrapped = {"awslogs": {"data": payload}}

    @property
    def raw_logs_data(self) -> str:
        """The value of the `data` field is a Base64 encoded ZIP archive."""
        return self.wrapped

    def parse_logs_data(self) -> CloudWatchLogsDecodedData:
        return CloudWatchLogsEvent(self.wrapped).parse_logs_data()

So the usage would be essentially identical, except a KinesisStreamRecordPayload is passed when initalized. I think that explicitness is useful in this case, but it does also add a dependency, so I think it's just a preference thing which approach is better. (Btw I'd think in real-world usage, someone would write a handler for a KinesisStreamEvent, unpack the records, and pass each payload to create this object)

@leandrodamascena
Copy link
Contributor

Hi @blewinter! Let me open a PR for this feature and if you have bandwidth you can do a review with me.

Thank you very much for all the interaction so far and we'll be happy to mention you in the credits for this new feature. 🥇

@heitorlessa
Copy link
Contributor

@leandrodamascena when you can, could you share what's the latest so I can take over for this iteration cycle?

@heitorlessa heitorlessa added event_sources Event Source Data Class utility and removed area/event_sources labels Nov 9, 2022
@heitorlessa
Copy link
Contributor

apologies for the delay @blewinter - Catching up on all messages and the original discussion too. @leandrodamascena is away until mid January so I'm taking over.

@heitorlessa
Copy link
Contributor

heitorlessa commented Nov 10, 2022

alright, I'm caught up. It's missing a sample event in this discussion, so I'll create the infra, create tests, and ping here when a PR is available.


Payload

Lambda -> CloudWatch Logs -> Kinesis Data Stream -> Lambda

payload = {
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "da10bf66b1f54bff5d96eae99149ad1f",
                "sequenceNumber": "49635052289529725553291405521504870233219489715332317186",
                "data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA",
                "approximateArrivalTimestamp": 1668093033.744,
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
            "awsRegion": "eu-west-1",
            "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG",
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081",
                "sequenceNumber": "49635052289529725553291405520881064510298312199003701250",
                "data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA",
                "approximateArrivalTimestamp": 1668092612.992,
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
            "awsRegion": "eu-west-1",
            "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG",
        },
    ]
}

Infra

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
  pt-1488

  Sample SAM Template for pt-1488

Globals:
  Function:
    Timeout: 3
    Tracing: Active

Resources:
  DummyLogDataFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Environment:
        Variables:
          POWERTOOLS_SERVICE_NAME: log-generator
      Events:
        HelloWorld:
          Type: Api
          Properties:
            Path: /hello
            Method: get

  CloudWatchKinesisLogsFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: kinesis/
      Handler: app.lambda_handler
      Runtime: python3.9
      Layers:
        - !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV2:13
      Architectures:
        - x86_64
      Environment:
        Variables:
          POWERTOOLS_SERVICE_NAME: kinesis-log-consumer
      Events:
        Logs:
          Type: Kinesis
          Properties:
            Stream: !GetAtt KinesisStreamCloudWatchLogs.Arn
            BatchSize: 5
            StartingPosition: LATEST
            MaximumRetryAttempts: 2
            FunctionResponseTypes:
              - ReportBatchItemFailures

  KinesisStreamCloudWatchLogs:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1

  # CloudWatch Logs -> Kinesis integration
  CloudWatchLogsToKinesisSubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      DestinationArn: !GetAtt KinesisStreamCloudWatchLogs.Arn
      FilterPattern: " " # match everything
      LogGroupName: !Sub "/aws/lambda/${DummyLogDataFunction}"
      RoleArn: !GetAtt CloudwatchSubscriptionFiltersRole.Arn

  CloudwatchSubscriptionFiltersRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: !Sub logs.${AWS::Region}.amazonaws.com
            Action: "sts:AssumeRole"
      Policies:
        - PolicyName: cloudwatch_log_groups
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:PutRecord
                  - kinesis:PutRecords
                Resource: !GetAtt KinesisStreamCloudWatchLogs.Arn

  # Log retention

  CloudWatchKinesisLogsFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${CloudWatchKinesisLogsFunction}"
      RetentionInDays: 1

  DummyLogDataFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${DummyLogDataFunction}"
      RetentionInDays: 1

Outputs:
  HelloWorldApi:
    Description: "API Gateway endpoint URL for Prod stage for Hello World function"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/hello/"
  DummyLogDataFunction:
    Description: "Hello World Lambda Function ARN"
    Value: !GetAtt DummyLogDataFunction.Arn
  DummyLogDataFunctionIamRole:
    Description: "Implicit IAM Role created for Hello World function"
    Value: !GetAtt DummyLogDataFunctionRole.Arn

  KinesisStreamArn:
    Description: Kinesis Data Stream to ingest CloudWatch Logs
    Value: !GetAtt KinesisStreamCloudWatchLogs.Arn

Lambda

Dummy Log generator

import json


def lambda_handler(event, context):
    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
        }),
    }

Kinesis shard consumer

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
    BatchProcessor,
    EventType,
    batch_processor,
)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
    logger.info(record.raw_event)


@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

@heitorlessa
Copy link
Contributor

heitorlessa commented Nov 10, 2022

@blewinter (and @leandrodamascena for when you're back) - I've tested both classes and a standalone function, and I settled for the latter to prevent data pollution and coupling where a class instance method returns a completely different entity.

Once I have your confirmation that this solves your use case, I'll create a PR and make sure it's included in the next release. Alternatively, I'll go ahead with this implementation mid next week to make it for the next release - I'm pretty sure you're busy, and we already thank you enough for the patience.

Here's what I've changed:

  • New method named data_zlib_compressed_as_json in KinesisStreamRecordPayload. This can also be useful for compressed JSON strings sent to Kinesis (telemetry, AdTech, etc.).
  • New standalone functions to parse, extract and transform into CloudWatchLogsDecodedData
    • extract_cloudwatch_logs_from_record(record) -> CloudWatchLogsDecodedData
    • extract_cloudwatch_logs_from_event(event) -> List[CloudWatchLogsDecodedData]

I've created two samples to best demonstrate this.

Processing batch of logs

This example uses Batch processing utility along with extract_cloudwatch_logs_from_record to parse a given log.

It provides a clear separation to handle each log at a time, partial failure support built-in, allowing you to trace, log correlation data, and potentially make idempotent too depending on how critical the operation is.

from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
                                                   batch_processor)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamRecord, extract_cloudwatch_logs_from_record)

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)


def record_handler(record: KinesisStreamRecord):
    log = extract_cloudwatch_logs_from_record(record)
    return log.message_type == "DATA_MESSAGE"


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.response()

Simplest log processing

This example uses Event Source Data Class event_source decorator along with extract_cloudwatch_logs_from_event.

I'm less inclined on this as it puts the onus on you to loop and handle partial failures, since you're dealing with a stream of records - adding for the sake of completion.

If you also don't agree, we can keep extract_cloudwatch_logs_from_record function only to keep the code base tidy.

from typing import List

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import \
    CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
    KinesisStreamEvent, extract_cloudwatch_logs_from_event)


@event_source(data_class=KinesisStreamEvent)
def simple_handler(event: KinesisStreamEvent, context):
    logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
    for log in logs:
        if log.message_type == "DATA_MESSAGE":
            return "success"
    return "success"

@heitorlessa
Copy link
Contributor

Implementation, tests, and documentation ready: #1710

I'll merge this on Wed to ensure it's available in the next release in case I don't hear back from OP

@heitorlessa heitorlessa linked a pull request Nov 14, 2022 that will close this issue
7 tasks
@blewinter
Copy link
Author

Hey, this looks great, thank you! Looking forward to using it!

@github-actions
Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

@github-actions github-actions bot added the pending-release Fix or implementation already in dev waiting to be released label Nov 16, 2022
@github-actions
Copy link
Contributor

This is now released under 2.3.0 version!

@github-actions github-actions bot removed the pending-release Fix or implementation already in dev waiting to be released label Nov 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
event_sources Event Source Data Class utility feature-request feature request
Projects
None yet
4 participants