Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip(interop) block dependency graph #10044

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions op-superchain/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

type Backend interface {
Expand Down Expand Up @@ -81,38 +80,23 @@ func (b *backend) MessageSafety(ctx context.Context, id MessageIdentifier, paylo
return MessageUnknown, fmt.Errorf("peer with chain id %d is not configured", id.ChainId)
}

blockNum := rpc.BlockNumber(id.BlockNumber.Int64())
block, logs, err := logsProvider.FetchLogs(ctx, rpc.BlockNumberOrHash{BlockNumber: &blockNum})
safety, err := MessageValidity(ctx, id, payload, logsProvider)
if err != nil {
return MessageUnknown, fmt.Errorf("unable to fetch logs: %w", err)
return safety, err
}

// validity with the block
if id.Timestamp != block.Time() {
return MessageInvalid, fmt.Errorf("message id and header timestamp mismatch")
}
if id.LogIndex >= uint64(len(logs)) {
return MessageInvalid, fmt.Errorf("invalid log index")
}

// Check message validity against the remote log
log := logs[id.LogIndex]
if err := CheckMessageLog(id, payload, &log); err != nil {
return MessageInvalid, fmt.Errorf("failed log check: %w", err)
}

// Message Safety
var finalizedL2Timestamp uint64
b.mu.RLock()
finalizedL2Timestamp = b.l2FinalizedBlockRef.Time
b.mu.RUnlock()

// Dependency verification
if id.Timestamp <= finalizedL2Timestamp {
return MessageFinalized, nil
}

// TODO: support for the other safety labels
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a GitHub ticket for this TODO.

Ignore this finding from todos_require_linear.


// Cant determine validity
return MessageUnknown, nil
// Cant determine higher levels of safety beyond validity
return safety, nil
}
192 changes: 192 additions & 0 deletions op-superchain/block_dependencies.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package superchain

import (
"context"
"fmt"
"math/big"
"sync"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

type BlockSafetyLabel int

const (
BlockUnsafe BlockSafetyLabel = iota - 1
BlockCrossUnsafe
BlockSafe
BlockFinalized
)

type blockDependency struct {
chainId *big.Int
blockNumber uint64
}

type blockDependent struct {
chainId *big.Int
blockRef eth.L2BlockRef
}

type BlockDependencies struct {
log log.Logger

chains map[string]*sources.L2Client

// We only track the heads and not any parent blocks previously added. The l2
// client source implements caching, simplfying memory management here.
heads map[string]common.Hash

// block -> unverified messages
unverifiedExecutingMessages map[common.Hash][]Message

// chain -> block -> dependencies
// (1) Link between blocks with executing message to the blocks that should contain the initiating message
// (2) The parent block is by default a dependency for a derived block
dependencies map[string]map[common.Hash][]blockDependency

// chain -> block number -> dependents
// (1) Link between blocks with an initiating message that's been executed.
// (2) Any derived block is by default a dependent for the parent.
//
// The block number is used here since the block containing the initiating
// message may not have yet been observed when processing the executing message.
dependents map[string]map[uint64][]blockDependent

// Start with a global lock on the graph and avoid the optimization if it's not contentious
mu sync.Mutex
}

func (deps *BlockDependencies) BlockSafety(ctx context.Context, chainId *big.Int, blockRef eth.L2BlockRef) (BlockSafetyLabel, error) {
deps.mu.Lock()
defer deps.mu.Unlock()

if len(deps.unverifiedExecutingMessages[blockRef.Hash]) > 0 {
return BlockUnsafe, nil
}

for _, blockDependency := range deps.dependencies[chainId.String()][blockRef.Hash] {
// Since there are no unverified messages in this block, we can safely fetch the
// right block using the block number as the initiating message in the remote
// block was validated.
//
// We also know the remote block specified by number hasn't been reorg'd,
// otherwise the invalidation would have cascaded to this block
chain := deps.chains[blockDependency.chainId.String()]
block, err := chain.L2BlockRefByNumber(ctx, blockDependency.blockNumber)
if err != nil {
return BlockUnsafe, err
}

dependencyBlockSafety, err := deps.BlockSafety(ctx, blockDependency.chainId, block)
if err != nil {
return BlockUnsafe, err
}

if dependencyBlockSafety == BlockUnsafe {
return BlockUnsafe, nil
}
}

// TODO: we need references to the safe head for every chain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a GitHub ticket for this TODO.

Ignore this finding from todos_require_linear.

return BlockCrossUnsafe, nil
}

func (deps *BlockDependencies) AddBlock(chainId *big.Int, blockRef eth.L2BlockRef) error {
deps.mu.Lock()
defer deps.mu.Unlock()

deps.log.Debug("adding block", "chain_id", chainId, "hash", blockRef.Hash)

chainIdStr := chainId.String()
chain, ok := deps.chains[chainIdStr]
if !ok {
return fmt.Errorf("chain %d not present in configuration", chainId)
}

head := deps.heads[chainIdStr]
if blockRef.ParentHash != head {
return fmt.Errorf("block %s does not build on head %s", blockRef.Hash, head)
}

_, txs, err := chain.InfoAndTxsByHash(context.TODO(), blockRef.Hash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to use well-defined context

Ignore this finding from context-todo.

if err != nil {
return fmt.Errorf("unable to query txs: %w", err)
}

// default edge with the parent block
deps.dependents[chainIdStr][blockRef.Number-1] = append(deps.dependents[chainIdStr][blockRef.Number-1], blockDependent{chainId, blockRef})
deps.dependencies[chainIdStr][blockRef.Hash] = append(deps.dependencies[chainIdStr][blockRef.Hash], blockDependency{chainId, blockRef.Number - 1})

// add edges for present executing messages
deps.heads[chainIdStr] = blockRef.Hash
for _, tx := range txs {
if IsInboxExecutingMessageTx(tx) {
_, id, payload, err := ParseInboxExecuteMessageTxData(tx.Data())
if err != nil {
// TODO: revisit bad txs to the inbox address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a GitHub ticket for this TODO.

Ignore this finding from todos_require_linear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO in error handling code

Ignore this finding from err-todo.

log.Warn("skipping inbox tx with bad tx data", "err", err)
continue
}

dependent := blockDependent{id.ChainId, blockRef}
dependency := blockDependency{id.ChainId, id.BlockNumber.Uint64()}

// todo: de-dup edges
deps.unverifiedExecutingMessages[blockRef.Hash] = append(deps.unverifiedExecutingMessages[blockRef.Hash], Message{id, payload})
deps.dependents[id.ChainId.String()][id.BlockNumber.Uint64()] = append(deps.dependents[id.ChainId.String()][id.BlockNumber.Uint64()], dependent)
deps.dependencies[chainIdStr][blockRef.Hash] = append(deps.dependencies[chainIdStr][blockRef.Hash], dependency)
}
}

// attempt resolution for this block & any existing dependents
deps.resolveUnverifiedExecutingMessages(chainId, blockRef)
for _, dependentBlock := range deps.dependents[chainIdStr][blockRef.Number] {
deps.resolveUnverifiedExecutingMessages(dependentBlock.chainId, dependentBlock.blockRef)
}

return nil
}

func (deps *BlockDependencies) resolveUnverifiedExecutingMessages(chainId *big.Int, blockRef eth.L2BlockRef) {
deps.log.Debug("resolving unverified messages", "chain_id", chainId, "hash", blockRef.Hash)

unverifiedMessages := deps.unverifiedExecutingMessages[blockRef.Hash]
remainingUnverifiedMessages := make([]Message, 0, len(unverifiedMessages))
for _, msg := range unverifiedMessages {
safety, err := MessageValidity(context.TODO(), msg.Id, msg.Payload, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to use well-defined context

Ignore this finding from context-todo.

if err != nil {
if safety == MessageInvalid {
deps.log.Error("invalidated msg", "id", msg.Id)
deps.handleInvalidation(chainId, blockRef)
}
if safety == MessageUnknown {
remainingUnverifiedMessages = append(remainingUnverifiedMessages, msg)
}
}
// msg is valid (unsafe) and can be dropped.
}

deps.unverifiedExecutingMessages[blockRef.Hash] = remainingUnverifiedMessages
}

func (deps *BlockDependencies) handleInvalidation(chainId *big.Int, blockRef eth.L2BlockRef) {
deps.log.Debug("block invalidation", "chain_id", chainId, "hash", blockRef.Hash)

// new head is the parent
chainIdStr := chainId.String()
deps.heads[chainIdStr] = blockRef.ParentHash

// first invalidate dependents (includes derived blocks)
for _, dependentBlock := range deps.dependents[chainIdStr][blockRef.Number] {
deps.handleInvalidation(dependentBlock.chainId, dependentBlock.blockRef)
}

// remove all edges from this block
delete(deps.dependents[chainIdStr], blockRef.Number)
delete(deps.dependencies[chainIdStr], blockRef.Hash)
}
33 changes: 33 additions & 0 deletions op-superchain/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package superchain

import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
)

var (
Expand Down Expand Up @@ -41,6 +43,11 @@ type MessageIdentifier struct {
ChainId *big.Int
}

type Message struct {
Id MessageIdentifier
Payload hexutil.Bytes
}

func MessagePayloadBytes(log *types.Log) []byte {
msg := []byte{}
for _, topic := range log.Topics {
Expand All @@ -59,6 +66,32 @@ func IsInboxExecutingMessageTx(tx *types.Transaction) bool {
return len(txData) >= 4 && bytes.Equal(txData[:4], inboxExecuteMessageBytes4)
}

// Check the validity of a message against the fetched log. If valid, the
// returned label is `MessageUnsafe` as dependencies still need to be verified
func MessageValidity(ctx context.Context, id MessageIdentifier, payload hexutil.Bytes, p LogsProvider) (MessageSafetyLabel, error) {
blockNum := rpc.BlockNumber(id.BlockNumber.Int64())
block, logs, err := p.FetchLogs(ctx, rpc.BlockNumberOrHash{BlockNumber: &blockNum})
if err != nil {
return MessageUnknown, fmt.Errorf("unable to fetch logs: %w", err)
}

// block validity
if id.Timestamp != block.Time() {
return MessageInvalid, nil
}
if id.LogIndex >= uint64(len(logs)) {
return MessageInvalid, fmt.Errorf("invalid log index")
}

// log validity
log := logs[id.LogIndex]
if err := CheckMessageLog(id, payload, &log); err != nil {
return MessageInvalid, fmt.Errorf("failed message id & log check: %w", err)
}

return MessageUnsafe, nil
}

// Check the message id and payload against the fields of the log.
func CheckMessageLog(id MessageIdentifier, payload hexutil.Bytes, log *types.Log) error {
if id.LogIndex != uint64(log.Index) {
Expand Down