Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: extract state pruning, block fetching and progress checking into a single pure function #13060

Merged
merged 56 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
cee17f7
remove lastStoredBlock and lastL1Tip from BatchSubmitter state
geoknee Nov 19, 2024
f86b9f9
change log line wording
geoknee Nov 19, 2024
a8fc922
fix typo
geoknee Nov 19, 2024
f81b505
remove unecessary method
geoknee Nov 21, 2024
434e559
WIP first pass at computeSyncActions
geoknee Nov 22, 2024
de4615c
computeSyncAction takes a ChannelStatuser interface
geoknee Nov 22, 2024
1e312d7
add happy path test case
geoknee Nov 22, 2024
24bdd1a
Merge branch 'gk/new-batcher-fix' into gk/compute-sync-actions
geoknee Nov 22, 2024
2a8081e
clearState is a pointer
geoknee Nov 22, 2024
8fab0c3
add more test cases
geoknee Nov 22, 2024
6859ff3
add another test case
geoknee Nov 22, 2024
8c7d76f
computeSyncActions only takes prevCurrentL1, not prevSyncStatus
geoknee Nov 22, 2024
e762352
add batcher restart case
geoknee Nov 22, 2024
7792f4a
safe chain reorg case
geoknee Nov 22, 2024
c705cdd
failed to make progress case
geoknee Nov 22, 2024
e3c0500
simplify log messages, print entire struct
geoknee Nov 22, 2024
0858079
add godoc
geoknee Nov 22, 2024
a5c5f6a
wire up computeSyncActions
geoknee Nov 22, 2024
a11d636
cache prevCurrentL1 on BatchSubmitter
geoknee Nov 22, 2024
1ff2a4d
document stages
geoknee Nov 22, 2024
9e9da9c
fix loadBlocksIntoState range interpretation
geoknee Nov 22, 2024
1615217
pass syncStatus, not pointer to syncStatus and add test case for no p…
geoknee Nov 22, 2024
8296dfe
check unsafe status before trying to get more blocks
geoknee Nov 22, 2024
ba378da
do not panic on invalid block ranges
geoknee Nov 25, 2024
09fc152
Merge remote-tracking branch 'origin/develop' into gk/compute-sync-ac…
geoknee Dec 2, 2024
b8a43b5
test: add assetions and mock data about blockID passed to clearState
geoknee Dec 2, 2024
ba62afe
add readme section on max channel duration
geoknee Dec 2, 2024
6d41fab
add back unit tests for pruning methods
geoknee Dec 2, 2024
d5afeff
fix pruneBlocks behaviour when blockCursor pointed at block which is …
geoknee Dec 2, 2024
4627f94
rename waitForNodeSync to sequencerOutOfSync
geoknee Dec 2, 2024
47578b2
Introduce SeqOutOfSyncError
geoknee Dec 2, 2024
0b22890
move SyncActions code to a separate file
geoknee Dec 4, 2024
c891505
ChannelStatuser -> channelStatuser
geoknee Dec 4, 2024
1ed9fff
SeqOutOfSyncError -> ErrSeqOutOfSync
geoknee Dec 4, 2024
e5dee34
move ctx to first position in fn signature
geoknee Dec 4, 2024
eca021c
do not update cached prevCurrentL1 value if there is an ErrSeqOutOfSync
geoknee Dec 4, 2024
b3337fb
Always warn log when computeSyncActions returns an error
geoknee Dec 4, 2024
4449b07
move sync actions test to separate file
geoknee Dec 4, 2024
8a0824c
computeSyncActions returns a bool, not an error
geoknee Dec 4, 2024
337a4ec
SyncActions -> syncActions
geoknee Dec 4, 2024
4a2cf69
define local variables to aid readability
geoknee Dec 4, 2024
8892aa5
organise computeSyncActions and introduce startAfresh syncAction
geoknee Dec 4, 2024
fa5fd37
undo changes to submodule
geoknee Dec 4, 2024
336f94f
move test utils to sync_actions_test.go file
geoknee Dec 4, 2024
a413728
ensure pruneChannels clears currentChannel when appropriate
geoknee Dec 4, 2024
9aefad7
fix submodule"
geoknee Dec 4, 2024
28fc914
don't try to get number of block if none exists
geoknee Dec 4, 2024
f4f766b
improve log
geoknee Dec 4, 2024
27c4fb2
Update op-batcher/batcher/driver.go
geoknee Dec 5, 2024
f588a00
use struct for block range, not array
geoknee Dec 5, 2024
c534f92
use startAfresh in one more place
geoknee Dec 5, 2024
4df01e1
add test case for multiple channels
geoknee Dec 5, 2024
b1f5fe7
print value of *struct in Stringer
geoknee Dec 5, 2024
95ebb10
add test case when there are no blocks in state
geoknee Dec 5, 2024
4766dcf
Update op-batcher/batcher/sync_actions.go
geoknee Dec 5, 2024
b069105
tighten up log messages and test descriptions
geoknee Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,7 @@ func (c *channel) OldestL2() eth.BlockID {
func (c *channel) Close() {
c.channelBuilder.Close()
}

func (c *channel) MaxInclusionBlock() uint64 {
return c.maxInclusionBlock
}
80 changes: 16 additions & 64 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,78 +464,30 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo

var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")

// pruneSafeBlocks dequeues blocks from the internal blocks queue
// if they have now become safe.
func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) {
oldestBlock, ok := s.blocks.Peek()
// pruneSafeBlocks dequeues the provided number of blocks from the internal blocks queue
func (s *channelManager) pruneSafeBlocks(num int) {
_, ok := s.blocks.DequeueN(int(num))
if !ok {
// no blocks to prune
return
panic("tried to prune more blocks than available")
}

if newSafeHead.Number+1 == oldestBlock.NumberU64() {
// no blocks to prune
return
}

if newSafeHead.Number+1 < oldestBlock.NumberU64() {
// This could happen if there was an L1 reorg.
// Or if the sequencer restarted.
s.log.Warn("safe head reversed, clearing channel manager state",
"oldestBlock", eth.ToBlockID(oldestBlock),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}

numBlocksToDequeue := newSafeHead.Number + 1 - oldestBlock.NumberU64()

if numBlocksToDequeue > uint64(s.blocks.Len()) {
// This could happen if the batcher restarted.
// The sequencer may have derived the safe chain
// from channels sent by a previous batcher instance.
s.log.Warn("safe head above unsafe head, clearing channel manager state",
"unsafeBlock", eth.ToBlockID(s.blocks[s.blocks.Len()-1]),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}

if s.blocks[numBlocksToDequeue-1].Hash() != newSafeHead.Hash {
s.log.Warn("safe chain reorg, clearing channel manager state",
"existingBlock", eth.ToBlockID(s.blocks[numBlocksToDequeue-1]),
"newSafeBlock", newSafeHead)
// We should restart work from the new safe head,
// and therefore prune all the blocks.
s.Clear(newSafeHead.L1Origin)
return
}

// This shouldn't return an error because
// We already checked numBlocksToDequeue <= s.blocks.Len()
_, _ = s.blocks.DequeueN(int(numBlocksToDequeue))
s.blockCursor -= int(numBlocksToDequeue)

s.blockCursor -= int(num)
if s.blockCursor < 0 {
panic("negative blockCursor")
s.blockCursor = 0
}
}

// pruneChannels dequeues channels from the internal channels queue
// if they were built using blocks which are now safe
func (s *channelManager) pruneChannels(newSafeHead eth.L2BlockRef) {
i := 0
for _, ch := range s.channelQueue {
if ch.LatestL2().Number > newSafeHead.Number {
break
// pruneChannels dequeues the provided number of channels from the internal channels queue
func (s *channelManager) pruneChannels(num int) {
clearCurrentChannel := false
for i := 0; i < num; i++ {
if s.channelQueue[i] == s.currentChannel {
clearCurrentChannel = true
}
i++
}
s.channelQueue = s.channelQueue[i:]
s.channelQueue = s.channelQueue[num:]
if clearCurrentChannel {
s.currentChannel = nil
}
}

// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
Expand Down
259 changes: 141 additions & 118 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,12 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) {
}

func TestChannelManager_PruneBlocks(t *testing.T) {
l := testlog.Logger(t, log.LevelDebug)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

cfg.InitNoneCompressor()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil)
b := types.NewBlock(&types.Header{ // This will shortly become the safe head
b := types.NewBlock(&types.Header{
Number: big.NewInt(1),
ParentHash: a.Hash(),
}, nil, nil, nil)
Expand All @@ -479,132 +477,157 @@ func TestChannelManager_PruneBlocks(t *testing.T) {
ParentHash: b.Hash(),
}, nil, nil, nil)

require.NoError(t, m.AddL2Block(a))
m.blockCursor += 1
require.NoError(t, m.AddL2Block(b))
m.blockCursor += 1
require.NoError(t, m.AddL2Block(c))
m.blockCursor += 1

// Normal path
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: b.Hash(),
Number: b.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks)

// Safe chain didn't move, nothing to prune
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: b.Hash(),
Number: b.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks)

// Safe chain moved beyond the blocks we had
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: c.Hash(),
Number: uint64(99),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)

// No blocks to prune, NOOP
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: c.Hash(),
Number: c.NumberU64(),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)

// Put another block in
d := types.NewBlock(&types.Header{
Number: big.NewInt(3),
ParentHash: c.Hash(),
}, nil, nil, nil)
require.NoError(t, m.AddL2Block(d))
m.blockCursor += 1

// Safe chain reorg
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: a.Hash(),
Number: uint64(3),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)

// Put another block in
require.NoError(t, m.AddL2Block(d))
m.blockCursor += 1
type testCase struct {
name string
initialQ queue.Queue[*types.Block]
initialBlockCursor int
numChannelsToPrune int
expectedQ queue.Queue[*types.Block]
expectedBlockCursor int
}

// Safe chain reversed
// state should be cleared
m.pruneSafeBlocks(eth.L2BlockRef{
Hash: a.Hash(), // unused
Number: uint64(1),
})
require.Equal(t, queue.Queue[*types.Block]{}, m.blocks)
for _, tc := range []testCase{
{
name: "[A,B,C]*+1->[B,C]*", // * denotes the cursor
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 2,
},
{
name: "[A,B,C*]+1->[B,C*]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 2,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 1,
},
{
name: "[A,B,C]*+2->[C]*",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 2,
expectedQ: queue.Queue[*types.Block]{c},
expectedBlockCursor: 1,
},
{
name: "[A,B,C*]+2->[C*]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 2,
numChannelsToPrune: 2,
expectedQ: queue.Queue[*types.Block]{c},
expectedBlockCursor: 0,
},
{
name: "[A*,B,C]+1->[B*,C]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 0,
numChannelsToPrune: 1,
expectedQ: queue.Queue[*types.Block]{b, c},
expectedBlockCursor: 0,
},
{
name: "[A,B,C]+3->[]",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 3,
expectedQ: queue.Queue[*types.Block]{},
expectedBlockCursor: 0,
},
{
name: "[A,B,C]*+4->panic",
initialQ: queue.Queue[*types.Block]{a, b, c},
initialBlockCursor: 3,
numChannelsToPrune: 4,
expectedQ: nil, // declare that the prune method should panic
expectedBlockCursor: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.blocks = tc.initialQ
m.blockCursor = tc.initialBlockCursor
if tc.expectedQ != nil {
m.pruneSafeBlocks(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.blocks)
} else {
require.Panics(t, func() { m.pruneSafeBlocks(tc.numChannelsToPrune) })
}
})
}

}

func TestChannelManager_PruneChannels(t *testing.T) {
geoknee marked this conversation as resolved.
Show resolved Hide resolved
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
cfg.InitNoneCompressor()
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
B, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
C, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)

m.channelQueue = []*channel{A, B, C}

numTx := 1
rng := rand.New(rand.NewSource(123))
a0 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
a0 = a0.WithSeal(&types.Header{Number: big.NewInt(0)})
a1 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
a1 = a1.WithSeal(&types.Header{Number: big.NewInt(1)})
b2 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b2 = b2.WithSeal(&types.Header{Number: big.NewInt(2)})
b3 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b3 = b3.WithSeal(&types.Header{Number: big.NewInt(3)})
c4 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
c4 = c4.WithSeal(&types.Header{Number: big.NewInt(4)})

_, err = A.AddBlock(a0)
require.NoError(t, err)
_, err = A.AddBlock(a1)
require.NoError(t, err)

_, err = B.AddBlock(b2)
A, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)
_, err = B.AddBlock(b3)
B, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)

_, err = C.AddBlock(c4)
C, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0)
require.NoError(t, err)

m.pruneChannels(eth.L2BlockRef{
Number: uint64(3),
})

require.Equal(t, []*channel{C}, m.channelQueue)

m.pruneChannels(eth.L2BlockRef{
Number: uint64(4),
})

require.Equal(t, []*channel{}, m.channelQueue)

m.pruneChannels(eth.L2BlockRef{
Number: uint64(4),
})

require.Equal(t, []*channel{}, m.channelQueue)
type testCase struct {
name string
initialQ []*channel
initialCurrentChannel *channel
numChannelsToPrune int
expectedQ []*channel
expectedCurrentChannel *channel
}

for _, tc := range []testCase{
{
name: "[A,B,C]+1->[B,C]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 1,
expectedQ: []*channel{B, C},
},
{
name: "[A,B,C]+3->[] + currentChannel=C",
initialQ: []*channel{A, B, C},
initialCurrentChannel: C,
numChannelsToPrune: 3,
expectedQ: []*channel{},
expectedCurrentChannel: nil,
},
{
name: "[A,B,C]+2->[C]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 2,
expectedQ: []*channel{C},
},
{
name: "[A,B,C]+3->[]",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 3,
expectedQ: []*channel{},
},
{
name: "[A,B,C]+4->panic",
initialQ: []*channel{A, B, C},
numChannelsToPrune: 4,
expectedQ: nil, // declare that the prune method should panic
},
} {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.channelQueue = tc.initialQ
m.currentChannel = tc.initialCurrentChannel
if tc.expectedQ != nil {
m.pruneChannels(tc.numChannelsToPrune)
require.Equal(t, tc.expectedQ, m.channelQueue)
require.Equal(t, tc.expectedCurrentChannel, m.currentChannel)
} else {
require.Panics(t, func() { m.pruneChannels(tc.numChannelsToPrune) })
}
})
}
}

func TestChannelManager_ChannelOutFactory(t *testing.T) {
type ChannelOutWrapper struct {
derive.ChannelOut
Expand Down
Loading