Skip to content

Commit

Permalink
Add Admin RPC for Adding New Chain Monitors (ethereum-optimism#11792)
Browse files Browse the repository at this point in the history
* Add Admin RPC for Adding New Chain Monitors

* Update op-supervisor/supervisor/backend/db/db.go

---------

Co-authored-by: protolambda <proto@protolambda.com>
  • Loading branch information
2 people authored and samlaf committed Nov 10, 2024
1 parent eb2fb27 commit c4c8594
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 36 deletions.
117 changes: 81 additions & 36 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,87 @@ import (
)

type SupervisorBackend struct {
ctx context.Context
started atomic.Bool
logger log.Logger
m Metrics
dataDir string

chainMonitors []*source.ChainMonitor
chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB

maintenanceCancel context.CancelFunc
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

var _ io.Closer = (*SupervisorBackend)(nil)

func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
// attempt to prepare the data directory
if err := prepDataDir(cfg.Datadir); err != nil {
return nil, err
}

// create the head tracker
headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json"))
if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err)
}
logDBs := make(map[types.ChainID]db.LogStorage)
chainRPCs := make(map[types.ChainID]string)
chainClients := make(map[types.ChainID]client.RPC)

// create the chains db
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker)

// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))

// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainMonitors: chainMonitors,
db: db,
}

// from the RPC strings, have the supervisor backend create a chain monitor
for _, rpc := range cfg.L2RPCs {
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
err := super.addFromRPC(ctx, logger, rpc)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
}
cm := newChainMetrics(chainID, m)
path, err := prepLogDBPath(chainID, cfg.Datadir)
if err != nil {
return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path)
if err != nil {
return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
logDBs[chainID] = logDB
chainRPCs[chainID] = rpc
chainClients[chainID] = rpcClient
}
chainsDB := db.NewChainsDB(logDBs, headTracker)
if err := chainsDB.Resume(); err != nil {
return nil, fmt.Errorf("failed to resume chains db: %w", err)
}
return super, nil
}

chainMonitors := make([]*source.ChainMonitor, 0, len(cfg.L2RPCs))
for chainID, rpc := range chainRPCs {
cm := newChainMetrics(chainID, m)
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, chainClients[chainID], chainsDB)
if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
chainMonitors = append(chainMonitors, monitor)
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string) error {
// create the rpc client, which yields the chain id
rpcClient, chainID, err := createRpcClient(su.ctx, logger, rpc)
if err != nil {
return err
}
return &SupervisorBackend{
logger: logger,
chainMonitors: chainMonitors,
db: chainsDB,
}, nil
// create metrics and a logdb for the chain
cm := newChainMetrics(chainID, su.m)
path, err := prepLogDBPath(chainID, su.dataDir)
if err != nil {
return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path)
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db)
if err != nil {
return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
if su.chainMonitors[chainID] != nil {
return fmt.Errorf("chain monitor for chain %v already exists", chainID)
}
su.chainMonitors[chainID] = monitor
su.db.AddLogDB(chainID, logDB)
return nil
}

func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
Expand All @@ -99,30 +121,41 @@ func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client
}

func (su *SupervisorBackend) Start(ctx context.Context) error {
// ensure we only start once
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// initiate "Resume" on the chains db, which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.Resume(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start chain monitors
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
// start db maintenance loop
su.db.StartCrossHeadMaintenance(ctx)
maintinenceCtx, cancel := context.WithCancel(ctx)
su.db.StartCrossHeadMaintenance(maintinenceCtx)
su.maintenanceCancel = cancel
return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
}
// signal the maintenance loop to stop
su.maintenanceCancel()
// collect errors from stopping chain monitors
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
}
// close the database
if err := su.db.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
}
Expand All @@ -134,6 +167,18 @@ func (su *SupervisorBackend) Close() error {
return nil
}

// AddL2RPC adds a new L2 chain to the supervisor backend
// it stops and restarts the backend to add the new chain
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
if err := su.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop backend: %w", err)
}
if err := su.addFromRPC(ctx, su.logger, rpc); err != nil {
return fmt.Errorf("failed to add chain monitor: %w", err)
}
return su.Start(ctx)
}

func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
chainID := identifier.ChainID
blockNum := identifier.BlockNumber
Expand Down
8 changes: 8 additions & 0 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,17 @@ func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *Chain
}
}

func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
if db.logDBs[chain] != nil {
log.Warn("overwriting existing logDB for chain", "chain", chain)
}
db.logDBs[chain] = logDB
}

// Resume prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database
// to ensure it can resume recording from the first log of the next block.
// TODO(#11793): we can rename this to something more descriptive like "PrepareWithRollback"
func (db *ChainsDB) Resume() error {
for chain, logStore := range db.logDBs {
if err := Resume(logStore); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions op-supervisor/supervisor/backend/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (m *MockBackend) Stop(ctx context.Context) error {
return nil
}

func (m *MockBackend) AddL2RPC(ctx context.Context, rpc string) error {
return nil
}

func (m *MockBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
return types.CrossUnsafe, nil
}
Expand Down
6 changes: 6 additions & 0 deletions op-supervisor/supervisor/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type AdminBackend interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
AddL2RPC(ctx context.Context, rpc string) error
}

type QueryBackend interface {
Expand Down Expand Up @@ -61,3 +62,8 @@ func (a *AdminFrontend) Start(ctx context.Context) error {
func (a *AdminFrontend) Stop(ctx context.Context) error {
return a.Supervisor.Stop(ctx)
}

// AddL2RPC adds a new L2 chain to the supervisor backend
func (a *AdminFrontend) AddL2RPC(ctx context.Context, rpc string) error {
return a.Supervisor.AddL2RPC(ctx, rpc)
}

0 comments on commit c4c8594

Please sign in to comment.