From 79a0de7e78d270c5c5381094dd93b1112c026d1c Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Thu, 28 Nov 2024 09:27:35 -0800 Subject: [PATCH] Handle resource exhaustion errors in the ingestion engine --- services/ingestion/event_subscriber.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index a3136a073..7403570f4 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "sort" + "time" "github.com/onflow/cadence/common" "github.com/onflow/flow-go/fvm/evm/events" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" @@ -271,11 +274,21 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) if err != nil { + // if we are rate limited by the AN, wait a bit and try again + if status.Code(err) == codes.ResourceExhausted { + time.Sleep(100 * time.Millisecond) + continue + } return 0, fmt.Errorf("failed to get block events: %w", err) } transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) if err != nil { + // if we are rate limited by the AN, wait a bit and try again + if status.Code(err) == codes.ResourceExhausted { + time.Sleep(100 * time.Millisecond) + continue + } return 0, fmt.Errorf("failed to get block events: %w", err) } @@ -346,6 +359,11 @@ func (r *RPCEventSubscriber) accumulateBlockEvents( currentHeight+maxRangeForGetEvents, ) if err != nil { + // if we are rate limited by the AN, wait a bit and try again + if status.Code(err) == codes.ResourceExhausted { + time.Sleep(100 * time.Millisecond) + continue + } return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err) } @@ -356,6 +374,11 @@ func (r *RPCEventSubscriber) accumulateBlockEvents( currentHeight+maxRangeForGetEvents, ) if err != nil { + // if we are rate limited by the AN, wait a bit and try again + if status.Code(err) == codes.ResourceExhausted { + time.Sleep(100 * time.Millisecond) + continue + } return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err) }