Skip to content

Commit

Permalink
awslogs: account for UTF-8 normalization in limits
Browse files Browse the repository at this point in the history
The CloudWatch Logs API defines its limits in terms of bytes, but its
inputs in terms of UTF-8 encoded strings.  Byte-sequences which are not
valid UTF-8 encodings are normalized to the Unicode replacement
character U+FFFD, which is a 3-byte sequence in UTF-8.  This replacement
can cause the input to grow, exceeding the API limit and causing failed
API calls.

This commit adds logic for counting the effective byte length after
normalization and splitting input without splitting valid UTF-8
byte-sequences into two invalid byte-sequences.

Fixes moby#37747

Signed-off-by: Samuel Karp <skarp@amazon.com>
  • Loading branch information
samuelkarp committed Oct 10, 2018
1 parent d6a7c22 commit 1e8ef38
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 13 deletions.
64 changes: 51 additions & 13 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
"unicode/utf8"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -46,6 +47,10 @@ const (
maximumLogEventsPerPut = 10000

// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
// Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
// Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
// splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
// this replacement happens.
maximumBytesPerEvent = 262144 - perEventBytes

resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
Expand Down Expand Up @@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) {
}
line := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
lineEffectiveLen := effectiveLen(string(line))
if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// Append new line if event is less than max event size
if len(line) < maximumBytesPerEvent {
// Append newline if event is less than max event size
if lineEffectiveLen < maximumBytesPerEvent {
line = append(line, "\n"...)
}
eventBuffer = append(eventBuffer, line...)
Expand All @@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) {
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
for len(events) > 0 {
// batch-calculations. Because the events are interpreted as UTF-8 encoded
// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
// compensate for that and to avoid splitting valid UTF-8 characters into
// invalid byte sequences, we calculate the length of each event assuming that
// this replacement happens.
func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
for len(bytes) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(events)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := events[:lineBytes]

splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
line := bytes[:splitOffset]
event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Expand All @@ -544,14 +551,45 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int

added := batch.add(event, lineBytes)
if added {
events = events[lineBytes:]
bytes = bytes[splitOffset:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
}

// effectiveLen counts the effective number of bytes in the string, after
// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
// utf8.RuneError)
func effectiveLen(line string) int {
effectiveBytes := 0
for _, rune := range line {
effectiveBytes += utf8.RuneLen(rune)
}
return effectiveBytes
}

// findValidSplit finds the byte offset to split a string without breaking valid
// Unicode codepoints given a maximum number of total bytes. findValidSplit
// returns the byte offset for splitting a string or []byte, as well as the
// effective number of bytes if the string were normalized to replace invalid
// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
// sequence, represented in Go as utf8.RuneError)
func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
for offset, rune := range line {
splitOffset = offset
if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
return splitOffset, effectiveBytes
}
effectiveBytes += utf8.RuneLen(rune)
}
splitOffset = len(line)
return
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
Expand Down
182 changes: 182 additions & 0 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) {
}
}

func TestEffectiveLen(t *testing.T) {
tests := []struct {
str string
effectiveBytes int
}{
{"Hello", 5},
{string([]byte{1, 2, 3, 4}), 4},
{"🙃", 4},
{string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12},
{"He\xff\xffo", 9},
{"", 0},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str))
})
}
}

func TestFindValidSplit(t *testing.T) {
tests := []struct {
str string
maxEffectiveBytes int
splitOffset int
effectiveBytes int
}{
{"", 10, 0, 0},
{"Hello", 6, 5, 5},
{"Hello", 2, 2, 2},
{"Hello", 0, 0, 0},
{"🙃", 3, 0, 0},
{"🙃", 4, 4, 4},
{string([]byte{'a', 0xFF}), 2, 1, 1},
{string([]byte{'a', 0xFF}), 4, 2, 4},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes)
assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset")
assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes")
t.Log(tc.str[:tc.splitOffset])
t.Log(tc.str[tc.splitOffset:])
})
}
}

func TestProcessEventEmoji(t *testing.T) {
stream := &logStream{}
batch := &eventBatch{}
bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1))
stream.processEvent(batch, bytes, 0)
assert.Equal(t, 2, len(batch.batch), "should be two events in the batch")
assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message))
assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message))
}

func TestCollectBatchLineSplit(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
Expand Down Expand Up @@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) {
}
}

func TestCollectBatchLineSplitWithBinary(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

d := make(chan bool)
close(d)
go stream.collectBatch(d)

longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError
stream.Log(&logger.Message{
Line: []byte(longline + "\xFD"),
Timestamp: time.Time{},
})

// no ticks
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
if len(argument.LogEvents) != 2 {
t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
}
if *argument.LogEvents[0].Message != longline {
t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
}
if *argument.LogEvents[1].Message != "\xFD" {
t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message)
}
}

func TestCollectBatchMaxEvents(t *testing.T) {
mockClient := newMockClientBuffered(1)
stream := &logStream{
Expand Down Expand Up @@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
}
}

func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts)
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
for i := 0; i < expectedPuts; i++ {
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
}

var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

d := make(chan bool)
close(d)
go stream.collectBatch(d)

// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream.Log(&logger.Message{
Line: []byte(maxline),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte("B"),
Timestamp: time.Time{},
})

// no ticks, guarantee batch by size (and chan close)
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}

// Should total to the maximum allowed bytes.
eventBytes := 0
for _, event := range argument.LogEvents {
eventBytes += effectiveLen(*event.Message)
}
eventsOverhead := len(argument.LogEvents) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent

if payloadTotal > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
}
if payloadTotal < lowestMaxBatch {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
}

argument = <-mockClient.putLogEventsArgument
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
}

func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
Expand Down

0 comments on commit 1e8ef38

Please sign in to comment.