From 6d65dd7ffe211c32a5c28f6620baf75464284289 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 3 Aug 2020 11:46:00 +0200 Subject: [PATCH 1/3] ethstats: avoid concurrent write on websocket, fixes #21403 --- ethstats/ethstats.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index b60ac56eabce..f5916e0f2b14 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -28,6 +28,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -80,6 +81,8 @@ type Service struct { pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel + + connMu sync.Mutex // Mutex to prevent concurrent write on the websocket connection } // New returns a monitoring service ready for stats reporting. @@ -306,10 +309,13 @@ func (s *Service) readLoop(conn *websocket.Conn) { // If the network packet is a system ping, respond to it directly var ping string if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { + s.connMu.Lock() if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + s.connMu.Unlock() log.Warn("Failed to respond to system ping message", "err", err) return } + s.connMu.Unlock() continue } // Not a system ping, try to decode an actual state message @@ -432,6 +438,8 @@ func (s *Service) login(conn *websocket.Conn) error { login := map[string][]interface{}{ "emit": {"hello", auth}, } + s.connMu.Lock() + defer s.connMu.Unlock() if err := conn.WriteJSON(login); err != nil { return err } @@ -474,6 +482,8 @@ func (s *Service) reportLatency(conn *websocket.Conn) error { "clientTime": start.String(), }}, } + s.connMu.Lock() + defer s.connMu.Unlock() if err := conn.WriteJSON(ping); err != nil { return err } @@ -547,6 +557,8 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { report := map[string][]interface{}{ "emit": {"block", stats}, } + s.connMu.Lock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -661,6 +673,8 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { report := map[string][]interface{}{ "emit": {"history", stats}, } + s.connMu.Unlock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -691,6 +705,8 @@ func (s *Service) reportPending(conn *websocket.Conn) error { report := map[string][]interface{}{ "emit": {"pending", stats}, } + s.connMu.Lock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -746,5 +762,7 @@ func (s *Service) reportStats(conn *websocket.Conn) error { report := map[string][]interface{}{ "emit": {"stats", stats}, } + s.connMu.Lock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } From acd88cdf05caa389f42f3d16acaa3c51429e3d70 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 3 Aug 2020 12:10:44 +0200 Subject: [PATCH 2/3] ethstats: prevent concurrent reads aswell --- ethstats/ethstats.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index f5916e0f2b14..2e620a821ee1 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -82,7 +82,17 @@ type Service struct { pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel - connMu sync.Mutex // Mutex to prevent concurrent write on the websocket connection + // Gorilla websocket docs: + // Connections support one concurrent reader and one concurrent writer. + // Applications are responsible for ensuring that no more than one goroutine calls the write methods + // - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel + // concurrently and that no more than one goroutine calls the read methods + // - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler + // concurrently. + // The Close and WriteControl methods can be called concurrently with all other methods. + // + // In our case, we use a single mutex for both reading and writing. + connMu sync.Mutex // Mutex to prevent concurrent write/read on the websocket connection } // New returns a monitoring service ready for stats reporting. @@ -302,22 +312,24 @@ func (s *Service) readLoop(conn *websocket.Conn) { for { // Retrieve the next generic network packet and bail out on error var blob json.RawMessage + s.connMu.Lock() if err := conn.ReadJSON(&blob); err != nil { log.Warn("Failed to retrieve stats server message", "err", err) + s.connMu.Unlock() return } // If the network packet is a system ping, respond to it directly var ping string if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { - s.connMu.Lock() if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { - s.connMu.Unlock() log.Warn("Failed to respond to system ping message", "err", err) + s.connMu.Unlock() return } s.connMu.Unlock() continue } + s.connMu.Unlock() // Not a system ping, try to decode an actual state message var msg map[string][]interface{} if err := json.Unmarshal(blob, &msg); err != nil { From 293a30b49365f69c7bd69d3fc6ec9271d596d7d3 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 4 Aug 2020 09:47:51 +0200 Subject: [PATCH 3/3] ethstats: use connection wrapper instead of locks all over --- ethstats/ethstats.go | 94 ++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 39 deletions(-) diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 2e620a821ee1..d73c43f9de47 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -82,17 +82,48 @@ type Service struct { pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel - // Gorilla websocket docs: - // Connections support one concurrent reader and one concurrent writer. - // Applications are responsible for ensuring that no more than one goroutine calls the write methods - // - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel - // concurrently and that no more than one goroutine calls the read methods - // - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler - // concurrently. - // The Close and WriteControl methods can be called concurrently with all other methods. - // - // In our case, we use a single mutex for both reading and writing. - connMu sync.Mutex // Mutex to prevent concurrent write/read on the websocket connection +} + +// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the +// websocket. +// From Gorilla websocket docs: +// Connections support one concurrent reader and one concurrent writer. +// Applications are responsible for ensuring that no more than one goroutine calls the write methods +// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel +// concurrently and that no more than one goroutine calls the read methods +// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler +// concurrently. +// The Close and WriteControl methods can be called concurrently with all other methods. +// +// The connWrapper uses a single mutex for both reading and writing. +type connWrapper struct { + conn *websocket.Conn + mu sync.Mutex +} + +func newConnectionWrapper(conn *websocket.Conn) *connWrapper { + return &connWrapper{conn: conn} +} + +// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) WriteJSON(v interface{}) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.conn.WriteJSON(v) +} + +// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) ReadJSON(v interface{}) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.conn.ReadJSON(v) +} + +// Close wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) Close() error { + // The Close and WriteControl methods can be called concurrently with all other methods, + // so the mutex is not used here + return w.conn.Close() } // New returns a monitoring service ready for stats reporting. @@ -227,17 +258,19 @@ func (s *Service) loop() { case <-errTimer.C: // Establish a websocket connection to the server on any supported URL var ( - conn *websocket.Conn + conn *connWrapper err error ) dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} header := make(http.Header) header.Set("origin", "http://localhost") for _, url := range urls { - conn, _, err = dialer.Dial(url, header) - if err == nil { + c, _, e := dialer.Dial(url, header) + if e == nil { + conn = newConnectionWrapper(c) break } + err = e } if err != nil { log.Warn("Stats server unreachable", "err", err) @@ -305,17 +338,15 @@ func (s *Service) loop() { // from the network socket. If any of them match an active request, it forwards // it, if they themselves are requests it initiates a reply, and lastly it drops // unknown packets. -func (s *Service) readLoop(conn *websocket.Conn) { +func (s *Service) readLoop(conn *connWrapper) { // If the read loop exists, close the connection defer conn.Close() for { // Retrieve the next generic network packet and bail out on error var blob json.RawMessage - s.connMu.Lock() if err := conn.ReadJSON(&blob); err != nil { log.Warn("Failed to retrieve stats server message", "err", err) - s.connMu.Unlock() return } // If the network packet is a system ping, respond to it directly @@ -323,13 +354,10 @@ func (s *Service) readLoop(conn *websocket.Conn) { if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { log.Warn("Failed to respond to system ping message", "err", err) - s.connMu.Unlock() return } - s.connMu.Unlock() continue } - s.connMu.Unlock() // Not a system ping, try to decode an actual state message var msg map[string][]interface{} if err := json.Unmarshal(blob, &msg); err != nil { @@ -419,7 +447,7 @@ type authMsg struct { } // login tries to authorize the client at the remote server. -func (s *Service) login(conn *websocket.Conn) error { +func (s *Service) login(conn *connWrapper) error { // Construct and send the login authentication infos := s.server.NodeInfo() @@ -450,8 +478,6 @@ func (s *Service) login(conn *websocket.Conn) error { login := map[string][]interface{}{ "emit": {"hello", auth}, } - s.connMu.Lock() - defer s.connMu.Unlock() if err := conn.WriteJSON(login); err != nil { return err } @@ -466,7 +492,7 @@ func (s *Service) login(conn *websocket.Conn) error { // report collects all possible data to report and send it to the stats server. // This should only be used on reconnects or rarely to avoid overloading the // server. Use the individual methods for reporting subscribed events. -func (s *Service) report(conn *websocket.Conn) error { +func (s *Service) report(conn *connWrapper) error { if err := s.reportLatency(conn); err != nil { return err } @@ -484,7 +510,7 @@ func (s *Service) report(conn *websocket.Conn) error { // reportLatency sends a ping request to the server, measures the RTT time and // finally sends a latency update. -func (s *Service) reportLatency(conn *websocket.Conn) error { +func (s *Service) reportLatency(conn *connWrapper) error { // Send the current time to the ethstats server start := time.Now() @@ -494,8 +520,6 @@ func (s *Service) reportLatency(conn *websocket.Conn) error { "clientTime": start.String(), }}, } - s.connMu.Lock() - defer s.connMu.Unlock() if err := conn.WriteJSON(ping); err != nil { return err } @@ -555,7 +579,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) { } // reportBlock retrieves the current chain head and reports it to the stats server. -func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { +func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { // Gather the block details from the header or block chain details := s.assembleBlockStats(block) @@ -569,8 +593,6 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { report := map[string][]interface{}{ "emit": {"block", stats}, } - s.connMu.Lock() - defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -629,7 +651,7 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats { // reportHistory retrieves the most recent batch of blocks and reports it to the // stats server. -func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { +func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { // Figure out the indexes that need reporting indexes := make([]uint64, 0, historyUpdateRange) if len(list) > 0 { @@ -685,8 +707,6 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { report := map[string][]interface{}{ "emit": {"history", stats}, } - s.connMu.Unlock() - defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -697,7 +717,7 @@ type pendStats struct { // reportPending retrieves the current number of pending transactions and reports // it to the stats server. -func (s *Service) reportPending(conn *websocket.Conn) error { +func (s *Service) reportPending(conn *connWrapper) error { // Retrieve the pending count from the local blockchain var pending int if s.eth != nil { @@ -717,8 +737,6 @@ func (s *Service) reportPending(conn *websocket.Conn) error { report := map[string][]interface{}{ "emit": {"pending", stats}, } - s.connMu.Lock() - defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -735,7 +753,7 @@ type nodeStats struct { // reportPending retrieves various stats about the node at the networking and // mining layer and reports it to the stats server. -func (s *Service) reportStats(conn *websocket.Conn) error { +func (s *Service) reportStats(conn *connWrapper) error { // Gather the syncing and mining infos from the local miner instance var ( mining bool @@ -774,7 +792,5 @@ func (s *Service) reportStats(conn *websocket.Conn) error { report := map[string][]interface{}{ "emit": {"stats", stats}, } - s.connMu.Lock() - defer s.connMu.Unlock() return conn.WriteJSON(report) }