-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CloudWatch Logs Fluent Bit Plugin initial implementation #1
Conversation
Remove once that PR is merged!
return aws.Int64Value(stream.logEvents[i].Timestamp) < aws.Int64Value(stream.logEvents[j].Timestamp) | ||
}) | ||
response, err := output.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ | ||
LogEvents: stream.logEvents, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does output.client.PutLogEvents()
automatically marks the memory allocated to stream.logEvents
garbage-collectable? If not, since all the streams
are held in a global data structure forever, OutputPlugin
, and we keep appending (https://github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/pull/1/files#diff-c88e64246a5b33fe05db284d0bec2b45R181) events to the logEvents
slice that belongs to individual stream
without ever releasing it to be GCed. Could it lead to memory leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I later realized that this line should prevent memory leak. However, since that function is defined totally independent from this function, it might be safer to reset stream.logEvents in a defer statement here instead of waiting until the next call to existingLogStream()
:
defer func() {
stream.logEvents = make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut)
}
By doing so, we guarantee putLogEvents()
will always release the underlying memory for GC.
In addition, the following 2 lines seem to be contradicting with each other:
https://github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/pull/1/files#diff-c88e64246a5b33fe05db284d0bec2b45R378
The first one preserves the underlying memory so we don't have to waste time allocating again but the second line allocates new slice every time. Which one do we want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, let me explain how the code works; there is not a contradiction here.
So there are multiple logStream structs, each with their own buffers- since the plugin may be writing to multiple streams.
In this line, the existing buffer is cleared, so that the memory can be re-used: https://github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/pull/1/files#diff-c88e64246a5b33fe05db284d0bec2b45R378
In this line, we're creating a new logStream object, so we allocate memory for a new buffer: https://github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/pull/1/files#diff-c88e64246a5b33fe05db284d0bec2b45R229
Unless I've made a mistake, there shouldn't be a memory leak in this code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline: there is a memory leak here, if we keep creating log streams forever, and they are un-used after a short period of time, after a long period of time in the lifetime of the plugin, a lot of unused memory will have built up.
plugins/plugins.go
Outdated
|
||
// Timeout is a simple timeout for single-threaded programming | ||
// (Goroutines are expensive in Cgo) | ||
type Timeout struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there's no unit test explicitly for Timeout. Is it necessary to have some?
cloudwatch/cloudwatch.go
Outdated
} | ||
|
||
func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*logStream, error) { | ||
output.timer.Check() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused on the how the timer should be used. Seems like we first check whether the timer has expired, then start the timer in case of non-nil err
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly. Since everything is single threaded, we have to continually check the timer to see if its timed out.
…build up of memory used by the plugin
return fmt.Errorf(errorStr, "log_group_name") | ||
} | ||
if config.LogStreamName == "" && config.LogStreamPrefix == "" { | ||
return fmt.Errorf("log_stream_name or log_stream_prefix is required") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If only a prefix is provided, does that mean all streams that match receive the logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. So the way it works is that a user defines in Fluent Bit which tags they will match and send to CloudWatch. Each log has a tag, and this is not related to the prefix here.
If you specify log stream prefix, this plugin will send every log it gets to a log stream named by concatenating the prefix and the tag. So if it gets 3 log messages, with 3 different tags, those all get sent to 3 different log streams.
…it => github.com/aws/amazon-cloudwatch-logs-for-fluent-bit
NOTE 1: This PR duplicates code from aws/amazon-kinesis-firehose-for-fluent-bit#1
Once that PR is merged, this plugin will import the shared code.
Test Plan