diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index b60ac56eabce..f5916e0f2b14 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -28,6 +28,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -80,6 +81,8 @@ type Service struct { pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel + + connMu sync.Mutex // Mutex to prevent concurrent write on the websocket connection } // New returns a monitoring service ready for stats reporting. @@ -306,10 +309,13 @@ func (s *Service) readLoop(conn *websocket.Conn) { // If the network packet is a system ping, respond to it directly var ping string if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { + s.connMu.Lock() if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + s.connMu.Unlock() log.Warn("Failed to respond to system ping message", "err", err) return } + s.connMu.Unlock() continue } // Not a system ping, try to decode an actual state message @@ -432,6 +438,8 @@ func (s *Service) login(conn *websocket.Conn) error { login := map[string][]interface{}{ "emit": {"hello", auth}, } + s.connMu.Lock() + defer s.connMu.Unlock() if err := conn.WriteJSON(login); err != nil { return err } @@ -474,6 +482,8 @@ func (s *Service) reportLatency(conn *websocket.Conn) error { "clientTime": start.String(), }}, } + s.connMu.Lock() + defer s.connMu.Unlock() if err := conn.WriteJSON(ping); err != nil { return err } @@ -547,6 +557,8 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { report := map[string][]interface{}{ "emit": {"block", stats}, } + s.connMu.Lock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -661,6 +673,8 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { report := map[string][]interface{}{ "emit": {"history", stats}, } + s.connMu.Unlock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -691,6 +705,8 @@ func (s *Service) reportPending(conn *websocket.Conn) error { report := map[string][]interface{}{ "emit": {"pending", stats}, } + s.connMu.Lock() + defer s.connMu.Unlock() return conn.WriteJSON(report) } @@ -746,5 +762,7 @@ func (s *Service) reportStats(conn *websocket.Conn) error { report := map[string][]interface{}{ "emit": {"stats", stats}, } + s.connMu.Lock() + defer s.connMu.Unlock() return conn.WriteJSON(report) }