From e30550a12123c5de31898d6d0c3d1af5afa2e684 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 12 Oct 2023 12:42:25 +0200 Subject: [PATCH] refactor(sync): use options to init metrics --- sync/metrics.go | 27 ++++++++++++++++++--------- sync/options.go | 9 +++++++++ sync/sync.go | 5 +++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index d90008b0..e83e0820 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -11,31 +11,40 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - totalSynced int64 + totalSynced atomic.Int64 + totalSyncedGauge metric.Float64ObservableGauge } -func (s *Syncer[H]) InitMetrics() error { - s.metrics = &metrics{} - +func newMetrics() *metrics { totalSynced, err := meter.Float64ObservableGauge( "total_synced_headers", metric.WithDescription("total synced headers"), ) if err != nil { - return err + panic(err) + } + + m := &metrics{ + totalSyncedGauge: totalSynced, } callback := func(ctx context.Context, observer metric.Observer) error { - observer.ObserveFloat64(totalSynced, float64(atomic.LoadInt64(&s.metrics.totalSynced))) + observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load())) return nil } _, err = meter.RegisterCallback(callback, totalSynced) - return err + if err != nil { + panic(err) + } + + return m } // recordTotalSynced records the total amount of synced headers. func (m *metrics) recordTotalSynced(totalSynced int) { - if m != nil { - atomic.AddInt64(&m.totalSynced, int64(totalSynced)) + if m == nil { + return } + + m.totalSynced.Store(int64(totalSynced)) } diff --git a/sync/options.go b/sync/options.go index 0c7ee822..ebd8402d 100644 --- a/sync/options.go +++ b/sync/options.go @@ -28,6 +28,8 @@ type Parameters struct { // recencyThreshold describes the time period for which a header is // considered "recent". The default is blockTime + 5 seconds. recencyThreshold time.Duration + // metrics is a flag that enables metrics collection. + metrics bool } // DefaultParameters returns the default params to configure the syncer. @@ -44,6 +46,13 @@ func (p *Parameters) Validate() error { return nil } +// WithMetrics enables Metrics on Syncer. +func WithMetrics() Option { + return func(p *Parameters) { + p.metrics = true + } +} + // WithBlockTime is a functional option that configures the // `blockTime` parameter. func WithBlockTime(duration time.Duration) Option { diff --git a/sync/sync.go b/sync/sync.go index 1e1de508..e640dd61 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -69,10 +69,15 @@ func NewSyncer[H header.Header[H]]( return nil, err } + var metrics *metrics + if params.metrics { + metrics = newMetrics() + } return &Syncer[H]{ sub: sub, store: syncStore[H]{Store: store}, getter: syncGetter[H]{Getter: getter}, + metrics: metrics, triggerSync: make(chan struct{}, 1), // should be buffered Params: ¶ms, }, nil