From 1e8ef386279e2e28aff199047e798fad660efbdd Mon Sep 17 00:00:00 2001 From: Samuel Karp Date: Fri, 5 Oct 2018 16:30:41 -0700 Subject: [PATCH] awslogs: account for UTF-8 normalization in limits The CloudWatch Logs API defines its limits in terms of bytes, but its inputs in terms of UTF-8 encoded strings. Byte-sequences which are not valid UTF-8 encodings are normalized to the Unicode replacement character U+FFFD, which is a 3-byte sequence in UTF-8. This replacement can cause the input to grow, exceeding the API limit and causing failed API calls. This commit adds logic for counting the effective byte length after normalization and splitting input without splitting valid UTF-8 byte-sequences into two invalid byte-sequences. Fixes https://github.com/moby/moby/issues/37747 Signed-off-by: Samuel Karp --- daemon/logger/awslogs/cloudwatchlogs.go | 64 +++++-- daemon/logger/awslogs/cloudwatchlogs_test.go | 182 +++++++++++++++++++ 2 files changed, 233 insertions(+), 13 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 0f989af379838..44e35e52b23cd 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -46,6 +47,10 @@ const ( maximumLogEventsPerPut = 10000 // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html + // Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the + // Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid + // splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that + // this replacement happens. maximumBytesPerEvent = 262144 - perEventBytes resourceAlreadyExistsCode = "ResourceAlreadyExistsException" @@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) { } line := msg.Line if l.multilinePattern != nil { - if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent { + lineEffectiveLen := effectiveLen(string(line)) + if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent { // This is a new log event or we will exceed max bytes per event // so flush the current eventBuffer to events and reset timestamp l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) eventBuffer = eventBuffer[:0] } - // Append new line if event is less than max event size - if len(line) < maximumBytesPerEvent { + // Append newline if event is less than max event size + if lineEffectiveLen < maximumBytesPerEvent { line = append(line, "\n"...) } eventBuffer = append(eventBuffer, line...) @@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) { // batch (defined in maximumBytesPerPut). Log messages are split by the maximum // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event // byte overhead (defined in perEventBytes) which is accounted for in split- and -// batch-calculations. -func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) { - for len(events) > 0 { +// batch-calculations. Because the events are interpreted as UTF-8 encoded +// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode +// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To +// compensate for that and to avoid splitting valid UTF-8 characters into +// invalid byte sequences, we calculate the length of each event assuming that +// this replacement happens. +func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) { + for len(bytes) > 0 { // Split line length so it does not exceed the maximum - lineBytes := len(events) - if lineBytes > maximumBytesPerEvent { - lineBytes = maximumBytesPerEvent - } - line := events[:lineBytes] - + splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent) + line := bytes[:splitOffset] event := wrappedEvent{ inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(string(line)), @@ -544,7 +551,7 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int added := batch.add(event, lineBytes) if added { - events = events[lineBytes:] + bytes = bytes[splitOffset:] } else { l.publishBatch(batch) batch.reset() @@ -552,6 +559,37 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int } } +// effectiveLen counts the effective number of bytes in the string, after +// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do +// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode +// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as +// utf8.RuneError) +func effectiveLen(line string) int { + effectiveBytes := 0 + for _, rune := range line { + effectiveBytes += utf8.RuneLen(rune) + } + return effectiveBytes +} + +// findValidSplit finds the byte offset to split a string without breaking valid +// Unicode codepoints given a maximum number of total bytes. findValidSplit +// returns the byte offset for splitting a string or []byte, as well as the +// effective number of bytes if the string were normalized to replace invalid +// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8 +// sequence, represented in Go as utf8.RuneError) +func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) { + for offset, rune := range line { + splitOffset = offset + if effectiveBytes+utf8.RuneLen(rune) > maxBytes { + return splitOffset, effectiveBytes + } + effectiveBytes += utf8.RuneLen(rune) + } + splitOffset = len(line) + return +} + // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 172be51a307f5..fdae99c76d3af 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) { } } +func TestEffectiveLen(t *testing.T) { + tests := []struct { + str string + effectiveBytes int + }{ + {"Hello", 5}, + {string([]byte{1, 2, 3, 4}), 4}, + {"🙃", 4}, + {string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12}, + {"He\xff\xffo", 9}, + {"", 0}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) { + assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str)) + }) + } +} + +func TestFindValidSplit(t *testing.T) { + tests := []struct { + str string + maxEffectiveBytes int + splitOffset int + effectiveBytes int + }{ + {"", 10, 0, 0}, + {"Hello", 6, 5, 5}, + {"Hello", 2, 2, 2}, + {"Hello", 0, 0, 0}, + {"🙃", 3, 0, 0}, + {"🙃", 4, 4, 4}, + {string([]byte{'a', 0xFF}), 2, 1, 1}, + {string([]byte{'a', 0xFF}), 4, 2, 4}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) { + splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes) + assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset") + assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes") + t.Log(tc.str[:tc.splitOffset]) + t.Log(tc.str[tc.splitOffset:]) + }) + } +} + +func TestProcessEventEmoji(t *testing.T) { + stream := &logStream{} + batch := &eventBatch{} + bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1)) + stream.processEvent(batch, bytes, 0) + assert.Equal(t, 2, len(batch.batch), "should be two events in the batch") + assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message)) + assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message)) +} + func TestCollectBatchLineSplit(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) { } } +func TestCollectBatchLineSplitWithBinary(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + d := make(chan bool) + close(d) + go stream.collectBatch(d) + + longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError + stream.Log(&logger.Message{ + Line: []byte(longline + "\xFD"), + Timestamp: time.Time{}, + }) + + // no ticks + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 2 { + t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != longline { + t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message) + } + if *argument.LogEvents[1].Message != "\xFD" { + t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message) + } +} + func TestCollectBatchMaxEvents(t *testing.T) { mockClient := newMockClientBuffered(1) stream := &logStream{ @@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { } } +func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { + expectedPuts := 2 + mockClient := newMockClientBuffered(expectedPuts) + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + for i := 0; i < expectedPuts; i++ { + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + } + + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + d := make(chan bool) + close(d) + go stream.collectBatch(d) + + // maxline is the maximum line that could be submitted after + // accounting for its overhead. + maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError + // This will be split and batched up to the `maximumBytesPerPut' + // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but + // should also tolerate an offset within that range. + stream.Log(&logger.Message{ + Line: []byte(maxline), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("B"), + Timestamp: time.Time{}, + }) + + // no ticks, guarantee batch by size (and chan close) + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + + // Should total to the maximum allowed bytes. + eventBytes := 0 + for _, event := range argument.LogEvents { + eventBytes += effectiveLen(*event.Message) + } + eventsOverhead := len(argument.LogEvents) * perEventBytes + payloadTotal := eventBytes + eventsOverhead + // lowestMaxBatch allows the payload to be offset if the messages + // don't lend themselves to align with the maximum event size. + lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent + + if payloadTotal > maximumBytesPerPut { + t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal) + } + if payloadTotal < lowestMaxBatch { + t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) + } + + argument = <-mockClient.putLogEventsArgument + message := *argument.LogEvents[len(argument.LogEvents)-1].Message + if message[len(message)-1:] != "B" { + t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) + } +} + func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { mockClient := newMockClient() stream := &logStream{