From 6c39a9825580e3e4375acafed557d73146b1443f Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 8 Jun 2021 22:12:28 -0400 Subject: [PATCH 01/12] remove flush for each write to http response in the agent monitor endpoint --- agent/agent_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index babb906f0976..5623a13a9586 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1211,10 +1211,10 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) if droppedCount > 0 { s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount) } + flusher.Flush() return nil, nil case log := <-logsCh: fmt.Fprint(resp, string(log)) - flusher.Flush() } } } From 1eeddf7ad801cb8f55cd088c275e1a59bd126209 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 8 Jun 2021 22:15:10 -0400 Subject: [PATCH 02/12] fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover. --- logging/monitor/monitor.go | 10 +++++++--- logging/monitor/monitor_test.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/logging/monitor/monitor.go b/logging/monitor/monitor.go index a8e4e523247d..e523a287f699 100644 --- a/logging/monitor/monitor.go +++ b/logging/monitor/monitor.go @@ -35,7 +35,7 @@ type monitor struct { droppedCount int // doneCh coordinates the shutdown of logCh - doneCh chan struct{} + doneCh chan bool // Defaults to 512. bufSize int @@ -58,7 +58,7 @@ func New(cfg Config) Monitor { sw := &monitor{ logger: cfg.Logger, logCh: make(chan []byte, bufSize), - doneCh: make(chan struct{}, 1), + doneCh: make(chan bool, 1), bufSize: bufSize, } @@ -72,7 +72,11 @@ func New(cfg Config) Monitor { // Stop deregisters the sink and stops the monitoring process func (d *monitor) Stop() int { d.logger.DeregisterSink(d.sink) - close(d.doneCh) + select { + case d.doneCh <- true: + default: + } + return d.droppedCount } diff --git a/logging/monitor/monitor_test.go b/logging/monitor/monitor_test.go index c12518f223d0..545f291af8fd 100644 --- a/logging/monitor/monitor_test.go +++ b/logging/monitor/monitor_test.go @@ -178,7 +178,7 @@ func TestMonitor_WriteStopped(t *testing.T) { mwriter := &monitor{ logger: logger, - doneCh: make(chan struct{}, 1), + doneCh: make(chan bool, 1), } mwriter.Stop() From 3c9d34d3a3f9b5e7e0aa653d269a60656ea94672 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 8 Jun 2021 22:15:59 -0400 Subject: [PATCH 03/12] start log reading goroutine before adding the sink to avoid filling the log channel before getting a chance of reading from it --- logging/monitor/monitor.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/logging/monitor/monitor.go b/logging/monitor/monitor.go index e523a287f699..fd72a5ed6cfc 100644 --- a/logging/monitor/monitor.go +++ b/logging/monitor/monitor.go @@ -84,9 +84,7 @@ func (d *monitor) Stop() int { // received log messages over the returned channel. func (d *monitor) Start() <-chan []byte { // register our sink with the logger - d.logger.RegisterSink(d.sink) streamCh := make(chan []byte, d.bufSize) - // run a go routine that listens for streamed // log messages and sends them to streamCh go func() { @@ -105,7 +103,7 @@ func (d *monitor) Start() <-chan []byte { } } }() - + d.logger.RegisterSink(d.sink) return streamCh } From 8c04856ec01b75e71d585ff44047e2c956e45c4b Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 9 Jun 2021 19:05:43 -0400 Subject: [PATCH 04/12] flush every 500ms to optimize log writing in the http server side. --- agent/agent_endpoint.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 5623a13a9586..02252241d1ca 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -5,6 +5,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -1202,7 +1203,8 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) // a gzip stream it will go ahead and write out the HTTP response header resp.Write([]byte("")) flusher.Flush() - + const flushDelay = 200 * time.Millisecond + t := time.AfterFunc(flushDelay, flusher.Flush) // Stream logs until the connection is closed. for { select { @@ -1215,6 +1217,11 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) return nil, nil case log := <-logsCh: fmt.Fprint(resp, string(log)) + select { + case <-t.C: + t = time.AfterFunc(flushDelay, flusher.Flush) + default: + } } } } From f04b37164df40a413e7efc9905143849220dc2a0 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 9 Jun 2021 23:03:57 -0400 Subject: [PATCH 05/12] add changelog file --- .changelog/10368.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/10368.txt diff --git a/.changelog/10368.txt b/.changelog/10368.txt new file mode 100644 index 000000000000..8aab92c3db9a --- /dev/null +++ b/.changelog/10368.txt @@ -0,0 +1,3 @@ +```release-note:improvement +monitoring: optimize the monitoring endpoint to avoid losing logs when under high load +``` From ae3f8589a1525142e4476295a6d0550fc03d143d Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 10 Jun 2021 09:34:49 -0400 Subject: [PATCH 06/12] add issue url to changelog --- .changelog/10368.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/10368.txt b/.changelog/10368.txt index 8aab92c3db9a..6d0ddb9ae76a 100644 --- a/.changelog/10368.txt +++ b/.changelog/10368.txt @@ -1,3 +1,3 @@ ```release-note:improvement -monitoring: optimize the monitoring endpoint to avoid losing logs when under high load +monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. [GH-10368](https://api.github.com/repos/hashicorp/consul/issues/10368) ``` From a524e54d1febeae8e657447a8e7ec885756de5a5 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 10 Jun 2021 09:40:33 -0400 Subject: [PATCH 07/12] fix changelog url --- .changelog/10368.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/10368.txt b/.changelog/10368.txt index 6d0ddb9ae76a..37257396e429 100644 --- a/.changelog/10368.txt +++ b/.changelog/10368.txt @@ -1,3 +1,3 @@ ```release-note:improvement -monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. [GH-10368](https://api.github.com/repos/hashicorp/consul/issues/10368) +monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. [GH-10368](https://github.com/hashicorp/consul/pull/10368) ``` From 762eda1db6677fad03f46b2b36e8c4c0a9a42919 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Thu, 10 Jun 2021 15:44:22 -0400 Subject: [PATCH 08/12] Update changelog Co-authored-by: Daniel Nephin --- .changelog/10368.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/10368.txt b/.changelog/10368.txt index 37257396e429..7df442df7636 100644 --- a/.changelog/10368.txt +++ b/.changelog/10368.txt @@ -1,3 +1,3 @@ ```release-note:improvement -monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. [GH-10368](https://github.com/hashicorp/consul/pull/10368) +monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. ``` From 7428230efd4be7a03a0a0f8e62b27222adb7f722 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Fri, 11 Jun 2021 11:58:36 -0400 Subject: [PATCH 09/12] use ticker to flush and avoid race condition when flushing in a different goroutine --- agent/agent_endpoint.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 02252241d1ca..e80cb1c8489b 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1204,7 +1204,8 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) resp.Write([]byte("")) flusher.Flush() const flushDelay = 200 * time.Millisecond - t := time.AfterFunc(flushDelay, flusher.Flush) + flushTicker := time.NewTicker(flushDelay) + // Stream logs until the connection is closed. for { select { @@ -1215,13 +1216,12 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) } flusher.Flush() return nil, nil + case log := <-logsCh: fmt.Fprint(resp, string(log)) - select { - case <-t.C: - t = time.AfterFunc(flushDelay, flusher.Flush) - default: - } + + case <-flushTicker.C: + flusher.Flush() } } } From d80cd84c4b5c76350383dbc2050095dbdd94852e Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Fri, 11 Jun 2021 13:49:05 -0400 Subject: [PATCH 10/12] stop the ticker when done Co-authored-by: Daniel Nephin --- agent/agent_endpoint.go | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index e80cb1c8489b..40533c81174a 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1205,6 +1205,7 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) flusher.Flush() const flushDelay = 200 * time.Millisecond flushTicker := time.NewTicker(flushDelay) + defer flushTicker.Stop() // Stream logs until the connection is closed. for { From 74419c77b06497565debbcf1859be9b311dfb849 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 14 Jun 2021 09:56:11 -0400 Subject: [PATCH 11/12] Revert "fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover." This reverts commit 1eeddf7a --- logging/monitor/monitor.go | 10 +++------- logging/monitor/monitor_test.go | 2 +- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/logging/monitor/monitor.go b/logging/monitor/monitor.go index fd72a5ed6cfc..b68027701d27 100644 --- a/logging/monitor/monitor.go +++ b/logging/monitor/monitor.go @@ -35,7 +35,7 @@ type monitor struct { droppedCount int // doneCh coordinates the shutdown of logCh - doneCh chan bool + doneCh chan struct{} // Defaults to 512. bufSize int @@ -58,7 +58,7 @@ func New(cfg Config) Monitor { sw := &monitor{ logger: cfg.Logger, logCh: make(chan []byte, bufSize), - doneCh: make(chan bool, 1), + doneCh: make(chan struct{}, 1), bufSize: bufSize, } @@ -72,11 +72,7 @@ func New(cfg Config) Monitor { // Stop deregisters the sink and stops the monitoring process func (d *monitor) Stop() int { d.logger.DeregisterSink(d.sink) - select { - case d.doneCh <- true: - default: - } - + close(d.doneCh) return d.droppedCount } diff --git a/logging/monitor/monitor_test.go b/logging/monitor/monitor_test.go index 545f291af8fd..c12518f223d0 100644 --- a/logging/monitor/monitor_test.go +++ b/logging/monitor/monitor_test.go @@ -178,7 +178,7 @@ func TestMonitor_WriteStopped(t *testing.T) { mwriter := &monitor{ logger: logger, - doneCh: make(chan bool, 1), + doneCh: make(chan struct{}, 1), } mwriter.Stop() From 85d00cb41b99cbd73db3922f24c8248ab88f6b13 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 14 Jun 2021 20:47:07 -0400 Subject: [PATCH 12/12] wait for log consumer loop to start before registering the sink --- logging/monitor/monitor.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/logging/monitor/monitor.go b/logging/monitor/monitor.go index b68027701d27..e286bf76c77c 100644 --- a/logging/monitor/monitor.go +++ b/logging/monitor/monitor.go @@ -2,6 +2,7 @@ package monitor import ( "errors" + "sync" log "github.com/hashicorp/go-hclog" ) @@ -83,22 +84,28 @@ func (d *monitor) Start() <-chan []byte { streamCh := make(chan []byte, d.bufSize) // run a go routine that listens for streamed // log messages and sends them to streamCh + + wg := new(sync.WaitGroup) + wg.Add(1) go func() { defer close(streamCh) - + wg.Done() for { select { - case log := <-d.logCh: + case logLine := <-d.logCh: select { case <-d.doneCh: return - case streamCh <- log: + case streamCh <- logLine: } case <-d.doneCh: return } } }() + + //wait for the consumer loop to start before registering the sink to avoid filling the log channel + wg.Wait() d.logger.RegisterSink(d.sink) return streamCh }