Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log number of montiored files #491

Closed
wants to merge 8 commits into from
Closed

Conversation

khanhntd
Copy link
Contributor

@khanhntd khanhntd commented Jun 15, 2022

Close #405
Close #432

Description of the issue

If CloudWatchAgent runs in an intensive log environment, CloudWatchAgent will monitor all the files and it can exhaust all the file descriptors that being allowed by the OS for a single process. Therefore, CloudWatchAgent needs to show the number of monitored files and show the number of allowed to be monitored for customer to have corresponding actions.

Description of changes

  • Create file-descriptor add-on for each OS and different OS can return different file descriptors limit.
  • Create a TailerQueue ( could suggest a better name here) to centralize the tailers (which contains a buffer channeled), add tailer and release tailer

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Reasons to choose buffered channel

  • Allocated small memory upfront. A small trade off with the second reason since customers does not want to increase their memory when using CloudWatchAgent
  • Will able to centralize the tailer (e.g stop one tailer in exchange for other tailer to push the logs completely and start the dropped tailer again until the previous execution is done)

Tests

There are two tests that I have done:

  • Generate logs everyone 3 minutes with auto_removal features:
    Sample app
import glob
import os
import logging
from logging.handlers import TimedRotatingFileHandler
import time
import json

# get root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# rotate our log file every 3 minutes
handler = TimedRotatingFileHandler("logdest/logs.log", when="M", interval=3)
logger.addHandler(handler)

# log a message
logging.info(json.dumps({"Metric": "12345"*10}))
# sleep for 4 minutes so that file will rotate upon next log message
time.sleep(60*4)
# log another message (this one will not appear since byte length of message == byte length of old log file)
logging.info(json.dumps({"Metric": "09876"*10}))
# sleep for another 4 minutes so that file will rotate upon next log message
time.sleep(60*4)
# this message will be partially written
logging.info({"Metric": "1234567890"*10})

Sample CloudWatchAgent JSON configuration:

{
        "agent": {
                "metrics_collection_interval": 60,
                "run_as_user": "root",
                "debug": true
        },
	"logs": {
                "logs_collected": {
                        "files": {
                                "collect_list": [
                                        {
                                                "file_path":"/local/home/khanhntd/Code/CloudWatch/logdest/**",
                                                "auto_removal": true,
                                                "log_group_name": "test-amazon-cloudwatch-agent.log",
                                                "log_stream_name": "{instance_id}-CloudWatchAgent",
                                        },
                                        {
                                                "file_path": "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log",
                                                "log_group_name": "amazon-cloudwatch-agent.log",
                                                "log_stream_name": "{instance_id}-CloudWatchAgent",
                                        },
                                        {
                                                "file_path": "/home/khanhntd/Code/MetricAgent.log",
                                                "log_group_name": "amazon-cloudwatch-agent.log",
                                                "log_stream_name": "{instance_id}-MetricAgent",
                                        }
                                ]
                        }
                }
        },
}

After running these two tests, confirm the monitored file by comparing the monitored files lsof -p {{CloudWatchAgent process id}} with logs in CloudWatchAgent.logs. An example of expected results:

2022-06-15T19:47:31Z I! [logagent] piping log from amazon-cloudwatch-agent.log/i-005113ad6a00082e0-MetricAgent(/home/khanhntd/Code/MetricAgent.log) to cloudwatchlogs with retention -1
2022-06-15T19:47:32Z I! [inputs.logfile] Number of current monitored log files / Allowed monitored log files: 3/65535
2022-06-15T19:47:32Z D! [outputs.cloudwatchlogs] Buffer fullness: 0 / 10000 metrics
2022-06-15T19:47:32Z D! [outputs.cloudwatch] Buffer fullness: 0 / 10000 metrics
2022-06-15T19:47:33Z I! [inputs.logfile] Number of current monitored log files / Allowed monitored log files: 3/65535
  • Generate log rotation by using logrotate
    Sample script
#!/bin/sh
#mode can only be ""(default), create, copy, copytruncate
mode=$1
pkill -9 pprofCol


export cwAgentDir="/opt/aws/amazon-cloudwatch-agent"

# start up a new cwagent with pprof enabled
if [ "$(/sbin/init --version 2>/dev/null | grep -c upstart)" = 1 ]; then
        stop amazon-cloudwatch-agent || true
else
    	service amazon-cloudwatch-agent stop || true
fi
sleep 10

(${cwAgentDir}/bin/amazon-cloudwatch-agent -config ${cwAgentDir}/etc/amazon-cloudwatch-agent.toml --pprof-addr :6060 > /dev/null 2>&1 &)

sleep 3

./pprofCol -textInterval=300 -profileInterval=3600 -truncateSize=10 > /dev/null 2>&1 &

mkdir soakTest

cat > minute_rotate.conf << EOF
/tmp/soakTest/minute_*.log {
        size 10k
        rotate 4
        compress
        notifempty
        ${mode}
}
EOF

cat > hour_rotate.conf << EOF
/tmp/soakTest/hour.log {
        size 10k
        rotate 4
        compress
        notifempty
        ${mode}
}
EOF

#set up cron job to run logrotate per minute/hour
sudo service crond start
crontab << EOF
* * * * * sudo logrotate -f /tmp/minute_rotate.conf
3 * * * * sudo logrotate -f /tmp/hour_rotate.conf
EOF

#generate logs continuously
(./generate_log.sh ${mode} > /dev/null 2>&1 &)

Sample Amazon CloudWatch Agent JSON configuration

{"agent":{"metrics_collection_interval":10},"metrics":{"append_dimensions":{"InstanceType":"${aws:InstanceType}"},"metrics_collected":{"cpu":{"measurement":["cpu_usage_idle","cpu_usage_iowait","cpu_usage_user","cpu_usage_system"],"totalcpu":false},"disk":{"measurement":["used_percent","inodes_free"]},"diskio":{"measurement":["io_time"]},"mem":{"measurement":["mem_used_percent"]},"statsd":{},"swap":{"measurement":["swap_used_percent"]},"procstat":[{"measurement":["cpu_usage","memory_rss"],"exe":"amazon-cloudwatch-agent"}]},"namespace":"CWAgent/LinuxSoakTest"},"logs":{"logs_collected":{"files":{"collect_list":[{"file_path":"/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log","log_group_name":"Linux-Soak-CWAgent.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%Y-%m-%dT%H:%M:%SZ","timezone":"UTC"},{"file_path":"/tmp/soakTest/memStats","log_group_name":"Linux-Soak-memStats","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/goroutine","log_group_name":"Linux-Soak-goroutine","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp0.log","log_group_name":"LinuxSoaking0.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp1.log","log_group_name":"LinuxSoaking1.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp2.log","log_group_name":"LinuxSoaking2.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp3.log","log_group_name":"LinuxSoaking3.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp4.log","log_group_name":"LinuxSoaking4.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp5.log","log_group_name":"LinuxSoaking5.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp6.log","log_group_name":"LinuxSoaking6.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp7.log","log_group_name":"LinuxSoaking7.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp8.log","log_group_name":"LinuxSoaking8.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/tmp9.log","log_group_name":"LinuxSoaking9.log","multi_line_start_pattern":"{timestamp_format}","timestamp_format":"%d %b %y %H:%M:%S","timezone":"UTC"},{"file_path":"/tmp/soakTest/structuredLogFile0.json","log_group_name":"linuxStructuredLogSoakTst0","multi_line_start_pattern":"^{"},{"file_path":"/tmp/soakTest/structuredLogFile1.json","log_group_name":"linuxStructuredLogSoakTst1","multi_line_start_pattern":"^{"}]}}},"csm":{"memory_limit_in_mb":20,"port":31000}}

Requirements

Before commit the code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make linter

Performance benchmarking

  1. Before(v353) and after adding tailer semaphore ( numUsedFds) with monitoring 15 log files (not sure why my version is more lighter)
    image

  2. Comparing between Semaphore

BenchmarkSemaphore_AcquireReleaseUnderLimitSimple-12          19336089                62.26 ns/op            0 B/op          0 allocs/op
BenchmarkSemaphore_AcquireReleaseUnderLimit-12                    1652306               726.5 ns/op             0 B/op          0 allocs/op
BenchmarkSemaphore_AcquireReleaseOverLimit-12                     32737             37202 ns/op               0 B/op          0 allocs/op
BenchmarkMarusamaSemaphore_AcquireReleaseUnderLimitSimple-12                         6038138               171.7 ns/op            96 B/op          1 allocs/op
BenchmarkMarusamaSemaphore_AcquireReleaseUnderLimit-12                                 479114              2268 ns/op             960 B/op         10 allocs/op
BenchmarkMarusamaSemaphore_AcquireReleaseOverLimit-12                                   50464             24728 ns/op            9600 B/op        100 allocs/op
BenchmarkAbiosoftSemaphore_AcquireReleaseUnderLimitSimple-12                 8135151               145.0 ns/op             0 B/op          0 allocs/op
BenchmarkAbiosoftSemaphore_AcquireReleaseUnderLimit-12                          432472              2731 ns/op               0 B/op          0 allocs/op
BenchmarkAbiosoftSemaphore_AcquireReleaseOverLimit-12                           51364             27126 ns/op               0 B/op          0 allocs/op
BenchmarkPivotalGolangSemaphore_AcquireReleaseUnderLimitSimple-12            3358042               355.3 ns/op           128 B/op          2 allocs/op
BenchmarkPivotalGolangSemaphore_AcquireReleaseUnderLimit-12                      149810              7881 ns/op            1200 B/op         20 allocs/op
BenchmarkPivotalGolangSemaphore_AcquireReleaseOverLimit-12                      14558             83574 ns/op           12000 B/op        200 allocs/op
BenchmarkXSyncSemaphore_AcquireReleaseUnderLimitSimple-12                   40687284                27.22 ns/op            0 B/op          0 allocs/op
BenchmarkXSyncSemaphore_AcquireReleaseUnderLimit-12                             1000000              1060 ns/op               0 B/op          0 allocs/op
BenchmarkXSyncSemaphore_AcquireReleaseOverLimit-12                              17275             69725 ns/op           15994 B/op        299 allocs/op

@khanhntd khanhntd force-pushed the show_files branch 27 times, most recently from a519997 to 8f7286c Compare June 16, 2022 12:27
@khanhntd khanhntd marked this pull request as ready for review June 16, 2022 12:31
@khanhntd khanhntd requested a review from a team as a code owner June 16, 2022 12:31
@khanhntd khanhntd force-pushed the show_files branch 4 times, most recently from e1ec189 to 620deca Compare June 20, 2022 20:57
@khanhntd khanhntd requested a review from a team June 20, 2022 21:18
Comment on lines 49 to 56
if (timeout > 0) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-sem.slots:
return true
case <-timer.C:
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to have a timer here. We can just use time.After() to fail through:

Suggested change
if (timeout > 0) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-sem.slots:
return true
case <-timer.C:
}
}
if (timeout > 0) {
select {
case <-sem.slots:
return true
case <-time.After(timeout):
}
}

truncate_suffix = "[Truncated...]"
`

const defaultTimeoutToAcquire = 1 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One second is a long time. Are you sure this can't be like 100ms? I'm very concerned about blocking on a channel for this semaphore because we run the risk of putting the agent in a deadlock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This can be 100ms or even 1ms since 1 * time.Second is only an arbitrary value so good point on this one.

plugins/inputs/logfile/tailersrc_test.go Outdated Show resolved Hide resolved
Comment on lines 308 to 309
time.Sleep(500 * time.Millisecond)
assert.Equal(t, 0, resources.numUsedFds.GetCount())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to assert and test for the fds counter in every test? It's redundant and doesn't increase test coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To account for the case whether the semaphore would drop by 1 if it stops tailing the file. This can be addressed in tail_test too which I had done it. However, there is a strong correlation between tail and tailsrc (tailsrc has a attribute of tailer). Therefore, I would introduce this to avoid any regression between these two

)

type Semaphore interface {
//Acquire a slot in the semaphore with a timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth documenting that if timeout is <=0, then Acquire() will not block. I initially thought timeout=0 would behave as if timeout=infinity.

if ok := t.numUsedFds.Acquire(defaultTimeoutToAcquire); !ok {
t.Log.Debugf("Cannot increase counter of used file descriptors")
}
}
Copy link
Contributor

@adam-mateen adam-mateen Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore code was really well done!

I don't think it needs to be used here.
It would be simpler and use less memory to have a single counter (integer instead of channel) of the open files (aka files being tailed). Then just log the counter's current value periodically.

It is undesirable to change behavior here and block for up to 1 second.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem with just having a simple global counter is that the logfile plugin itself doesn't know/care when log files stop being monitored.. Though I guess rather than having a semaphore we could maybe just have an atomic counter that we lock/unlock before incrementing/decrementing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the information: there were others use semaphore as a counter (XSync, Marusama). However, after comparing the performance between them, I would say this version (or similar to DropBox), this would be the best option(for now) when decide the file descriptors as a upper bound.

BenchmarkSemaphore_AcquireReleaseUnderLimitSimple-12          19336089                62.26 ns/op            0 B/op          0 allocs/op
BenchmarkSemaphore_AcquireReleaseUnderLimit-12                    1652306               726.5 ns/op             0 B/op          0 allocs/op
BenchmarkSemaphore_AcquireReleaseOverLimit-12                     32737             37202 ns/op               0 B/op          0 allocs/op
BenchmarkMarusamaSemaphore_AcquireReleaseUnderLimitSimple-12                         6038138               171.7 ns/op            96 B/op          1 allocs/op
BenchmarkMarusamaSemaphore_AcquireReleaseUnderLimit-12                                 479114              2268 ns/op             960 B/op         10 allocs/op
BenchmarkMarusamaSemaphore_AcquireReleaseOverLimit-12                                   50464             24728 ns/op            9600 B/op        100 allocs/op
BenchmarkAbiosoftSemaphore_AcquireReleaseUnderLimitSimple-12                 8135151               145.0 ns/op             0 B/op          0 allocs/op
BenchmarkAbiosoftSemaphore_AcquireReleaseUnderLimit-12                          432472              2731 ns/op               0 B/op          0 allocs/op
BenchmarkAbiosoftSemaphore_AcquireReleaseOverLimit-12                           51364             27126 ns/op               0 B/op          0 allocs/op
BenchmarkPivotalGolangSemaphore_AcquireReleaseUnderLimitSimple-12            3358042               355.3 ns/op           128 B/op          2 allocs/op
BenchmarkPivotalGolangSemaphore_AcquireReleaseUnderLimit-12                      149810              7881 ns/op            1200 B/op         20 allocs/op
BenchmarkPivotalGolangSemaphore_AcquireReleaseOverLimit-12                      14558             83574 ns/op           12000 B/op        200 allocs/op
BenchmarkXSyncSemaphore_AcquireReleaseUnderLimitSimple-12                   40687284                27.22 ns/op            0 B/op          0 allocs/op
BenchmarkXSyncSemaphore_AcquireReleaseUnderLimit-12                             1000000              1060 ns/op               0 B/op          0 allocs/op
BenchmarkXSyncSemaphore_AcquireReleaseOverLimit-12                              17275             69725 ns/op           15994 B/op        299 allocs/op

However, the 1 second is only an arbitrary value and it could be more or less. But as long as it within acceptable range (e.g do customer cares about the show number of monitored files being late in 1 second or not? Maybe its exchange between source of truth and the fastness a little bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could get by with an atomic counter. I think the code would look a little weird, but it's 100% feasible. Take a look at https://gobyexample.com/atomic-counters. Pretty sure since it considers the input a delta, you could pass in 1 when a log file is discovered, and pass in -1 when the tailer closes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm wrong but according to the syns/atomic

Package atomic provides low-level atomic memory primitives useful for implementing synchronization algorithms.
These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. 

So I'm still considering between the two. However, will compare performance before giving my final decision.

Copy link
Contributor Author

@khanhntd khanhntd Jun 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compare a little bit when doing atomic counter with mutex

BenchmarkSemaphore_AcquireReleaseUnderLimit-12              9621            129439 ns/op               0 B/op          0 allocs/op
BenchmarkAtomicCounterWithOutLock-12                       20169             60657 ns/op               0 B/op          0 allocs/op
BenchmarkAtomicCounterWithLock-12                           5130            256611 ns/op              13 B/op          0 allocs/op

So agree if we only care about the end results; then Atomic Counter would be a great idea. However, with a lock, I would stand by using semaphore instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to derail, I wasn't as concerned about the implementation of counter vs semaphore.
Agreed that an atomic counter (no lock needed) is simpler and would suffice here.

More concerned that you are introducing a blocking function call on Acquire().
I initially thought this PR would just add a variable to track the count of open files AND then periodically log it.
Not to check the counter value and change behavior based on the counter value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More concerned that you are introducing a blocking function call on Acquire().

Acquire() would be not blocking the function since we are using the timing value and it we don't provide the timing value, it would return false immediately. So there is a thing we need to consider: would we going with zero timeout value or some partial timeout value (e.g 100 Milisecond, I changed it from 1 second to 100 ms). As long as its within the acceptable range. I will use default time as 0 if it concerns the customer about the blocking part.

Not to check the counter value and change behavior based on the counter value.

if tail.numUsedFds.GetLimit() != 0 {

The reason why I used the previous line is to avoid CloudWatchAgent is not able to get the file descriptors from the linux and assign it to 0 as a default so

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know it does not block indefinitely, but I consider it "blocking" even if it is blocks for 100ms.

*** I don't think it should block at all. I propose setting defaultTimeoutToAcquire to 0, or just removing the timeout parameter from Acquire().***

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The timeout stops it from blocking indefinitely but is still potentially paused for up to 100ms. That's not great

@khanhntd khanhntd force-pushed the show_files branch 9 times, most recently from d7cd36c to 4c00cfe Compare June 27, 2022 08:38
@khanhntd khanhntd added this to the 1.247354.0 milestone Jul 1, 2022
@khanhntd khanhntd force-pushed the show_files branch 2 times, most recently from 5335a6a to a933c40 Compare July 8, 2022 14:08
Comment on lines +54 to +55
case <-time.After(timeout):
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this return false because it failed to claim before the timeout

truncate_suffix = "[Truncated...]"
`

const defaultTimeoutToAcquire = 0 * time.Millisecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to multiply by millisecond?

@khanhntd
Copy link
Contributor Author

Close since #506 has a better solution and better performance

@khanhntd khanhntd closed this Jul 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants