Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generics to subscriber and publisher and fix potential deadlock #602

Merged
merged 10 commits into from
Oct 10, 2024
62 changes: 23 additions & 39 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/onflow/go-ethereum/eth/filters"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/models"
Expand All @@ -25,10 +24,9 @@ type StreamAPI struct {
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
blocksPublisher *models.Publisher
transactionsPublisher *models.Publisher
logsPublisher *models.Publisher
ratelimiter limiter.Store
blocksPublisher *models.Publisher[*models.Block]
transactionsPublisher *models.Publisher[*gethTypes.Transaction]
logsPublisher *models.Publisher[[]*gethTypes.Log]
}

func NewStreamAPI(
Expand All @@ -37,10 +35,9 @@ func NewStreamAPI(
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
blocksPublisher *models.Publisher,
transactionsPublisher *models.Publisher,
logsPublisher *models.Publisher,
ratelimiter limiter.Store,
blocksPublisher *models.Publisher[*models.Block],
transactionsPublisher *models.Publisher[*gethTypes.Transaction],
logsPublisher *models.Publisher[[]*gethTypes.Log],
) *StreamAPI {
return &StreamAPI{
logger: logger,
Expand All @@ -51,22 +48,17 @@ func NewStreamAPI(
blocksPublisher: blocksPublisher,
transactionsPublisher: transactionsPublisher,
logsPublisher: logsPublisher,
ratelimiter: ratelimiter,
}
}

// NewHeads send a notification each time a new block is appended to the chain.
func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.blocksPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
block, ok := data.(*models.Block)
if !ok {
return fmt.Errorf("invalid data sent to block subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error {
return func(block *models.Block) error {
h, err := block.Hash()
if err != nil {
return err
Expand All @@ -93,16 +85,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.transactionsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
tx, ok := data.(*gethTypes.Transaction)
if !ok {
return fmt.Errorf("invalid data sent to pending transaction subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func(*gethTypes.Transaction) error {
return func(tx *gethTypes.Transaction) error {
if fullTx != nil && *fullTx {
return notifier.Notify(sub.ID, tx)
}
Expand All @@ -120,16 +108,12 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
return nil, fmt.Errorf("failed to create log subscription filter: %w", err)
}

return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.logsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
allLogs, ok := data.([]*gethTypes.Log)
if !ok {
return fmt.Errorf("invalid data sent to log subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*gethTypes.Log) error {
return func(allLogs []*gethTypes.Log) error {
for _, log := range allLogs {
// todo we could optimize this matching for cases where we have multiple subscriptions
// using the same filter criteria, we could only filter once and stream to all subscribers
Expand All @@ -148,10 +132,11 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
)
}

func (s *StreamAPI) newSubscription(
func newSubscription[T any](
ctx context.Context,
publisher *models.Publisher,
callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error,
logger zerolog.Logger,
publisher *models.Publisher[T],
callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(T) error,
) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
Expand All @@ -162,8 +147,7 @@ func (s *StreamAPI) newSubscription(

subs := models.NewSubscription(callback(notifier, rpcSub))

rpcSub.ID = rpc.ID(subs.ID().String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit hesitant about removing this assignment here. If I remember correctly, this was added by Gregor, due to some issue with the event streaming API. But on the other hand, the CI is passing, so maybe it's not needed after all 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up, I'll take a look at the history, maybe there were some clues what specific problem this was solving.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't find anything, we can just merge it. It doesn't seem to break anything, and the E2E tests do exercise this part.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janezpodhostnik I did investigate this. What I found is:

rpcSub.ID

has the format: 0x8b841deff1dbca0881c995ad77574d17.
I think Gregor changed it to uuid, to match the format of the event streaming API,
e.g.: 69a20431-f601-43da-99f1-aa4bfd4d1bac

But we're using entirely the subscription functionality from Geth, so there's no need to go with the uuid format.
I think we're all good here. It would make sense though, to log the proper subscription-id, e.g.

l := logger.With().Str("subscription-id", string(rpcSub.ID)).Logger()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 47ecbea. Please check if that is what you had in mind.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good idea adding both 👍

l := s.logger.With().Str("subscription-id", subs.ID().String()).Logger()
l := logger.With().Str("subscription-id", fmt.Sprintf("%p", subs)).Logger()
janezpodhostnik marked this conversation as resolved.
Show resolved Hide resolved

publisher.Subscribe(subs)

Expand Down
20 changes: 12 additions & 8 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
Expand All @@ -36,9 +37,9 @@ type Storages struct {
}

type Publishers struct {
Block *models.Publisher
Transaction *models.Publisher
Logs *models.Publisher
Block *models.Publisher[*models.Block]
Transaction *models.Publisher[*gethTypes.Transaction]
Logs *models.Publisher[[]*gethTypes.Log]
}

type Bootstrap struct {
Expand Down Expand Up @@ -72,9 +73,9 @@ func New(config *config.Config) (*Bootstrap, error) {

return &Bootstrap{
publishers: &Publishers{
Block: models.NewPublisher(),
Transaction: models.NewPublisher(),
Logs: models.NewPublisher(),
Block: models.NewPublisher[*models.Block](),
Transaction: models.NewPublisher[*gethTypes.Transaction](),
Logs: models.NewPublisher[[]*gethTypes.Log](),
},
storages: storages,
logger: logger,
Expand Down Expand Up @@ -209,7 +210,11 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
}

// create transaction pool
txPool := requester.NewTxPool(b.client, b.publishers.Transaction, b.logger)
txPool := requester.NewTxPool(
b.client,
b.publishers.Transaction,
b.logger,
)

evm, err := requester.NewEVM(
b.client,
Expand Down Expand Up @@ -260,7 +265,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.publishers.Block,
b.publishers.Transaction,
b.publishers.Logs,
ratelimiter,
)

pullAPI := api.NewPullAPI(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
cloud.google.com/go/storage v1.36.0
github.com/cockroachdb/pebble v1.1.1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/onflow/atree v0.8.0-rc.6
github.com/onflow/cadence v1.0.0
Expand Down Expand Up @@ -82,6 +81,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
56 changes: 27 additions & 29 deletions models/stream.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,74 @@
package models

import (
"fmt"
"sync"

"github.com/google/uuid"
)

type Publisher struct {
type Publisher[T any] struct {
mux sync.RWMutex
subscribers map[uuid.UUID]Subscriber
subscribers map[Subscriber[T]]struct{}
}

func NewPublisher() *Publisher {
return &Publisher{
func NewPublisher[T any]() *Publisher[T] {
return &Publisher[T]{
mux: sync.RWMutex{},
subscribers: make(map[uuid.UUID]Subscriber),
subscribers: make(map[Subscriber[T]]struct{}),
}
}

func (p *Publisher) Publish(data any) {
func (p *Publisher[T]) Publish(data T) {
p.mux.RLock()
defer p.mux.RUnlock()

for _, s := range p.subscribers {
for s := range p.subscribers {
s.Notify(data)
}
}
janezpodhostnik marked this conversation as resolved.
Show resolved Hide resolved

func (p *Publisher) Subscribe(s Subscriber) {
func (p *Publisher[T]) Subscribe(s Subscriber[T]) {
p.mux.Lock()
defer p.mux.Unlock()

p.subscribers[s.ID()] = s
p.subscribers[s] = struct{}{}
}

func (p *Publisher) Unsubscribe(s Subscriber) {
func (p *Publisher[T]) Unsubscribe(s Subscriber[T]) {
p.mux.Lock()
defer p.mux.Unlock()

delete(p.subscribers, s.ID())
delete(p.subscribers, s)
}

type Subscriber interface {
ID() uuid.UUID
Notify(data any)
type Subscriber[T any] interface {
Notify(data T)
Error() <-chan error
}

type Subscription struct {
type Subscription[T any] struct {
err chan error
callback func(data any) error
uuid uuid.UUID
callback func(data T) error
}

func NewSubscription(callback func(any) error) *Subscription {
return &Subscription{
func NewSubscription[T any](callback func(T) error) *Subscription[T] {
return &Subscription[T]{
callback: callback,
uuid: uuid.New(),
err: make(chan error),
}
}

func (b *Subscription) Notify(data any) {
func (b *Subscription[T]) Notify(data T) {
err := b.callback(data)
if err != nil {
b.err <- err
select {
case b.err <- err:
Copy link
Member

@zhangchiqing zhangchiqing Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the subscription cares about callback returning error or not.

I think the subscription's responsibility is to deliver the data to the callback. If there is error, it's the callback's job to handle it, log it, or even crash.

The callback has all the context of why this would error, so it's better being handled there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!
Right now I just added a band-aid, but I will definitely fix this further with a following PR.

default:
// TODO: handle this better!
panic(fmt.Sprintf("failed to send error to subscription %v", err))
}
}
}

func (b *Subscription) ID() uuid.UUID {
return b.uuid
}

func (b *Subscription) Error() <-chan error {
func (b *Subscription[T]) Error() <-chan error {
return b.err
}
Loading
Loading