Skip to content

Commit

Permalink
op-node: cl-sync using events
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Jun 18, 2024
1 parent aff6e89 commit 402f4be
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 237 deletions.
19 changes: 10 additions & 9 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ type safeDB interface {
}

func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := rollup.NewSynchronousEvents(log, ctx, rootDeriver)

metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)

clSync := clsync.NewCLSync(log, cfg, metrics, ec)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)

var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
Expand All @@ -98,12 +104,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri

pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := driver.NewSynchronousEvents(log, ctx, rootDeriver)

syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
Expand Down Expand Up @@ -146,6 +146,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
syncDeriver,
engDeriv,
rollupNode,
clSync,
}

t.Cleanup(rollupNode.rpc.Stop)
Expand Down Expand Up @@ -328,7 +329,7 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
// ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub
func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action {
return func(t Testing) {
s.clSync.AddUnsafePayload(payload)
s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload})
}
}

Expand Down
2 changes: 1 addition & 1 deletion op-e2e/actions/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
// give the unsafe block to the verifier, and see if it reorgs because of any unsafe inputs
head, err := altSeqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(head)
verifier.ActL2UnsafeGossipReceive(head)(t)

// make sure verifier has processed everything
verifier.ActL2PipelineFull(t)
Expand Down
1 change: 0 additions & 1 deletion op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ func L1InfoFromState(ctx context.Context, contract *bindings.L1Block, l2Number *
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
func TestSystemMockP2P(t *testing.T) {
t.Skip("flaky in CI") // TODO(CLI-3859): Re-enable this test.
InitParallel(t)

cfg := DefaultSystemConfig(t)
Expand Down
3 changes: 2 additions & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *

n.tracer.OnUnsafeL2Payload(ctx, from, envelope)

n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from)
n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from,
"txs", len(envelope.ExecutionPayload.Transactions))

// Pass on the event to the L2 Engine
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
Expand Down
14 changes: 7 additions & 7 deletions op-node/rollup/attributes/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("drop stale attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)

Expand All @@ -196,7 +196,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("pending gets reorged", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)

Expand All @@ -211,7 +211,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("consolidation fails", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA1)
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestAttributesHandler(t *testing.T) {
fn := func(t *testing.T, lastInSpan bool) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA1)
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestAttributesHandler(t *testing.T) {

logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA0)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestAttributesHandler(t *testing.T) {

logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)

ec.SetUnsafeHead(refA0)
Expand All @@ -399,7 +399,7 @@ func TestAttributesHandler(t *testing.T) {
t.Run("no attributes", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
eng := &testutils.MockEngine{}
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync)
ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{})
ah := NewAttributesHandler(logger, cfg, ec, eng)
defer eng.AssertExpectations(t)

Expand Down
168 changes: 111 additions & 57 deletions op-node/rollup/clsync/clsync.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package clsync

import (
"context"
"errors"
"io"
"sync"

"github.com/ethereum/go-ethereum/log"

Expand All @@ -20,27 +18,26 @@ type Metrics interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
}

type Engine interface {
engine.EngineState
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}

// CLSync holds on to a queue of received unsafe payloads,
// and tries to apply them to the tip of the chain when requested to.
type CLSync struct {
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
log log.Logger
cfg *rollup.Config
metrics Metrics

emitter rollup.EventEmitter

mu sync.Mutex

unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
}

func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync {
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter rollup.EventEmitter) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
emitter: emitter,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
}
}
Expand All @@ -58,67 +55,124 @@ func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef {
return ref
}

// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) {
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
type ReceivedUnsafePayloadEvent struct {
Envelope *eth.ExecutionPayloadEnvelope
}

func (ev ReceivedUnsafePayloadEvent) String() string {
return "received-unsafe-payload"
}

func (eq *CLSync) OnEvent(ev rollup.Event) {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue.
eq.mu.Lock()
defer eq.mu.Unlock()

switch x := ev.(type) {
case engine.InvalidPayloadEvent:
eq.onInvalidPayload(x)
case engine.ForkchoiceUpdateEvent:
eq.onForkchoiceUpdate(x)
case ReceivedUnsafePayloadEvent:
eq.onUnsafePayload(x)
}
}

if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
// onInvalidPayload checks if the first next-up payload matches the invalid payload.
// If so, the payload is dropped, to give the next payloads a try.
func (eq *CLSync) onInvalidPayload(x engine.InvalidPayloadEvent) {
eq.log.Debug("CL sync received invalid-payload report", x.Envelope.ExecutionPayload.ID())

block := x.Envelope.ExecutionPayload
if peek := eq.unsafePayloads.Peek(); peek != nil &&
block.BlockHash == peek.ExecutionPayload.BlockHash {
eq.log.Warn("Dropping invalid unsafe payload",
"hash", block.BlockHash, "number", uint64(block.BlockNumber),
"timestamp", uint64(block.Timestamp))
eq.unsafePayloads.Pop()
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))
}

// Proceed dequeues the next applicable unsafe payload, if any, to apply to the tip of the chain.
// EOF error means we can't process the next unsafe payload. The caller should then try a different form of syncing.
func (eq *CLSync) Proceed(ctx context.Context) error {
// onForkchoiceUpdate peeks at the next applicable unsafe payload, if any,
// to apply on top of the received forkchoice pre-state.
// The payload is held on to until the forkchoice changes (success case) or the payload is reported to be invalid.
func (eq *CLSync) onForkchoiceUpdate(x engine.ForkchoiceUpdateEvent) {
eq.log.Debug("CL sync received forkchoice update",
"unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head)

for {
pop, abort := eq.fromQueue(x)
if abort {
return
}
if pop {
eq.unsafePayloads.Pop()
} else {
break
}
}

firstEnvelope := eq.unsafePayloads.Peek()

// We don't pop from the queue. If there is a temporary error then we can retry.
// Upon next forkchoice update or invalid-payload event we can remove it from the queue.
eq.emitter.Emit(engine.ProcessUnsafePayloadEvent{Envelope: firstEnvelope})
}

// fromQueue determines what to do with the tip of the payloads-queue, given the forkchoice pre-state.
// If abort, there is nothing to process (either due to empty queue, or unsuitable tip).
// If pop, the tip should be dropped, and processing can repeat from there.
// If not abort or pop, the tip is ready to process.
func (eq *CLSync) fromQueue(x engine.ForkchoiceUpdateEvent) (pop bool, abort bool) {
if eq.unsafePayloads.Len() == 0 {
return io.EOF
return false, true
}
firstEnvelope := eq.unsafePayloads.Peek()
first := firstEnvelope.ExecutionPayload

if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil
if first.BlockHash == x.UnsafeL2Head.Hash {
eq.log.Debug("successfully processed payload, removing it from the payloads queue now")
return true, false
}
if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
return nil

if uint64(first.BlockNumber) <= x.SafeL2Head.Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
if uint64(first.BlockNumber) <= x.UnsafeL2Head.Number {
eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}

// Ensure that the unsafe payload builds upon the current unsafe head
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID())
eq.unsafePayloads.Pop()
if first.ParentHash != x.UnsafeL2Head.Hash {
if uint64(first.BlockNumber) == x.UnsafeL2Head.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID())
return true, false
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
return false, true // rollup-node should try something different if it cannot process the first unsafe payload
}

ref, err := derive.PayloadToBlockRef(eq.cfg, first)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads.Pop()
return nil
return false, false
}

// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1.
func (eq *CLSync) onUnsafePayload(x ReceivedUnsafePayloadEvent) {
eq.log.Debug("CL sync received payload", "payload", x.Envelope.ExecutionPayload.ID())
envelope := x.Envelope
if envelope == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}

if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, derive.ErrTemporary) {
eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return err
} else if err != nil {
eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.unsafePayloads.Pop()
return err
if err := eq.unsafePayloads.Push(envelope); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err)
return
}
eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return nil
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp))

// request forkchoice signal, so we can process the payload maybe
eq.emitter.Emit(engine.ForkchoiceRequestEvent{})
}
Loading

0 comments on commit 402f4be

Please sign in to comment.