diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index 42d0813f30caf..5672db1286f0f 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -82,10 +82,16 @@ type safeDB interface { } func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + rootDeriver := &rollup.SynchronousDerivers{} + synchronousEvents := rollup.NewSynchronousEvents(log, ctx, rootDeriver) + metrics := &testutils.TestDerivationMetrics{} - ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode) + ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents) - clSync := clsync.NewCLSync(log, cfg, metrics, ec) + clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents) var finalizer driver.Finalizer if cfg.PlasmaEnabled() { @@ -98,12 +104,6 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - rootDeriver := &rollup.SynchronousDerivers{} - synchronousEvents := driver.NewSynchronousEvents(log, ctx, rootDeriver) - syncDeriver := &driver.SyncDeriver{ Derivation: pipeline, Finalizer: finalizer, @@ -146,6 +146,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri syncDeriver, engDeriv, rollupNode, + clSync, } t.Cleanup(rollupNode.rpc.Stop) @@ -328,7 +329,7 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) { // ActL2UnsafeGossipReceive creates an action that can receive an unsafe execution payload, like gossipsub func (s *L2Verifier) ActL2UnsafeGossipReceive(payload *eth.ExecutionPayloadEnvelope) Action { return func(t Testing) { - s.clSync.AddUnsafePayload(payload) + s.syncDeriver.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: payload}) } } diff --git a/op-e2e/actions/reorg_test.go b/op-e2e/actions/reorg_test.go index 1b4edbaf9cfee..27cce2aa1086e 100644 --- a/op-e2e/actions/reorg_test.go +++ b/op-e2e/actions/reorg_test.go @@ -767,7 +767,7 @@ func ConflictingL2Blocks(gt *testing.T, deltaTimeOffset *hexutil.Uint64) { // give the unsafe block to the verifier, and see if it reorgs because of any unsafe inputs head, err := altSeqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe) require.NoError(t, err) - verifier.ActL2UnsafeGossipReceive(head) + verifier.ActL2UnsafeGossipReceive(head)(t) // make sure verifier has processed everything verifier.ActL2PipelineFull(t) diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 30f15f7e20603..d5775ff50aa26 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -617,7 +617,6 @@ func L1InfoFromState(ctx context.Context, contract *bindings.L1Block, l2Number * // TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that // the nodes can sync L2 blocks before they are confirmed on L1. func TestSystemMockP2P(t *testing.T) { - t.Skip("flaky in CI") // TODO(CLI-3859): Re-enable this test. InitParallel(t) cfg := DefaultSystemConfig(t) diff --git a/op-node/node/node.go b/op-node/node/node.go index f2a62cf58da0d..fb5f981d8f14a 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -582,7 +582,8 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope * n.tracer.OnUnsafeL2Payload(ctx, from, envelope) - n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from) + n.log.Info("Received signed execution payload from p2p", "id", envelope.ExecutionPayload.ID(), "peer", from, + "txs", len(envelope.ExecutionPayload.Transactions)) // Pass on the event to the L2 Engine ctx, cancel := context.WithTimeout(ctx, time.Second*30) diff --git a/op-node/rollup/attributes/attributes_test.go b/op-node/rollup/attributes/attributes_test.go index 62931af8c3786..49e6247e14177 100644 --- a/op-node/rollup/attributes/attributes_test.go +++ b/op-node/rollup/attributes/attributes_test.go @@ -182,7 +182,7 @@ func TestAttributesHandler(t *testing.T) { t.Run("drop stale attributes", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) defer eng.AssertExpectations(t) @@ -196,7 +196,7 @@ func TestAttributesHandler(t *testing.T) { t.Run("pending gets reorged", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) defer eng.AssertExpectations(t) @@ -211,7 +211,7 @@ func TestAttributesHandler(t *testing.T) { t.Run("consolidation fails", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) ec.SetUnsafeHead(refA1) @@ -265,7 +265,7 @@ func TestAttributesHandler(t *testing.T) { fn := func(t *testing.T, lastInSpan bool) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) ec.SetUnsafeHead(refA1) @@ -324,7 +324,7 @@ func TestAttributesHandler(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) ec.SetUnsafeHead(refA0) @@ -375,7 +375,7 @@ func TestAttributesHandler(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) ec.SetUnsafeHead(refA0) @@ -399,7 +399,7 @@ func TestAttributesHandler(t *testing.T) { t.Run("no attributes", func(t *testing.T) { logger := testlog.Logger(t, log.LevelInfo) eng := &testutils.MockEngine{} - ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync) + ec := engine.NewEngineController(eng, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) ah := NewAttributesHandler(logger, cfg, ec, eng) defer eng.AssertExpectations(t) diff --git a/op-node/rollup/clsync/clsync.go b/op-node/rollup/clsync/clsync.go index 989f1c7c98b60..faa4586105e30 100644 --- a/op-node/rollup/clsync/clsync.go +++ b/op-node/rollup/clsync/clsync.go @@ -1,9 +1,7 @@ package clsync import ( - "context" - "errors" - "io" + "sync" "github.com/ethereum/go-ethereum/log" @@ -20,27 +18,26 @@ type Metrics interface { RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) } -type Engine interface { - engine.EngineState - InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error -} - // CLSync holds on to a queue of received unsafe payloads, // and tries to apply them to the tip of the chain when requested to. type CLSync struct { - log log.Logger - cfg *rollup.Config - metrics Metrics - ec Engine + log log.Logger + cfg *rollup.Config + metrics Metrics + + emitter rollup.EventEmitter + + mu sync.Mutex + unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates } -func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync { +func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter rollup.EventEmitter) *CLSync { return &CLSync{ log: log, cfg: cfg, metrics: metrics, - ec: ec, + emitter: emitter, unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize), } } @@ -58,67 +55,124 @@ func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef { return ref } -// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1. -func (eq *CLSync) AddUnsafePayload(envelope *eth.ExecutionPayloadEnvelope) { - if envelope == nil { - eq.log.Warn("cannot add nil unsafe payload") - return +type ReceivedUnsafePayloadEvent struct { + Envelope *eth.ExecutionPayloadEnvelope +} + +func (ev ReceivedUnsafePayloadEvent) String() string { + return "received-unsafe-payload" +} + +func (eq *CLSync) OnEvent(ev rollup.Event) { + // Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue. + eq.mu.Lock() + defer eq.mu.Unlock() + + switch x := ev.(type) { + case engine.InvalidPayloadEvent: + eq.onInvalidPayload(x) + case engine.ForkchoiceUpdateEvent: + eq.onForkchoiceUpdate(x) + case ReceivedUnsafePayloadEvent: + eq.onUnsafePayload(x) } +} - if err := eq.unsafePayloads.Push(envelope); err != nil { - eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err) - return +// onInvalidPayload checks if the first next-up payload matches the invalid payload. +// If so, the payload is dropped, to give the next payloads a try. +func (eq *CLSync) onInvalidPayload(x engine.InvalidPayloadEvent) { + eq.log.Debug("CL sync received invalid-payload report", x.Envelope.ExecutionPayload.ID()) + + block := x.Envelope.ExecutionPayload + if peek := eq.unsafePayloads.Peek(); peek != nil && + block.BlockHash == peek.ExecutionPayload.BlockHash { + eq.log.Warn("Dropping invalid unsafe payload", + "hash", block.BlockHash, "number", uint64(block.BlockNumber), + "timestamp", uint64(block.Timestamp)) + eq.unsafePayloads.Pop() } - p := eq.unsafePayloads.Peek() - eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID()) - eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp)) } -// Proceed dequeues the next applicable unsafe payload, if any, to apply to the tip of the chain. -// EOF error means we can't process the next unsafe payload. The caller should then try a different form of syncing. -func (eq *CLSync) Proceed(ctx context.Context) error { +// onForkchoiceUpdate peeks at the next applicable unsafe payload, if any, +// to apply on top of the received forkchoice pre-state. +// The payload is held on to until the forkchoice changes (success case) or the payload is reported to be invalid. +func (eq *CLSync) onForkchoiceUpdate(x engine.ForkchoiceUpdateEvent) { + eq.log.Debug("CL sync received forkchoice update", + "unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head) + + for { + pop, abort := eq.fromQueue(x) + if abort { + return + } + if pop { + eq.unsafePayloads.Pop() + } else { + break + } + } + + firstEnvelope := eq.unsafePayloads.Peek() + + // We don't pop from the queue. If there is a temporary error then we can retry. + // Upon next forkchoice update or invalid-payload event we can remove it from the queue. + eq.emitter.Emit(engine.ProcessUnsafePayloadEvent{Envelope: firstEnvelope}) +} + +// fromQueue determines what to do with the tip of the payloads-queue, given the forkchoice pre-state. +// If abort, there is nothing to process (either due to empty queue, or unsuitable tip). +// If pop, the tip should be dropped, and processing can repeat from there. +// If not abort or pop, the tip is ready to process. +func (eq *CLSync) fromQueue(x engine.ForkchoiceUpdateEvent) (pop bool, abort bool) { if eq.unsafePayloads.Len() == 0 { - return io.EOF + return false, true } firstEnvelope := eq.unsafePayloads.Peek() first := firstEnvelope.ExecutionPayload - if uint64(first.BlockNumber) <= eq.ec.SafeL2Head().Number { - eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID()) - eq.unsafePayloads.Pop() - return nil + if first.BlockHash == x.UnsafeL2Head.Hash { + eq.log.Debug("successfully processed payload, removing it from the payloads queue now") + return true, false } - if uint64(first.BlockNumber) <= eq.ec.UnsafeL2Head().Number { - eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID()) - eq.unsafePayloads.Pop() - return nil + + if uint64(first.BlockNumber) <= x.SafeL2Head.Number { + eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID()) + return true, false + } + if uint64(first.BlockNumber) <= x.UnsafeL2Head.Number { + eq.log.Info("skipping unsafe payload, since it is older than unsafe head", "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID()) + return true, false } // Ensure that the unsafe payload builds upon the current unsafe head - if first.ParentHash != eq.ec.UnsafeL2Head().Hash { - if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 { - eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", eq.ec.UnsafeL2Head().ID(), "unsafe_payload", first.ID()) - eq.unsafePayloads.Pop() + if first.ParentHash != x.UnsafeL2Head.Hash { + if uint64(first.BlockNumber) == x.UnsafeL2Head.Number+1 { + eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", x.SafeL2Head.ID(), "unsafe", x.UnsafeL2Head.ID(), "unsafe_payload", first.ID()) + return true, false } - return io.EOF // time to go to next stage if we cannot process the first unsafe payload + return false, true // rollup-node should try something different if it cannot process the first unsafe payload } - ref, err := derive.PayloadToBlockRef(eq.cfg, first) - if err != nil { - eq.log.Error("failed to decode L2 block ref from payload", "err", err) - eq.unsafePayloads.Pop() - return nil + return false, false +} + +// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1. +func (eq *CLSync) onUnsafePayload(x ReceivedUnsafePayloadEvent) { + eq.log.Debug("CL sync received payload", "payload", x.Envelope.ExecutionPayload.ID()) + envelope := x.Envelope + if envelope == nil { + eq.log.Warn("cannot add nil unsafe payload") + return } - if err := eq.ec.InsertUnsafePayload(ctx, firstEnvelope, ref); errors.Is(err, derive.ErrTemporary) { - eq.log.Debug("Temporary error while inserting unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) - return err - } else if err != nil { - eq.log.Warn("Dropping invalid unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) - eq.unsafePayloads.Pop() - return err + if err := eq.unsafePayloads.Push(envelope); err != nil { + eq.log.Warn("Could not add unsafe payload", "id", envelope.ExecutionPayload.ID(), "timestamp", uint64(envelope.ExecutionPayload.Timestamp), "err", err) + return } - eq.unsafePayloads.Pop() - eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) - return nil + p := eq.unsafePayloads.Peek() + eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ExecutionPayload.ID()) + eq.log.Trace("Next unsafe payload to process", "next", p.ExecutionPayload.ID(), "timestamp", uint64(p.ExecutionPayload.Timestamp)) + + // request forkchoice signal, so we can process the payload maybe + eq.emitter.Emit(engine.ForkchoiceRequestEvent{}) } diff --git a/op-node/rollup/clsync/clsync_test.go b/op-node/rollup/clsync/clsync_test.go index 67bcc25f82eaa..f42c67f9220e5 100644 --- a/op-node/rollup/clsync/clsync_test.go +++ b/op-node/rollup/clsync/clsync_test.go @@ -1,9 +1,6 @@ package clsync import ( - "context" - "errors" - "io" "math/big" "math/rand" // nosemgrep "testing" @@ -17,39 +14,12 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testutils" ) -type fakeEngine struct { - unsafe, safe, finalized eth.L2BlockRef - - err error -} - -func (f *fakeEngine) Finalized() eth.L2BlockRef { - return f.finalized -} - -func (f *fakeEngine) UnsafeL2Head() eth.L2BlockRef { - return f.unsafe -} - -func (f *fakeEngine) SafeL2Head() eth.L2BlockRef { - return f.safe -} - -func (f *fakeEngine) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error { - if f.err != nil { - return f.err - } - f.unsafe = ref - return nil -} - -var _ Engine = (*fakeEngine)(nil) - func TestCLSync(t *testing.T) { rng := rand.New(rand.NewSource(1234)) @@ -155,157 +125,252 @@ func TestCLSync(t *testing.T) { // When a previously received unsafe block is older than the tip of the chain, we want to drop it. t.Run("drop old", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: refA2, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - cl.AddUnsafePayload(payloadA1) - require.NoError(t, cl.Proceed(context.Background())) + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA2, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) // no new events expected to be emitted require.Nil(t, cl.unsafePayloads.Peek(), "pop because too old") - require.Equal(t, refA2, eng.unsafe, "keep unsafe head") }) // When we already have the exact payload as tip, then no need to process it t.Run("drop equal", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: refA1, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - cl.AddUnsafePayload(payloadA1) - require.NoError(t, cl.Proceed(context.Background())) + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) // no new events expected to be emitted require.Nil(t, cl.unsafePayloads.Peek(), "pop because seen") - require.Equal(t, refA1, eng.unsafe, "keep unsafe head") }) // When we have a different payload, at the same height, then we want to keep it. // The unsafe chain consensus preserves the first-seen payload. t.Run("ignore conflict", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: altRefA1, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - cl.AddUnsafePayload(payloadA1) - require.NoError(t, cl.Proceed(context.Background())) + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: altRefA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) // no new events expected to be emitted require.Nil(t, cl.unsafePayloads.Peek(), "pop because alternative") - require.Equal(t, altRefA1, eng.unsafe, "keep unsafe head") }) t.Run("ignore unsafe reorg", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: altRefA1, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - cl.AddUnsafePayload(payloadA2) - require.ErrorIs(t, cl.Proceed(context.Background()), io.EOF, "payload2 does not fit onto alt1, thus retrieve next input from L1") + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2}) + emitter.AssertExpectations(t) + + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: altRefA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) // no new events expected, since A2 does not fit onto altA1 require.Nil(t, cl.unsafePayloads.Peek(), "pop because not applicable") - require.Equal(t, altRefA1, eng.unsafe, "keep unsafe head") }) t.Run("success", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: refA0, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - - require.ErrorIs(t, cl.Proceed(context.Background()), io.EOF, "nothing to process yet") + + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + emitter.AssertExpectations(t) // nothing to process yet + require.Nil(t, cl.unsafePayloads.Peek(), "no payloads yet") - cl.AddUnsafePayload(payloadA1) + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + lowest := cl.LowestQueuedUnsafeBlock() require.Equal(t, refA1, lowest, "expecting A1 next") - require.NoError(t, cl.Proceed(context.Background())) + + // payload A1 should be possible to process on top of A0 + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA0, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + + // now pretend the payload was processed: we can drop A1 now + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) require.Nil(t, cl.unsafePayloads.Peek(), "pop because applied") - require.Equal(t, refA1, eng.unsafe, "new unsafe head") - cl.AddUnsafePayload(payloadA2) + // repeat for A2 + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2}) + emitter.AssertExpectations(t) + lowest = cl.LowestQueuedUnsafeBlock() require.Equal(t, refA2, lowest, "expecting A2 next") - require.NoError(t, cl.Proceed(context.Background())) + + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA2}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + + // now pretend the payload was processed: we can drop A2 now + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA2, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) require.Nil(t, cl.unsafePayloads.Peek(), "pop because applied") - require.Equal(t, refA2, eng.unsafe, "new unsafe head") }) t.Run("double buffer", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: refA0, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - cl.AddUnsafePayload(payloadA1) - cl.AddUnsafePayload(payloadA2) + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2}) + emitter.AssertExpectations(t) lowest := cl.LowestQueuedUnsafeBlock() require.Equal(t, refA1, lowest, "expecting A1 next") - require.NoError(t, cl.Proceed(context.Background())) - require.NotNil(t, cl.unsafePayloads.Peek(), "next is ready") - require.Equal(t, refA1, eng.unsafe, "new unsafe head") - require.NoError(t, cl.Proceed(context.Background())) + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA0, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + require.Equal(t, 2, cl.unsafePayloads.Len(), "still holding on to A1, and queued A2") + + // Now pretend the payload was processed: we can drop A1 now. + // The CL-sync will try to immediately continue with A2. + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA2}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + + // now pretend the payload was processed: we can drop A2 now + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA2, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) require.Nil(t, cl.unsafePayloads.Peek(), "done") - require.Equal(t, refA2, eng.unsafe, "new unsafe head") }) t.Run("temporary error", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: refA0, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - - testErr := derive.NewTemporaryError(errors.New("test error")) - eng.err = testErr - cl.AddUnsafePayload(payloadA1) - require.ErrorIs(t, cl.Proceed(context.Background()), testErr) - require.Equal(t, refA0, eng.unsafe, "old unsafe head after error") + + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA0, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + + // On temporary errors we don't need any feedback from the engine. + // We just hold on to what payloads there are in the queue. require.NotNil(t, cl.unsafePayloads.Peek(), "no pop because temporary error") - eng.err = nil - require.NoError(t, cl.Proceed(context.Background())) - require.Equal(t, refA1, eng.unsafe, "new unsafe head after resolved error") + // Pretend we are still stuck on the same forkchoice. The CL-sync will retry sneding the payload. + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA0, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + require.NotNil(t, cl.unsafePayloads.Peek(), "no pop because retry still unconfirmed") + + // Now confirm we got the payload this time + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA1, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) require.Nil(t, cl.unsafePayloads.Peek(), "pop because valid") }) t.Run("invalid payload error", func(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - eng := &fakeEngine{ - unsafe: refA0, - safe: refA0, - finalized: refA0, - } - cl := NewCLSync(logger, cfg, metrics, eng) - - testErr := errors.New("test error") - eng.err = testErr - cl.AddUnsafePayload(payloadA1) - require.ErrorIs(t, cl.Proceed(context.Background()), testErr) - require.Equal(t, refA0, eng.unsafe, "old unsafe head after error") + emitter := &testutils.MockEmitter{} + cl := NewCLSync(logger, cfg, metrics, emitter) + + // CLSync gets payload and requests engine state, to later determine if payload should be forwarded + emitter.ExpectOnce(engine.ForkchoiceRequestEvent{}) + cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) + + // Engine signals, CLSync sends the payload + emitter.ExpectOnce(engine.ProcessUnsafePayloadEvent{Envelope: payloadA1}) + cl.OnEvent(engine.ForkchoiceUpdateEvent{ + UnsafeL2Head: refA0, + SafeL2Head: refA0, + FinalizedL2Head: refA0, + }) + emitter.AssertExpectations(t) + + // Pretend the payload is bad. It should not be retried after this. + cl.OnEvent(engine.InvalidPayloadEvent{Envelope: payloadA1}) + emitter.AssertExpectations(t) require.Nil(t, cl.unsafePayloads.Peek(), "pop because invalid") }) } diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 0865685c84145..1701387c18c17 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -77,8 +77,6 @@ type EngineController interface { type CLSync interface { LowestQueuedUnsafeBlock() eth.L2BlockRef - AddUnsafePayload(payload *eth.ExecutionPayloadEnvelope) - Proceed(ctx context.Context) error } type AttributesHandler interface { @@ -173,13 +171,17 @@ func NewDriver( sequencerConductor conductor.SequencerConductor, plasma PlasmaIface, ) *Driver { + driverCtx, driverCancel := context.WithCancel(context.Background()) + rootDeriver := &rollup.SynchronousDerivers{} + synchronousEvents := rollup.NewSynchronousEvents(log, driverCtx, rootDeriver) + l1 = NewMeteredL1Fetcher(l1, metrics) l1State := NewL1State(log, metrics) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) - ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode) - clSync := clsync.NewCLSync(log, cfg, metrics, ec) + ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents) + clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents) var finalizer Finalizer if cfg.PlasmaEnabled() { @@ -193,12 +195,8 @@ func NewDriver( attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) meteredEngine := NewMeteredEngine(cfg, ec, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) - driverCtx, driverCancel := context.WithCancel(context.Background()) asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics) - rootDeriver := &rollup.SynchronousDerivers{} - synchronousEvents := NewSynchronousEvents(log, driverCtx, rootDeriver) - syncDeriver := &SyncDeriver{ Derivation: derivationPipeline, Finalizer: finalizer, @@ -251,6 +249,7 @@ func NewDriver( engDeriv, schedDeriv, driver, + clSync, } return driver diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index b933096abc2ea..93b78f875f41c 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/async" + "github.com/ethereum-optimism/optimism/op-node/rollup/clsync" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/engine" @@ -40,7 +41,7 @@ type Driver struct { sched *StepSchedulingDeriver - synchronousEvents *SynchronousEvents + synchronousEvents *rollup.SynchronousEvents // Requests to block the event loop for synchronous execution to avoid reading an inconsistent state stateReq chan chan struct{} @@ -287,7 +288,7 @@ func (s *Driver) eventLoop() { // If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync. if s.SyncCfg.SyncMode == sync.CLSync || !s.Engine.IsEngineSyncing() { s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID()) - s.CLSync.AddUnsafePayload(envelope) + s.Emitter.Emit(clsync.ReceivedUnsafePayloadEvent{Envelope: envelope}) s.metrics.RecordReceivedUnsafePayload(envelope) reqStep() } else if s.SyncCfg.SyncMode == sync.ELSync { @@ -509,12 +510,8 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error { return derive.EngineELSyncing } - // Trying unsafe payload should be done before safe attributes - // It allows the unsafe head to move forward while the long-range consolidation is in progress. - if err := s.CLSync.Proceed(ctx); err != io.EOF { - // EOF error means we can't process the next unsafe payload. Then we should process next safe attributes. - return err - } + // Any now processed forkchoice updates will trigger CL-sync payload processing, if any payload is queued up. + // Try safe attributes now. if err := s.AttributesHandler.Proceed(ctx); err != io.EOF { // EOF error means we can't process the next attributes. Then we should derive the next attributes. diff --git a/op-node/rollup/engine/engine_controller.go b/op-node/rollup/engine/engine_controller.go index 6e382cc367f2a..c24b75f6c3fe3 100644 --- a/op-node/rollup/engine/engine_controller.go +++ b/op-node/rollup/engine/engine_controller.go @@ -54,6 +54,8 @@ type EngineController struct { elStart time.Time clock clock.Clock + emitter rollup.EventEmitter + // Block Head State unsafeHead eth.L2BlockRef pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet. @@ -75,7 +77,8 @@ type EngineController struct { safeAttrs *derive.AttributesWithParent } -func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics, rollupCfg *rollup.Config, syncMode sync.Mode) *EngineController { +func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics, + rollupCfg *rollup.Config, syncMode sync.Mode, emitter rollup.EventEmitter) *EngineController { syncStatus := syncStatusCL if syncMode == sync.ELSync { syncStatus = syncStatusWillStartEL @@ -90,6 +93,7 @@ func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metri syncMode: syncMode, syncStatus: syncStatus, clock: clock.SystemClock, + emitter: emitter, } } @@ -224,6 +228,11 @@ func (e *EngineController) StartPayload(ctx context.Context, parent eth.L2BlockR if err != nil { return errTyp, err } + e.emitter.Emit(ForkchoiceUpdateEvent{ + UnsafeL2Head: parent, + SafeL2Head: e.safeHead, + FinalizedL2Head: e.finalizedHead, + }) e.buildingInfo = eth.PayloadInfo{ID: id, Timestamp: uint64(attrs.Attributes.Timestamp)} e.buildingSafe = updateSafe @@ -257,6 +266,8 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.IsLastInSpan envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor) if err != nil { + // TODO would be nice to return proper payload error here + e.emitter.Emit(InvalidPayloadEvent{}) return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err) } ref, err := derive.PayloadToBlockRef(e.rollupCfg, envelope.ExecutionPayload) @@ -280,6 +291,11 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy e.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false) } } + e.emitter.Emit(ForkchoiceUpdateEvent{ + UnsafeL2Head: e.unsafeHead, + SafeL2Head: e.safeHead, + FinalizedL2Head: e.finalizedHead, + }) e.resetBuildingState() return envelope, BlockInsertOK, nil @@ -353,7 +369,7 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { } logFn := e.logSyncProgressMaybe() defer logFn() - _, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) + fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil) if err != nil { var inputErr eth.InputError if errors.As(err, &inputErr) { @@ -367,6 +383,13 @@ func (e *EngineController) TryUpdateEngine(ctx context.Context) error { return derive.NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err)) } } + if fcRes.PayloadStatus.Status == eth.ExecutionValid { + e.emitter.Emit(ForkchoiceUpdateEvent{ + UnsafeL2Head: e.unsafeHead, + SafeL2Head: e.safeHead, + FinalizedL2Head: e.finalizedHead, + }) + } e.needFCUCall = false return nil } @@ -393,6 +416,9 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et if err != nil { return derive.NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) } + if status.Status == eth.ExecutionInvalid { + e.emitter.Emit(InvalidPayloadEvent{Envelope: envelope}) + } if !e.checkNewPayloadStatus(status.Status) { payload := envelope.ExecutionPayload return derive.NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w", @@ -440,6 +466,14 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et e.syncStatus = syncStatusFinishedEL } + if fcRes.PayloadStatus.Status == eth.ExecutionValid { + e.emitter.Emit(ForkchoiceUpdateEvent{ + UnsafeL2Head: e.unsafeHead, + SafeL2Head: e.safeHead, + FinalizedL2Head: e.finalizedHead, + }) + } + return nil } @@ -501,6 +535,11 @@ func (e *EngineController) TryBackupUnsafeReorg(ctx context.Context) (bool, erro } } if fcRes.PayloadStatus.Status == eth.ExecutionValid { + e.emitter.Emit(ForkchoiceUpdateEvent{ + UnsafeL2Head: e.backupUnsafeHead, + SafeL2Head: e.safeHead, + FinalizedL2Head: e.finalizedHead, + }) // Execution engine accepted the reorg. e.log.Info("successfully reorged unsafe head using backupUnsafe", "unsafe", e.backupUnsafeHead.ID()) e.SetUnsafeHead(e.BackupUnsafeL2Head()) diff --git a/op-node/rollup/engine/events.go b/op-node/rollup/engine/events.go index 7a26abcae8b26..02dcbc40e332a 100644 --- a/op-node/rollup/engine/events.go +++ b/op-node/rollup/engine/events.go @@ -9,8 +9,44 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-service/eth" ) +type InvalidPayloadEvent struct { + Envelope *eth.ExecutionPayloadEnvelope +} + +func (ev InvalidPayloadEvent) String() string { + return "invalid-payload" +} + +// ForkchoiceRequestEvent signals to the engine that it should emit an artificial +// forkchoice-update event, to signal the latest forkchoice to other derivers. +// This helps decouple derivers from the actual engine state, +// while also not making the derivers wait for a forkchoice update at random. +type ForkchoiceRequestEvent struct { +} + +func (ev ForkchoiceRequestEvent) String() string { + return "forkchoice-request" +} + +type ForkchoiceUpdateEvent struct { + UnsafeL2Head, SafeL2Head, FinalizedL2Head eth.L2BlockRef +} + +func (ev ForkchoiceUpdateEvent) String() string { + return "forkchoice-update" +} + +type ProcessUnsafePayloadEvent struct { + Envelope *eth.ExecutionPayloadEnvelope +} + +func (ev ProcessUnsafePayloadEvent) String() string { + return "process-unsafe-payload" +} + type TryBackupUnsafeReorgEvent struct { } @@ -47,7 +83,7 @@ func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config, } func (d *EngDeriver) OnEvent(ev rollup.Event) { - switch ev.(type) { + switch x := ev.(type) { case TryBackupUnsafeReorgEvent: // If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c // this was a no-op(except correcting invalid state when backupUnsafe is empty but TryBackupUnsafeReorg called). @@ -81,5 +117,34 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) { d.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unexpected TryUpdateEngine error type: %w", err)}) } } + case ProcessUnsafePayloadEvent: + ref, err := derive.PayloadToBlockRef(d.cfg, x.Envelope.ExecutionPayload) + if err != nil { + d.log.Error("failed to decode L2 block ref from payload", "err", err) + return + } + if err := d.ec.InsertUnsafePayload(d.ctx, x.Envelope, ref); err != nil { + d.log.Info("failed to insert payload", "ref", ref, + "txs", len(x.Envelope.ExecutionPayload.Transactions), "err", err) + // yes, duplicate error-handling. After all derivers are interacting with the engine + // through events, we can drop the engine-controller interface: + // unify the events handler with the engine-controller, + // remove a lot of code, and not do this error translation. + if errors.Is(err, derive.ErrReset) { + d.emitter.Emit(rollup.ResetEvent{Err: err}) + } else if errors.Is(err, derive.ErrTemporary) { + d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err}) + } else { + d.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unexpected InsertUnsafePayload error type: %w", err)}) + } + } else { + d.log.Info("successfully processed payload", "ref", ref, "txs", len(x.Envelope.ExecutionPayload.Transactions)) + } + case ForkchoiceRequestEvent: + d.emitter.Emit(ForkchoiceUpdateEvent{ + UnsafeL2Head: d.ec.UnsafeL2Head(), + SafeL2Head: d.ec.SafeL2Head(), + FinalizedL2Head: d.ec.Finalized(), + }) } } diff --git a/op-node/rollup/events.go b/op-node/rollup/events.go index d973afdd59ef0..037c68dbacc86 100644 --- a/op-node/rollup/events.go +++ b/op-node/rollup/events.go @@ -80,3 +80,7 @@ type DeriverFunc func(ev Event) func (fn DeriverFunc) OnEvent(ev Event) { fn(ev) } + +type NoopEmitter struct{} + +func (e NoopEmitter) Emit(ev Event) {} diff --git a/op-node/rollup/driver/synchronous.go b/op-node/rollup/synchronous.go similarity index 86% rename from op-node/rollup/driver/synchronous.go rename to op-node/rollup/synchronous.go index fa752cab55102..2fd85a4178631 100644 --- a/op-node/rollup/driver/synchronous.go +++ b/op-node/rollup/synchronous.go @@ -1,12 +1,10 @@ -package driver +package rollup import ( "context" "sync" "github.com/ethereum/go-ethereum/log" - - "github.com/ethereum-optimism/optimism/op-node/rollup" ) // Don't queue up an endless number of events. @@ -23,16 +21,16 @@ type SynchronousEvents struct { // if this util is used in a concurrent context. evLock sync.Mutex - events []rollup.Event + events []Event log log.Logger ctx context.Context - root rollup.Deriver + root Deriver } -func NewSynchronousEvents(log log.Logger, ctx context.Context, root rollup.Deriver) *SynchronousEvents { +func NewSynchronousEvents(log log.Logger, ctx context.Context, root Deriver) *SynchronousEvents { return &SynchronousEvents{ log: log, ctx: ctx, @@ -40,7 +38,7 @@ func NewSynchronousEvents(log log.Logger, ctx context.Context, root rollup.Deriv } } -func (s *SynchronousEvents) Emit(event rollup.Event) { +func (s *SynchronousEvents) Emit(event Event) { s.evLock.Lock() defer s.evLock.Unlock() @@ -75,4 +73,4 @@ func (s *SynchronousEvents) Drain() error { } } -var _ rollup.EventEmitter = (*SynchronousEvents)(nil) +var _ EventEmitter = (*SynchronousEvents)(nil) diff --git a/op-node/rollup/driver/synchronous_test.go b/op-node/rollup/synchronous_test.go similarity index 86% rename from op-node/rollup/driver/synchronous_test.go rename to op-node/rollup/synchronous_test.go index bc8732fe9eb8a..191b4f4246a20 100644 --- a/op-node/rollup/driver/synchronous_test.go +++ b/op-node/rollup/synchronous_test.go @@ -1,4 +1,4 @@ -package driver +package rollup import ( "context" @@ -8,21 +8,14 @@ import ( "github.com/ethereum/go-ethereum/log" - "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/testlog" ) -type TestEvent struct{} - -func (ev TestEvent) String() string { - return "X" -} - func TestSynchronousEvents(t *testing.T) { logger := testlog.Logger(t, log.LevelError) ctx, cancel := context.WithCancel(context.Background()) count := 0 - deriver := rollup.DeriverFunc(func(ev rollup.Event) { + deriver := DeriverFunc(func(ev Event) { count += 1 }) syncEv := NewSynchronousEvents(logger, ctx, deriver) @@ -48,7 +41,7 @@ func TestSynchronousEvents(t *testing.T) { func TestSynchronousEventsSanityLimit(t *testing.T) { logger := testlog.Logger(t, log.LevelError) count := 0 - deriver := rollup.DeriverFunc(func(ev rollup.Event) { + deriver := DeriverFunc(func(ev Event) { count += 1 }) syncEv := NewSynchronousEvents(logger, context.Background(), deriver) @@ -74,9 +67,9 @@ func (ev CyclicEvent) String() string { func TestSynchronousCyclic(t *testing.T) { logger := testlog.Logger(t, log.LevelError) - var emitter rollup.EventEmitter + var emitter EventEmitter result := false - deriver := rollup.DeriverFunc(func(ev rollup.Event) { + deriver := DeriverFunc(func(ev Event) { logger.Info("received event", "event", ev) switch x := ev.(type) { case CyclicEvent: diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index f533865e96dc5..7d2235c3a45e0 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -101,7 +101,7 @@ type Driver struct { } func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { - engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) + engine := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync, rollup.NoopEmitter{}) attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source) syncCfg := &sync.Config{SyncMode: sync.CLSync} pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics) diff --git a/op-service/testutils/mock_emitter.go b/op-service/testutils/mock_emitter.go new file mode 100644 index 0000000000000..e808c651053e8 --- /dev/null +++ b/op-service/testutils/mock_emitter.go @@ -0,0 +1,25 @@ +package testutils + +import ( + "github.com/stretchr/testify/mock" + + "github.com/ethereum-optimism/optimism/op-node/rollup" +) + +type MockEmitter struct { + mock.Mock +} + +func (m *MockEmitter) Emit(ev rollup.Event) { + m.Mock.MethodCalled("Emit", ev) +} + +func (m *MockEmitter) ExpectOnce(expected rollup.Event) { + m.Mock.On("Emit", expected).Once() +} + +func (m *MockEmitter) AssertExpectations(t mock.TestingT) { + m.Mock.AssertExpectations(t) +} + +var _ rollup.EventEmitter = (*MockEmitter)(nil)