Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lambda-promtail; ensure messages to Kinesis are usable by refactoring parsing of KinesisEvent to match parsing of CWEvents + code cleanup #13098

Merged
3 changes: 1 addition & 2 deletions tools/lambda-promtail/lambda-promtail/json_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func NewJSONStream(recordChan chan Record) Stream {
func (s Stream) Start(r io.ReadCloser, tokenCountToTarget int) {
defer r.Close()
defer close(s.records)
var decoder *json.Decoder
decoder = json.NewDecoder(r)
decoder := json.NewDecoder(r)

// Skip the provided count of JSON tokens to get the the target array, ex: "{" "Record"
for i := 0; i < tokenCountToTarget; i++ {
Expand Down
78 changes: 57 additions & 21 deletions tools/lambda-promtail/lambda-promtail/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
"log"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -13,36 +15,34 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error {
func parseKinesisEvent(ctx context.Context, b *batch, ev *events.KinesisEvent) error {
if ev == nil {
return nil
}

for _, record := range ev.Records {
timestamp := time.Unix(record.Kinesis.ApproximateArrivalTimestamp.Unix(), 0)

labels := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
}

labels = applyLabels(labels)
var data []byte
var recordData events.CloudwatchLogsData
var err error

// Check if the data is gzipped by inspecting the 'data' field
for _, record := range ev.Records {
if isGzipped(record.Kinesis.Data) {
uncompressedData, err := ungzipData(record.Kinesis.Data)
data, err = ungzipData(record.Kinesis.Data)
if err != nil {
return err
log.Printf("Error decompressing data: %v", err)
}
b.add(ctx, entry{labels, logproto.Entry{
Line: string(uncompressedData),
Timestamp: timestamp,
}})
} else {
b.add(ctx, entry{labels, logproto.Entry{
Line: string(record.Kinesis.Data),
Timestamp: timestamp,
}})
data = record.Kinesis.Data
}

recordData, err = unmarshalData(data)
if err != nil {
log.Printf("Error unmarshalling data: %v", err)
}

labels := createLabels(record, recordData)

if err := processLogEvents(ctx, b, recordData.LogEvents, labels); err != nil {
return err
}
}

Expand Down Expand Up @@ -79,3 +79,39 @@ func ungzipData(data []byte) ([]byte, error) {

return io.ReadAll(reader)
}

func unmarshalData(data []byte) (events.CloudwatchLogsData, error) {
var recordData events.CloudwatchLogsData
err := json.Unmarshal(data, &recordData)
return recordData, err
}

func createLabels(record events.KinesisEventRecord, recordData events.CloudwatchLogsData) model.LabelSet {
labels := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
model.LabelName("__aws_cloudwatch_log_group"): model.LabelValue(recordData.LogGroup),
model.LabelName("__aws_cloudwatch_owner"): model.LabelValue(recordData.Owner),
}

if keepStream {
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(recordData.LogStream)
}
Comment on lines +93 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these not be prefixed with __aws_kinesis instead of __aws_cloudwatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cstyan depending on the pov. The values are coming from cloudwatch, not from the K stream. K is a data stream not the source so I believe you want to have a filter that's a reflection of what your actual cloudwatch looks like, not the tool you used in the middle to stream that data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, thanks for the context 👍


return applyLabels(labels)
}

func processLogEvents(ctx context.Context, b *batch, logEvents []events.CloudwatchLogsLogEvent, labels model.LabelSet) error {
for _, logEvent := range logEvents {
timestamp := time.UnixMilli(logEvent.Timestamp)

if err := b.add(ctx, entry{labels, logproto.Entry{
Line: logEvent.Message,
Timestamp: timestamp,
}}); err != nil {
return err
}
}

return nil
}
33 changes: 4 additions & 29 deletions tools/lambda-promtail/lambda-promtail/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

type MockBatch struct {
streams map[string]*logproto.Stream
size int
}

func (b *MockBatch) add(_ context.Context, e entry) error {
b.streams[e.labels.String()] = &logproto.Stream{
Labels: e.labels.String(),
}
return nil
}

func (b *MockBatch) flushBatch(_ context.Context) error {
return nil
}
func (b *MockBatch) encode() ([]byte, int, error) {
return nil, 0, nil
}
func (b *MockBatch) createPushRequest() (*logproto.PushRequest, int) {
return nil, 0
}

func ReadJSONFromFile(t *testing.T, inputFile string) []byte {
inputJSON, err := os.ReadFile(inputFile)
if err != nil {
Expand All @@ -45,6 +23,9 @@ func ReadJSONFromFile(t *testing.T, inputFile string) []byte {

func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
inputJson, err := os.ReadFile("../testdata/kinesis-event.json")
mockBatch := &batch{
streams: map[string]*logproto.Stream{},
}

if err != nil {
t.Errorf("could not open test file. details: %v", err)
Expand All @@ -56,13 +37,7 @@ func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
}

ctx := context.TODO()
b := &MockBatch{
streams: map[string]*logproto.Stream{},
}

err = parseKinesisEvent(ctx, b, &testEvent)
err = parseKinesisEvent(ctx, mockBatch, &testEvent)
require.Nil(t, err)

labels_str := "{__aws_kinesis_event_source_arn=\"arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream\", __aws_log_type=\"kinesis\"}"
require.Contains(t, b.streams, labels_str)
}
4 changes: 2 additions & 2 deletions tools/lambda-promtail/lambda-promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) {
reader.Seek(0, 0)
}

return nil, fmt.Errorf("unknown event type!")
return nil, fmt.Errorf("unknown event type")
}

func handler(ctx context.Context, ev map[string]interface{}) error {
Expand All @@ -210,7 +210,7 @@ func handler(ctx context.Context, ev map[string]interface{}) error {

event, err := checkEventType(ev)
if err != nil {
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s\n", ev))
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s", ev))
return err
}

Expand Down
11 changes: 2 additions & 9 deletions tools/lambda-promtail/lambda-promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ type batch struct {
client Client
}

type batchIf interface {
add(ctx context.Context, e entry) error
encode() ([]byte, int, error)
createPushRequest() (*logproto.PushRequest, int)
flushBatch(ctx context.Context) error
}

func newBatch(ctx context.Context, pClient Client, entries ...entry) (*batch, error) {
b := &batch{
streams: map[string]*logproto.Stream{},
Expand Down Expand Up @@ -158,7 +151,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
if status > 0 && status != 429 && status/100 != 5 {
break
}
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s\n", status, err))
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s", status, err))
backoff.Wait()

// Make sure it sends at least once before checking for retry.
Expand All @@ -168,7 +161,7 @@ func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
}

if err != nil {
level.Error(*c.log).Log("err", fmt.Errorf("Failed to send logs! %s\n", err))
level.Error(*c.log).Log("err", fmt.Errorf("failed to send logs! %s", err))
return err
}

Expand Down
2 changes: 0 additions & 2 deletions tools/lambda-promtail/lambda-promtail/promtail_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"net/http"
"net/url"
"time"

"github.com/go-kit/log"
Expand All @@ -25,7 +24,6 @@ type promtailClient struct {
type promtailClientConfig struct {
backoff *backoff.Config
http *httpClientConfig
url *url.URL
}

type httpClientConfig struct {
Expand Down
10 changes: 5 additions & 5 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.

var lineCount int
for scanner.Scan() {
log_line := scanner.Text()
logLine := scanner.Text()
lineCount++
if lineCount <= parser.skipHeaderCount {
continue
}
if printLogLine {
fmt.Println(log_line)
fmt.Println(logLine)
}

timestamp := time.Now()
match := parser.timestampRegex.FindStringSubmatch(log_line)
match := parser.timestampRegex.FindStringSubmatch(logLine)
if len(match) > 0 {
if labels["lb_type"] == LB_NLB_TYPE {
// NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC
Expand All @@ -222,7 +222,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
}

if err := b.add(ctx, entry{ls, logproto.Entry{
Line: log_line,
Line: logLine,
Timestamp: timestamp,
}}); err != nil {
return err
Expand Down Expand Up @@ -281,7 +281,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log
ExpectedBucketOwner: aws.String(labels["bucketOwner"]),
})
if err != nil {
return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
return fmt.Errorf("failed to get object %s from bucket %s on account %s, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
}
err = parseS3Log(ctx, batch, labels, obj.Body, log)
if err != nil {
Expand Down
59 changes: 24 additions & 35 deletions tools/lambda-promtail/testdata/kinesis-event.json
Original file line number Diff line number Diff line change
@@ -1,36 +1,25 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333333333333333333333333333333333333333333",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480641523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333334444444444444444444444444444444444444",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480841523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
}
]
}
"messageType": "DATA_MESSAGE",
"owner": "some_owner",
"logGroup": "test-logroup",
"logStream": "test-logstream",
"subscriptionFilters": ["test-subscription"],
"logEvents": [
{
"id": "98237509",
"timestamp": 1719922604969,
"message": "some_message"
},
{
"id": "20396236",
"timestamp": 1719922604969,
"message": "some_message"
},
{
"id": "23485670",
"timestamp": 1719922604969,
"message": "some_message"
}
]
}

Loading