Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generics to subscriber and publisher and fix potential deadlock #602

Merged
merged 10 commits into from
Oct 10, 2024
62 changes: 23 additions & 39 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/onflow/go-ethereum/eth/filters"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/models"
Expand All @@ -25,10 +24,9 @@ type StreamAPI struct {
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
blocksPublisher *models.Publisher
transactionsPublisher *models.Publisher
logsPublisher *models.Publisher
ratelimiter limiter.Store
blocksPublisher *models.Publisher[*models.Block]
transactionsPublisher *models.Publisher[*gethTypes.Transaction]
logsPublisher *models.Publisher[[]*gethTypes.Log]
}

func NewStreamAPI(
Expand All @@ -37,10 +35,9 @@ func NewStreamAPI(
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
blocksPublisher *models.Publisher,
transactionsPublisher *models.Publisher,
logsPublisher *models.Publisher,
ratelimiter limiter.Store,
blocksPublisher *models.Publisher[*models.Block],
transactionsPublisher *models.Publisher[*gethTypes.Transaction],
logsPublisher *models.Publisher[[]*gethTypes.Log],
) *StreamAPI {
return &StreamAPI{
logger: logger,
Expand All @@ -51,22 +48,17 @@ func NewStreamAPI(
blocksPublisher: blocksPublisher,
transactionsPublisher: transactionsPublisher,
logsPublisher: logsPublisher,
ratelimiter: ratelimiter,
}
}

// NewHeads send a notification each time a new block is appended to the chain.
func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.blocksPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
block, ok := data.(*models.Block)
if !ok {
return fmt.Errorf("invalid data sent to block subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error {
return func(block *models.Block) error {
h, err := block.Hash()
if err != nil {
return err
Expand All @@ -93,16 +85,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.transactionsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
tx, ok := data.(*gethTypes.Transaction)
if !ok {
return fmt.Errorf("invalid data sent to pending transaction subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func(*gethTypes.Transaction) error {
return func(tx *gethTypes.Transaction) error {
if fullTx != nil && *fullTx {
return notifier.Notify(sub.ID, tx)
}
Expand All @@ -120,16 +108,12 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
return nil, fmt.Errorf("failed to create log subscription filter: %w", err)
}

return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.logsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
allLogs, ok := data.([]*gethTypes.Log)
if !ok {
return fmt.Errorf("invalid data sent to log subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*gethTypes.Log) error {
return func(allLogs []*gethTypes.Log) error {
for _, log := range allLogs {
// todo we could optimize this matching for cases where we have multiple subscriptions
// using the same filter criteria, we could only filter once and stream to all subscribers
Expand All @@ -148,10 +132,11 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
)
}

func (s *StreamAPI) newSubscription(
func newSubscription[T any](
ctx context.Context,
publisher *models.Publisher,
callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error,
logger zerolog.Logger,
publisher *models.Publisher[T],
callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(T) error,
) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
Expand All @@ -162,8 +147,7 @@ func (s *StreamAPI) newSubscription(

subs := models.NewSubscription(callback(notifier, rpcSub))

rpcSub.ID = rpc.ID(subs.ID().String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit hesitant about removing this assignment here. If I remember correctly, this was added by Gregor, due to some issue with the event streaming API. But on the other hand, the CI is passing, so maybe it's not needed after all 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up, I'll take a look at the history, maybe there were some clues what specific problem this was solving.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't find anything, we can just merge it. It doesn't seem to break anything, and the E2E tests do exercise this part.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janezpodhostnik I did investigate this. What I found is:

rpcSub.ID

has the format: 0x8b841deff1dbca0881c995ad77574d17.
I think Gregor changed it to uuid, to match the format of the event streaming API,
e.g.: 69a20431-f601-43da-99f1-aa4bfd4d1bac

But we're using entirely the subscription functionality from Geth, so there's no need to go with the uuid format.
I think we're all good here. It would make sense though, to log the proper subscription-id, e.g.

l := logger.With().Str("subscription-id", string(rpcSub.ID)).Logger()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 47ecbea. Please check if that is what you had in mind.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good idea adding both 👍

l := s.logger.With().Str("subscription-id", subs.ID().String()).Logger()
l := logger.With().Str("subscription-id", fmt.Sprintf("%p", subs)).Logger()
janezpodhostnik marked this conversation as resolved.
Show resolved Hide resolved

publisher.Subscribe(subs)

Expand Down
20 changes: 12 additions & 8 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
Expand All @@ -36,9 +37,9 @@ type Storages struct {
}

type Publishers struct {
Block *models.Publisher
Transaction *models.Publisher
Logs *models.Publisher
Block *models.Publisher[*models.Block]
Transaction *models.Publisher[*gethTypes.Transaction]
Logs *models.Publisher[[]*gethTypes.Log]
}

type Bootstrap struct {
Expand Down Expand Up @@ -71,9 +72,9 @@ func New(config *config.Config) (*Bootstrap, error) {

return &Bootstrap{
publishers: &Publishers{
Block: models.NewPublisher(),
Transaction: models.NewPublisher(),
Logs: models.NewPublisher(),
Block: models.NewPublisher[*models.Block](),
Transaction: models.NewPublisher[*gethTypes.Transaction](),
Logs: models.NewPublisher[[]*gethTypes.Log](),
},
storages: storages,
logger: logger,
Expand Down Expand Up @@ -208,7 +209,11 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
}

// create transaction pool
txPool := requester.NewTxPool(b.client, b.publishers.Transaction, b.logger)
txPool := requester.NewTxPool(
b.client,
b.publishers.Transaction,
b.logger,
)

evm, err := requester.NewEVM(
b.client,
Expand Down Expand Up @@ -259,7 +264,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.publishers.Block,
b.publishers.Transaction,
b.publishers.Logs,
ratelimiter,
)

pullAPI := api.NewPullAPI(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
cloud.google.com/go/storage v1.36.0
github.com/cockroachdb/pebble v1.1.1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/onflow/atree v0.8.0-rc.6
github.com/onflow/cadence v1.0.0-preview.52
Expand Down Expand Up @@ -82,6 +81,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
47 changes: 19 additions & 28 deletions models/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,66 @@ package models

import (
"sync"

"github.com/google/uuid"
)

type Publisher struct {
type Publisher[T any] struct {
mux sync.RWMutex
subscribers map[uuid.UUID]Subscriber
subscribers map[Subscriber[T]]struct{}
}

func NewPublisher() *Publisher {
return &Publisher{
func NewPublisher[T any]() *Publisher[T] {
return &Publisher[T]{
mux: sync.RWMutex{},
subscribers: make(map[uuid.UUID]Subscriber),
subscribers: make(map[Subscriber[T]]struct{}),
}
}

func (p *Publisher) Publish(data any) {
func (p *Publisher[T]) Publish(data T) {
p.mux.RLock()
defer p.mux.RUnlock()

for _, s := range p.subscribers {
for s := range p.subscribers {
s.Notify(data)
}
}
janezpodhostnik marked this conversation as resolved.
Show resolved Hide resolved

func (p *Publisher) Subscribe(s Subscriber) {
func (p *Publisher[T]) Subscribe(s Subscriber[T]) {
p.mux.Lock()
defer p.mux.Unlock()

p.subscribers[s.ID()] = s
p.subscribers[s] = struct{}{}
}

func (p *Publisher) Unsubscribe(s Subscriber) {
func (p *Publisher[T]) Unsubscribe(s Subscriber[T]) {
p.mux.Lock()
defer p.mux.Unlock()

delete(p.subscribers, s.ID())
delete(p.subscribers, s)
}

type Subscriber interface {
ID() uuid.UUID
Notify(data any)
type Subscriber[T any] interface {
Notify(data T)
Error() <-chan error
}

type Subscription struct {
type Subscription[T any] struct {
err chan error
callback func(data any) error
uuid uuid.UUID
callback func(data T) error
}

func NewSubscription(callback func(any) error) *Subscription {
return &Subscription{
func NewSubscription[T any](callback func(T) error) *Subscription[T] {
return &Subscription[T]{
callback: callback,
uuid: uuid.New(),
}
janezpodhostnik marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *Subscription) Notify(data any) {
func (b *Subscription[T]) Notify(data T) {
err := b.callback(data)
if err != nil {
b.err <- err
}
}

func (b *Subscription) ID() uuid.UUID {
return b.uuid
}

func (b *Subscription) Error() <-chan error {
func (b *Subscription[T]) Error() <-chan error {
return b.err
}
9 changes: 5 additions & 4 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

pebbleDB "github.com/cockroachdb/pebble"
"github.com/onflow/flow-go-sdk"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/metrics"
Expand Down Expand Up @@ -41,8 +42,8 @@ type Engine struct {
accounts storage.AccountIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
blocksPublisher *models.Publisher
logsPublisher *models.Publisher
blocksPublisher *models.Publisher[*models.Block]
logsPublisher *models.Publisher[[]*gethTypes.Log]
collector metrics.Collector
}

Expand All @@ -53,8 +54,8 @@ func NewEventIngestionEngine(
receipts storage.ReceiptIndexer,
transactions storage.TransactionIndexer,
accounts storage.AccountIndexer,
blocksPublisher *models.Publisher,
logsPublisher *models.Publisher,
blocksPublisher *models.Publisher[*models.Block],
logsPublisher *models.Publisher[[]*gethTypes.Log],
log zerolog.Logger,
collector metrics.Collector,
) *Engine {
Expand Down
Loading
Loading