Skip to content

Commit

Permalink
op-batcher: fix channel duration timeout management (#12916)
Browse files Browse the repository at this point in the history
* op-batcher: fix channel duration timeout management

Previously, we would use L1 data to help track channel durations. For example, the batcher would be configured to post data every hour. We update a global state variable with the latest l1 origin of a channel when it closed, and compute the deadline for that channel using a duration delta starting at that l1 origin timestamp.

Since we changed the way autoDA switching works, a channel can be _closed_ (due to a duration timeout or other reason) and this will cause the l1 origin state variable to move forward, extending the deadline ready for the next channel. Crucially, with autoDA switching nowadays, the closed channel will not always be submitted on chain (it can be discarded and the blocks requeued). If it is discarded, the channel duration timeout has already been extended.

The fix for this is to update the global state variable at channel submission time, not channel closing time.

* add regression test for channel duration timeouts during requeue
  • Loading branch information
geoknee authored Nov 18, 2024
1 parent 7550853 commit 873b3e0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
27 changes: 14 additions & 13 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type channelManager struct {

// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The latest L1 block from all the L2 blocks in the most recently submitted channel.
// Used to track channel duration timeouts.
l1OriginLastSubmittedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection
Expand Down Expand Up @@ -75,12 +76,12 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {

// Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks.Clear()
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.l1OriginLastSubmittedChannel = l1OriginLastSubmittedChannel
s.tip = common.Hash{}
s.closed = false
s.currentChannel = nil
Expand Down Expand Up @@ -160,6 +161,12 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()

// update s.l1OriginLastSubmittedChannel so that the next
// channel's duration timeout will trigger properly
if channel.LatestL1Origin().Number > s.l1OriginLastSubmittedChannel.Number {
s.l1OriginLastSubmittedChannel = channel.LatestL1Origin()
}
s.txChannels[tx.ID().String()] = channel
return tx, nil
}
Expand Down Expand Up @@ -284,15 +291,15 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return fmt.Errorf("creating channel out: %w", err)
}

pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut)
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastSubmittedChannel.Number, channelOut)

s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)

s.log.Info("Created channel",
"id", pc.ID(),
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"l1OriginLastSubmittedChannel", s.l1OriginLastSubmittedChannel,
"blocks_pending", s.blocks.Len(),
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
Expand Down Expand Up @@ -374,11 +381,6 @@ func (s *channelManager) outputFrames() error {
return nil
}

lastClosedL1Origin := s.currentChannel.LatestL1Origin()
if lastClosedL1Origin.Number > s.l1OriginLastClosedChannel.Number {
s.l1OriginLastClosedChannel = lastClosedL1Origin
}

inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
Expand All @@ -401,12 +403,11 @@ func (s *channelManager) outputFrames() error {
"input_bytes", inBytes,
"output_bytes", outBytes,
"oldest_l1_origin", s.currentChannel.OldestL1Origin(),
"l1_origin", lastClosedL1Origin,
"l1_origin", s.currentChannel.LatestL1Origin(),
"oldest_l2", s.currentChannel.OldestL2(),
"latest_l2", s.currentChannel.LatestL2(),
"full_reason", s.currentChannel.FullErr(),
"compr_ratio", comprRatio,
"latest_l1_origin", s.l1OriginLastClosedChannel,
)
return nil
}
Expand Down
16 changes: 12 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {

// Channel Manager state should be empty by default
require.Empty(m.blocks)
require.Equal(eth.BlockID{}, m.l1OriginLastClosedChannel)
require.Equal(eth.BlockID{}, m.l1OriginLastSubmittedChannel)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
Expand Down Expand Up @@ -161,8 +161,8 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.NoError(m.outputFrames())
_, err := m.nextTxData(m.currentChannel)
require.NoError(err)
require.NotNil(m.l1OriginLastClosedChannel)
require.Len(m.blocks, 0)
require.NotNil(m.l1OriginLastSubmittedChannel)
require.Equal(newL1Tip, m.tip)
require.Len(m.currentChannel.pendingTransactions, 1)

Expand All @@ -184,7 +184,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {

// Check that the entire channel manager state cleared
require.Empty(m.blocks)
require.Equal(uint64(123), m.l1OriginLastClosedChannel.Number)
require.Equal(uint64(123), m.l1OriginLastSubmittedChannel.Number)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

m.l1OriginLastClosedChannel = test.safeL1Block
m.l1OriginLastSubmittedChannel = test.safeL1Block
require.Nil(t, m.currentChannel)

require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
Expand Down Expand Up @@ -639,13 +639,21 @@ func TestChannelManager_Requeue(t *testing.T) {
// Assert that at least one block was processed into the channel
require.NotContains(t, m.blocks, blockA)

l1OriginBeforeRequeue := m.l1OriginLastSubmittedChannel

// Call the function we are testing
m.Requeue(m.defaultCfg)

// Ensure we got back to the state above
require.Equal(t, m.blocks, stateSnapshot)
require.Empty(t, m.channelQueue)

// Ensure the l1OridingLastSubmittedChannel was
// not changed. This ensures the next channel
// has its duration timeout deadline computed
// properly.
require.Equal(t, l1OriginBeforeRequeue, m.l1OriginLastSubmittedChannel)

// Trigger the blocks -> channelQueue data pipelining again
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.NotEmpty(t, m.channelQueue)
Expand Down

0 comments on commit 873b3e0

Please sign in to comment.