Skip to content

Commit

Permalink
ethstats: avoid concurrent write on websocket, fixes #21403
Browse files Browse the repository at this point in the history
  • Loading branch information
holiman committed Aug 3, 2020
1 parent 9c2ac6f commit 6d65dd7
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -80,6 +81,8 @@ type Service struct {

pongCh chan struct{} // Pong notifications are fed into this channel
histCh chan []uint64 // History request block numbers are fed into this channel

connMu sync.Mutex // Mutex to prevent concurrent write on the websocket connection
}

// New returns a monitoring service ready for stats reporting.
Expand Down Expand Up @@ -306,10 +309,13 @@ func (s *Service) readLoop(conn *websocket.Conn) {
// If the network packet is a system ping, respond to it directly
var ping string
if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") {
s.connMu.Lock()
if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil {
s.connMu.Unlock()
log.Warn("Failed to respond to system ping message", "err", err)
return
}
s.connMu.Unlock()
continue
}
// Not a system ping, try to decode an actual state message
Expand Down Expand Up @@ -432,6 +438,8 @@ func (s *Service) login(conn *websocket.Conn) error {
login := map[string][]interface{}{
"emit": {"hello", auth},
}
s.connMu.Lock()
defer s.connMu.Unlock()
if err := conn.WriteJSON(login); err != nil {
return err
}
Expand Down Expand Up @@ -474,6 +482,8 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
"clientTime": start.String(),
}},
}
s.connMu.Lock()
defer s.connMu.Unlock()
if err := conn.WriteJSON(ping); err != nil {
return err
}
Expand Down Expand Up @@ -547,6 +557,8 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
report := map[string][]interface{}{
"emit": {"block", stats},
}
s.connMu.Lock()
defer s.connMu.Unlock()
return conn.WriteJSON(report)
}

Expand Down Expand Up @@ -661,6 +673,8 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
report := map[string][]interface{}{
"emit": {"history", stats},
}
s.connMu.Unlock()
defer s.connMu.Unlock()
return conn.WriteJSON(report)
}

Expand Down Expand Up @@ -691,6 +705,8 @@ func (s *Service) reportPending(conn *websocket.Conn) error {
report := map[string][]interface{}{
"emit": {"pending", stats},
}
s.connMu.Lock()
defer s.connMu.Unlock()
return conn.WriteJSON(report)
}

Expand Down Expand Up @@ -746,5 +762,7 @@ func (s *Service) reportStats(conn *websocket.Conn) error {
report := map[string][]interface{}{
"emit": {"stats", stats},
}
s.connMu.Lock()
defer s.connMu.Unlock()
return conn.WriteJSON(report)
}

0 comments on commit 6d65dd7

Please sign in to comment.