Skip to content

Commit

Permalink
Add subscribeEvents and subcriptionEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Nov 6, 2024
1 parent 3088325 commit ce0ca1b
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 0 deletions.
203 changes: 203 additions & 0 deletions rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/jsonrpc"
)
Expand Down Expand Up @@ -44,6 +45,16 @@ type EventsChunk struct {
ContinuationToken string `json:"continuation_token,omitempty"`
}

type EventSubscription struct {
From *felt.Felt `json:"from_address"`
Keys [][]felt.Felt `json:"keys"`
FromBlock *BlockID `json:"block"`
}

type SubscriptionID struct {
ID uint64 `json:"subscription_id"`
}

/****************************************************
Events Handlers
*****************************************************/
Expand Down Expand Up @@ -112,6 +123,198 @@ func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Er
return true, nil
}

const subscribeEventsChunkSize = 1024

func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt,
blockID *BlockID,
) (*SubscriptionID, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

lenKeys := len(keys)
for _, k := range keys {
lenKeys += len(k)
}
if lenKeys > maxEventFilterKeys {
return nil, ErrTooManyKeysInFilter
}

var requestedHeader *core.Header
headHeader, err := h.bcReader.HeadsHeader()
if err != nil {
return nil, ErrInternal.CloneWithData(err.Error())
}

if blockID == nil {
requestedHeader = headHeader
} else {
requestedHeader, rpcErr := h.blockHeaderByID(blockID)
if rpcErr != nil {
return nil, rpcErr
}

// Todo: should the pending block be included in the head count?
if requestedHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack {
return nil, ErrTooManyBlocksBack
}
}

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()

headerSub := h.newHeads.Subscribe()
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
headerSub.Unsubscribe()
}()

// The specification doesn't enforce ordering of events therefore events from new blocks can be sent before
// old blocks.
// Todo: DRY
sub.wg.Go(func() {
for {
select {
case <-subscriptionCtx.Done():
return
case header := <-headerSub.Recv():
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}
defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")

Check failure on line 195 in rpc/events.go

View workflow job for this annotation

GitHub Actions / lint

deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop (gocritic)

if err = setEventFilterRange(filter, &BlockID{Number: header.Number},
&BlockID{Number: header.Number}, header.Number); err != nil {
h.log.Warnw("Error setting event filter range", "err", err)
return
}

var cToken *blockchain.ContinuationToken
filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

for cToken != nil {
filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}
}
}
}
})

filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}
defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")

if err = setEventFilterRange(filter, &BlockID{Number: requestedHeader.Number},
&BlockID{Number: headHeader.Number}, headHeader.Number); err != nil {
h.log.Warnw("Error setting event filter range", "err", err)
return
}

var cToken *blockchain.ContinuationToken
filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

for cToken != nil {
filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}
}
})

return &SubscriptionID{ID: id}, nil
}

func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error {
for _, event := range events {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Pending block doesn't have a number
var blockNumber *uint64
if event.BlockHash != nil {
blockNumber = &(event.BlockNumber)
}
emittedEvent := &EmittedEvent{
BlockNumber: blockNumber,
BlockHash: event.BlockHash,
TransactionHash: event.TransactionHash,
Event: &Event{
From: event.From,
Keys: event.Keys,
Data: event.Data,
},
}

resp, err := json.Marshal(jsonrpc.Request{
Version: "2.0",
Method: "starknet_subscriptionEvents",
Params: map[string]any{
"subscription_id": id,
"result": emittedEvent,
},
})
if err != nil {
return err
}

_, err = w.Write(resp)
return err
}
}
return nil
}

// Events gets the events matching a filter
//
// It follows the specification defined here:
Expand Down
44 changes: 44 additions & 0 deletions rpc/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,50 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {
return fc.w == fc2.w
}

func TestSubscribeEventsAndUnsubscribe(t *testing.T) {

Check failure on line 232 in rpc/events_test.go

View workflow job for this annotation

GitHub Actions / lint

TestSubscribeEventsAndUnsubscribe's subtests should call t.Parallel (tparallel)
t.Parallel()
log := utils.NewNopZapLogger()
n := utils.Ptr(utils.Mainnet)
client := feeder.NewTestClient(t, n)
gw := adaptfeeder.New(client)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
chain := blockchain.New(pebble.NewMemTest(t), n)
syncer := sync.New(chain, gw, log, 0, false)
handler := rpc.New(chain, syncer, nil, "", log)

go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)

serverConn, clientConn := net.Pipe()
t.Cleanup(func() {
require.NoError(t, serverConn.Close())
require.NoError(t, clientConn.Close())
})

t.Run("Too many keys in filter", func(t *testing.T) {
keys := make([][]felt.Felt, 1024+1)
fromAddr := new(felt.Felt).SetBytes([]byte("from_address"))
id, rpcErr := handler.SubscribeEvents(ctx, fromAddr, keys, nil)
assert.Zero(t, id)
assert.Equal(t, rpc.ErrTooManyKeysInFilter, rpcErr)
})

// Todo: use mocks to fix the tests
t.Run("Too many blocks back", func(t *testing.T) {
keys := make([][]felt.Felt, 1)
fromAddr := new(felt.Felt).SetBytes([]byte("from_address"))
blockID := &rpc.BlockID{Number: 0}
id, rpcErr := handler.SubscribeEvents(ctx, fromAddr, keys, blockID)
assert.Zero(t, id)
assert.Equal(t, rpc.ErrTooManyBlocksBack, rpcErr)
})
}

func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
t.Parallel()
log := utils.NewNopZapLogger()
Expand Down
7 changes: 7 additions & 0 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ var (
ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"}
ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"}
ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"}
ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: "Cannot go back more than 1024 blocks"}

// These errors can be only be returned by Juno-specific methods.
ErrSubscriptionNotFound = &jsonrpc.Error{Code: 100, Message: "Subscription not found"}
)

const (
maxBlocksBack = 1024
maxEventChunkSize = 10240
maxEventFilterKeys = 1024
traceCacheSize = 128
Expand Down Expand Up @@ -311,6 +313,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen
Name: "starknet_specVersion",
Handler: h.SpecVersion,
},
{
Name: "starknet_subscribeEvents",
Params: []jsonrpc.Parameter{{Name: "from_address"}, {Name: "keys"}, {Name: "block", Optional: true}},
Handler: h.SubscribeEvents,
},
{
Name: "juno_subscribeNewHeads",
Handler: h.SubscribeNewHeads,
Expand Down

0 comments on commit ce0ca1b

Please sign in to comment.