Skip to content

Commit

Permalink
Merge pull request #374 from onflow/gregor/pending-tx
Browse files Browse the repository at this point in the history
Pending transaction event from pool
  • Loading branch information
sideninja authored Jul 23, 2024
2 parents eff039c + 7961ee9 commit 3bf0418
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 52 deletions.
13 changes: 3 additions & 10 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,16 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*
s.transactionsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
tx, ok := data.(models.Transaction)
tx, ok := data.(*gethTypes.Transaction)
if !ok {
return fmt.Errorf("invalid data sent to pending transaction subscription")
}

var res any
if fullTx != nil && *fullTx {
if r, err := NewTransaction(tx, s.config.EVMNetworkID); err != nil {
return err
} else {
res = r
}
} else {
res = tx.Hash()
return notifier.Notify(sub.ID, tx)
}

return notifier.Notify(sub.ID, res)
return notifier.Notify(sub.ID, tx.Hash())
}
},
)
Expand Down
5 changes: 4 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func startIngestion(
transactions,
accounts,
blocksPublisher,
transactionsPublisher,
logsPublisher,
logger,
)
Expand Down Expand Up @@ -286,12 +285,16 @@ func startServer(
return fmt.Errorf("failed to create a COA signer: %w", err)
}

// create transaction pool
txPool := requester.NewTxPool(client, transactionsPublisher, logger)

evm, err := requester.NewEVM(
client,
cfg,
signer,
logger,
blocks,
txPool,
)
if err != nil {
return fmt.Errorf("failed to create EVM requester: %w", err)
Expand Down
49 changes: 22 additions & 27 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ import (
var _ models.Engine = &Engine{}

type Engine struct {
subscriber EventSubscriber
store *pebble.Storage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
accounts storage.AccountIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
status *models.EngineStatus
blocksPublisher *models.Publisher
transactionsPublisher *models.Publisher
logsPublisher *models.Publisher
subscriber EventSubscriber
store *pebble.Storage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
accounts storage.AccountIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
status *models.EngineStatus
blocksPublisher *models.Publisher
logsPublisher *models.Publisher
}

func NewEventIngestionEngine(
Expand All @@ -39,24 +38,22 @@ func NewEventIngestionEngine(
transactions storage.TransactionIndexer,
accounts storage.AccountIndexer,
blocksPublisher *models.Publisher,
transactionsPublisher *models.Publisher,
logsPublisher *models.Publisher,
log zerolog.Logger,
) *Engine {
log = log.With().Str("component", "ingestion").Logger()

return &Engine{
subscriber: subscriber,
store: store,
blocks: blocks,
receipts: receipts,
transactions: transactions,
accounts: accounts,
log: log,
status: models.NewEngineStatus(),
blocksPublisher: blocksPublisher,
transactionsPublisher: transactionsPublisher,
logsPublisher: logsPublisher,
subscriber: subscriber,
store: store,
blocks: blocks,
receipts: receipts,
transactions: transactions,
accounts: accounts,
log: log,
status: models.NewEngineStatus(),
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
}
}

Expand Down Expand Up @@ -181,9 +178,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
e.blocksPublisher.Publish(b)
}

for i, r := range receipts {
e.transactionsPublisher.Publish(txs[i])

for _, r := range receipts {
if len(r.Logs) > 0 {
e.logsPublisher.Publish(r.Logs)
}
Expand Down
5 changes: 0 additions & 5 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func TestSerialBlockIngestion(t *testing.T) {
accounts,
models.NewPublisher(),
models.NewPublisher(),
models.NewPublisher(),
zerolog.Nop(),
)

Expand Down Expand Up @@ -148,7 +147,6 @@ func TestSerialBlockIngestion(t *testing.T) {
accounts,
models.NewPublisher(),
models.NewPublisher(),
models.NewPublisher(),
zerolog.Nop(),
)

Expand Down Expand Up @@ -258,7 +256,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
accounts,
models.NewPublisher(),
models.NewPublisher(),
models.NewPublisher(),
zerolog.Nop(),
)

Expand Down Expand Up @@ -357,7 +354,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
accounts,
models.NewPublisher(),
models.NewPublisher(),
models.NewPublisher(),
zerolog.Nop(),
)

Expand Down Expand Up @@ -452,7 +448,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
accounts,
models.NewPublisher(),
models.NewPublisher(),
models.NewPublisher(),
zerolog.Nop(),
)

Expand Down
23 changes: 15 additions & 8 deletions services/requester/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ const (
// and after submitted based on different strategies.

type TxPool struct {
logger zerolog.Logger
client *CrossSporkClient
pool *sync.Map
// todo add a broadcaster for pending transaction streaming
logger zerolog.Logger
client *CrossSporkClient
pool *sync.Map
txPublisher *models.Publisher
// todo add methods to inspect transaction pool state
}

func NewTxPool(client *CrossSporkClient, logger zerolog.Logger) *TxPool {
func NewTxPool(
client *CrossSporkClient,
transactionsPublisher *models.Publisher,
logger zerolog.Logger,
) *TxPool {
return &TxPool{
logger: logger.With().Str("component", "tx-pool").Logger(),
client: client,
pool: &sync.Map{},
logger: logger.With().Str("component", "tx-pool").Logger(),
client: client,
txPublisher: transactionsPublisher,
pool: &sync.Map{},
}
}

Expand All @@ -49,6 +54,8 @@ func (t *TxPool) Send(
flowTx *flow.Transaction,
evmTx *gethTypes.Transaction,
) error {
t.txPublisher.Publish(evmTx) // publish pending transaction event

if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion services/requester/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewEVM(
signer crypto.Signer,
logger zerolog.Logger,
blocks storage.BlockIndexer,
txPool *TxPool,
) (*EVM, error) {
logger = logger.With().Str("component", "requester").Logger()
// check that the address stores already created COA resource in the "evm" storage path.
Expand Down Expand Up @@ -164,7 +165,7 @@ func NewEVM(
signer: signer,
logger: logger,
blocks: blocks,
txPool: NewTxPool(client, logger),
txPool: txPool,
head: head,
evmSigner: evmSigner,
validationOptions: validationOptions,
Expand Down

0 comments on commit 3bf0418

Please sign in to comment.