Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring heartbeat interval #261

Merged
merged 5 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func startIngestion(
Uint64("missed-heights", blk.Height-latestCadenceHeight).
Msg("indexing cadence height information")

subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID, logger)
subscriber := ingestion.NewRPCSubscriber(client, cfg.HeartbeatInterval, cfg.FlowNetworkID, logger)
engine := ingestion.NewEventIngestionEngine(
subscriber,
blocks,
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Config struct {
FilterExpiry time.Duration
// ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally.
ForceStartCadenceHeight uint64
// HeartbeatInterval sets custom heartbeat interval for events
HeartbeatInterval uint64
}

func FromFlags() (*Config, error) {
Expand All @@ -95,6 +97,7 @@ func FromFlags() (*Config, error) {
flag.BoolVar(&cfg.CreateCOAResource, "coa-resource-create", false, "Auto-create the COA resource in the Flow COA account provided if one doesn't exist")
flag.StringVar(&logLevel, "log-level", "debug", "Define verbosity of the log output ('debug', 'info', 'error')")
flag.Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second")
flag.Uint64Var(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription")
flag.IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client")
flag.Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. This should only be used locally or for testing, never in production.")
flag.StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire")
Expand Down
23 changes: 17 additions & 6 deletions services/ingestion/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,25 @@ type EventSubscriber interface {
var _ EventSubscriber = &RPCSubscriber{}

type RPCSubscriber struct {
client *requester.CrossSporkClient
chain flowGo.ChainID
logger zerolog.Logger
client *requester.CrossSporkClient
chain flowGo.ChainID
heartbeatInterval uint64
logger zerolog.Logger
}

func NewRPCSubscriber(client *requester.CrossSporkClient, chainID flowGo.ChainID, logger zerolog.Logger) *RPCSubscriber {
func NewRPCSubscriber(
client *requester.CrossSporkClient,
heartbeatInterval uint64,
chainID flowGo.ChainID,
logger zerolog.Logger,
) *RPCSubscriber {
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCSubscriber{client: client, chain: chainID, logger: logger}
return &RPCSubscriber{
client: client,
heartbeatInterval: heartbeatInterval,
chain: chainID,
logger: logger,
}
}

// Subscribe will retrieve all the events from the provided height. If the height is from previous
Expand Down Expand Up @@ -81,7 +92,7 @@ func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan mod
Msg("backfilling done, subscribe for live data")

// subscribe in the current spork, handling of context cancellation is done by the producer
for ev := range r.subscribe(ctx, height) {
for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.heartbeatInterval)) {
events <- ev
}

Expand Down
2 changes: 1 addition & 1 deletion services/ingestion/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_Subscribing(t *testing.T) {
client, err := requester.NewCrossSporkClient(currentClient, sporkClients, zerolog.Nop())
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, flowGo.Emulator, zerolog.Nop())
subscriber := NewRPCSubscriber(client, 100, flowGo.Emulator, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

Expand Down
Loading