diff --git a/.changeset/perfect-peaches-work.md b/.changeset/perfect-peaches-work.md new file mode 100644 index 00000000000..9a3b461e1c9 --- /dev/null +++ b/.changeset/perfect-peaches-work.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix flaky WS test server #internal diff --git a/core/chains/evm/testutils/client.go b/core/chains/evm/testutils/client.go index 325e24300be..ba455c08a85 100644 --- a/core/chains/evm/testutils/client.go +++ b/core/chains/evm/testutils/client.go @@ -137,7 +137,7 @@ func (ts *testWSServer) newWSHandler(chainID *big.Int, callback JSONRPCHandler) ts.mu.Unlock() for { - _, data, err := conn.ReadMessage() + err := ts.handleNewMsg(chainID, conn, callback) if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) { ts.t.Log("Websocket closing") @@ -146,97 +146,98 @@ func (ts *testWSServer) newWSHandler(chainID *big.Int, callback JSONRPCHandler) ts.t.Logf("Failed to read message: %v", err) return } - ts.t.Log("Received message", string(data)) - - req := gjson.ParseBytes(data) - - if req.IsArray() { // Handle batch request - ts.t.Log("Received batch request") - responses := []string{} - for _, reqElem := range req.Array() { - m := reqElem.Get("method") - if m.Type != gjson.String { - ts.t.Logf("Method must be string: %v", m.Type) - continue - } - - var resp JSONRPCResponse - if chainID != nil && m.String() == "eth_chainId" { - resp.Result = `"0x` + chainID.Text(16) + `"` - } else if m.String() == "eth_syncing" { - resp.Result = "false" - } else { - resp = callback(m.String(), reqElem.Get("params")) - } - id := reqElem.Get("id") - var msg string - if resp.Error.Message != "" { - msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":%d,"message":"%s"}}`, id, resp.Error.Code, resp.Error.Message) - } else { - msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`, id, resp.Result) - } - responses = append(responses, msg) - } - responseBatch := fmt.Sprintf("[%s]", strings.Join(responses, ",")) - ts.t.Logf("Sending batch response: %v", responseBatch) - ts.mu.Lock() - err = conn.WriteMessage(websocket.BinaryMessage, []byte(responseBatch)) - ts.mu.Unlock() - if err != nil { - ts.t.Logf("Failed to write message: %v", err) - } - return - } - // Handle single request - if e := req.Get("error"); e.Exists() { - ts.t.Logf("Received jsonrpc error: %v", e) - continue - } + } + } +} - m := req.Get("method") - if m.Type != gjson.String { - ts.t.Logf("Method must be string: %v", m.Type) - return - } +func (ts *testWSServer) handleNewMsg(chainID *big.Int, conn *websocket.Conn, callback JSONRPCHandler) error { + _, data, err := conn.ReadMessage() + if err != nil { + return err + } - var resp JSONRPCResponse - if chainID != nil && m.String() == "eth_chainId" { - resp.Result = `"0x` + chainID.Text(16) + `"` - } else if m.String() == "eth_syncing" { - resp.Result = "false" - } else { - resp = callback(m.String(), req.Get("params")) - } - id := req.Get("id") - var msg string - if resp.Error.Message != "" { - msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":%d,"message":"%s"}}`, id, resp.Error.Code, resp.Error.Message) - } else { - msg = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`, id, resp.Result) - } - ts.t.Logf("Sending message: %v", msg) - ts.mu.Lock() - err = conn.WriteMessage(websocket.BinaryMessage, []byte(msg)) - ts.mu.Unlock() + ts.t.Log("Received message", string(data)) + + req := gjson.ParseBytes(data) + + if req.IsArray() { // Handle batch request + ts.t.Log("Received batch request") + var responses []string + for i, reqElem := range req.Array() { + var response string + response, _, err = ts.handleRequest(chainID, callback, reqElem) if err != nil { - ts.t.Logf("Failed to write message: %v", err) - return + return fmt.Errorf("failed to handle elem %d of batch request: %w", i, err) } + responses = append(responses, response) + } - if resp.Notify != "" { - time.Sleep(100 * time.Millisecond) - msg := fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":%s}}`, resp.Notify) - ts.t.Log("Sending message", msg) - ts.mu.Lock() - err = conn.WriteMessage(websocket.BinaryMessage, []byte(msg)) - ts.mu.Unlock() - if err != nil { - ts.t.Logf("Failed to write message: %v", err) - return - } - } + return ts.writeMsg(conn, fmt.Sprintf("[%s]", strings.Join(responses, ","))) + } + // Handle single request + response, asyncResponse, err := ts.handleRequest(chainID, callback, req) + if err != nil { + return fmt.Errorf("failed to handle request: %w", err) + } + + if response != "" { + err = ts.writeMsg(conn, response) + if err != nil { + return err } } + + if asyncResponse != "" { + time.Sleep(100 * time.Millisecond) + return ts.writeMsg(conn, asyncResponse) + } + + return nil +} + +func (ts *testWSServer) handleRequest(chainID *big.Int, callback JSONRPCHandler, req gjson.Result) (response, asyncResponse string, err error) { + if e := req.Get("error"); e.Exists() { + ts.t.Logf("Received jsonrpc error: %v", e) + return + } + + m := req.Get("method") + if m.Type != gjson.String { + err = fmt.Errorf("method must be string: %v", m.Type) + return + } + + var resp JSONRPCResponse + if chainID != nil && m.String() == "eth_chainId" { + resp.Result = `"0x` + chainID.Text(16) + `"` + } else if m.String() == "eth_syncing" { + resp.Result = "false" + } else { + resp = callback(m.String(), req.Get("params")) + } + id := req.Get("id") + if resp.Error.Message != "" { + response = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":%d,"message":"%s"}}`, id, resp.Error.Code, resp.Error.Message) + } else { + response = fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`, id, resp.Result) + } + + if resp.Notify != "" { + asyncResponse = fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":%s}}`, resp.Notify) + } + + return +} + +func (ts *testWSServer) writeMsg(conn *websocket.Conn, msg string) error { + ts.t.Logf("Sending message: %v", msg) + ts.mu.Lock() + err := conn.WriteMessage(websocket.BinaryMessage, []byte(msg)) + ts.mu.Unlock() + if err != nil { + return fmt.Errorf("failed to write msg: %w", err) + } + return nil } type MockEth struct {