diff --git a/internal/atomiccounter/atomiccounter.go b/internal/atomiccounter/atomiccounter.go new file mode 100644 index 0000000000..5939a0e530 --- /dev/null +++ b/internal/atomiccounter/atomiccounter.go @@ -0,0 +1,28 @@ +package atomiccounter + +import ( + "sync/atomic" +) + +type AtomicCounter struct { + val int64 +} + +// NewAtomicCounter returns a new counter with the default value of 0. +func NewAtomicCounter() AtomicCounter { + return AtomicCounter{} +} + +func (ac *AtomicCounter) Increment() { + atomic.AddInt64(&ac.val, 1) +} + +func (ac *AtomicCounter) Decrement() { + atomic.AddInt64(&ac.val, -1) +} + +// Get is not safe to use for synchronizing work between goroutines. +// It is just for logging the current value. +func (ac *AtomicCounter) Get() int64 { + return ac.val +} \ No newline at end of file diff --git a/logs/logs.go b/logs/logs.go index 98853026a1..b356d17b7b 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -9,6 +9,7 @@ import ( "log" "time" + "github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/tail" "github.com/influxdata/telegraf/config" ) @@ -95,6 +96,7 @@ func (l *LogAgent) Run(ctx context.Context) { for { select { case <-t.C: + log.Printf("D! [logagent] open file count, %v", tail.OpenFileCount.Get()) for _, c := range l.collections { srcs := c.FindLogSrc() for _, src := range srcs { diff --git a/plugins/inputs/logfile/tail/tail.go b/plugins/inputs/logfile/tail/tail.go index 982449d485..0f1d82d924 100644 --- a/plugins/inputs/logfile/tail/tail.go +++ b/plugins/inputs/logfile/tail/tail.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/aws/amazon-cloudwatch-agent/internal/atomiccounter" "github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/tail/watch" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/models" @@ -23,6 +24,7 @@ var ( ErrDeletedNotReOpen = errors.New("File was deleted, tail should now stop") exitOnDeletionCheckDuration = time.Minute exitOnDeletionWaitDuration = 5 * time.Minute + OpenFileCount = atomiccounter.NewAtomicCounter() ) type Line struct { @@ -120,6 +122,7 @@ func TailFile(filename string, config Config) (*Tail, error) { if err != nil { return nil, err } + OpenFileCount.Increment() } if !config.ReOpen { @@ -181,6 +184,7 @@ func (tail *Tail) closeFile() { if tail.file != nil { tail.file.Close() tail.file = nil + OpenFileCount.Decrement() } } @@ -205,6 +209,7 @@ func (tail *Tail) reopen() error { } break } + OpenFileCount.Increment() return nil } diff --git a/plugins/inputs/logfile/tail/tail_test.go b/plugins/inputs/logfile/tail/tail_test.go index ddd74a46bc..79acf75a04 100644 --- a/plugins/inputs/logfile/tail/tail_test.go +++ b/plugins/inputs/logfile/tail/tail_test.go @@ -148,7 +148,8 @@ func setup(t *testing.T) (*os.File, *Tail, *testLogger) { if err != nil { t.Fatalf("failed to tail file %v: %v", tmpfile.Name(), err) } - + // Cannot expect OpenFileCount.Get() to be 1 because the TailFile struct + // was not created with MustExist=true, so file may not yet be opened. return tmpfile, tail, &tl } @@ -163,6 +164,8 @@ func readThreelines(t *testing.T, tail *Tail) { t.Errorf("wrong line from tail found: '%v'", line.Text) } } + // If file was readable, then expect it to exist. + assert.Equal(t, int64(1), OpenFileCount.Get()) } func verifyTailerLogging(t *testing.T, tlog *testLogger, expectedErrorMsg string) { @@ -179,6 +182,7 @@ func verifyTailerLogging(t *testing.T, tlog *testLogger, expectedErrorMsg string func verifyTailerExited(t *testing.T, tail *Tail) { select { case <-tail.Dead(): + assert.Equal(t, int64(0), OpenFileCount.Get()) return default: t.Errorf("Tailer is still alive after file removed and wait period") diff --git a/plugins/inputs/logfile/tailersrc_test.go b/plugins/inputs/logfile/tailersrc_test.go index 4b478776cf..a1a73a3327 100644 --- a/plugins/inputs/logfile/tailersrc_test.go +++ b/plugins/inputs/logfile/tailersrc_test.go @@ -46,7 +46,7 @@ func TestTailerSrc(t *testing.T) { if err != nil { t.Errorf("Failed to create temp file: %v", err) } - + beforeCount := tail.OpenFileCount.Get() tailer, err := tail.TailFile(file.Name(), tail.Config{ ReOpen: false, @@ -63,7 +63,7 @@ func TestTailerSrc(t *testing.T) { t.Errorf("Failed to create tailer src for file %v with error: %v", file, err) return } - + assert.Equal(t, beforeCount + 1, tail.OpenFileCount.Get()) ts := NewTailerSrc( "groupName", "streamName", "destination", @@ -144,11 +144,15 @@ func TestTailerSrc(t *testing.T) { fmt.Fprintln(file, l) } - // Removal of log file should stop tailersrc + // Removal of log file should stop tailerSrc and Tail. if err := os.Remove(file.Name()); err != nil { t.Errorf("failed to remove log file '%v': %v", file.Name(), err) } <-done + // Most test functions do not wait for the Tail to close the file. + // They rely on Tail to detect file deletion and close the file. + // So the count might be nonzero due to previous test cases. + assert.LessOrEqual(t, tail.OpenFileCount.Get(), beforeCount) } func TestOffsetDoneCallBack(t *testing.T) { diff --git a/profiler/profiler.go b/profiler/profiler.go index 6ec14201e3..3bfcab8de4 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -27,10 +27,6 @@ func (p *profiler) AddStats(key []string, value float64) { p.Lock() defer p.Unlock() k := strings.Join(key, "_") - - if _, ok := p.stats[k]; !ok { - p.stats[k] = 0 - } p.stats[k] += value }