Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcher: keep blocks, channels and frames in strict order & simplify reorg handling #12390

Merged
merged 71 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
75928f8
use a queue.Queue for channelBuilder.frames
geoknee Oct 3, 2024
40d5deb
remove pop and push terminology
geoknee Oct 3, 2024
9309689
proliferate queue.Queue type
geoknee Oct 3, 2024
762057b
simplify requeue method
geoknee Oct 3, 2024
74e53f6
undo changes to submodule
geoknee Oct 3, 2024
7a020d6
sketch out new arch
geoknee Oct 3, 2024
06b52bd
add TODO
geoknee Oct 3, 2024
c949c2a
add channelManager.pruneSafeBlocks method and integrate into main loop
geoknee Oct 4, 2024
d0e98f9
fix frameCursor semantics
geoknee Oct 4, 2024
431e681
fixup tests
geoknee Oct 4, 2024
fdd675f
avoid Rewind() in tests
geoknee Oct 4, 2024
0f4cc5c
only rewind cursor in rewind (never move it forward)
geoknee Oct 4, 2024
ea88715
fix assertions
geoknee Oct 4, 2024
5ed15db
prune channels whose blocks are now safe
geoknee Oct 4, 2024
2ea2dc8
handle case when rewinding a channel with no blocks
geoknee Oct 4, 2024
a33d34e
add clarification
geoknee Oct 4, 2024
aafd290
implement channelManager.pendinBlocks() method
geoknee Oct 7, 2024
22351ae
fix pruning logic
geoknee Oct 7, 2024
6717664
simplify pruneChannels
geoknee Oct 8, 2024
c463d53
simplify pruneSafeBlocks
geoknee Oct 8, 2024
c8121b3
add unit tests for pruneSafeBlocks
geoknee Oct 8, 2024
737a229
fix pruneSafeBlocks to avoid underflow
geoknee Oct 8, 2024
71901a2
improve test
geoknee Oct 8, 2024
cd6d19c
add unit tests for pruneChannels
geoknee Oct 8, 2024
867335c
introduce handleChannelTimeout
geoknee Oct 9, 2024
358c6a8
factor out channelManager.rewindToBlockWithHash
geoknee Oct 9, 2024
78d1f30
change test expectation
geoknee Oct 9, 2024
26f0040
do more pruning in test
geoknee Oct 9, 2024
b5c95e2
Replace "clean shutdown" behaviour with waitNodeSync()
geoknee Oct 9, 2024
a0b0e37
Add readme and architecture diagram
geoknee Oct 9, 2024
a09229e
don't panic when there is a safe chain reorg
geoknee Oct 9, 2024
fdc69c2
fix test
geoknee Oct 9, 2024
6655519
readability improvements
geoknee Oct 10, 2024
99a40de
only clear state after waiting for node to sync
geoknee Oct 10, 2024
d3c1d3b
resize image
geoknee Oct 10, 2024
a831127
tweak readme
geoknee Oct 10, 2024
7cd9c82
typo
geoknee Oct 25, 2024
a3678e2
rewindToBlockWithHash never moves cursor forward
geoknee Oct 30, 2024
7278337
use s.pendingBlocks()
geoknee Oct 30, 2024
a2fea59
add log line
geoknee Oct 30, 2024
73b3dcd
check there are blocks when handling timeout
geoknee Oct 30, 2024
4688f2d
rename HasFrame() to HasPendingFrame()
geoknee Oct 30, 2024
0a13a7c
Merge remote-tracking branch 'origin/develop' into gk/batcher-cursors…
geoknee Oct 30, 2024
ed49954
fixup test
geoknee Oct 30, 2024
616c96a
improve readme
geoknee Nov 5, 2024
0e356f2
link to open issues by tag
geoknee Nov 5, 2024
bb3c776
Merge remote-tracking branch 'origin/develop' into gk/batcher-cursors…
geoknee Nov 11, 2024
e2d6811
add log when main loop returns
geoknee Nov 11, 2024
c9635d7
pass blockID to rewindToBlock
geoknee Nov 11, 2024
eda390d
don't remove all channels when a channel times out
geoknee Nov 11, 2024
e1e8beb
use newSafeHead.L1Origin in Clear() when pruning blocks
geoknee Nov 12, 2024
b7e87d3
clarify comment
geoknee Nov 12, 2024
a60ffe2
use warn log level on safe chain reorg pruning, and unify handling fo…
geoknee Nov 12, 2024
da89ebc
update panic message
geoknee Nov 12, 2024
71065c6
extend test coverage and fix bug
geoknee Nov 12, 2024
fe7eb67
rename test blocks
geoknee Nov 12, 2024
81aa115
simplify HasPendingFrame() method
geoknee Nov 12, 2024
0ba74df
simplify implementation of RewindFrameCursor
geoknee Nov 12, 2024
23da037
activate dormant test
geoknee Nov 12, 2024
4cd021a
ensure pending_blocks_bytes_current metric is tracked properly
geoknee Nov 12, 2024
7a45ecd
cover metrics behaviour in test
geoknee Nov 12, 2024
f642cda
extend test coverage to channelManager.handleChannelTimeout
geoknee Nov 12, 2024
753ad8e
add comment to TxFailed
geoknee Nov 12, 2024
42727dc
rename test fn
geoknee Nov 14, 2024
869b745
point to e2e tests in readme.
geoknee Nov 14, 2024
9ceff07
readme: performance -> throughput
geoknee Nov 14, 2024
2d025e8
improve channel_manager_test to assert old channels are not affected …
geoknee Nov 14, 2024
74ed8b8
fix handleChannelTimeout behaviour
geoknee Nov 14, 2024
7b17b1c
tighten up requirements for invalidating a channel
geoknee Nov 14, 2024
62bd303
replace requeue with handleChannelInvalidated
geoknee Nov 14, 2024
941669e
Merge remote-tracking branch 'origin/develop' into gk/batcher-cursors…
geoknee Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added op-batcher/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
31 changes: 14 additions & 17 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *channel) TxFailed(id string) {
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
c.channelBuilder.PushFrames(data.Frames()...)
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
geoknee marked this conversation as resolved.
Show resolved Hide resolved
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
delete(c.pendingTransactions, id)
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
Expand All @@ -61,18 +61,16 @@ func (c *channel) TxFailed(id string) {
c.metr.RecordBatchTxFailed()
}

// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
// TxConfirmed marks a transaction as confirmed on L1. Returns a bool indicating
// whether the channel timed out on chain.
func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool {
c.metr.RecordBatchTxSubmitted()
c.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := c.pendingTransactions[id]; !ok {
c.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead
return false, nil
return false
}
delete(c.pendingTransactions, id)
c.confirmedTransactions[id] = inclusionBlock
Expand All @@ -82,21 +80,20 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number)
c.maxInclusionBlock = max(c.maxInclusionBlock, inclusionBlock.Number)

if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
}

// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if c.isTimedOut() {
c.metr.RecordChannelTimedOut(c.ID())
c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, c.channelBuilder.Blocks()
}
// If we are done with this channel, record that.
if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, nil
return true
}

return false, nil
return false
}

// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0.
Expand Down Expand Up @@ -136,7 +133,7 @@ func (c *channel) ID() derive.ChannelID {
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
for i := 0; i < nf && c.channelBuilder.HasFrame(); i++ {
for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}
Expand All @@ -151,7 +148,7 @@ func (c *channel) NextTxData() txData {
func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasFrame()
return c.channelBuilder.HasPendingFrame()
}
// Collect enough frames if channel is not full yet
return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx())
Expand Down
61 changes: 36 additions & 25 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum/go-ethereum/core/types"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ type ChannelBuilder struct {
// current channel
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
blocks queue.Queue[*types.Block]
// latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel
latestL1Origin eth.BlockID
// oldestL1Origin is the oldest L1 origin of all the L2 blocks that have been added to the channel
Expand All @@ -75,7 +76,12 @@ type ChannelBuilder struct {
// oldestL2 is the oldest L2 block of all the L2 blocks that have been added to the channel
oldestL2 eth.BlockID
// frames data queue, to be send as txs
frames []frameData
frames queue.Queue[frameData]
// frameCursor tracks which frames in the queue were submitted
// frames[frameCursor] is the next unsubmitted (pending) frame
// frameCursor = len(frames) is reserved for when
// there are no pending (next unsubmitted) frames
frameCursor int
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
// total frames counter
numFrames int
// total amount of output data of all frames created yet
Expand Down Expand Up @@ -190,7 +196,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return l1info, fmt.Errorf("adding block to channel out: %w", err)
}

c.blocks = append(c.blocks, block)
c.blocks.Enqueue(block)
vdamle marked this conversation as resolved.
Show resolved Hide resolved
c.updateSwTimeout(l1info.Number)

if l1info.Number > c.latestL1Origin.Number {
Expand Down Expand Up @@ -312,11 +318,11 @@ func (c *ChannelBuilder) setFullErr(err error) {
}

// OutputFrames creates new frames with the channel out. It should be called
// after AddBlock and before iterating over available frames with HasFrame and
// after AddBlock and before iterating over pending frames with HasFrame and
// NextFrame.
//
// If the channel isn't full yet, it will conservatively only
// pull readily available frames from the compression output.
// pull pending frames from the compression output.
// If it is full, the channel is closed and all remaining
// frames will be created, possibly with a small leftover frame.
func (c *ChannelBuilder) OutputFrames() error {
Expand Down Expand Up @@ -387,7 +393,7 @@ func (c *ChannelBuilder) outputFrame() error {
id: frameID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(),
}
c.frames = append(c.frames, frame)
c.frames.Enqueue(frame)
c.numFrames++
c.outputBytes += len(frame.data)
return err // possibly io.EOF (last frame)
Expand All @@ -402,46 +408,51 @@ func (c *ChannelBuilder) Close() {
}

// TotalFrames returns the total number of frames that were created in this channel so far.
// It does not decrease when the frames queue is being emptied.
func (c *ChannelBuilder) TotalFrames() int {
return c.numFrames
}

// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
// HasPendingFrame returns whether there's any pending frame. If true, it can be
// dequeued using NextFrame().
//
// Call OutputFrames before to create new frames from the channel out
// compression pipeline.
func (c *ChannelBuilder) HasFrame() bool {
return len(c.frames) > 0
func (c *ChannelBuilder) HasPendingFrame() bool {
return c.frames.Len() > 0 && c.frameCursor < c.frames.Len()
geoknee marked this conversation as resolved.
Show resolved Hide resolved
}

// PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrame() returns true.
// It is larger than zero iff HasFrame() returns true.
func (c *ChannelBuilder) PendingFrames() int {
return len(c.frames)
return c.frames.Len() - c.frameCursor
}

// NextFrame dequeues the next available frame.
// HasFrame must be called prior to check if there's a next frame available.
// NextFrame returns the next pending frame and increments the frameCursor
// HasFrame must be called prior to check if there a next pending frame exists.
// Panics if called when there's no next frame.
func (c *ChannelBuilder) NextFrame() frameData {
if len(c.frames) == 0 {
if len(c.frames) <= c.frameCursor {
panic("no next frame")
vdamle marked this conversation as resolved.
Show resolved Hide resolved
}

f := c.frames[0]
c.frames = c.frames[1:]
f := c.frames[c.frameCursor]
c.frameCursor++
return f
}

// PushFrames adds the frames back to the internal frames queue. Panics if not of
// the same channel.
func (c *ChannelBuilder) PushFrames(frames ...frameData) {
for _, f := range frames {
if f.id.chID != c.ID() {
// RewindFrameCursor moves the frameCursor to point at the supplied frame
// only if it is ahead of it.
// Panics if the frame is not in this channel.
func (c *ChannelBuilder) RewindFrameCursor(frame frameData) {
sebastianst marked this conversation as resolved.
Show resolved Hide resolved
for i, f := range c.frames {
if f.id.chID != frame.id.chID {
panic("wrong channel")
}
c.frames = append(c.frames, f)
if f.id.frameNumber == frame.id.frameNumber {
if c.frameCursor > i {
c.frameCursor = i
}
return
}
}
panic("frame not found")
}
16 changes: 8 additions & 8 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
},
data: expectedBytes,
}
cb.PushFrames(frameData)
cb.frames = append(cb.frames, frameData)

// There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.PendingFrames())
Expand Down Expand Up @@ -385,7 +385,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {
},
data: buf.Bytes(),
}
cb.PushFrames(frame)
cb.RewindFrameCursor(frame)
})
}

Expand Down Expand Up @@ -625,11 +625,11 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {

require.NoError(cb.OutputFrames())

require.True(cb.HasFrame())
require.True(cb.HasPendingFrame())
f := cb.NextFrame()
require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame

require.False(cb.HasFrame(), "no leftover frame expected") // would fail without fix
require.False(cb.HasPendingFrame(), "no leftover frame expected") // would fail without fix
}

func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
Expand All @@ -656,8 +656,8 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
expectedInputBytes = 47
}
require.Equal(t, expectedInputBytes, cb.co.InputBytes())
require.Equal(t, 1, len(cb.blocks))
require.Equal(t, 0, len(cb.frames))
require.Equal(t, 1, cb.blocks.Len())
require.Equal(t, 0, cb.frames.Len())
require.True(t, cb.IsFull())

// Since the channel output is full, the next call to AddBlock
Expand Down Expand Up @@ -858,7 +858,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {

// empty queue
for pf := nf - 1; pf >= 0; pf-- {
require.True(cb.HasFrame())
require.True(cb.HasPendingFrame())
_ = cb.NextFrame()
require.Equal(cb.PendingFrames(), pf)
require.Equal(cb.TotalFrames(), nf)
Expand Down Expand Up @@ -932,7 +932,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
require.Greater(cb.PendingFrames(), 1)

var flen int
for cb.HasFrame() {
for cb.HasPendingFrame() {
f := cb.NextFrame()
flen += len(f.data)
}
Expand Down
Loading