Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

[18.09 backport] awslogs: account for UTF-8 normalization in limits #112

Merged
merged 1 commit into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
"unicode/utf8"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -46,6 +47,10 @@ const (
maximumLogEventsPerPut = 10000

// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
// Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
// Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
// splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
// this replacement happens.
maximumBytesPerEvent = 262144 - perEventBytes

resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
Expand Down Expand Up @@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) {
}
line := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
lineEffectiveLen := effectiveLen(string(line))
if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// Append new line if event is less than max event size
if len(line) < maximumBytesPerEvent {
// Append newline if event is less than max event size
if lineEffectiveLen < maximumBytesPerEvent {
line = append(line, "\n"...)
}
eventBuffer = append(eventBuffer, line...)
Expand All @@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) {
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
for len(events) > 0 {
// batch-calculations. Because the events are interpreted as UTF-8 encoded
// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
// compensate for that and to avoid splitting valid UTF-8 characters into
// invalid byte sequences, we calculate the length of each event assuming that
// this replacement happens.
func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
for len(bytes) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(events)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := events[:lineBytes]

splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
line := bytes[:splitOffset]
event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Expand All @@ -544,14 +551,45 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int

added := batch.add(event, lineBytes)
if added {
events = events[lineBytes:]
bytes = bytes[splitOffset:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
}

// effectiveLen counts the effective number of bytes in the string, after
// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
// utf8.RuneError)
func effectiveLen(line string) int {
effectiveBytes := 0
for _, rune := range line {
effectiveBytes += utf8.RuneLen(rune)
}
return effectiveBytes
}

// findValidSplit finds the byte offset to split a string without breaking valid
// Unicode codepoints given a maximum number of total bytes. findValidSplit
// returns the byte offset for splitting a string or []byte, as well as the
// effective number of bytes if the string were normalized to replace invalid
// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
// sequence, represented in Go as utf8.RuneError)
func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
for offset, rune := range line {
splitOffset = offset
if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
return splitOffset, effectiveBytes
}
effectiveBytes += utf8.RuneLen(rune)
}
splitOffset = len(line)
return
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
Expand Down
182 changes: 182 additions & 0 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) {
}
}

func TestEffectiveLen(t *testing.T) {
tests := []struct {
str string
effectiveBytes int
}{
{"Hello", 5},
{string([]byte{1, 2, 3, 4}), 4},
{"🙃", 4},
{string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12},
{"He\xff\xffo", 9},
{"", 0},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str))
})
}
}

func TestFindValidSplit(t *testing.T) {
tests := []struct {
str string
maxEffectiveBytes int
splitOffset int
effectiveBytes int
}{
{"", 10, 0, 0},
{"Hello", 6, 5, 5},
{"Hello", 2, 2, 2},
{"Hello", 0, 0, 0},
{"🙃", 3, 0, 0},
{"🙃", 4, 4, 4},
{string([]byte{'a', 0xFF}), 2, 1, 1},
{string([]byte{'a', 0xFF}), 4, 2, 4},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes)
assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset")
assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes")
t.Log(tc.str[:tc.splitOffset])
t.Log(tc.str[tc.splitOffset:])
})
}
}

func TestProcessEventEmoji(t *testing.T) {
stream := &logStream{}
batch := &eventBatch{}
bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1))
stream.processEvent(batch, bytes, 0)
assert.Equal(t, 2, len(batch.batch), "should be two events in the batch")
assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message))
assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message))
}

func TestCollectBatchLineSplit(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
Expand Down Expand Up @@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) {
}
}

func TestCollectBatchLineSplitWithBinary(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

d := make(chan bool)
close(d)
go stream.collectBatch(d)

longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError
stream.Log(&logger.Message{
Line: []byte(longline + "\xFD"),
Timestamp: time.Time{},
})

// no ticks
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
if len(argument.LogEvents) != 2 {
t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
}
if *argument.LogEvents[0].Message != longline {
t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
}
if *argument.LogEvents[1].Message != "\xFD" {
t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message)
}
}

func TestCollectBatchMaxEvents(t *testing.T) {
mockClient := newMockClientBuffered(1)
stream := &logStream{
Expand Down Expand Up @@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
}
}

func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts)
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
for i := 0; i < expectedPuts; i++ {
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
}

var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

d := make(chan bool)
close(d)
go stream.collectBatch(d)

// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream.Log(&logger.Message{
Line: []byte(maxline),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte("B"),
Timestamp: time.Time{},
})

// no ticks, guarantee batch by size (and chan close)
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}

// Should total to the maximum allowed bytes.
eventBytes := 0
for _, event := range argument.LogEvents {
eventBytes += effectiveLen(*event.Message)
}
eventsOverhead := len(argument.LogEvents) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent

if payloadTotal > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
}
if payloadTotal < lowestMaxBatch {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
}

argument = <-mockClient.putLogEventsArgument
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
}

func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
Expand Down