From b2eb2e51755f3da8d30c16230b85104b0e6c3c44 Mon Sep 17 00:00:00 2001 From: sideninja <75445744+sideninja@users.noreply.github.com> Date: Wed, 22 May 2024 20:00:44 +0200 Subject: [PATCH 1/3] add heartbeat to config --- config/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config/config.go b/config/config.go index 27e17ed2..77808713 100644 --- a/config/config.go +++ b/config/config.go @@ -70,6 +70,8 @@ type Config struct { FilterExpiry time.Duration // ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally. ForceStartCadenceHeight uint64 + // HeartbeatInterval sets custom heartbeat interval for events + HeartbeatInterval int } func FromFlags() (*Config, error) { @@ -95,6 +97,7 @@ func FromFlags() (*Config, error) { flag.BoolVar(&cfg.CreateCOAResource, "coa-resource-create", false, "Auto-create the COA resource in the Flow COA account provided if one doesn't exist") flag.StringVar(&logLevel, "log-level", "debug", "Define verbosity of the log output ('debug', 'info', 'error')") flag.Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second") + flag.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription") flag.IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client") flag.Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. This should only be used locally or for testing, never in production.") flag.StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire") From 3452eb653448c53ea41fe6db380860150de0880a Mon Sep 17 00:00:00 2001 From: sideninja <75445744+sideninja@users.noreply.github.com> Date: Wed, 22 May 2024 20:03:54 +0200 Subject: [PATCH 2/3] configuring heartbeat interval --- bootstrap/bootstrap.go | 2 +- config/config.go | 4 ++-- services/ingestion/subscriber.go | 23 +++++++++++++++++------ 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index c81b4e7e..8e184809 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -152,7 +152,7 @@ func startIngestion( Uint64("missed-heights", blk.Height-latestCadenceHeight). Msg("indexing cadence height information") - subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID, logger) + subscriber := ingestion.NewRPCSubscriber(client, cfg.HeartbeatInterval, cfg.FlowNetworkID, logger) engine := ingestion.NewEventIngestionEngine( subscriber, blocks, diff --git a/config/config.go b/config/config.go index 77808713..dec30e5d 100644 --- a/config/config.go +++ b/config/config.go @@ -71,7 +71,7 @@ type Config struct { // ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally. ForceStartCadenceHeight uint64 // HeartbeatInterval sets custom heartbeat interval for events - HeartbeatInterval int + HeartbeatInterval uint64 } func FromFlags() (*Config, error) { @@ -97,7 +97,7 @@ func FromFlags() (*Config, error) { flag.BoolVar(&cfg.CreateCOAResource, "coa-resource-create", false, "Auto-create the COA resource in the Flow COA account provided if one doesn't exist") flag.StringVar(&logLevel, "log-level", "debug", "Define verbosity of the log output ('debug', 'info', 'error')") flag.Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second") - flag.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription") + flag.Uint64Var(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription") flag.IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client") flag.Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. This should only be used locally or for testing, never in production.") flag.StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire") diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 48fb2d32..f0c1a218 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -29,14 +29,25 @@ type EventSubscriber interface { var _ EventSubscriber = &RPCSubscriber{} type RPCSubscriber struct { - client *requester.CrossSporkClient - chain flowGo.ChainID - logger zerolog.Logger + client *requester.CrossSporkClient + chain flowGo.ChainID + heartbeatInterval uint64 + logger zerolog.Logger } -func NewRPCSubscriber(client *requester.CrossSporkClient, chainID flowGo.ChainID, logger zerolog.Logger) *RPCSubscriber { +func NewRPCSubscriber( + client *requester.CrossSporkClient, + heartbeatInterval uint64, + chainID flowGo.ChainID, + logger zerolog.Logger, +) *RPCSubscriber { logger = logger.With().Str("component", "subscriber").Logger() - return &RPCSubscriber{client: client, chain: chainID, logger: logger} + return &RPCSubscriber{ + client: client, + heartbeatInterval: heartbeatInterval, + chain: chainID, + logger: logger, + } } // Subscribe will retrieve all the events from the provided height. If the height is from previous @@ -81,7 +92,7 @@ func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan mod Msg("backfilling done, subscribe for live data") // subscribe in the current spork, handling of context cancellation is done by the producer - for ev := range r.subscribe(ctx, height) { + for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.heartbeatInterval)) { events <- ev } From 89b84cb3df2235505a4ae6ae559d7a0c276a9d86 Mon Sep 17 00:00:00 2001 From: sideninja <75445744+sideninja@users.noreply.github.com> Date: Fri, 24 May 2024 14:12:40 +0200 Subject: [PATCH 3/3] fix missing interval set --- services/ingestion/subscriber_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index c5516374..7da43b51 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -97,7 +97,7 @@ func Test_Subscribing(t *testing.T) { client, err := requester.NewCrossSporkClient(currentClient, sporkClients, zerolog.Nop()) require.NoError(t, err) - subscriber := NewRPCSubscriber(client, flowGo.Emulator, zerolog.Nop()) + subscriber := NewRPCSubscriber(client, 100, flowGo.Emulator, zerolog.Nop()) events := subscriber.Subscribe(context.Background(), 1)