diff --git a/rpc/json.go b/rpc/json.go index 8a3b162cabbc..051f22076d9a 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -46,6 +46,17 @@ type subscriptionResult struct { Result json.RawMessage `json:"result,omitempty"` } +type subscriptionResultEnc struct { + ID string `json:"subscription"` + Result any `json:"result"` +} + +type jsonrpcSubscriptionNotification struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params subscriptionResultEnc `json:"params"` +} + // A value of this type can a JSON-RPC request, notification, successful response or // error response. Which one it is depends on the fields. type jsonrpcMessage struct { diff --git a/rpc/subscription.go b/rpc/subscription.go index 3231c2ceec9b..9cb07275479e 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -105,7 +105,7 @@ type Notifier struct { mu sync.Mutex sub *Subscription - buffer []json.RawMessage + buffer []any callReturned bool activated bool } @@ -129,12 +129,7 @@ func (n *Notifier) CreateSubscription() *Subscription { // Notify sends a notification to the client with the given data as payload. // If an error occurs the RPC connection is closed and the error is returned. -func (n *Notifier) Notify(id ID, data interface{}) error { - enc, err := json.Marshal(data) - if err != nil { - return err - } - +func (n *Notifier) Notify(id ID, data any) error { n.mu.Lock() defer n.mu.Unlock() @@ -144,9 +139,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error { panic("Notify with wrong ID") } if n.activated { - return n.send(n.sub, enc) + return n.send(n.sub, data) } - n.buffer = append(n.buffer, enc) + n.buffer = append(n.buffer, data) return nil } @@ -181,16 +176,16 @@ func (n *Notifier) activate() error { return nil } -func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { - params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) - ctx := context.Background() - - msg := &jsonrpcMessage{ +func (n *Notifier) send(sub *Subscription, data any) error { + msg := jsonrpcSubscriptionNotification{ Version: vsn, Method: n.namespace + notificationMethodSuffix, - Params: params, + Params: subscriptionResultEnc{ + ID: string(sub.ID), + Result: data, + }, } - return n.h.conn.writeJSON(ctx, msg, false) + return n.h.conn.writeJSON(context.Background(), &msg, false) } // A Subscription is created by a notifier and tied to that notifier. The client can use diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index b2704578291e..3a131c8e6bd2 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -17,12 +17,19 @@ package rpc import ( + "bytes" + "context" "encoding/json" "fmt" + "io" + "math/big" "net" "strings" "testing" "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" ) func TestNewID(t *testing.T) { @@ -218,3 +225,56 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe return nil, nil, fmt.Errorf("unrecognized message: %v", msg) } } + +type mockConn struct { + enc *json.Encoder +} + +// writeJSON writes a message to the connection. +func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error { + return c.enc.Encode(msg) +} + +// Closed returns a channel which is closed when the connection is closed. +func (c *mockConn) closed() <-chan interface{} { return nil } + +// RemoteAddr returns the peer address of the connection. +func (c *mockConn) remoteAddr() string { return "" } + +// BenchmarkNotify benchmarks the performance of notifying a subscription. +func BenchmarkNotify(b *testing.B) { + id := ID("test") + notifier := &Notifier{ + h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}}, + sub: &Subscription{ID: id}, + activated: true, + } + msg := &types.Header{ + ParentHash: common.HexToHash("0x01"), + Number: big.NewInt(100), + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + notifier.Notify(id, msg) + } +} + +func TestNotify(t *testing.T) { + out := new(bytes.Buffer) + id := ID("test") + notifier := &Notifier{ + h: &handler{conn: &mockConn{json.NewEncoder(out)}}, + sub: &Subscription{ID: id}, + activated: true, + } + msg := &types.Header{ + ParentHash: common.HexToHash("0x01"), + Number: big.NewInt(100), + } + notifier.Notify(id, msg) + have := strings.TrimSpace(out.String()) + want := `{"jsonrpc":"2.0","method":"_subscription","params":{"subscription":"test","result":{"parentHash":"0x0000000000000000000000000000000000000000000000000000000000000001","sha3Uncles":"0x0000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","receiptsRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","difficulty":null,"number":"0x64","gasLimit":"0x0","gasUsed":"0x0","timestamp":"0x0","extraData":"0x","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","baseFeePerGas":null,"withdrawalsRoot":null,"blobGasUsed":null,"excessBlobGas":null,"parentBeaconBlockRoot":null,"hash":"0xe5fb877dde471b45b9742bb4bb4b3d74a761e2fb7cb849a3d2b687eed90fb604"}}}` + if have != want { + t.Errorf("have:\n%v\nwant:\n%v\n", have, want) + } +}