Skip to content

Commit

Permalink
fix: race condition within evm indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
onikonychev committed Sep 22, 2024
1 parent c20f872 commit f61e012
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 39 deletions.
65 changes: 36 additions & 29 deletions app/server/evm_tx_indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server

import (
"context"
"sync/atomic"
"time"

"github.com/cometbft/cometbft/libs/service"
Expand All @@ -13,7 +14,7 @@ import (
)

const (
ServiceName = "EVMTxIndexerService"
EVMTxIndexerServiceName = "EVMTxIndexerService"

NewBlockWaitTimeout = 60 * time.Second
)
Expand All @@ -22,19 +23,16 @@ const (
type EVMTxIndexerService struct {
service.BaseService

txIndexer *indexer.EVMTxIndexer
client rpcclient.Client
cancelFunc context.CancelFunc
evmTxIndexer *indexer.EVMTxIndexer
rpcClient rpcclient.Client
cancelFunc context.CancelFunc
}

// NewEVMIndexerService returns a new service instance.
func NewEVMIndexerService(
txIdxr *indexer.EVMTxIndexer,
client rpcclient.Client,
) *EVMTxIndexerService {
is := &EVMTxIndexerService{txIndexer: txIdxr, client: client}
is.BaseService = *service.NewBaseService(nil, ServiceName, is)
return is
func NewEVMIndexerService(evmTxIndexer *indexer.EVMTxIndexer, rpcClient rpcclient.Client) *EVMTxIndexerService {
indexerService := &EVMTxIndexerService{evmTxIndexer: evmTxIndexer, rpcClient: rpcClient}
indexerService.BaseService = *service.NewBaseService(nil, EVMTxIndexerServiceName, indexerService)
return indexerService
}

// OnStart implements service.Service by subscribing for new blocks
Expand All @@ -43,37 +41,43 @@ func (service *EVMTxIndexerService) OnStart() error {
ctx, cancel := context.WithCancel(context.Background())
service.cancelFunc = cancel

status, err := service.client.Status(ctx)
status, err := service.rpcClient.Status(ctx)
if err != nil {
return err
}
latestBlock := status.SyncInfo.LatestBlockHeight
newBlockSignal := make(chan struct{}, 1)

blockHeadersChan, err := service.client.Subscribe(
// chainHeightStorage is used within goroutine and the indexer loop so, using atomic for read/write
var chainHeightStorage int64
atomic.StoreInt64(&chainHeightStorage, status.SyncInfo.LatestBlockHeight)

newBlockSignal := make(chan struct{}, 1)
blockHeadersChan, err := service.rpcClient.Subscribe(
ctx,
ServiceName,
EVMTxIndexerServiceName,
types.QueryForEvent(types.EventNewBlockHeader).String(),
0,
)
if err != nil {
return err
}

// Goroutine listening for new blocks
go func(ctx context.Context) {
for {
select {
case <-ctx.Done(): // Listen for context cancellation to stop the goroutine
case <-ctx.Done():
service.Logger.Info("Stopping indexer goroutine")
err := service.txIndexer.CloseDBAndExit()
err := service.evmTxIndexer.CloseDBAndExit()
if err != nil {
service.Logger.Error("Error closing indexer DB", "err", err)
}
return
case msg := <-blockHeadersChan:
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
if eventDataHeader.Header.Height > latestBlock {
latestBlock = eventDataHeader.Header.Height
currentChainHeight := eventDataHeader.Header.Height
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if currentChainHeight > chainHeight {
atomic.StoreInt64(&chainHeightStorage, currentChainHeight)
// notify
select {
case newBlockSignal <- struct{}{}:
Expand All @@ -84,37 +88,40 @@ func (service *EVMTxIndexerService) OnStart() error {
}
}(ctx)

lastBlock, err := service.txIndexer.LastIndexedBlock()
lastIndexedHeight, err := service.evmTxIndexer.LastIndexedBlock()
if err != nil {
return err
}
if lastBlock == -1 {
lastBlock = latestBlock
if lastIndexedHeight == -1 {
lastIndexedHeight = atomic.LoadInt64(&chainHeightStorage)
}

// Indexer loop
for {
if latestBlock <= lastBlock {
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if chainHeight <= lastIndexedHeight {
// nothing to index. wait for signal of new block
select {
case <-newBlockSignal:
case <-time.After(NewBlockWaitTimeout):
}
continue
}
for i := lastBlock + 1; i <= latestBlock; i++ {
block, err := service.client.Block(ctx, &i)
for i := lastIndexedHeight + 1; i <= chainHeight; i++ {
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := service.client.BlockResults(ctx, &i)
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
if err := service.txIndexer.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
if err := service.evmTxIndexer.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
service.Logger.Error("failed to index block", "height", i, "err", err)
}
lastBlock = blockResult.Height
lastIndexedHeight = blockResult.Height
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions app/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func startInProcess(ctx *sdkserver.Context, clientCtx client.Context, opts Start

// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC or JSONRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local tendermint RPC client.
// case, because it spawns a new local tendermint RPC rpcClient.
if (conf.API.Enable || conf.GRPC.Enable || conf.JSONRPC.Enable || conf.JSONRPC.EnableIndexer) && tmNode != nil {
clientCtx = clientCtx.WithClient(local.New(tmNode))

Expand All @@ -389,7 +389,11 @@ func startInProcess(ctx *sdkserver.Context, clientCtx client.Context, opts Start
logger.Error("failed to open evm indexer DB", "error", err.Error())
return err
}
evmTxIndexer, _ := OpenEVMIndexer(ctx, idxDB, clientCtx)
evmTxIndexer, _, err := OpenEVMIndexer(ctx, idxDB, clientCtx)
if err != nil {
logger.Error("failed starting evm indexer service", "error", err.Error())
return err
}
evmIdxer = evmTxIndexer
}

Expand Down Expand Up @@ -423,7 +427,7 @@ func startInProcess(ctx *sdkserver.Context, clientCtx client.Context, opts Start

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// If grpc is enabled, configure grpc client for grpc gateway and json-rpc.
// If grpc is enabled, configure grpc rpcClient for grpc gateway and json-rpc.
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -438,7 +442,7 @@ func startInProcess(ctx *sdkserver.Context, clientCtx client.Context, opts Start
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("gRPC client assigned to client context", "address", grpcAddress)
ctx.Logger.Debug("gRPC rpcClient assigned to rpcClient context", "address", grpcAddress)
}
}

Expand Down Expand Up @@ -597,7 +601,7 @@ func OpenIndexerDB(rootDir string, backendType dbm.BackendType) (dbm.DB, error)

func OpenEVMIndexer(
ctx *sdkserver.Context, indexerDb dbm.DB, clientCtx client.Context,
) (eth.EVMTxIndexer, *EVMTxIndexerService) {
) (eth.EVMTxIndexer, *EVMTxIndexerService, error) {
idxLogger := ctx.Logger.With("indexer", "evm")
evmIndexer := indexer.NewEVMTxIndexer(indexerDb, idxLogger, clientCtx)

Expand All @@ -610,7 +614,12 @@ func OpenEVMIndexer(
errCh <- err
}
}()
return evmIndexer, evmIndexerService
select {
case err := <-errCh:
return nil, nil, err
case <-time.After(types.ServerStartTime): // assume server started successfully
}
return evmIndexer, evmIndexerService, nil
}

func openTraceWriter(traceWriterFile string) (w io.Writer, err error) {
Expand Down
4 changes: 2 additions & 2 deletions app/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func ConnectTmWS(tmRPCAddr, tmEndpoint string, logger tmlog.Logger) *rpcclient.W

if err != nil {
logger.Error(
"Tendermint WS client could not be created",
"Tendermint WS rpcClient could not be created",
"address", tmRPCAddr+tmEndpoint,
"error", err,
)
} else if err := tmWsClient.OnStart(); err != nil {
logger.Error(
"Tendermint WS client could not start",
"Tendermint WS rpcClient could not start",
"address", tmRPCAddr+tmEndpoint,
"error", err,
)
Expand Down
9 changes: 7 additions & 2 deletions x/common/testutil/testnetwork/start_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,17 @@ func startNodeAndServers(cfg Config, val *Validator) error {

val.Logger.Log("Set EVM indexer")

evmTxIndexer, evmTxIndexerService := server.OpenEVMIndexer(val.Ctx, db.NewMemDB(), val.ClientCtx)
evmTxIndexer, evmTxIndexerService, err := server.OpenEVMIndexer(val.Ctx, db.NewMemDB(), val.ClientCtx)
if err != nil {
{
return fmt.Errorf("failed starting evm indexer service: %w", err)

Check warning on line 139 in x/common/testutil/testnetwork/start_node.go

View check run for this annotation

Codecov / codecov/patch

x/common/testutil/testnetwork/start_node.go#L139

Added line #L139 was not covered by tests
}
}
val.EthTxIndexer = evmTxIndexer
val.EthTxIndexerService = evmTxIndexerService

val.jsonrpc, val.jsonrpcDone, err =
server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, val.AppConfig, nil)
server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, val.AppConfig, val.EthTxIndexer)
if err != nil {
return errors.Wrap(err, "failed to start JSON-RPC server")
}
Expand Down

0 comments on commit f61e012

Please sign in to comment.