Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: cl-sync using events #10903

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ type safeDB interface {
}

func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := rollup.NewSynchronousEvents(log, ctx, rootDeriver)

metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)

clSync := clsync.NewCLSync(log, cfg, metrics, ec)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)

var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
Expand All @@ -98,12 +104,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri

pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := driver.NewSynchronousEvents(log, ctx, rootDeriver)

syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
Expand Down Expand Up @@ -146,6 +146,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
syncDeriver,
engDeriv,
rollupNode,
clSync,
}

t.Cleanup(rollupNode.rpc.Stop)
Expand Down Expand Up @@ -328,7 +329,7 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.clSync.AddUnsafePayload(payload)
s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
}
}

Expand Down
2 changes: 1 addition & 1 deletion op-e2e/actions/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
// give the unsafe block to the verifier, and see if it reorgs because of any unsafe inputs
head, err := altSeqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(head)
verifier.ActL2UnsafeGossipReceive(head)(t)

// make sure verifier has processed everything
verifier.ActL2PipelineFull(t)
Expand Down
1 change: 0 additions & 1 deletion op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ func L1InfoFromState(ctx context.Context, contract *bindings.L1Block, l2Number *
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
func TestSystemMockP2P(t *testing.T) {
t.Skip("flaky in CI") // TODO(CLI-3859): Re-enable this test.
protolambda marked this conversation as resolved.
Show resolved Hide resolved
InitParallel(t)

cfg := DefaultSystemConfig(t)
Expand Down
3 changes: 2 additions & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *

n.tracer.OnUnsafeL2Payload(ctx, from, envelope)

n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from)
n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from,
"txs", len(envelope.ExecutionPayload.Transactions))

// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
Expand Down
14 changes: 7 additions & 7 deletions op-node/rollup/attributes/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("drop stale attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)

Expand All @@ -196,7 +196,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("pending gets reorged", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)

Expand All @@ -211,7 +211,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("consolidation fails", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA1)
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestAttributesHandler(t *testing.T) {
fn := func(t *testing.T, lastInSpan bool) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA1)
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestAttributesHandler(t *testing.T) {

logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA0)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestAttributesHandler(t *testing.T) {

logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA0)
Expand All @@ -399,7 +399,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("no attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)

Expand Down
168 changes: 111 additions & 57 deletions op-node/rollup/clsync/clsync.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package clsync

import (
"context"
"errors"
"io"
"sync"

"github.com/ethereum/go-ethereum/log"

Expand All @@ -20,27 +18,26 @@ type Metrics interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
}

type Engine interface {
engine.EngineState
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}

// CLSync holds on to a queue of received unsafe payloads,
// and tries to apply them to the tip of the chain when requested to.
type CLSync struct {
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
log log.Logger
cfg *rollup.Config
metrics Metrics

emitter rollup.EventEmitter

mu sync.Mutex

unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
}

func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync {
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter rollup.EventEmitter) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
emitter: emitter,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
}
}
Expand All @@ -58,67 +55,124 @@ func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef {
return ref
}

// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) {
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
type ReceivedUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}

func (ev ReceivedUnsafePayloadEvent) String() string {
return "received-unsafe-payload"
}

func (eq *CLSync) OnEvent(ev rollup.Event) {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue.
eq.mu.Lock()
defer eq.mu.Unlock()

switch x := ev.(type) {
case engine.InvalidPayloadEvent:
eq.onInvalidPayload(x)
case engine.ForkchoiceUpdateEvent:
eq.onForkchoiceUpdate(x)
case ReceivedUnsafePayloadEvent:
eq.onUnsafePayload(x)
}
}

if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
// onInvalidPayload checks if the first next-up payload matches the invalid payload.
// If so, the payload is dropped, to give the next payloads a try.
func (eq *CLSync) onInvalidPayload(x engine.InvalidPayloadEvent) {
eq.log.Debug("CL sync received invalid-payload report", x.Envelope.ExecutionPayload.ID())

block := x.Envelope.ExecutionPayload
if peek := eq.unsafePayloads.Peek(); peek != nil &&
block.BlockHash == peek.ExecutionPayload.BlockHash {
eq.log.Warn("Dropping invalid unsafe payload",
"hash", block.BlockHash, "number", uint64(block.BlockNumber),
"timestamp", uint64(block.Timestamp))
eq.unsafePayloads.Pop()
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}

// Proceed dequeues the next applicable unsafe payload, if any, to apply to the tip of the chain.
// EOF error means we can't process the next unsafe payload. The caller should then try a different form of syncing.
func (eq *CLSync) Proceed(ctx context.Context) error {
// onForkchoiceUpdate peeks at the next applicable unsafe payload, if any,
// to apply on top of the received forkchoice pre-state.
// The payload is held on to until the forkchoice changes (success case) or the payload is reported to be invalid.
func (eq *CLSync) onForkchoiceUpdate(x engine.ForkchoiceUpdateEvent) {
protolambda marked this conversation as resolved.
Show resolved Hide resolved
eq.log.Debug("CL sync received forkchoice update",
"unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head)

for {
pop, abort := eq.fromQueue(x)
if abort {
return
}
if pop {
eq.unsafePayloads.Pop()
} else {
break
}
}

firstEnvelope := eq.unsafePayloads.Peek()

// We don't pop from the queue. If there is a temporary error then we can retry.
// Upon next forkchoice update or invalid-payload event we can remove it from the queue.
eq.emitter.Emit(engine.ProcessUnsafePayloadEvent{Envelope: firstEnvelope})
}

// fromQueue determines what to do with the tip of the payloads-queue, given the forkchoice pre-state.
// If abort, there is nothing to process (either due to empty queue, or unsuitable tip).
// If pop, the tip should be dropped, and processing can repeat from there.
// If not abort or pop, the tip is ready to process.
func (eq *CLSync) fromQueue(x engine.ForkchoiceUpdateEvent) (pop bool, abort bool) {
if eq.unsafePayloads.Len() == 0 {
return io.EOF
return false, true
}
firstEnvelope := eq.unsafePayloads.Peek()
first := firstEnvelope.ExecutionPayload

if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
if first.BlockHash == x.UnsafeL2Head.Hash {
eq.log.Debug("successfully processed payload, removing it from the payloads queue now")
return true, false
}
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil

if uint64(first.BlockNumber) <= x.SafeL2Head.Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
if uint64(first.BlockNumber) <= x.UnsafeL2Head.Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}

// Ensure that the unsafe payload builds upon the current unsafe head
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
if first.ParentHash != x.UnsafeL2Head.Hash {
if uint64(first.BlockNumber) == x.UnsafeL2Head.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
return false, true // rollup-node should try something different if it cannot process the first unsafe payload
}

ref, err := derive.PayloadToBlockRef(eq.cfg, first)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads.Pop()
return nil
return false, false
}

// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) onUnsafePayload(x ReceivedUnsafePayloadEvent) {
eq.log.Debug("CL sync received payload", "payload", x.Envelope.ExecutionPayload.ID())
envelope := x.Envelope
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}

if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, derive.ErrTemporary) {
eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return err
} else if err != nil {
eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return err
if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return nil
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))

// request forkchoice signal, so we can process the payload maybe
eq.emitter.Emit(engine.ForkchoiceRequestEvent{})
}
Loading
Loading