Skip to content

Commit

Permalink
improve monitor performance (#10368)
Browse files Browse the repository at this point in the history
* remove flush for each write to http response in the agent monitor endpoint

* fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover.

* start log reading goroutine before adding the sink to avoid filling the log channel before getting a chance of reading from it

* flush every 500ms to optimize log writing in the http server side.

* add changelog file

* add issue url to changelog

* fix changelog url

* Update changelog

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>

* use ticker to flush and avoid race condition when flushing in a different goroutine

* stop the ticker when done

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>

* Revert "fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover."

This reverts commit 1eeddf7

* wait for log consumer loop to start before registering the sink

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
  • Loading branch information
dhiaayachi and dnephin authored Jun 15, 2021
1 parent 2f1f980 commit c8ba2d4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .changelog/10368.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
monitoring: optimize the monitoring endpoint to avoid losing logs when under high load.
```
8 changes: 8 additions & 0 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -1204,6 +1205,9 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
// a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte(""))
flusher.Flush()
const flushDelay = 200 * time.Millisecond
flushTicker := time.NewTicker(flushDelay)
defer flushTicker.Stop()

// Stream logs until the connection is closed.
for {
Expand All @@ -1213,9 +1217,13 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
if droppedCount > 0 {
s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount)
}
flusher.Flush()
return nil, nil

case log := <-logsCh:
fmt.Fprint(resp, string(log))

case <-flushTicker.C:
flusher.Flush()
}
}
Expand Down
15 changes: 10 additions & 5 deletions logging/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package monitor

import (
"errors"
"sync"

log "github.com/hashicorp/go-hclog"
)
Expand Down Expand Up @@ -80,28 +81,32 @@ func (d *monitor) Stop() int {
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
// register our sink with the logger
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize)

// run a go routine that listens for streamed
// log messages and sends them to streamCh

wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer close(streamCh)

wg.Done()
for {
select {
case log := <-d.logCh:
case logLine := <-d.logCh:
select {
case <-d.doneCh:
return
case streamCh <- log:
case streamCh <- logLine:
}
case <-d.doneCh:
return
}
}
}()

//wait for the consumer loop to start before registering the sink to avoid filling the log channel
wg.Wait()
d.logger.RegisterSink(d.sink)
return streamCh
}

Expand Down

0 comments on commit c8ba2d4

Please sign in to comment.