Skip to content

Commit

Permalink
WIP: Cross-Head Maintenance
Browse files Browse the repository at this point in the history
  • Loading branch information
axelKingsley committed Aug 13, 2024
1 parent 5c8ebef commit 7800128
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 17 deletions.
68 changes: 68 additions & 0 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"io"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
Expand All @@ -20,9 +23,13 @@ type LogStorage interface {
Rewind(newHeadBlockNum uint64) error
LatestBlockNum() uint64
ClosestBlockInfo(blockNum uint64) (uint64, backendTypes.TruncatedHash, error)
Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error)
LastCheckpointBehind(entrydb.EntryIdx) (*logs.Iterator, error)
}

type HeadsStorage interface {
Current() *heads.Heads
Apply(op heads.Operation) error
}

type ChainsDB struct {
Expand All @@ -49,6 +56,67 @@ func (db *ChainsDB) Resume() error {
return nil
}

// UpdateCrossSafeHeads updates the cross-heads of all chains
// this is an example of how to use the SafetyChecker to update the cross-heads
func (db *ChainsDB) UpdateCrossSafeHeads() error {
checker := NewSafetyChecker(Safe, *db)
return db.UpdateCrossHeads(checker)
}

// UpdateCrossSafeHeads updates the cross-heads of all chains
// based on the provided SafetyChecker. The SafetyChecker is used to determine
// the safety of each log entry in the database, and the cross-head associated with it.
// TODO: rather than make this monolithic across all chains, this should be broken up
// allowing each chain to update on its own routine
func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error {
currentHeads := db.heads.Current()
for chainID := range currentHeads.Chains {
// start with the xsafe head of the chain
xHead := checker.CrossHeadForChain(chainID)
// rewind the index to the last checkpoint and get the iterator
i, err := db.logDBs[chainID].LastCheckpointBehind(xHead)
if err != nil {
return fmt.Errorf("failed to rewind cross-safe head for chain %v: %w", chainID, err)
}
// play forward from this checkpoint, advancing the cross-safe head as far as possible
for {
_, _, _, err := i.NextLog()
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("failed to read next log for chain %v: %w", chainID, err)
}
// if we've advanced past the local safety threshold, stop
if i.Index() > checker.LocalHeadForChain(chainID) {
break
}
// all non-executing messages are safe to advance
// executing messages are safe to advance once checked
em, err := i.ExecMessage()
if err != nil {
return fmt.Errorf("failed to get executing message for chain %v: %w", chainID, err)
} else if em != (backendTypes.ExecutingMessage{}) {
// if there is an executing message, check it
chainID := types.ChainIDFromUInt64(uint64(em.Chain))
safe := checker.Check(chainID, em.BlockNum, em.LogIdx, em.Hash)
if !safe {
break
}
}
// record the current index, as it is safe to advance to this point
xHead = i.Index()
}
// have the checker create an update to the x-head in question, and apply that update
err = db.heads.Apply(checker.Update(chainID, xHead))
if err != nil {
return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err)
}
}
return nil
}

// LatestBlockNum returns the latest block number that has been recorded to the logs db
// for the given chain. It does not contain safety guarantees.
func (db *ChainsDB) LatestBlockNum(chain types.ChainID) uint64 {
logDB, ok := db.logDBs[chain]
if !ok {
Expand Down
19 changes: 19 additions & 0 deletions op-supervisor/supervisor/backend/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"testing"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -49,11 +52,23 @@ func TestChainsDB_Rewind(t *testing.T) {

type stubHeadStorage struct{}

func (s *stubHeadStorage) Apply(heads.Operation) error {
panic("not implemented")
}

func (s *stubHeadStorage) Current() *heads.Heads {
panic("not implemented")
}

type stubLogDB struct {
addLogCalls int
headBlockNum uint64
}

func (s *stubLogDB) LastCheckpointBehind(entrydb.EntryIdx) (*logs.Iterator, error) {
panic("not implemented")
}

func (s *stubLogDB) ClosestBlockInfo(_ uint64) (uint64, backendTypes.TruncatedHash, error) {
panic("not implemented")
}
Expand All @@ -63,6 +78,10 @@ func (s *stubLogDB) AddLog(logHash backendTypes.TruncatedHash, block eth.BlockID
return nil
}

func (s *stubLogDB) Contains(blockNum uint64, logIdx uint32, loghash backendTypes.TruncatedHash) (bool, entrydb.EntryIdx, error) {
panic("not implemented")
}

func (s *stubLogDB) Rewind(newHeadBlockNum uint64) error {
s.headBlockNum = newHeadBlockNum
return nil
Expand Down
10 changes: 10 additions & 0 deletions op-supervisor/supervisor/backend/db/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -48,6 +50,14 @@ type stubLogStore struct {
rewoundTo uint64
}

func (s *stubLogStore) Contains(blockNum uint64, logIdx uint32, loghash types.TruncatedHash) (bool, entrydb.EntryIdx, error) {
panic("not supported")
}

func (s *stubLogStore) LastCheckpointBehind(entrydb.EntryIdx) (*logs.Iterator, error) {
panic("not supported")
}

func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, error) {
if s.closestBlockErr != nil {
return 0, types.TruncatedHash{}, s.closestBlockErr
Expand Down
46 changes: 36 additions & 10 deletions op-supervisor/supervisor/backend/db/logs/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,29 +202,42 @@ func (db *DB) ClosestBlockInfo(blockNum uint64) (uint64, types.TruncatedHash, er
return checkpoint.blockNum, entry.hash, nil
}

// Get returns the truncated hash of the log at the specified blockNum and logIdx,
// or an error if the log is not found.
func (db *DB) Get(blockNum uint64, logiIdx uint32) (types.TruncatedHash, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
hash, _, err := db.findLogInfo(blockNum, logiIdx)
return hash, err
}

// Contains return true iff the specified logHash is recorded in the specified blockNum and logIdx.
// logIdx is the index of the log in the array of all logs the block.
// If the log is found, the entry index of the log is returned, too.
// logIdx is the index of the log in the array of all logs in the block.
// This can be used to check the validity of cross-chain interop events.
func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash types.TruncatedHash) (bool, error) {
func (db *DB) Contains(blockNum uint64, logIdx uint32, logHash types.TruncatedHash) (bool, entrydb.EntryIdx, error) {
db.rwLock.RLock()
defer db.rwLock.RUnlock()
db.log.Trace("Checking for log", "blockNum", blockNum, "logIdx", logIdx, "hash", logHash)

evtHash, _, err := db.findLogInfo(blockNum, logIdx)
evtHash, iter, err := db.findLogInfo(blockNum, logIdx)
if errors.Is(err, ErrNotFound) {
// Did not find a log at blockNum and logIdx
return false, nil
return false, 0, nil
} else if err != nil {
return false, err
return false, 0, err
}
db.log.Trace("Found initiatingEvent", "blockNum", blockNum, "logIdx", logIdx, "hash", evtHash)
// Found the requested block and log index, check if the hash matches
return evtHash == logHash, nil
if evtHash == logHash {
return true, iter.Index(), nil
}
return false, 0, nil
}

// Executes checks if the log identified by the specific block number and log index, has an ExecutingMessage associated
// with it that needs to be checked as part of interop validation.
// logIdx is the index of the log in the array of all logs the block.
// logIdx is the index of the log in the array of all logs in the block.
// Returns the ExecutingMessage if it exists, or ExecutingMessage{} if the log is found but has no ExecutingMessage.
// Returns ErrNotFound if the specified log does not exist in the database.
func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage, error) {
Expand All @@ -241,7 +254,7 @@ func (db *DB) Executes(blockNum uint64, logIdx uint32) (types.ExecutingMessage,
return execMsg, nil
}

func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (types.TruncatedHash, *iterator, error) {
func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (types.TruncatedHash, *Iterator, error) {
entryIdx, err := db.searchCheckpoint(blockNum, logIdx)
if errors.Is(err, io.EOF) {
// Did not find a checkpoint to start reading from so the log cannot be present.
Expand Down Expand Up @@ -277,7 +290,7 @@ func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (types.TruncatedHash,
}
}

func (db *DB) newIterator(startCheckpointEntry entrydb.EntryIdx) (*iterator, error) {
func (db *DB) newIterator(startCheckpointEntry entrydb.EntryIdx) (*Iterator, error) {
checkpoint, err := db.readSearchCheckpoint(startCheckpointEntry)
if err != nil {
return nil, fmt.Errorf("failed to read search checkpoint entry %v: %w", startCheckpointEntry, err)
Expand Down Expand Up @@ -315,7 +328,7 @@ func (db *DB) newIterator(startCheckpointEntry entrydb.EntryIdx) (*iterator, err
}
startLogCtx = initEvt.preContext(startLogCtx)
}
i := &iterator{
i := &Iterator{
db: db,
// +2 to skip the initial search checkpoint and the canonical hash event after it
nextEntryIdx: startIdx,
Expand Down Expand Up @@ -477,6 +490,19 @@ func (db *DB) Rewind(headBlockNum uint64) error {
return nil
}

func (db *DB) LastCheckpointBehind(entryIdx entrydb.EntryIdx) (*Iterator, error) {
for attempts := 0; attempts < searchCheckpointFrequency; attempts++ {
// attempt to read the index entry as a search checkpoint
_, err := db.readSearchCheckpoint(entryIdx)
if err == nil {
return db.newIterator(entryIdx)
}
// reverse if we haven't found it yet
entryIdx -= 1
}
return nil, fmt.Errorf("failed to find a search checkpoint in the last %v entries", searchCheckpointFrequency)
}

func (db *DB) readSearchCheckpoint(entryIdx entrydb.EntryIdx) (searchCheckpoint, error) {
data, err := db.store.Read(entryIdx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions op-supervisor/supervisor/backend/db/logs/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa
require.LessOrEqual(t, len(execMsg), 1, "cannot have multiple executing messages for a single log")
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
result, _, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Truef(t, result, "Did not find log %v in block %v with hash %v", logIdx, blockNum, logHash)
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
Expand All @@ -564,7 +564,7 @@ func requireContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHa
func requireNotContains(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash) {
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
result, _, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash)
require.LessOrEqual(t, m.entriesReadForSearch, int64(searchCheckpointFrequency), "Should not need to read more than between two checkpoints")
Expand All @@ -587,7 +587,7 @@ func requireExecutingMessage(t *testing.T, db *DB, blockNum uint64, logIdx uint3
func requireWrongHash(t *testing.T, db *DB, blockNum uint64, logIdx uint32, logHash common.Hash, execMsg types.ExecutingMessage) {
m, ok := db.m.(*stubMetrics)
require.True(t, ok, "Did not get the expected metrics type")
result, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
result, _, err := db.Contains(blockNum, logIdx, types.TruncateHash(logHash))
require.NoErrorf(t, err, "Error searching for log %v in block %v", logIdx, blockNum)
require.Falsef(t, result, "Found unexpected log %v in block %v with hash %v", logIdx, blockNum, logHash)

Expand Down
12 changes: 8 additions & 4 deletions op-supervisor/supervisor/backend/db/logs/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
)

type iterator struct {
type Iterator struct {
db *DB
nextEntryIdx entrydb.EntryIdx

Expand All @@ -19,7 +19,7 @@ type iterator struct {
entriesRead int64
}

func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error) {
func (i *Iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.TruncatedHash, outErr error) {
for i.nextEntryIdx <= i.db.lastEntryIdx() {
entryIdx := i.nextEntryIdx
entry, err := i.db.store.Read(entryIdx)
Expand Down Expand Up @@ -63,7 +63,11 @@ func (i *iterator) NextLog() (blockNum uint64, logIdx uint32, evtHash types.Trun
return
}

func (i *iterator) ExecMessage() (types.ExecutingMessage, error) {
func (i *Iterator) Index() entrydb.EntryIdx {
return i.nextEntryIdx - 1
}

func (i *Iterator) ExecMessage() (types.ExecutingMessage, error) {
if !i.hasExecMsg {
return types.ExecutingMessage{}, nil
}
Expand All @@ -76,7 +80,7 @@ func (i *iterator) ExecMessage() (types.ExecutingMessage, error) {
return execMsg, nil
}

func (i *iterator) readExecMessage(initEntryIdx entrydb.EntryIdx) (types.ExecutingMessage, error) {
func (i *Iterator) readExecMessage(initEntryIdx entrydb.EntryIdx) (types.ExecutingMessage, error) {
linkIdx := initEntryIdx + 1
if linkIdx%searchCheckpointFrequency == 0 {
linkIdx += 2 // skip the search checkpoint and canonical hash entries
Expand Down
Loading

0 comments on commit 7800128

Please sign in to comment.