From 6c4e0133efd5b510f0b48c7392061fc258d3c20b Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Wed, 26 Jun 2024 13:14:59 +1000 Subject: [PATCH 1/3] op-supervisor: Create clients and monitor chain heads for each L2 chain --- op-service/sources/l1_client.go | 8 +- op-supervisor/metrics/metrics.go | 55 ++++ op-supervisor/metrics/noop.go | 5 + op-supervisor/supervisor/backend/backend.go | 39 ++- .../supervisor/backend/source/chain.go | 95 +++++++ .../backend/source/chain_metrics.go | 31 +++ .../supervisor/backend/source/heads.go | 97 +++++++ .../supervisor/backend/source/heads_test.go | 243 ++++++++++++++++++ op-supervisor/supervisor/service.go | 19 +- 9 files changed, 583 insertions(+), 9 deletions(-) create mode 100644 op-supervisor/supervisor/backend/source/chain.go create mode 100644 op-supervisor/supervisor/backend/source/chain_metrics.go create mode 100644 op-supervisor/supervisor/backend/source/heads.go create mode 100644 op-supervisor/supervisor/backend/source/heads_test.go diff --git a/op-service/sources/l1_client.go b/op-service/sources/l1_client.go index 3cda3a73304e..d67ce1c87abd 100644 --- a/op-service/sources/l1_client.go +++ b/op-service/sources/l1_client.go @@ -25,7 +25,11 @@ type L1ClientConfig struct { func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig { // Cache 3/2 worth of sequencing window of receipts and txs span := int(config.SeqWindowSize) * 3 / 2 - fullSpan := span + return L1ClientSimpleConfig(trustRPC, kind, span) +} + +func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L1ClientConfig { + span := cacheSize if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large span = 1000 } @@ -44,7 +48,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide MethodResetDuration: time.Minute, }, // Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors. - L1BlockRefsCacheSize: fullSpan, + L1BlockRefsCacheSize: cacheSize, } } diff --git a/op-supervisor/metrics/metrics.go b/op-supervisor/metrics/metrics.go index b659e71b95fa..cc7bb5381a32 100644 --- a/op-supervisor/metrics/metrics.go +++ b/op-supervisor/metrics/metrics.go @@ -1,6 +1,8 @@ package metrics import ( + "math/big" + "github.com/prometheus/client_golang/prometheus" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -14,6 +16,9 @@ type Metricer interface { opmetrics.RPCMetricer + CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) + CacheGet(chainID *big.Int, label string, hit bool) + Document() []opmetrics.DocumentedMetric } @@ -24,6 +29,10 @@ type Metrics struct { opmetrics.RPCMetrics + SizeVec *prometheus.GaugeVec + GetVec *prometheus.CounterVec + AddVec *prometheus.CounterVec + info prometheus.GaugeVec up prometheus.Gauge } @@ -61,6 +70,33 @@ func NewMetrics(procName string) *Metrics { Name: "up", Help: "1 if the op-supervisor has finished starting up", }), + + SizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "source_rpc_cache_size", + Help: "source rpc cache cache size", + }, []string{ + "chain", + "type", + }), + GetVec: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: "source_rpc_cache_get", + Help: "source rpc cache lookups, hitting or not", + }, []string{ + "chain", + "type", + "hit", + }), + AddVec: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: "source_rpc_cache_add", + Help: "source rpc cache additions, evicting previous values or not", + }, []string{ + "chain", + "type", + "evicted", + }), } } @@ -82,3 +118,22 @@ func (m *Metrics) RecordUp() { prometheus.MustRegister() m.up.Set(1) } + +func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) { + chain := chainID.String() + m.SizeVec.WithLabelValues(chain, label).Set(float64(cacheSize)) + if evicted { + m.AddVec.WithLabelValues(chain, label, "true").Inc() + } else { + m.AddVec.WithLabelValues(chain, label, "false").Inc() + } +} + +func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) { + chain := chainID.String() + if hit { + m.GetVec.WithLabelValues(chain, label, "true").Inc() + } else { + m.GetVec.WithLabelValues(chain, label, "false").Inc() + } +} diff --git a/op-supervisor/metrics/noop.go b/op-supervisor/metrics/noop.go index f87e75a1116c..515387e76085 100644 --- a/op-supervisor/metrics/noop.go +++ b/op-supervisor/metrics/noop.go @@ -1,6 +1,8 @@ package metrics import ( + "math/big" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" ) @@ -14,3 +16,6 @@ func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil } func (*noopMetrics) RecordInfo(version string) {} func (*noopMetrics) RecordUp() {} + +func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {} +func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {} diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index 074d498f4e52..eb6b6e3ab235 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -3,18 +3,29 @@ package backend import ( "context" "errors" + "fmt" "io" "sync/atomic" + "github.com/ethereum-optimism/optimism/op-supervisor/config" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) +type Metrics interface { + source.Metrics +} + type SupervisorBackend struct { started atomic.Bool + logger log.Logger + + chainMonitors []*source.ChainMonitor // TODO(protocol-quest#287): collection of logdbs per chain // TODO(protocol-quest#288): collection of logdb updating services per chain @@ -24,8 +35,19 @@ var _ frontend.Backend = (*SupervisorBackend)(nil) var _ io.Closer = (*SupervisorBackend)(nil) -func NewSupervisorBackend() *SupervisorBackend { - return &SupervisorBackend{} +func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) { + chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs)) + for i, rpc := range cfg.L2RPCs { + monitor, err := source.NewChainMonitor(ctx, logger, m, rpc) + if err != nil { + return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err) + } + chainMonitors[i] = monitor + } + return &SupervisorBackend{ + logger: logger, + chainMonitors: chainMonitors, + }, nil } func (su *SupervisorBackend) Start(ctx context.Context) error { @@ -33,6 +55,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { return errors.New("already started") } // TODO(protocol-quest#288): start logdb updating services of all chains + for _, monitor := range su.chainMonitors { + if err := monitor.Start(); err != nil { + return fmt.Errorf("failed to start chain monitor: %w", err) + } + } return nil } @@ -41,7 +68,13 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { return errors.New("already stopped") } // TODO(protocol-quest#288): stop logdb updating services of all chains - return nil + var errs error + for _, monitor := range su.chainMonitors { + if err := monitor.Stop(); err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err)) + } + } + return errs } func (su *SupervisorBackend) Close() error { diff --git a/op-supervisor/supervisor/backend/source/chain.go b/op-supervisor/supervisor/backend/source/chain.go new file mode 100644 index 000000000000..2d025960ee7a --- /dev/null +++ b/op-supervisor/supervisor/backend/source/chain.go @@ -0,0 +1,95 @@ +package source + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/dial" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/sources/caching" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +// TODO(optimism#10999) Make these configurable and a sensible default +const epochPollInterval = 30 * time.Second +const pollInterval = 2 * time.Second +const trustRpc = false +const rpcKind = sources.RPCKindStandard + +type Metrics interface { + CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) + CacheGet(chainID *big.Int, label string, hit bool) +} + +// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform +// interop consolidation. It detects and notifies when reorgs occur. +type ChainMonitor struct { + headMonitor *HeadMonitor +} + +func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metrics, rpc string) (*ChainMonitor, error) { + // First dial a simple client and get the chain ID so we have a simple identifier for the chain. + ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc) + if err != nil { + return nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err) + } + chainID, err := ethClient.ChainID(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err) + } + logger = logger.New("chainID", chainID) + m := newChainMetrics(chainID, genericMetrics) + cl, err := newClient(ctx, logger, m, rpc, ethClient.Client(), pollInterval, trustRpc, rpcKind) + if err != nil { + return nil, err + } + logger.Info("Monitoring chain", "rpc", rpc) + headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, &loggingCallback{logger}) + return &ChainMonitor{ + headMonitor: headMonitor, + }, nil +} + +func (c *ChainMonitor) Start() error { + return c.headMonitor.Start() +} + +func (c *ChainMonitor) Stop() error { + return c.headMonitor.Stop() +} + +// loggingCallback is a temporary implementation of the head monitor callback that just logs the events. +// TODO(optimism#10999): Replace this with something that actually detects reorgs, fetches logs, and does consolidation +type loggingCallback struct { + log log.Logger +} + +func (n *loggingCallback) OnNewUnsafeHead(_ context.Context, block eth.L1BlockRef) { + n.log.Info("New unsafe head", "block", block) +} + +func (n *loggingCallback) OnNewSafeHead(_ context.Context, block eth.L1BlockRef) { + n.log.Info("New safe head", "block", block) +} + +func (n *loggingCallback) OnNewFinalizedHead(_ context.Context, block eth.L1BlockRef) { + n.log.Info("New finalized head", "block", block) +} + +func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) { + c, err := client.NewRPCWithClient(ctx, logger, rpc, client.NewBaseRPCClient(rpcClient), pollRate) + if err != nil { + return nil, fmt.Errorf("failed to create new RPC client: %w", err) + } + + l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100)) + if err != nil { + return nil, fmt.Errorf("failed to connect client: %w", err) + } + return l1Client, nil +} diff --git a/op-supervisor/supervisor/backend/source/chain_metrics.go b/op-supervisor/supervisor/backend/source/chain_metrics.go new file mode 100644 index 000000000000..96221d367e8d --- /dev/null +++ b/op-supervisor/supervisor/backend/source/chain_metrics.go @@ -0,0 +1,31 @@ +package source + +import ( + "math/big" + + "github.com/ethereum-optimism/optimism/op-service/sources/caching" +) + +// chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain +// and the actual metrics implementation which requires a chain ID to identify the source chain. +type chainMetrics struct { + chainID *big.Int + delegate Metrics +} + +func newChainMetrics(chainID *big.Int, delegate Metrics) *chainMetrics { + return &chainMetrics{ + chainID: chainID, + delegate: delegate, + } +} + +func (c *chainMetrics) CacheAdd(label string, cacheSize int, evicted bool) { + c.delegate.CacheAdd(c.chainID, label, cacheSize, evicted) +} + +func (c *chainMetrics) CacheGet(label string, hit bool) { + c.delegate.CacheGet(c.chainID, label, hit) +} + +var _ caching.Metrics = (*chainMetrics)(nil) diff --git a/op-supervisor/supervisor/backend/source/heads.go b/op-supervisor/supervisor/backend/source/heads.go new file mode 100644 index 000000000000..f5c8896693fd --- /dev/null +++ b/op-supervisor/supervisor/backend/source/heads.go @@ -0,0 +1,97 @@ +package source + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "github.com/ethereum-optimism/optimism/op-service/eth" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +type HeadMonitorClient interface { + eth.NewHeadSource + eth.L1BlockRefsSource +} + +type HeadChangeCallback interface { + OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) + OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) + OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) +} + +// HeadMonitor monitors an L2 chain and sends notifications when the unsafe, safe or finalized head changes. +// Head updates may be coalesced, allowing the head block to skip forward multiple blocks. +// Reorgs are not identified. +type HeadMonitor struct { + log log.Logger + epochPollInterval time.Duration + rpc HeadMonitorClient + callback HeadChangeCallback + + started atomic.Bool + headsSub event.Subscription + safeSub ethereum.Subscription + finalizedSub ethereum.Subscription +} + +func NewHeadMonitor(logger log.Logger, epochPollInterval time.Duration, rpc HeadMonitorClient, callback HeadChangeCallback) *HeadMonitor { + return &HeadMonitor{ + log: logger, + epochPollInterval: epochPollInterval, + rpc: rpc, + callback: callback, + } +} + +func (h *HeadMonitor) Start() error { + if !h.started.CompareAndSwap(false, true) { + return errors.New("already started") + } + + // Keep subscribed to the unsafe head, which changes frequently. + h.headsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + h.log.Warn("Resubscribing after failed heads subscription", "err", err) + } + return eth.WatchHeadChanges(ctx, h.rpc, h.callback.OnNewUnsafeHead) + }) + go func() { + err, ok := <-h.headsSub.Err() + if !ok { + return + } + h.log.Error("Heads subscription error", "err", err) + }() + + // Poll for the safe block and finalized block, which only change once per epoch at most and may be delayed. + h.safeSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewSafeHead, eth.Safe, + h.epochPollInterval, time.Second*10) + h.finalizedSub = eth.PollBlockChanges(h.log, h.rpc, h.callback.OnNewFinalizedHead, eth.Finalized, + h.epochPollInterval, time.Second*10) + h.log.Info("Chain head monitoring started") + return nil +} + +func (h *HeadMonitor) Stop() error { + if !h.started.CompareAndSwap(true, false) { + return errors.New("already stopped") + } + + // stop heads feed + if h.headsSub != nil { + h.headsSub.Unsubscribe() + } + // stop polling for safe-head changes + if h.safeSub != nil { + h.safeSub.Unsubscribe() + } + // stop polling for finalized-head changes + if h.finalizedSub != nil { + h.finalizedSub.Unsubscribe() + } + return nil +} diff --git a/op-supervisor/supervisor/backend/source/heads_test.go b/op-supervisor/supervisor/backend/source/heads_test.go new file mode 100644 index 000000000000..d13dff48d851 --- /dev/null +++ b/op-supervisor/supervisor/backend/source/heads_test.go @@ -0,0 +1,243 @@ +package source + +import ( + "context" + "errors" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +const waitDuration = 10 * time.Second +const checkInterval = 10 * time.Millisecond + +func TestUnsafeHeadUpdates(t *testing.T) { + rng := rand.New(rand.NewSource(0x1337)) + header1 := testutils.RandomHeader(rng) + header2 := testutils.RandomHeader(rng) + + t.Run("NotifyOfNewHeads", func(t *testing.T) { + rpc, callback := startHeadMonitor(t) + + rpc.NewUnsafeHead(t, header1) + callback.RequireUnsafeHeaders(t, header1) + + rpc.NewUnsafeHead(t, header2) + callback.RequireUnsafeHeaders(t, header1, header2) + }) + + t.Run("ResubscribeOnError", func(t *testing.T) { + rpc, callback := startHeadMonitor(t) + + rpc.SubscriptionError(t) + + rpc.NewUnsafeHead(t, header1) + callback.RequireUnsafeHeaders(t, header1) + }) +} + +func TestSafeHeadUpdates(t *testing.T) { + rpc, callback := startHeadMonitor(t) + + head1 := eth.L1BlockRef{ + Hash: common.Hash{0xaa}, + Number: 1, + } + head2 := eth.L1BlockRef{ + Hash: common.Hash{0xbb}, + Number: 2, + } + + rpc.SetSafeHead(head1) + callback.RequireSafeHeaders(t, head1) + rpc.SetSafeHead(head2) + callback.RequireSafeHeaders(t, head1, head2) +} + +func TestFinalizedHeadUpdates(t *testing.T) { + rpc, callback := startHeadMonitor(t) + + head1 := eth.L1BlockRef{ + Hash: common.Hash{0xaa}, + Number: 1, + } + head2 := eth.L1BlockRef{ + Hash: common.Hash{0xbb}, + Number: 2, + } + + rpc.SetFinalizedHead(head1) + callback.RequireFinalizedHeaders(t, head1) + rpc.SetFinalizedHead(head2) + callback.RequireFinalizedHeaders(t, head1, head2) +} + +func startHeadMonitor(t *testing.T) (*stubRPC, *stubCallback) { + logger := testlog.Logger(t, log.LvlInfo) + rpc := &stubRPC{} + callback := &stubCallback{} + monitor := NewHeadMonitor(logger, 50*time.Millisecond, rpc, callback) + require.NoError(t, monitor.Start()) + t.Cleanup(func() { + require.NoError(t, monitor.Stop()) + }) + return rpc, callback +} + +type stubCallback struct { + sync.Mutex + unsafe []eth.L1BlockRef + safe []eth.L1BlockRef + finalized []eth.L1BlockRef +} + +func (s *stubCallback) RequireUnsafeHeaders(t *testing.T, heads ...*types.Header) { + expected := make([]eth.L1BlockRef, len(heads)) + for i, head := range heads { + expected[i] = eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)) + } + s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.unsafe }, expected) +} + +func (s *stubCallback) RequireSafeHeaders(t *testing.T, expected ...eth.L1BlockRef) { + s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.safe }, expected) +} + +func (s *stubCallback) RequireFinalizedHeaders(t *testing.T, expected ...eth.L1BlockRef) { + s.requireHeaders(t, func(s *stubCallback) []eth.L1BlockRef { return s.finalized }, expected) +} + +func (s *stubCallback) requireHeaders(t *testing.T, getter func(*stubCallback) []eth.L1BlockRef, expected []eth.L1BlockRef) { + require.Eventually(t, func() bool { + s.Lock() + defer s.Unlock() + return len(getter(s)) >= len(expected) + }, waitDuration, checkInterval) + s.Lock() + defer s.Unlock() + require.Equal(t, expected, getter(s)) +} + +func (s *stubCallback) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) { + s.Lock() + defer s.Unlock() + s.unsafe = append(s.unsafe, block) +} + +func (s *stubCallback) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) { + s.Lock() + defer s.Unlock() + s.safe = append(s.safe, block) +} + +func (s *stubCallback) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) { + s.Lock() + defer s.Unlock() + s.finalized = append(s.finalized, block) +} + +var _ HeadChangeCallback = (*stubCallback)(nil) + +type stubRPC struct { + sync.Mutex + sub *mockSubscription + + safeHead eth.L1BlockRef + finalizedHead eth.L1BlockRef +} + +func (s *stubRPC) SubscribeNewHead(_ context.Context, unsafeCh chan<- *types.Header) (ethereum.Subscription, error) { + s.Lock() + defer s.Unlock() + if s.sub != nil { + return nil, errors.New("already subscribed to unsafe heads") + } + errChan := make(chan error) + s.sub = &mockSubscription{errChan, unsafeCh, s} + return s.sub, nil +} + +func (s *stubRPC) SetSafeHead(head eth.L1BlockRef) { + s.Lock() + defer s.Unlock() + s.safeHead = head +} + +func (s *stubRPC) SetFinalizedHead(head eth.L1BlockRef) { + s.Lock() + defer s.Unlock() + s.finalizedHead = head +} + +func (s *stubRPC) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) { + s.Lock() + defer s.Unlock() + switch label { + case eth.Safe: + if s.safeHead == (eth.L1BlockRef{}) { + return eth.L1BlockRef{}, errors.New("no unsafe head") + } + return s.safeHead, nil + case eth.Finalized: + if s.finalizedHead == (eth.L1BlockRef{}) { + return eth.L1BlockRef{}, errors.New("no finalized head") + } + return s.finalizedHead, nil + default: + return eth.L1BlockRef{}, fmt.Errorf("unknown label: %v", label) + } +} + +func (s *stubRPC) NewUnsafeHead(t *testing.T, header *types.Header) { + s.WaitForSub(t) + s.Lock() + defer s.Unlock() + require.NotNil(t, s.sub, "Attempting to publish a header with no subscription") + s.sub.headers <- header +} + +func (s *stubRPC) SubscriptionError(t *testing.T) { + s.WaitForSub(t) + s.Lock() + defer s.Unlock() + s.sub.errChan <- errors.New("subscription error") + s.sub = nil +} + +func (s *stubRPC) WaitForSub(t *testing.T) { + require.Eventually(t, func() bool { + s.Lock() + defer s.Unlock() + return s.sub != nil + }, waitDuration, checkInterval, "Head monitor did not subscribe to unsafe head") +} + +var _ HeadMonitorClient = (*stubRPC)(nil) + +type mockSubscription struct { + errChan chan error + headers chan<- *types.Header + rpc *stubRPC +} + +func (m *mockSubscription) Unsubscribe() { + fmt.Println("Unsubscribed") + m.rpc.Lock() + defer m.rpc.Unlock() + m.rpc.sub = nil +} + +func (m *mockSubscription) Err() <-chan error { + return m.errChan +} diff --git a/op-supervisor/supervisor/service.go b/op-supervisor/supervisor/service.go index 5b8b0fa14c1b..aecd90ad2fba 100644 --- a/op-supervisor/supervisor/service.go +++ b/op-supervisor/supervisor/service.go @@ -60,19 +60,26 @@ func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config. if err := su.initMetricsServer(cfg); err != nil { return fmt.Errorf("failed to start Metrics server: %w", err) } - su.initBackend(cfg) + if err := su.initBackend(ctx, cfg); err != nil { + return fmt.Errorf("failed to start backend: %w", err) + } if err := su.initRPCServer(cfg); err != nil { return fmt.Errorf("failed to start RPC server: %w", err) } return nil } -func (su *SupervisorService) initBackend(cfg *config.Config) { +func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config) error { if cfg.MockRun { su.backend = backend.NewMockBackend() - } else { - su.backend = backend.NewSupervisorBackend() + return nil + } + be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg) + if err != nil { + return fmt.Errorf("failed to create supervisor backend: %w", err) } + su.backend = be + return nil } func (su *SupervisorService) initMetrics(cfg *config.Config) { @@ -152,6 +159,10 @@ func (su *SupervisorService) Start(ctx context.Context) error { return fmt.Errorf("unable to start RPC server: %w", err) } + if err := su.backend.Start(ctx); err != nil { + return fmt.Errorf("unable to start backend: %w", err) + } + su.metrics.RecordUp() return nil } From 71006ea05a7b80307dcec2ae410295d704ab6eb9 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Wed, 26 Jun 2024 15:29:07 +1000 Subject: [PATCH 2/3] op-supervisor: Remove rpc url from log message --- op-supervisor/supervisor/backend/source/chain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-supervisor/supervisor/backend/source/chain.go b/op-supervisor/supervisor/backend/source/chain.go index 2d025960ee7a..31872d45995f 100644 --- a/op-supervisor/supervisor/backend/source/chain.go +++ b/op-supervisor/supervisor/backend/source/chain.go @@ -48,7 +48,7 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metr if err != nil { return nil, err } - logger.Info("Monitoring chain", "rpc", rpc) + logger.Info("Monitoring chain") headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, &loggingCallback{logger}) return &ChainMonitor{ headMonitor: headMonitor, From 7193465783c2b5f96c92e4e376ee1100e9a67cfc Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Thu, 27 Jun 2024 10:52:27 +1000 Subject: [PATCH 3/3] op-supervisor: Update tickets in TODOs --- op-supervisor/supervisor/backend/source/chain.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/op-supervisor/supervisor/backend/source/chain.go b/op-supervisor/supervisor/backend/source/chain.go index 31872d45995f..d3f44d63e1f7 100644 --- a/op-supervisor/supervisor/backend/source/chain.go +++ b/op-supervisor/supervisor/backend/source/chain.go @@ -15,7 +15,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -// TODO(optimism#10999) Make these configurable and a sensible default +// TODO(optimism#11032) Make these configurable and a sensible default const epochPollInterval = 30 * time.Second const pollInterval = 2 * time.Second const trustRpc = false @@ -64,7 +64,6 @@ func (c *ChainMonitor) Stop() error { } // loggingCallback is a temporary implementation of the head monitor callback that just logs the events. -// TODO(optimism#10999): Replace this with something that actually detects reorgs, fetches logs, and does consolidation type loggingCallback struct { log log.Logger }