Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-supervisor: Create clients and monitor chain heads for each L2 chain #11009

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions op-service/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ type L1ClientConfig struct {
func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig {
// Cache 3/2 worth of sequencing window of receipts and txs
span := int(config.SeqWindowSize) * 3 / 2
fullSpan := span
return L1ClientSimpleConfig(trustRPC, kind, span)
}

func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L1ClientConfig {
span := cacheSize
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
span = 1000
}
Expand All @@ -44,7 +48,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
MethodResetDuration: time.Minute,
},
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: fullSpan,
L1BlockRefsCacheSize: cacheSize,
}
}

Expand Down
55 changes: 55 additions & 0 deletions op-supervisor/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"math/big"

"github.com/prometheus/client_golang/prometheus"

opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
Expand All @@ -14,6 +16,9 @@ type Metricer interface {

opmetrics.RPCMetricer

CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)

Document() []opmetrics.DocumentedMetric
}

Expand All @@ -24,6 +29,10 @@ type Metrics struct {

opmetrics.RPCMetrics

SizeVec *prometheus.GaugeVec
GetVec *prometheus.CounterVec
AddVec *prometheus.CounterVec

info prometheus.GaugeVec
up prometheus.Gauge
}
Expand Down Expand Up @@ -61,6 +70,33 @@ func NewMetrics(procName string) *Metrics {
Name: "up",
Help: "1 if the op-supervisor has finished starting up",
}),

SizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "source_rpc_cache_size",
Help: "source rpc cache cache size",
}, []string{
"chain",
"type",
}),
GetVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_get",
Help: "source rpc cache lookups, hitting or not",
}, []string{
"chain",
"type",
"hit",
}),
AddVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_add",
Help: "source rpc cache additions, evicting previous values or not",
}, []string{
"chain",
"type",
"evicted",
}),
}
}

Expand All @@ -82,3 +118,22 @@ func (m *Metrics) RecordUp() {
prometheus.MustRegister()
m.up.Set(1)
}

func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) {
chain := chainID.String()
m.SizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
if evicted {
m.AddVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.AddVec.WithLabelValues(chain, label, "false").Inc()
}
}

func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) {
chain := chainID.String()
if hit {
m.GetVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.GetVec.WithLabelValues(chain, label, "false").Inc()
}
}
5 changes: 5 additions & 0 deletions op-supervisor/metrics/noop.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"math/big"

opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
)

Expand All @@ -14,3 +16,6 @@ func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil }

func (*noopMetrics) RecordInfo(version string) {}
func (*noopMetrics) RecordUp() {}

func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {}
39 changes: 36 additions & 3 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@ package backend
import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)

type Metrics interface {
source.Metrics
}

type SupervisorBackend struct {
started atomic.Bool
logger log.Logger

chainMonitors []*source.ChainMonitor

// TODO(protocol-quest#287): collection of logdbs per chain
// TODO(protocol-quest#288): collection of logdb updating services per chain
Expand All @@ -24,15 +35,31 @@ var _ frontend.Backend = (*SupervisorBackend)(nil)

var _ io.Closer = (*SupervisorBackend)(nil)

func NewSupervisorBackend() *SupervisorBackend {
return &SupervisorBackend{}
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs))
for i, rpc := range cfg.L2RPCs {
monitor, err := source.NewChainMonitor(ctx, logger, m, rpc)
if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
chainMonitors[i] = monitor
}
return &SupervisorBackend{
logger: logger,
chainMonitors: chainMonitors,
}, nil
}

func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// TODO(protocol-quest#288): start logdb updating services of all chains
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
return nil
}

Expand All @@ -41,7 +68,13 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
return errors.New("already stopped")
}
// TODO(protocol-quest#288): stop logdb updating services of all chains
return nil
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
}
return errs
}

func (su *SupervisorBackend) Close() error {
Expand Down
94 changes: 94 additions & 0 deletions op-supervisor/supervisor/backend/source/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package source

import (
"context"
"fmt"
"math/big"
"time"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

// TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 30 * time.Second
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard

type Metrics interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)
}

// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
headMonitor *HeadMonitor
}

func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metrics, rpc string) (*ChainMonitor, error) {
// First dial a simple client and get the chain ID so we have a simple identifier for the chain.
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
logger = logger.New("chainID", chainID)
m := newChainMetrics(chainID, genericMetrics)
cl, err := newClient(ctx, logger, m, rpc, ethClient.Client(), pollInterval, trustRpc, rpcKind)
if err != nil {
return nil, err
}
logger.Info("Monitoring chain")
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, &loggingCallback{logger})
return &ChainMonitor{
headMonitor: headMonitor,
}, nil
}

func (c *ChainMonitor) Start() error {
return c.headMonitor.Start()
}

func (c *ChainMonitor) Stop() error {
return c.headMonitor.Stop()
}

// loggingCallback is a temporary implementation of the head monitor callback that just logs the events.
type loggingCallback struct {
log log.Logger
}

func (n *loggingCallback) OnNewUnsafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New unsafe head", "block", block)
}

func (n *loggingCallback) OnNewSafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New safe head", "block", block)
}

func (n *loggingCallback) OnNewFinalizedHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New finalized head", "block", block)
}

func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, client.NewBaseRPCClient(rpcClient), pollRate)
if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err)
}

l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
if err != nil {
return nil, fmt.Errorf("failed to connect client: %w", err)
}
return l1Client, nil
}
31 changes: 31 additions & 0 deletions op-supervisor/supervisor/backend/source/chain_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package source

import (
"math/big"

"github.com/ethereum-optimism/optimism/op-service/sources/caching"
)

// chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain
// and the actual metrics implementation which requires a chain ID to identify the source chain.
type chainMetrics struct {
chainID *big.Int
delegate Metrics
}

func newChainMetrics(chainID *big.Int, delegate Metrics) *chainMetrics {
return &chainMetrics{
chainID: chainID,
delegate: delegate,
}
}

func (c *chainMetrics) CacheAdd(label string, cacheSize int, evicted bool) {
c.delegate.CacheAdd(c.chainID, label, cacheSize, evicted)
}

func (c *chainMetrics) CacheGet(label string, hit bool) {
c.delegate.CacheGet(c.chainID, label, hit)
}

var _ caching.Metrics = (*chainMetrics)(nil)
Loading