Skip to content

Commit

Permalink
op-batcher: extract state pruning, block fetching and progress checki…
Browse files Browse the repository at this point in the history
…ng into a single pure function (#13060)

* remove lastStoredBlock and lastL1Tip from BatchSubmitter state

We can use the channelManager's state to infer lastStoredBlock. And lastL1Tip is actually unused.

* change log line wording

* fix typo

* remove unecessary method

* WIP first pass at computeSyncActions

* computeSyncAction takes a ChannelStatuser interface

also report fully inclusive range of blocks to load

* add happy path test case

* clearState is a pointer

we can use nil value to signal no state clearing should be performed

* add more test cases

* add another test case

* computeSyncActions only takes prevCurrentL1, not prevSyncStatus

* add batcher restart case

* safe chain reorg case

* failed to make progress case

* simplify log messages, print entire struct

* add godoc

* wire up computeSyncActions

* cache prevCurrentL1 on BatchSubmitter

* document stages

* fix loadBlocksIntoState range interpretation

* pass syncStatus, not pointer to syncStatus and add test case for no progress

* check unsafe status before trying to get more blocks

* do not panic on invalid block ranges

return an error instead. This error is ultimated swallowed, matching existing behaviour.

* test: add assetions and mock data about blockID passed to clearState

* add readme section on max channel duration

* add back unit tests for pruning methods

* fix pruneBlocks behaviour when blockCursor pointed at block which is now pruned

* rename waitForNodeSync to sequencerOutOfSync

* Introduce SeqOutOfSyncError

* move SyncActions code to a separate file

* ChannelStatuser -> channelStatuser

* SeqOutOfSyncError -> ErrSeqOutOfSync

* move ctx to first position in fn signature

* do not update cached prevCurrentL1 value if there is an ErrSeqOutOfSync

* Always warn log when computeSyncActions returns an error

* move sync actions test to separate file

* computeSyncActions returns a bool, not an error

There is only ever one kind of error returned

* SyncActions -> syncActions

* define local variables to aid readability

* organise computeSyncActions and introduce startAfresh syncAction

Add comments explaining logical flow: the checks get increasingly deep and we return early where possible.

* undo changes to submodule

* move test utils to sync_actions_test.go file

* ensure pruneChannels clears currentChannel when appropriate

* fix submodule"

* don't try to get number of block if none exists

* improve log

* Update op-batcher/batcher/driver.go

Co-authored-by: Sebastian Stammler <seb@oplabs.co>

* use struct for block range, not array

* use startAfresh in one more place

* add test case for multiple channels

also set HeadL1 to more realistic values (generally ahead of currentL1 due to nonzero confirmation depth)

* print value of *struct  in Stringer

* add test case when there are no blocks in state

* Update op-batcher/batcher/sync_actions.go

Co-authored-by: Sebastian Stammler <seb@oplabs.co>

* tighten up log messages and test descriptions

---------

Co-authored-by: Sebastian Stammler <seb@oplabs.co>
  • Loading branch information
2 people authored and sigma committed Dec 19, 2024
1 parent b9b1aa6 commit be033cd
Show file tree
Hide file tree
Showing 7 changed files with 591 additions and 242 deletions.
4 changes: 4 additions & 0 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,7 @@ func (c *channel) OldestL2() eth.BlockID {
func (c *channel) Close() {
c.channelBuilder.Close()
}

func (c *channel) MaxInclusionBlock() uint64 {
return c.maxInclusionBlock
}
80 changes: 16 additions & 64 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,78 +464,30 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo

var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")

// pruneSafeBlocks dequeues blocks from the internal blocks queue
// if they have now become safe.
func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) {
oldestBlock, ok := s.blocks.Peek()
// pruneSafeBlocks dequeues the provided number of blocks from the internal blocks queue
func (s *channelManager) pruneSafeBlocks(num int) {
_, ok := s.blocks.DequeueN(int(num))
if !ok {
// no blocks to prune
return
panic("tried to prune more blocks than available")
}

if newSafeHead.Number+1 == oldestBlock.NumberU64() {
// no blocks to prune
return
}

if newSafeHead.Number+1 < oldestBlock.NumberU64() {
// This could happen if there was an L1 reorg.
// Or if the sequencer restarted.
s.log.Warn("safe head reversed, clearing channel manager state",
"oldestBlock", eth.ToBlockID(oldestBlock),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}

numBlocksToDequeue := newSafeHead.Number + 1 - oldestBlock.NumberU64()

if numBlocksToDequeue > uint64(s.blocks.Len()) {
// This could happen if the batcher restarted.
// The sequencer may have derived the safe chain
// from channels sent by a previous batcher instance.
s.log.Warn("safe head above unsafe head, clearing channel manager state",
"unsafeBlock", eth.ToBlockID(s.blocks[s.blocks.Len()-1]),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}

if s.blocks[numBlocksToDequeue-1].Hash() != newSafeHead.Hash {
s.log.Warn("safe chain reorg, clearing channel manager state",
"existingBlock", eth.ToBlockID(s.blocks[numBlocksToDequeue-1]),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}

// This shouldn't return an error because
// We already checked numBlocksToDequeue <= s.blocks.Len()
_, _ = s.blocks.DequeueN(int(numBlocksToDequeue))
s.blockCursor -= int(numBlocksToDequeue)

s.blockCursor -= int(num)
if s.blockCursor < 0 {
panic("negative blockCursor")
s.blockCursor = 0
}
}

// pruneChannels dequeues channels from the internal channels queue
// if they were built using blocks which are now safe
func (s *channelManager) pruneChannels(newSafeHead eth.L2BlockRef) {
i := 0
for _, ch := range s.channelQueue {
if ch.LatestL2().Number > newSafeHead.Number {
break
// pruneChannels dequeues the provided number of channels from the internal channels queue
func (s *channelManager) pruneChannels(num int) {
clearCurrentChannel := false
for i := 0; i < num; i++ {
if s.channelQueue[i] == s.currentChannel {
clearCurrentChannel = true
}
i++
}
s.channelQueue = s.channelQueue[i:]
s.channelQueue = s.channelQueue[num:]
if clearCurrentChannel {
s.currentChannel = nil
}
}

// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
Expand Down
259 changes: 141 additions & 118 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,12 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) {
}

func TestChannelManager_PruneBlocks(t *testing.T) {
l := testlog.Logger(t, log.LevelDebug)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

cfg.InitNoneCompressor()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil)
b := types.NewBlock(&types.Header{ // This will shortly become the safe head
b := types.NewBlock(&types.Header{
Number: big.NewInt(1),
ParentHash: a.Hash(),
}, nil, nil, nil)
Expand All @@ -479,132 +477,157 @@ func TestChannelManager_PruneBlocks(t *testing.T) {
ParentHash: b.Hash(),
}, nil, nil, nil)

require.NoError(t, m.AddL2Block(a))
m.blockCursor += 1
require.NoError(t, m.AddL2Block(b))
m.blockCursor += 1
require.NoError(t, m.AddL2Block(c))
m.blockCursor += 1

// Normal path
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: b.Hash(),
Number: b.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks)

// Safe chain didn't move, nothing to prune
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: b.Hash(),
Number: b.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks)

// Safe chain moved beyond the blocks we had
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: c.Hash(),
Number: uint64(99),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)

// No blocks to prune, NOOP
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: c.Hash(),
Number: c.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)

// Put another block in
d := types.NewBlock(&types.Header{
Number: big.NewInt(3),
ParentHash: c.Hash(),
}, nil, nil, nil)
require.NoError(t, m.AddL2Block(d))
m.blockCursor += 1

// Safe chain reorg
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: a.Hash(),
Number: uint64(3),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)

// Put another block in
require.NoError(t, m.AddL2Block(d))
m.blockCursor += 1
type testCase struct {
name string
initialQ queue.Queue[*types.Block]
initialBlockCursor int
numChannelsToPrune int
expectedQ queue.Queue[*types.Block]
expectedBlockCursor int
}

// Safe chain reversed
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: a.Hash(), // unused
Number: uint64(1),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)
for _, tc := range []testCase{
{
name: "[A,B,C]*+1->[B,C]*", // * denotes the cursor
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 2,
},
{
name: "[A,B,C*]+1->[B,C*]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 2,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 1,
},
{
name: "[A,B,C]*+2->[C]*",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 2,
expectedQ: queue.Queue[*types.Block]{c},
expectedBlockCursor: 1,
},
{
name: "[A,B,C*]+2->[C*]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 2,
numChannelsToPrune: 2,
expectedQ: queue.Queue[*types.Block]{c},
expectedBlockCursor: 0,
},
{
name: "[A*,B,C]+1->[B*,C]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 0,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 0,
},
{
name: "[A,B,C]+3->[]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 3,
expectedQ: queue.Queue[*types.Block]{},
expectedBlockCursor: 0,
},
{
name: "[A,B,C]*+4->panic",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 4,
expectedQ: nil, // declare that the prune method should panic
expectedBlockCursor: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.blocks = tc.initialQ
m.blockCursor = tc.initialBlockCursor
if tc.expectedQ != nil {
m.pruneSafeBlocks(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.blocks)
} else {
require.Panics(t, func() { m.pruneSafeBlocks(tc.numChannelsToPrune) })
}
})
}

}

func TestChannelManager_PruneChannels(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
cfg.InitNoneCompressor()
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
B, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
C, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)

m.channelQueue = []*channel{A, B, C}

numTx := 1
rng := rand.New(rand.NewSource(123))
a0 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
a0 = a0.WithSeal(&types.Header{Number: big.NewInt(0)})
a1 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
a1 = a1.WithSeal(&types.Header{Number: big.NewInt(1)})
b2 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b2 = b2.WithSeal(&types.Header{Number: big.NewInt(2)})
b3 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b3 = b3.WithSeal(&types.Header{Number: big.NewInt(3)})
c4 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
c4 = c4.WithSeal(&types.Header{Number: big.NewInt(4)})

_, err = A.AddBlock(a0)
require.NoError(t, err)
_, err = A.AddBlock(a1)
require.NoError(t, err)

_, err = B.AddBlock(b2)
A, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)
_, err = B.AddBlock(b3)
B, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)

_, err = C.AddBlock(c4)
C, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)

m.pruneChannels(eth.L2BlockRef{
Number: uint64(3),
})

require.Equal(t, []*channel{C}, m.channelQueue)

m.pruneChannels(eth.L2BlockRef{
Number: uint64(4),
})

require.Equal(t, []*channel{}, m.channelQueue)

m.pruneChannels(eth.L2BlockRef{
Number: uint64(4),
})

require.Equal(t, []*channel{}, m.channelQueue)
type testCase struct {
name string
initialQ []*channel
initialCurrentChannel *channel
numChannelsToPrune int
expectedQ []*channel
expectedCurrentChannel *channel
}

for _, tc := range []testCase{
{
name: "[A,B,C]+1->[B,C]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 1,
expectedQ: []*channel{B, C},
},
{
name: "[A,B,C]+3->[] + currentChannel=C",
initialQ: []*channel{A, B, C},
initialCurrentChannel: C,
numChannelsToPrune: 3,
expectedQ: []*channel{},
expectedCurrentChannel: nil,
},
{
name: "[A,B,C]+2->[C]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 2,
expectedQ: []*channel{C},
},
{
name: "[A,B,C]+3->[]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 3,
expectedQ: []*channel{},
},
{
name: "[A,B,C]+4->panic",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 4,
expectedQ: nil, // declare that the prune method should panic
},
} {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.channelQueue = tc.initialQ
m.currentChannel = tc.initialCurrentChannel
if tc.expectedQ != nil {
m.pruneChannels(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.channelQueue)
require.Equal(t, tc.expectedCurrentChannel, m.currentChannel)
} else {
require.Panics(t, func() { m.pruneChannels(tc.numChannelsToPrune) })
}
})
}
}

func TestChannelManager_ChannelOutFactory(t *testing.T) {
type ChannelOutWrapper struct {
derive.ChannelOut
Expand Down
Loading

0 comments on commit be033cd

Please sign in to comment.