From 51da97feb4d6273aec3e30684675156f5a4f33ef Mon Sep 17 00:00:00 2001 From: Gaston Ponti Date: Wed, 7 Apr 2021 18:02:36 -0300 Subject: [PATCH] Celostats reliability (#1487) Description Improve reliability from the ethstats module The connectionWrapper was cherrypicked from go-ethereum upstream: https://github.com/ethereum/go-ethereum/commit/82a9e11058623e73ace50445b41a06ccf8fd888f (https://github.com/ethereum/go-ethereum/pull/21404) Other changes Validator injecting its version to the proxy stats chunk Tested Manually in a local testnet Related issues Fixes #1395 Fixes #1397 Backwards compatibility Yes --- ethstats/ethstats.go | 400 ++++++++++++++++++++++++++++--------------- 1 file changed, 259 insertions(+), 141 deletions(-) diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index e2b30e6c2a..044bfc160e 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -28,6 +28,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/celo-org/celo-blockchain/accounts" @@ -126,6 +127,51 @@ type Service struct { histCh chan []uint64 // History request block numbers are fed into this channel } +// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the +// websocket. +// +// From Gorilla websocket docs: +// Connections support one concurrent reader and one concurrent writer. +// Applications are responsible for ensuring that no more than one goroutine calls the write methods +// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel +// concurrently and that no more than one goroutine calls the read methods +// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler +// concurrently. +// The Close and WriteControl methods can be called concurrently with all other methods. +type connWrapper struct { + conn *websocket.Conn + + rlock sync.Mutex + wlock sync.Mutex +} + +func newConnectionWrapper(conn *websocket.Conn) *connWrapper { + return &connWrapper{conn: conn} +} + +// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) WriteJSON(v interface{}) error { + w.wlock.Lock() + defer w.wlock.Unlock() + + return w.conn.WriteJSON(v) +} + +// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) ReadJSON(v interface{}) error { + w.rlock.Lock() + defer w.rlock.Unlock() + + return w.conn.ReadJSON(v) +} + +// Close wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) Close() error { + // The Close and WriteControl methods can be called concurrently with all other methods, + // so the mutex is not used here + return w.conn.Close() +} + func parseStatsConnectionURL(url string, name *string, host *string) error { re := regexp.MustCompile("([^:@]*)?@(.+)") parts := re.FindStringSubmatch(url) @@ -240,6 +286,7 @@ func (s *Service) loop(ctx context.Context) { for { select { case delegateSignMsg := <-signCh: + s.fillWithValidatorInfo(&delegateSignMsg.Payload) if err := s.handleDelegateSign(&delegateSignMsg.Payload, delegateSignMsg.PeerID); err != nil { log.Warn("Delegate sign failed", "err", err) } @@ -259,81 +306,104 @@ func (s *Service) loop(ctx context.Context) { urls = []string{"wss://" + path, "ws://" + path} } - // Establish a websocket connection to the server on any supported URL - var ( - conn *websocket.Conn - err error - ) - + errTimer := time.NewTimer(0) + defer errTimer.Stop() // Loop reporting until termination for { - dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} - header := make(http.Header) - for _, url := range urls { - conn, _, err = dialer.Dial(url, header) - if err == nil { - break + select { + case <-ctxGroup.Done(): + return ctxGroup.Err() + case <-errTimer.C: + var ( + conn *connWrapper + err error + ) + // Establish a websocket connection to the server on any supported URL + dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} + header := make(http.Header) + header.Set("origin", "http://localhost") + for _, url := range urls { + c, _, e := dialer.Dial(url, header) + err = e + if err == nil { + conn = newConnectionWrapper(c) + break + } } - } - - if err != nil { - log.Warn("Stats server unreachable", "err", err) - time.Sleep(connectionTimeout * time.Second) - continue - } - // Authenticate the client with the server - if err = s.login(conn, sendCh); err != nil { - log.Warn("Stats login failed", "err", err) - conn.Close() - time.Sleep(connectionTimeout * time.Second) - continue - } - - // This go routine will close when the connection gets closed/lost - go s.readLoop(conn) + if err != nil { + log.Warn("Stats server unreachable", "err", err) + errTimer.Reset(connectionTimeout * time.Second) + continue + } - // Send the initial stats so our node looks decent from the get go - if err = s.report(conn, sendCh); err != nil { - log.Warn("Initial stats report failed", "err", err) - conn.Close() - continue - } + // Authenticate the client with the server + if err = s.login(conn, sendCh); err != nil { + log.Warn("Stats login failed", "err", err) + conn.Close() + errTimer.Reset(connectionTimeout * time.Second) + continue + } - // Keep sending status updates until the connection breaks - fullReport := time.NewTicker(statusUpdateInterval * time.Second) + // This go routine will close when the connection gets closed/lost + go s.readLoop(conn) - for err == nil { - select { - case <-ctxGroup.Done(): + // Send the initial stats so our node looks decent from the get go + if err = s.report(conn, sendCh); err != nil { + log.Warn("Initial stats report failed", "err", err) conn.Close() - return ctxGroup.Err() + continue + } - case <-fullReport.C: - if err = s.report(conn, sendCh); err != nil { - log.Warn("Full stats report failed", "err", err) - } - case list := <-s.histCh: - if err = s.reportHistory(conn, list); err != nil { - log.Warn("Requested history report failed", "err", err) - } - case head := <-headCh: - if err = s.reportBlock(conn, head); err != nil { - log.Warn("Block stats report failed", "err", err) - } - case <-txCh: - if err = s.reportPending(conn); err != nil { - log.Warn("Transaction stats report failed", "err", err) - } - case signedMessage := <-sendCh: - if err = s.handleDelegateSend(conn, signedMessage); err != nil { - log.Warn("Delegate send failed", "err", err) + // Keep sending status updates until the connection breaks + fullReport := time.NewTicker(statusUpdateInterval * time.Second) + + for err == nil { + select { + case <-ctxGroup.Done(): + fullReport.Stop() + conn.Close() + return ctxGroup.Err() + + case <-fullReport.C: + if err = s.report(conn, sendCh); err != nil { + log.Warn("Full stats report failed", "err", err) + } + case list := <-s.histCh: + if err = s.reportHistory(conn, list); err != nil { + log.Warn("Requested history report failed", "err", err) + } + case head := <-headCh: + if err = s.reportBlock(conn, head); err != nil { + log.Warn("Block stats report failed", "err", err) + } + if err = s.reportPending(conn); err != nil { + log.Warn("Post-block transaction stats report failed", "err", err) + } + case <-txCh: + if err = s.reportPending(conn); err != nil { + log.Warn("Transaction stats report failed", "err", err) + } + case signedMessage := <-sendCh: + // if it is a ping or hello message, it shouldn't be handled here + if signedMessage.Action != actionNodePing && signedMessage.Action != actionHello { + if err = s.handleDelegateSend(conn, signedMessage); err != nil { + log.Warn("Delegate send failed", "err", err) + } + } else { + errMessage := fmt.Sprintf("Signed message with action %s discarded", signedMessage.Action) + log.Warn("Signed message discarded", "Action", signedMessage.Action) + err = errors.New(errMessage) + } } } + fullReport.Stop() + + // Make sure the connection is closed + conn.Close() + errTimer.Reset(0) + // Continues with the for, and tries to login again until the ctx was cancelled } - // Make sure the connection is closed - conn.Close() - // Continues with the for, and tries to login again until the ctx was cancelled } }) } @@ -342,7 +412,7 @@ func (s *Service) loop(ctx context.Context) { } // login tries to authorize the client at the remote server. -func (s *Service) login(conn *websocket.Conn, sendCh chan *StatsPayload) error { +func (s *Service) login(conn *connWrapper, sendCh chan *StatsPayload) error { // Construct and send the login authentication infos := s.server.NodeInfo() @@ -390,8 +460,8 @@ func (s *Service) login(conn *websocket.Conn, sendCh chan *StatsPayload) error { } if s.backend.IsProxy() { - // Proxy needs a delegate send here to get ACK - if err := s.waitAndDelegateMessageWithTimeout(conn, sendCh); err != nil { + // Proxy needs a delegate send of a hello action here to get ACK + if err := s.waitAndDelegateMessageWithTimeout(conn, sendCh, actionHello); err != nil { return err } } @@ -428,12 +498,53 @@ func (s *Service) login(conn *websocket.Conn, sendCh chan *StatsPayload) error { return nil } -func (s *Service) waitAndDelegateMessageWithTimeout(conn *websocket.Conn, sendCh chan *StatsPayload) error { - select { - case signedMessage := <-sendCh: - return s.handleDelegateSend(conn, signedMessage) - case <-time.After(delegateSignTimeout * time.Second): - return errors.New("delegation sign timeout") +func (s *Service) waitAndDelegateMessageWithTimeout(conn *connWrapper, sendCh chan *StatsPayload, action string) error { + for { + select { + case signedMessage := <-sendCh: + err := s.handleDelegateSend(conn, signedMessage) + // The wait and delegate message, basically requires that some message was already returned. + // It is possible to receive an old message signed that was queue before. + // With this, we only continue if that message was present, otherwise we delegate that message + // and continue + if signedMessage.Action == action { + return err + } else { + if err != nil { + log.Warn("Delegate send failed", "err", err) + } + } + case <-time.After(delegateSignTimeout * time.Second): + return errors.New("delegation sign timeout") + } + } +} + +var nodeNameRegex = regexp.MustCompile(`(.*)/(.*)/(.*)/(.*)`) + +func (s *Service) fillWithValidatorInfo(message *StatsPayload) { + if message.Action == actionHello { + msg, ok := message.Stats.(map[string]interface{}) + if ok { + proxyInfo, ok := msg["info"].(map[string]interface{}) + if ok { + infos := s.server.NodeInfo() + proxyNode := proxyInfo["node"].(string) + proxyNodeParts := nodeNameRegex.FindStringSubmatch(proxyNode) + validatorNodeParts := nodeNameRegex.FindStringSubmatch(infos.Name) + // if one of the regex failed, maintain the proxy node name + if proxyNodeParts != nil && validatorNodeParts != nil { + proxyInfo["node"] = fmt.Sprintf( + "%s/%s(val:%s)/%s/%s", + proxyNodeParts[1], + proxyNodeParts[2], + validatorNodeParts[2], + proxyNodeParts[3], + proxyNodeParts[4], + ) + } + } + } } } @@ -454,7 +565,7 @@ func (s *Service) handleDelegateSign(messageToSign *StatsPayload, peerID enode.I return s.backend.SendDelegateSignMsgToProxy(msg, peerID) } -func (s *Service) handleDelegateSend(conn *websocket.Conn, signedMessage *StatsPayload) error { +func (s *Service) handleDelegateSend(conn *connWrapper, signedMessage *StatsPayload) error { report := map[string][]interface{}{ "emit": {signedMessage.Action, signedMessage.Stats}, } @@ -550,85 +661,94 @@ func (s *Service) handleChainHeadEvents(ctx context.Context, headCh chan *types. // from the network socket. If any of them match an active request, it forwards // it, if they themselves are requests it initiates a reply, and lastly it drops // unknown packets. -func (s *Service) readLoop(conn *websocket.Conn) { +func (s *Service) readLoop(conn *connWrapper) { // If the read loop exists, close the connection defer conn.Close() for { // Retrieve the next generic network packet and bail out on error - var msg interface{} - if err := conn.ReadJSON(&msg); err != nil { + var blob json.RawMessage + if err := conn.ReadJSON(&blob); err != nil { + // A closed connection from the server is also catched here + log.Warn("Failed to retrieve stats server message", "err", err) + return + } + // If the network packet is a system ping, respond to it directly + var ping string + if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { + if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + log.Warn("Failed to respond to system ping message", "err", err) + return + } + continue + } + + // Not a system ping, try to decode an actual state message + var msg map[string]interface{} + if err := json.Unmarshal(blob, &msg); err != nil { log.Warn("Failed to decode stats server message", "err", err) return } + msgEmit, _ := msg["emit"].([]interface{}) - switch packet := msg.(type) { - case map[string]interface{}: - msgEmit, _ := packet["emit"].([]interface{}) + log.Trace("Received message from stats server", "msgEmit", msgEmit) + if len(msgEmit) == 0 { + log.Warn("Stats server sent non-broadcast", "msgEmit", msgEmit) + return + } - log.Trace("Received message from stats server", "msgEmit", msgEmit) - if len(msgEmit) == 0 { - log.Warn("Stats server sent non-broadcast", "msgEmit", msgEmit) + command, ok := msgEmit[0].(string) + if !ok { + log.Warn("Invalid stats server message type", "type", msgEmit[0]) + return + } + // If the message is a ping reply, deliver (someone must be listening!) + if len(msgEmit) == 2 && command == actionNodePong { + select { + case s.pongCh <- struct{}{}: + // Pong delivered, continue listening + continue + default: + // Ping routine dead, abort + log.Warn("Stats server pinger seems to have died") return } - - command, ok := msgEmit[0].(string) + } + // If the message is a history request, forward to the event processor + if len(msgEmit) == 2 && command == actionHistory { + // Make sure the request is valid and doesn't crash us + request, ok := msgEmit[1].(map[string]interface{}) if !ok { - log.Warn("Invalid stats server message type", "type", msgEmit[0]) - return - } - // If the message is a ping reply, deliver (someone must be listening!) - if len(msgEmit) == 2 && command == actionNodePong { + log.Warn("Invalid stats history request", "msg", msgEmit[1]) select { - case s.pongCh <- struct{}{}: - // Pong delivered, continue listening - continue + case s.histCh <- nil: // Treat it as an no indexes request default: - // Ping routine dead, abort - log.Warn("Stats server pinger seems to have died") - return } + continue // Ethstats sometimes sends invalid history requests, ignore those } - // If the message is a history request, forward to the event processor - if len(msgEmit) == 2 && command == actionHistory { - // Make sure the request is valid and doesn't crash us - request, ok := msgEmit[1].(map[string]interface{}) - if !ok { - log.Warn("Invalid stats history request", "msg", msgEmit[1]) - s.histCh <- nil - continue // Ethstats sometime sends invalid history requests, ignore those - } - list, ok := request["list"].([]interface{}) + list, ok := request["list"].([]interface{}) + if !ok { + log.Warn("Invalid stats history block list", "list", request["list"]) + return + } + // Convert the block number list to an integer list + numbers := make([]uint64, len(list)) + for i, num := range list { + n, ok := num.(float64) if !ok { - log.Warn("Invalid stats history block list", "list", request["list"]) + log.Warn("Invalid stats history block number", "number", num) return } - // Convert the block number list to an integer list - numbers := make([]uint64, len(list)) - for i, num := range list { - n, ok := num.(float64) - if !ok { - log.Warn("Invalid stats history block number", "number", num) - return - } - numbers[i] = uint64(n) - } - select { - case s.histCh <- numbers: - continue - default: - } + numbers[i] = uint64(n) } - default: - // Report anything else and continue - log.Info("Ping from websocket server", "msg", msg) - // Primus server might want to have a pong or it closes the connection - var serverTime = fmt.Sprintf("primus::pong::%d", time.Now().UnixNano()/int64(time.Millisecond)) - err := conn.WriteMessage(websocket.TextMessage, []byte(serverTime)) - if err != nil { - log.Error("Sending pong failed!", "err", err) + select { + case s.histCh <- numbers: + continue + default: } } + // Report anything else and continue + log.Info("Unknown stats message", "msg", msg) } } @@ -656,7 +776,7 @@ type authMsg struct { // report collects all possible data to report and send it to the stats server. // This should only be used on reconnects or rarely to avoid overloading the // server. Use the individual methods for reporting subscribed events. -func (s *Service) report(conn *websocket.Conn, sendCh chan *StatsPayload) error { +func (s *Service) report(conn *connWrapper, sendCh chan *StatsPayload) error { if err := s.reportLatency(conn, sendCh); err != nil { log.Warn("Latency failed to report", "err", err) return err @@ -675,7 +795,7 @@ func (s *Service) report(conn *websocket.Conn, sendCh chan *StatsPayload) error // reportLatency sends a ping request to the server, measures the RTT time and // finally sends a latency update. -func (s *Service) reportLatency(conn *websocket.Conn, sendCh chan *StatsPayload) error { +func (s *Service) reportLatency(conn *connWrapper, sendCh chan *StatsPayload) error { // Send the current time to the ethstats server start := time.Now() @@ -686,9 +806,9 @@ func (s *Service) reportLatency(conn *websocket.Conn, sendCh chan *StatsPayload) if err := s.sendStats(conn, actionNodePing, ping); err != nil { return err } - // Proxy needs a delegate send here to get ACK + // Proxy needs a delegate send of a node-ping action here to get ACK if s.backend.IsProxy() { - if err := s.waitAndDelegateMessageWithTimeout(conn, sendCh); err != nil { + if err := s.waitAndDelegateMessageWithTimeout(conn, sendCh, actionNodePing); err != nil { return err } } @@ -801,7 +921,7 @@ func (s *Service) signStats(stats interface{}) (map[string]interface{}, error) { return signedStats, nil } -func (s *Service) sendStats(conn *websocket.Conn, action string, stats interface{}) error { +func (s *Service) sendStats(conn *connWrapper, action string, stats interface{}) error { if s.backend.IsProxy() { statsWithAction := map[string]interface{}{ "stats": stats, @@ -832,7 +952,7 @@ func (s *Service) sendStats(conn *websocket.Conn, action string, stats interface } // reportBlock retrieves the current chain head and reports it to the stats server. -func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { +func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { // Gather the block details from the header or block chain details := s.assembleBlockStats(block) @@ -929,7 +1049,6 @@ type validatorInfo struct { func (s *Service) assembleValidatorSet(block *types.Block, state vm.StateDB) validatorSet { var ( - err error valSet validatorSet valsRegistered []validatorInfo valsElected []common.Address @@ -939,8 +1058,7 @@ func (s *Service) assembleValidatorSet(block *types.Block, state vm.StateDB) val valsRegisteredMap, _ := validators.RetrieveRegisteredValidators(s.eth.BlockChain().CurrentHeader(), state) valsRegistered = make([]validatorInfo, 0, len(valsRegisteredMap)) for _, address := range valsRegisteredMap { - var valData validators.ValidatorContractData - valData, err = validators.GetValidator(s.eth.BlockChain().CurrentHeader(), state, address) + valData, err := validators.GetValidator(s.eth.BlockChain().CurrentHeader(), state, address) if err != nil { log.Warn("Validator data not found", "address", address.Hex(), "err", err) @@ -974,7 +1092,7 @@ func (s *Service) assembleValidatorSet(block *types.Block, state vm.StateDB) val // reportHistory retrieves the most recent batch of blocks and reports it to the // stats server. -func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error { +func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { // Figure out the indexes that need reporting indexes := make([]uint64, 0, historyUpdateRange) if len(list) > 0 { @@ -1037,7 +1155,7 @@ type pendStats struct { // reportPending retrieves the current number of pending transactions and reports // it to the stats server. -func (s *Service) reportPending(conn *websocket.Conn) error { +func (s *Service) reportPending(conn *connWrapper) error { // Retrieve the pending count from the local blockchain var pending int if s.eth != nil { @@ -1072,7 +1190,7 @@ type nodeStats struct { // reportPending retrieves various stats about the node at the networking and // mining layer and reports it to the stats server. -func (s *Service) reportStats(conn *websocket.Conn) error { +func (s *Service) reportStats(conn *connWrapper) error { // Gather the syncing and mining infos from the local miner instance var ( validatorAddress common.Address