Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interop: XSafe Head Maintainer #11458

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"io"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
Expand All @@ -20,11 +23,18 @@ type LogStorage interface {
Rewind(newHeadBlockNum uint64) error
LatestBlockNum() uint64
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error)
NextExecutingMessage(logs.Iterator) (backendTypes.ExecutingMessage, error)
}

type HeadsStorage interface {
Current() *heads.Heads
Apply(op heads.Operation) error
}

// ChainsDB is a database that stores logs and heads for multiple chains.
// it implements the ChainsStorage interface.
type ChainsDB struct {
logDBs map[types.ChainID]LogStorage
heads HeadsStorage
Expand All @@ -49,6 +59,79 @@ func (db *ChainsDB) Resume() error {
return nil
}

// UpdateCrossSafeHeads updates the cross-heads of all chains
// this is an example of how to use the SafetyChecker to update the cross-heads
func (db *ChainsDB) UpdateCrossSafeHeads() error {
checker := NewSafetyChecker(Safe, *db)
return db.UpdateCrossHeads(checker)
}

// UpdateCrossHeadsForChain updates the cross-head for a single chain.
// the provided checker controls which heads are considered.
// TODO: we should invert control and have the underlying logDB call their own update
// for now, monolithic control is fine. There may be a stronger reason to refactor if the API needs it.
func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error {
axelKingsley marked this conversation as resolved.
Show resolved Hide resolved
// start with the xsafe head of the chain
xHead := checker.CrossHeadForChain(chainID)
// advance as far as the local head
localHead := checker.LocalHeadForChain(chainID)
// get an iterator for the last checkpoint behind the x-head
i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
}
// advance the logDB through all executing messages we can
// this loop will break:
// - when we reach the local head
// - when we reach a message that is not safe
// - if an error occurs
for {
exec, err := db.logDBs[chainID].NextExecutingMessage(i)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err)
}
// if we are now beyond the local head, stop
if i.Index() > localHead {
break
}
// use the checker to determine if this message is safe
safe := checker.Check(
types.ChainIDFromUInt64(uint64(exec.Chain)),
exec.BlockNum,
exec.LogIdx,
exec.Hash)
if !safe {
break
}
// if all is well, prepare the x-head update to this point
xHead = i.Index()
}

// have the checker create an update to the x-head in question, and apply that update
err = db.heads.Apply(checker.Update(chainID, xHead))
if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
}
return nil
}

// UpdateCrossSafeHeads updates the cross-heads of all chains
// based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it.
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
currentHeads := db.heads.Current()
for chainID := range currentHeads.Chains {
if err := db.UpdateCrossHeadsForChain(chainID, checker); err != nil {
return err
}
}
return nil
}

// LatestBlockNum returns the latest block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
logDB, ok := db.logDBs[chain]
if !ok {
Expand Down
256 changes: 253 additions & 3 deletions op-supervisor/supervisor/backend/db/db_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package db

import (
"fmt"
"io"
"testing"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,11 +52,244 @@ func TestChainsDB_Rewind(t *testing.T) {
})
}

type stubHeadStorage struct{}
func TestChainsDB_UpdateCrossHeads(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)

// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})

// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
// 2. progress the iterator to the next log (16) because the first safety check will pass
// 3. fail the second safety check
// 4. update the cross-heads to the last successful safety check (16)
err := db.UpdateCrossHeads(checker)
require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(16), checker.updated)
}

func TestChainsDB_UpdateCrossHeadsBeyondLocal(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// set the safety checker to pass 99 times, effeciively allowing all messages to be safe
checker.numSafe = 99

// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})

// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
// 2. progress the iterator to repeatedly, as the safety check will pass 99 times.
// 3. exceed the local head, and update the cross-head to the local head (40)
err := db.UpdateCrossHeads(checker)
require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(40), checker.updated)
}

func TestChainsDB_UpdateCrossHeadsEOF(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// set the log DB to return an EOF error when trying to get the next executing message
// after processing 10 messages as safe (with more messages available to be safe)
logDB.errOverload = io.EOF
logDB.errAfter = 10
checker.numSafe = 99

// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})

// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
// 2. after processing 10 messages as safe, fail to find any executing messages (EOF)
// 3. update to the last successful safety check (25) without returning an error
err := db.UpdateCrossHeads(checker)
require.NoError(t, err)
require.Equal(t, entrydb.EntryIdx(25), checker.updated)
}

func TestChainsDB_UpdateCrossHeadsError(t *testing.T) {
// using a chainID of 1 for simplicity
chainID := types.ChainIDFromUInt64(1)
// get default stubbed components
logDB, checker, h := setupStubbedForUpdateHeads(chainID)
// set the log DB to return an error when trying to get the next executing message
// after processing 3 messages as safe (with more messages available to be safe)
logDB.errOverload = fmt.Errorf("some error")
logDB.errAfter = 3
checker.numSafe = 99

// The ChainsDB is real, but uses only stubbed components
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})

// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 10)
// 2. fail during execution, even after processing 3 messages as safe
// 3. exit without updating, returning the error
err := db.UpdateCrossHeads(checker)
require.Error(t, err)
// the update was never set (aka 0-value)
require.Equal(t, entrydb.EntryIdx(0), checker.updated)
}

// setupStubbedForUpdateHeads sets up stubbed components for testing the UpdateCrossHeads method
// it returns stubbed structs which are suitable for their interfaces, and can be modified before testing
// TODO: the variables at the top of this function should be configurable by the caller.
// this isn't an issue for now, as all tests can modify the stubbed components directly after calling this function.
// but readability and maintainability would be improved by making this function more configurable.
func setupStubbedForUpdateHeads(chainID types.ChainID) (*stubLogDB, *stubChecker, *heads.Heads) {
// the checkpoint starts somewhere behind the last known cross-safe head
checkpoint := entrydb.EntryIdx(15)
// the last known cross-safe head is at 20
cross := entrydb.EntryIdx(20)
// the local head (the limit of the update) is at 40
local := entrydb.EntryIdx(40)
// the number of executing messages to make available (this should be more than the number of safety checks performed)
numExecutingMessages := 30
// number of safety checks that will pass before returning false
numSafe := 1
// number of calls to nextExecutingMessage before potentially returning an error
errAfter := 4

// set up stubbed logDB
logDB := &stubLogDB{}
// the log DB will start the iterator at the checkpoint index
logDB.lastCheckpointBehind = &stubIterator{checkpoint}
// rig the log DB to return an error after a certain number of calls to NextExecutingMessage
logDB.errAfter = errAfter
// set up stubbed executing messages that the ChainsDB can pass to the checker
logDB.executingMessages = []*backendTypes.ExecutingMessage{}
for i := 0; i < numExecutingMessages; i++ {
// executing messages are packed in groups of 3, with block numbers increasing by 1
logDB.executingMessages = append(logDB.executingMessages, &backendTypes.ExecutingMessage{
BlockNum: uint64(100 + int(i/3)),
LogIdx: uint32(i),
Hash: backendTypes.TruncatedHash{},
})
}

// set up stubbed checker
checker := &stubChecker{
localHeadForChain: local,
crossHeadForChain: cross,
// the first safety check will return true, the second false
numSafe: numSafe,
}

// set up stubbed heads with sample values
h := heads.NewHeads()
h.Chains[chainID] = heads.ChainHeads{}

return logDB, checker, h
}

type stubChecker struct {
localHeadForChain entrydb.EntryIdx
crossHeadForChain entrydb.EntryIdx
numSafe int
checkCalls int
updated entrydb.EntryIdx
}

func (s *stubChecker) LocalHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
return s.localHeadForChain
}

func (s *stubChecker) CrossHeadForChain(chainID types.ChainID) entrydb.EntryIdx {
return s.crossHeadForChain
}

// stubbed Check returns true for the first numSafe calls, and false thereafter
func (s *stubChecker) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash backendTypes.TruncatedHash) bool {
if s.checkCalls >= s.numSafe {
return false
}
s.checkCalls++
return true
}

func (s *stubChecker) Update(chain types.ChainID, index entrydb.EntryIdx) heads.OperationFn {
s.updated = index
return func(heads *heads.Heads) error {
return nil
}
}

type stubHeadStorage struct {
heads *heads.Heads
}

func (s *stubHeadStorage) Apply(heads.Operation) error {
return nil
}

func (s *stubHeadStorage) Current() *heads.Heads {
if s.heads == nil {
s.heads = heads.NewHeads()
}
return s.heads.Copy()
}

type stubIterator struct {
index entrydb.EntryIdx
}

func (s *stubIterator) NextLog() (uint64, uint32, backendTypes.TruncatedHash, error) {
panic("not implemented")
}
func (s *stubIterator) Index() entrydb.EntryIdx {
return s.index
}
func (s *stubIterator) ExecMessage() (backendTypes.ExecutingMessage, error) {
panic("not implemented")
}

type stubLogDB struct {
addLogCalls int
headBlockNum uint64
addLogCalls int
headBlockNum uint64
emIndex int
executingMessages []*backendTypes.ExecutingMessage
lastCheckpointBehind *stubIterator
errOverload error
errAfter int
containsResponse containsResponse
}

// stubbed LastCheckpointBehind returns a stubbed iterator which was passed in to the struct
func (s *stubLogDB) LastCheckpointBehind(entrydb.EntryIdx) (logs.Iterator, error) {
return s.lastCheckpointBehind, nil
}

func (s *stubLogDB) NextExecutingMessage(i logs.Iterator) (backendTypes.ExecutingMessage, error) {
// if error overload is set, return it to simulate a failure condition
if s.errOverload != nil && s.emIndex >= s.errAfter {
return backendTypes.ExecutingMessage{}, s.errOverload
}
// increment the iterator to mark advancement
i.(*stubIterator).index += 1
// return the next executing message
m := *s.executingMessages[s.emIndex]
// and increment to the next message for the next call
s.emIndex++
return m, nil
}

func (s *stubLogDB) ClosestBlockInfo(_ uint64) (uint64, backendTypes.TruncatedHash, error) {
Expand All @@ -63,6 +301,18 @@ func (s *stubLogDB) AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID
return nil
}

type containsResponse struct {
contains bool
index entrydb.EntryIdx
err error
}

// stubbed Contains records the arguments passed to it
// it returns the response set in the struct, or an empty response
func (s *stubLogDB) Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
return s.containsResponse.contains, s.containsResponse.index, s.containsResponse.err
}

func (s *stubLogDB) Rewind(newHeadBlockNum uint64) error {
s.headBlockNum = newHeadBlockNum
return nil
Expand Down
Loading