diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 50d6d310b..4cc596d1e 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -156,7 +156,7 @@ func startIngestion( Uint64("missed-heights", blk.Height-latestCadenceHeight). Msg("indexing cadence height information") - subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID, logger) + subscriber := ingestion.NewRPCSubscriber(client, cfg.HeartbeatInterval, cfg.FlowNetworkID, logger) engine := ingestion.NewEventIngestionEngine( subscriber, blocks, diff --git a/config/config.go b/config/config.go index 27e17ed29..dec30e5d1 100644 --- a/config/config.go +++ b/config/config.go @@ -70,6 +70,8 @@ type Config struct { FilterExpiry time.Duration // ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally. ForceStartCadenceHeight uint64 + // HeartbeatInterval sets custom heartbeat interval for events + HeartbeatInterval uint64 } func FromFlags() (*Config, error) { @@ -95,6 +97,7 @@ func FromFlags() (*Config, error) { flag.BoolVar(&cfg.CreateCOAResource, "coa-resource-create", false, "Auto-create the COA resource in the Flow COA account provided if one doesn't exist") flag.StringVar(&logLevel, "log-level", "debug", "Define verbosity of the log output ('debug', 'info', 'error')") flag.Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second") + flag.Uint64Var(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription") flag.IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client") flag.Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. This should only be used locally or for testing, never in production.") flag.StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire") diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 48fb2d325..f0c1a2187 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -29,14 +29,25 @@ type EventSubscriber interface { var _ EventSubscriber = &RPCSubscriber{} type RPCSubscriber struct { - client *requester.CrossSporkClient - chain flowGo.ChainID - logger zerolog.Logger + client *requester.CrossSporkClient + chain flowGo.ChainID + heartbeatInterval uint64 + logger zerolog.Logger } -func NewRPCSubscriber(client *requester.CrossSporkClient, chainID flowGo.ChainID, logger zerolog.Logger) *RPCSubscriber { +func NewRPCSubscriber( + client *requester.CrossSporkClient, + heartbeatInterval uint64, + chainID flowGo.ChainID, + logger zerolog.Logger, +) *RPCSubscriber { logger = logger.With().Str("component", "subscriber").Logger() - return &RPCSubscriber{client: client, chain: chainID, logger: logger} + return &RPCSubscriber{ + client: client, + heartbeatInterval: heartbeatInterval, + chain: chainID, + logger: logger, + } } // Subscribe will retrieve all the events from the provided height. If the height is from previous @@ -81,7 +92,7 @@ func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan mod Msg("backfilling done, subscribe for live data") // subscribe in the current spork, handling of context cancellation is done by the producer - for ev := range r.subscribe(ctx, height) { + for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.heartbeatInterval)) { events <- ev } diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index d944bcea3..4be9d0dd5 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -31,7 +31,7 @@ func Test_Subscribing(t *testing.T) { client, err := requester.NewCrossSporkClient(currentClient, sporkClients, zerolog.Nop()) require.NoError(t, err) - subscriber := NewRPCSubscriber(client, flowGo.Emulator, zerolog.Nop()) + subscriber := NewRPCSubscriber(client, 100, flowGo.Emulator, zerolog.Nop()) events := subscriber.Subscribe(context.Background(), 1)