diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 0f989af379838..44e35e52b23cd 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -46,6 +47,10 @@ const ( maximumLogEventsPerPut = 10000 // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html + // Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the + // Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid + // splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that + // this replacement happens. maximumBytesPerEvent = 262144 - perEventBytes resourceAlreadyExistsCode = "ResourceAlreadyExistsException" @@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) { } line := msg.Line if l.multilinePattern != nil { - if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent { + lineEffectiveLen := effectiveLen(string(line)) + if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent { // This is a new log event or we will exceed max bytes per event // so flush the current eventBuffer to events and reset timestamp l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) eventBuffer = eventBuffer[:0] } - // Append new line if event is less than max event size - if len(line) < maximumBytesPerEvent { + // Append newline if event is less than max event size + if lineEffectiveLen < maximumBytesPerEvent { line = append(line, "\n"...) } eventBuffer = append(eventBuffer, line...) @@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) { // batch (defined in maximumBytesPerPut). Log messages are split by the maximum // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event // byte overhead (defined in perEventBytes) which is accounted for in split- and -// batch-calculations. -func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) { - for len(events) > 0 { +// batch-calculations. Because the events are interpreted as UTF-8 encoded +// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode +// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To +// compensate for that and to avoid splitting valid UTF-8 characters into +// invalid byte sequences, we calculate the length of each event assuming that +// this replacement happens. +func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) { + for len(bytes) > 0 { // Split line length so it does not exceed the maximum - lineBytes := len(events) - if lineBytes > maximumBytesPerEvent { - lineBytes = maximumBytesPerEvent - } - line := events[:lineBytes] - + splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent) + line := bytes[:splitOffset] event := wrappedEvent{ inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(string(line)), @@ -544,7 +551,7 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int added := batch.add(event, lineBytes) if added { - events = events[lineBytes:] + bytes = bytes[splitOffset:] } else { l.publishBatch(batch) batch.reset() @@ -552,6 +559,37 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int } } +// effectiveLen counts the effective number of bytes in the string, after +// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do +// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode +// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as +// utf8.RuneError) +func effectiveLen(line string) int { + effectiveBytes := 0 + for _, rune := range line { + effectiveBytes += utf8.RuneLen(rune) + } + return effectiveBytes +} + +// findValidSplit finds the byte offset to split a string without breaking valid +// Unicode codepoints given a maximum number of total bytes. findValidSplit +// returns the byte offset for splitting a string or []byte, as well as the +// effective number of bytes if the string were normalized to replace invalid +// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8 +// sequence, represented in Go as utf8.RuneError) +func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) { + for offset, rune := range line { + splitOffset = offset + if effectiveBytes+utf8.RuneLen(rune) > maxBytes { + return splitOffset, effectiveBytes + } + effectiveBytes += utf8.RuneLen(rune) + } + splitOffset = len(line) + return +} + // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 172be51a307f5..fdae99c76d3af 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) { } } +func TestEffectiveLen(t *testing.T) { + tests := []struct { + str string + effectiveBytes int + }{ + {"Hello", 5}, + {string([]byte{1, 2, 3, 4}), 4}, + {"🙃", 4}, + {string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12}, + {"He\xff\xffo", 9}, + {"", 0}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) { + assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str)) + }) + } +} + +func TestFindValidSplit(t *testing.T) { + tests := []struct { + str string + maxEffectiveBytes int + splitOffset int + effectiveBytes int + }{ + {"", 10, 0, 0}, + {"Hello", 6, 5, 5}, + {"Hello", 2, 2, 2}, + {"Hello", 0, 0, 0}, + {"🙃", 3, 0, 0}, + {"🙃", 4, 4, 4}, + {string([]byte{'a', 0xFF}), 2, 1, 1}, + {string([]byte{'a', 0xFF}), 4, 2, 4}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) { + splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes) + assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset") + assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes") + t.Log(tc.str[:tc.splitOffset]) + t.Log(tc.str[tc.splitOffset:]) + }) + } +} + +func TestProcessEventEmoji(t *testing.T) { + stream := &logStream{} + batch := &eventBatch{} + bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1)) + stream.processEvent(batch, bytes, 0) + assert.Equal(t, 2, len(batch.batch), "should be two events in the batch") + assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message)) + assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message)) +} + func TestCollectBatchLineSplit(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) { } } +func TestCollectBatchLineSplitWithBinary(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + d := make(chan bool) + close(d) + go stream.collectBatch(d) + + longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError + stream.Log(&logger.Message{ + Line: []byte(longline + "\xFD"), + Timestamp: time.Time{}, + }) + + // no ticks + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 2 { + t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != longline { + t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message) + } + if *argument.LogEvents[1].Message != "\xFD" { + t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message) + } +} + func TestCollectBatchMaxEvents(t *testing.T) { mockClient := newMockClientBuffered(1) stream := &logStream{ @@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { } } +func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { + expectedPuts := 2 + mockClient := newMockClientBuffered(expectedPuts) + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + for i := 0; i < expectedPuts; i++ { + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + } + + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + d := make(chan bool) + close(d) + go stream.collectBatch(d) + + // maxline is the maximum line that could be submitted after + // accounting for its overhead. + maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError + // This will be split and batched up to the `maximumBytesPerPut' + // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but + // should also tolerate an offset within that range. + stream.Log(&logger.Message{ + Line: []byte(maxline), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("B"), + Timestamp: time.Time{}, + }) + + // no ticks, guarantee batch by size (and chan close) + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + + // Should total to the maximum allowed bytes. + eventBytes := 0 + for _, event := range argument.LogEvents { + eventBytes += effectiveLen(*event.Message) + } + eventsOverhead := len(argument.LogEvents) * perEventBytes + payloadTotal := eventBytes + eventsOverhead + // lowestMaxBatch allows the payload to be offset if the messages + // don't lend themselves to align with the maximum event size. + lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent + + if payloadTotal > maximumBytesPerPut { + t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal) + } + if payloadTotal < lowestMaxBatch { + t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) + } + + argument = <-mockClient.putLogEventsArgument + message := *argument.LogEvents[len(argument.LogEvents)-1].Message + if message[len(message)-1:] != "B" { + t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) + } +} + func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { mockClient := newMockClient() stream := &logStream{