Skip to content

Commit

Permalink
Fjord: Add Brotli channel compression support (#10358)
Browse files Browse the repository at this point in the history
* wip

* wip

* fix

* fix

* fix

* fix

* address some of the bots comments

* use version bit of 1

* fix lint

* adding compression type

* update batch reader

* abstract span channel compressor

* test and singular batch compressor

* fix

* lint

* move channel compressor as interface

* add base class

* fix go mod

* test fixes

* address comments

* fix

* fix

* revert channel builder test

* revert ratio compressor test

* add checks to accept brotli only post fjord

* revemo unnecessary in test

* fix forge-std

* gofmt

* address comments

* remove methods in compressor

* fix error msg

* add compression algo flag to optional flags

* add Clone() function

---------

Co-authored-by: Roberto Bayardo <roberto.bayardo@coinbase.com>
cody-wang-cb and Roberto Bayardo authored May 13, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent ea52388 commit 4b8f6f4
Showing 32 changed files with 714 additions and 177 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@ go 1.21

require (
github.com/BurntSushi/toml v1.3.2
github.com/DataDog/zstd v1.5.2
github.com/andybalholm/brotli v1.1.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
@@ -55,7 +57,6 @@ require (
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
}
var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize)
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo)
} else {
co, err = derive.NewSingularChannelOut(c)
}
38 changes: 29 additions & 9 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
@@ -297,7 +297,6 @@ func TestChannelBuilderBatchType(t *testing.T) {
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
{"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic},
}
for _, test := range tests {
test := test
@@ -413,7 +412,8 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {

// Check how many ready bytes
require.Greater(t, uint64(cb.co.ReadyBytes()+derive.FrameV0OverHeadSize), channelConfig.MaxFrameSize)
require.Equal(t, 0, cb.PendingFrames())

require.Equal(t, 0, cb.PendingFrames()) // always 0 because non compressor

// The channel should not be full
// but we want to output the frames for testing anyways
@@ -430,11 +430,27 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
}

func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
} else {
ChannelBuilder_OutputFrames_SpanBatch(t, algo)
}
})
}
}

func ChannelBuilder_OutputFrames_SpanBatch(t *testing.T, algo derive.CompressionAlgo) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 5
if algo.IsBrotli() {
channelConfig.TargetNumFrames = 3
} else {
channelConfig.TargetNumFrames = 5
}
channelConfig.BatchType = derive.SpanBatchType
channelConfig.InitRatioCompressor(1)
channelConfig.InitRatioCompressor(1, algo)

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
@@ -453,6 +469,10 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for {
err = addMiniBlock(cb)
if err == nil {
if cb.IsFull() {
// this happens when the data exactly fills the channel
break
}
require.False(t, cb.IsFull())
// There should be no ready bytes until the channel is full
require.Equal(t, cb.co.ReadyBytes(), 0)
@@ -504,7 +524,7 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1
channelConfig.TargetNumFrames = math.MaxUint16 + 1
channelConfig.InitRatioCompressor(.1)
channelConfig.InitRatioCompressor(.1, derive.Zlib)
channelConfig.BatchType = batchType

rng := rand.New(rand.NewSource(123))
@@ -546,8 +566,8 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {
TargetNumFrames: 1,
BatchType: derive.SpanBatchType,
}
cfg.InitShadowCompressor()

cfg.InitShadowCompressor(derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)

@@ -577,7 +597,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 2
// Configure the Input Threshold params so we observe a full channel
channelConfig.InitRatioCompressor(1)
channelConfig.InitRatioCompressor(1, derive.Zlib)

// Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
@@ -700,7 +720,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = tnf
cfg.BatchType = batchType
cfg.InitShadowCompressor()
cfg.InitShadowCompressor(derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err)

@@ -782,7 +802,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16
cfg.BatchType = batchType
cfg.InitRatioCompressor(1.0)
cfg.InitRatioCompressor(1.0, derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err, "NewChannelBuilder")

13 changes: 7 additions & 6 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
@@ -53,25 +53,26 @@ type ChannelConfig struct {
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
// comprKind can be the empty string, in which case the default compressor will
// be used.
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) {
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo derive.CompressionAlgo) {
cc.CompressorConfig = compressor.Config{
// Compressor output size needs to account for frame encoding overhead
TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize),
ApproxComprRatio: approxComprRatio,
Kind: comprKind,
CompressionAlgo: compressionAlgo,
}
}

func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind)
func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind, compressionAlgo)
}

func (cc *ChannelConfig) InitShadowCompressor() {
cc.InitCompressorConfig(0, compressor.ShadowKind)
func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(0, compressor.ShadowKind, compressionAlgo)
}

func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind)
cc.InitCompressorConfig(0, compressor.NoneKind, derive.Zlib)
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_config_test.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ func defaultTestChannelConfig() ChannelConfig {
TargetNumFrames: 1,
BatchType: derive.SingularBatchType,
}
c.InitRatioCompressor(0.4)
c.InitRatioCompressor(0.4, derive.Zlib)
return c
}

4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig
TargetNumFrames: 1,
BatchType: batchType,
}
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, derive.Zlib)
return cfg
}

@@ -123,7 +123,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map
cfg.ChannelTimeout = 10
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, derive.Zlib)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)

// Channel Manager state should be empty by default
21 changes: 19 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"io"
"testing"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
@@ -29,6 +30,9 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{})
m.Clear(eth.BlockID{})

@@ -71,7 +75,9 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelManager_NextTxData tests the nextTxData function.
func TestChannelManager_NextTxData(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
}}, &rollup.Config{})
m.Clear(eth.BlockID{})

// Nil pending channel should return EOF
@@ -118,6 +124,9 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: false,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err)
chID := ch.ID()
@@ -156,6 +165,9 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: true,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err)
chID := ch.ID()
@@ -202,6 +214,9 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map
ChannelTimeout: 10,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{})
m.Clear(eth.BlockID{})

@@ -251,7 +266,9 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
}}, &rollup.Config{})
m.Clear(eth.BlockID{})

// Let's add a valid pending transaction to the channel
8 changes: 8 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
@@ -67,6 +68,9 @@ type CLIConfig struct {
// Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string

// Type of compression algorithm to use. Must be one of [zlib, brotli, brotli[9-11]]
CompressionAlgo derive.CompressionAlgo

// If Stopped is true, the batcher starts stopped and won't start batching right away.
// Batching needs to be started via an admin RPC.
Stopped bool
@@ -124,6 +128,9 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
}
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
}
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
@@ -168,6 +175,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
CompressionAlgo: derive.CompressionAlgo(ctx.String(flags.CompressionAlgoFlag.Name)),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
4 changes: 3 additions & 1 deletion op-batcher/batcher/config_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
@@ -35,7 +36,8 @@ func validBatcherConfig() batcher.CLIConfig {
MetricsConfig: metrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
// The compressor config is not checked in config.Check()
RPC: rpc.DefaultCLIConfig(),
RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
}
}

7 changes: 6 additions & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
@@ -219,7 +219,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize)
}

cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor)
cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)

if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running.
@@ -228,6 +228,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
}

// Checking for brotli compression only post Fjord
if bs.ChannelConfig.CompressorConfig.CompressionAlgo.IsBrotli() && !bs.RollupConfig.IsFjord(uint64(time.Now().Unix())) {
return fmt.Errorf("cannot use brotli compression before Fjord")
}

if err := cc.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
}
3 changes: 3 additions & 0 deletions op-batcher/compressor/config.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,9 @@ type Config struct {
// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind.
Kind string

// Type of compression algorithm to use. Must be one of [zlib, brotli-(9|10|11)]
CompressionAlgo derive.CompressionAlgo
}

func (c Config) NewCompressor() (derive.Compressor, error) {
23 changes: 9 additions & 14 deletions op-batcher/compressor/ratio_compressor.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package compressor

import (
"bytes"
"compress/zlib"

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)

type RatioCompressor struct {
config Config

inputBytes int
buf bytes.Buffer
compress *zlib.Writer
compressor derive.ChannelCompressor
}

// NewRatioCompressor creates a new derive.Compressor implementation that uses the target
@@ -25,11 +21,11 @@ func NewRatioCompressor(config Config) (derive.Compressor, error) {
config: config,
}

compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
compressor, err := derive.NewChannelCompressor(config.CompressionAlgo)
if err != nil {
return nil, err
}
c.compress = compress
c.compressor = compressor

return c, nil
}
@@ -39,29 +35,28 @@ func (t *RatioCompressor) Write(p []byte) (int, error) {
return 0, err
}
t.inputBytes += len(p)
return t.compress.Write(p)
return t.compressor.Write(p)
}

func (t *RatioCompressor) Close() error {
return t.compress.Close()
return t.compressor.Close()
}

func (t *RatioCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
return t.compressor.Read(p)
}

func (t *RatioCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.compressor.Reset()
t.inputBytes = 0
}

func (t *RatioCompressor) Len() int {
return t.buf.Len()
return t.compressor.Len()
}

func (t *RatioCompressor) Flush() error {
return t.compress.Flush()
return t.compressor.Flush()
}

func (t *RatioCompressor) FullErr() error {
2 changes: 2 additions & 0 deletions op-batcher/compressor/ratio_compressor_test.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"testing"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require"
)

@@ -62,6 +63,7 @@ func TestChannelConfig_InputThreshold(t *testing.T) {
comp, err := compressor.NewRatioCompressor(compressor.Config{
TargetOutputSize: tt.targetOutputSize,
ApproxComprRatio: tt.approxComprRatio,
CompressionAlgo: derive.Zlib,
})
require.NoError(t, err)
got := comp.(*compressor.RatioCompressor).InputThreshold()
36 changes: 14 additions & 22 deletions op-batcher/compressor/shadow_compressor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package compressor

import (
"bytes"
"compress/zlib"

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)

@@ -21,11 +18,8 @@ const (
type ShadowCompressor struct {
config Config

buf bytes.Buffer
compress *zlib.Writer

shadowBuf bytes.Buffer
shadowCompress *zlib.Writer
compressor derive.ChannelCompressor
shadowCompressor derive.ChannelCompressor

fullErr error

@@ -45,11 +39,11 @@ func NewShadowCompressor(config Config) (derive.Compressor, error) {
}

var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
c.compressor, err = derive.NewChannelCompressor(config.CompressionAlgo)
if err != nil {
return nil, err
}
c.shadowCompress, err = zlib.NewWriterLevel(&c.shadowBuf, zlib.BestCompression)
c.shadowCompressor, err = derive.NewChannelCompressor(config.CompressionAlgo)
if err != nil {
return nil, err
}
@@ -62,7 +56,7 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
if t.fullErr != nil {
return 0, t.fullErr
}
_, err := t.shadowCompress.Write(p)
_, err := t.shadowCompressor.Write(p)
if err != nil {
return 0, err
}
@@ -71,10 +65,10 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
// Do not flush the buffer unless there's some chance we will be over the size limit.
// This reduces CPU but more importantly it makes the shadow compression ratio more
// closely reflect the ultimate compression ratio.
if err = t.shadowCompress.Flush(); err != nil {
if err = t.shadowCompressor.Flush(); err != nil {
return 0, err
}
newBound = uint64(t.shadowBuf.Len()) + CloseOverheadZlib
newBound = uint64(t.shadowCompressor.Len()) + CloseOverheadZlib
if newBound > t.config.TargetOutputSize {
t.fullErr = derive.ErrCompressorFull
if t.Len() > 0 {
@@ -85,32 +79,30 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
}
}
t.bound = newBound
return t.compress.Write(p)
return t.compressor.Write(p)
}

func (t *ShadowCompressor) Close() error {
return t.compress.Close()
return t.compressor.Close()
}

func (t *ShadowCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
return t.compressor.Read(p)
}

func (t *ShadowCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.shadowBuf.Reset()
t.shadowCompress.Reset(&t.shadowBuf)
t.compressor.Reset()
t.shadowCompressor.Reset()
t.fullErr = nil
t.bound = safeCompressionOverhead
}

func (t *ShadowCompressor) Len() int {
return t.buf.Len()
return t.compressor.Len()
}

func (t *ShadowCompressor) Flush() error {
return t.compress.Flush()
return t.compressor.Flush()
}

func (t *ShadowCompressor) FullErr() error {
2 changes: 2 additions & 0 deletions op-batcher/compressor/shadow_compressor_test.go
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ func TestShadowCompressor(t *testing.T) {

sc, err := NewShadowCompressor(Config{
TargetOutputSize: test.targetOutputSize,
CompressionAlgo: derive.Zlib,
})
require.NoError(t, err)

@@ -115,6 +116,7 @@ func TestBoundInaccurateForLargeRandomData(t *testing.T) {

sc, err := NewShadowCompressor(Config{
TargetOutputSize: sizeLimit + 100,
CompressionAlgo: derive.Zlib,
})
require.NoError(t, err)

11 changes: 11 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
opservice "github.com/ethereum-optimism/optimism/op-service"
openum "github.com/ethereum-optimism/optimism/op-service/enum"
@@ -99,6 +100,15 @@ var (
return nil
},
}
CompressionAlgoFlag = &cli.GenericFlag{
Name: "compression-algo",
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes),
EnvVars: prefixEnvVars("COMPRESSION_ALGO"),
Value: func() *derive.CompressionAlgo {
out := derive.Zlib
return &out
}(),
}
StoppedFlag = &cli.BoolFlag{
Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
@@ -167,6 +177,7 @@ var optionalFlags = []cli.Flag{
BatchTypeFlag,
DataAvailabilityTypeFlag,
ActiveSequencerCheckDurationFlag,
CompressionAlgoFlag,
}

func init() {
3 changes: 2 additions & 1 deletion op-e2e/actions/l2_batcher.go
Original file line number Diff line number Diff line change
@@ -192,6 +192,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
target := batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize)
c, e := compressor.NewShadowCompressor(compressor.Config{
TargetOutputSize: target,
CompressionAlgo: derive.Zlib,
})
require.NoError(t, e, "failed to create compressor")

@@ -200,7 +201,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
} else {
// use span batch if we're forcing it or if we're at/beyond delta
if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) {
ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target)
ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target, derive.Zlib)
// use singular batches in all other cases
} else {
ch, err = derive.NewSingularChannelOut(c)
2 changes: 1 addition & 1 deletion op-e2e/actions/sync_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ import (
)

func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut {
channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000)
channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000, derive.Zlib)
require.NoError(t, err)
return channelOut
}
1 change: 1 addition & 0 deletions op-e2e/sequencer_failover_setup.go
Original file line number Diff line number Diff line change
@@ -296,6 +296,7 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
BatchType: derive.SpanBatchType,
DataAvailabilityType: batcherFlags.CalldataType,
ActiveSequencerCheckDuration: 0,
CompressionAlgo: derive.Zlib,
}

batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
1 change: 1 addition & 0 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
@@ -845,6 +845,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Stopped: sys.Cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: batchType,
DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: derive.Zlib,
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
153 changes: 80 additions & 73 deletions op-node/benchmarks/batchbuilding_test.go
Original file line number Diff line number Diff line change
@@ -58,13 +58,13 @@ type compressorAndTarget struct {
}

// channelOutByType returns a channel out of the given type as a helper for the benchmarks
func channelOutByType(batchType uint, compKey string) (derive.ChannelOut, error) {
func channelOutByType(batchType uint, compKey string, algo derive.CompressionAlgo) (derive.ChannelOut, error) {
chainID := big.NewInt(333)
if batchType == derive.SingularBatchType {
return derive.NewSingularChannelOut(compressors[compKey].compressor)
}
if batchType == derive.SpanBatchType {
return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput)
return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput, algo)
}
return nil, fmt.Errorf("unsupported batch type: %d", batchType)
}
@@ -129,25 +129,28 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
}
b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
// don't measure the setup time
b.StopTimer()
compressors[tc.compKey].compressor.Reset()
cout, _ := channelOutByType(tc.BatchType, tc.compKey)
// add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ {
err := cout.AddSingularBatch(batches[i], 0)
for _, algo := range derive.CompressionAlgoTypes {
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
// don't measure the setup time
b.StopTimer()
compressors[tc.compKey].compressor.Reset()
cout, _ := channelOutByType(tc.BatchType, tc.compKey, algo)
// add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ {
err := cout.AddSingularBatch(batches[i], 0)
require.NoError(b, err)
}
// measure the time to add the final batch
b.StartTimer()
// add the final batch to the channel out
err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0)
require.NoError(b, err)
}
// measure the time to add the final batch
b.StartTimer()
// add the final batch to the channel out
err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0)
require.NoError(b, err)
}
})
})
}

}
}

@@ -165,35 +168,37 @@ func BenchmarkIncremental(b *testing.B) {
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
}
for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey)
if err != nil {
b.Fatal(err)
}
done := false
for base := 0; !done; base += tc.BatchCount {
rangeName := fmt.Sprintf("Incremental %s: %d-%d", tc.String(), base, base+tc.BatchCount)
b.Run(rangeName, func(b *testing.B) {
b.StopTimer()
// prepare the batches
t := time.Now()
batches := make([]*derive.SingularBatch, tc.BatchCount)
for i := 0; i < tc.BatchCount; i++ {
t := t.Add(time.Second)
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Unix())
}
b.StartTimer()
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
if err != nil {
done = true
return
for _, algo := range derive.CompressionAlgoTypes {
for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey, algo)
if err != nil {
b.Fatal(err)
}
done := false
for base := 0; !done; base += tc.BatchCount {
rangeName := fmt.Sprintf("Incremental %s-%s: %d-%d", algo, tc.String(), base, base+tc.BatchCount)
b.Run(rangeName+"_"+algo.String(), func(b *testing.B) {
b.StopTimer()
// prepare the batches
t := time.Now()
batches := make([]*derive.SingularBatch, tc.BatchCount)
for i := 0; i < tc.BatchCount; i++ {
t := t.Add(time.Second)
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Unix())
}
}
})
b.StartTimer()
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
if err != nil {
done = true
return
}
}
})
}
}
}
}
@@ -226,33 +231,35 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
}
}

for _, tc := range tests {
chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount)
t := time.Now()
for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
}
b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
// don't measure the setup time
b.StopTimer()
compressors[tc.compKey].compressor.Reset()
cout, _ := channelOutByType(tc.BatchType, tc.compKey)
b.StartTimer()
// add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
require.NoError(b, err)
}
for _, algo := range derive.CompressionAlgoTypes {
for _, tc := range tests {
chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount)
t := time.Now()
for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
}
})
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ {
// don't measure the setup time
b.StopTimer()
compressors[tc.compKey].compressor.Reset()
cout, _ := channelOutByType(tc.BatchType, tc.compKey, algo)
b.StartTimer()
// add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
require.NoError(b, err)
}
}
})
}
}
}

2 changes: 1 addition & 1 deletion op-node/cmd/batch_decoder/reassemble/reassemble.go
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
var batchTypes []int
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time))
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
if err == nil {
for batchData, err := br(); err != io.EOF; batchData, err = br() {
if err != nil {
42 changes: 38 additions & 4 deletions op-node/rollup/derive/channel.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package derive

import (
"bufio"
"bytes"
"compress/zlib"
"fmt"
"io"

"github.com/andybalholm/brotli"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/rlp"
)

const (
ZlibCM8 = 8
ZlibCM15 = 15
)

// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
@@ -151,17 +158,44 @@ func (ch *Channel) Reader() io.Reader {
// The L1Inclusion block is also provided at creation time.
// Warning: the batch reader can read every batch-type.
// The caller of the batch-reader should filter the results.
func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64) (func() (*BatchData, error), error) {
// Setup decompressor stage + RLP reader
zr, err := zlib.NewReader(r)
func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func() (*BatchData, error), error) {
// use buffered reader so can peek the first byte
bufReader := bufio.NewReader(r)
compressionType, err := bufReader.Peek(1)
if err != nil {
return nil, err
}

var zr io.Reader
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
var err error
zr, err = zlib.NewReader(bufReader)
if err != nil {
return nil, err
}
// If the bits equal to 1, then it is a brotli reader
} else if compressionType[0] == ChannelVersionBrotli {
// If before Fjord, we cannot accept brotli compressed batch
if !isFjord {
return nil, fmt.Errorf("cannot accept brotli compressed batch before Fjord")
}
// discard the first byte
_, err := bufReader.Discard(1)
if err != nil {
return nil, err
}
zr = brotli.NewReader(bufReader)
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}

// Setup decompressor stage + RLP reader
rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively
return func() (*BatchData, error) {
var batchData BatchData
if err = rlpReader.Decode(&batchData); err != nil {
if err := rlpReader.Decode(&batchData); err != nil {
return nil, err
}
return &batchData, nil
94 changes: 94 additions & 0 deletions op-node/rollup/derive/channel_compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package derive

import (
"bytes"
"compress/zlib"
"fmt"
"io"

"github.com/andybalholm/brotli"
)

const (
ChannelVersionBrotli byte = 0x01
)

type ChannelCompressor interface {
Write([]byte) (int, error)
Flush() error
Close() error
Reset()
Len() int
Read([]byte) (int, error)
GetCompressed() *bytes.Buffer
}

type CompressorWriter interface {
Write([]byte) (int, error)
Flush() error
Close() error
Reset(io.Writer)
}

type BaseChannelCompressor struct {
compressed *bytes.Buffer
CompressorWriter
}

func (bcc *BaseChannelCompressor) Len() int {
return bcc.compressed.Len()
}

func (bcc *BaseChannelCompressor) Read(p []byte) (int, error) {
return bcc.compressed.Read(p)
}

func (bcc *BaseChannelCompressor) GetCompressed() *bytes.Buffer {
return bcc.compressed
}

type ZlibCompressor struct {
BaseChannelCompressor
}

func (zc *ZlibCompressor) Reset() {
zc.compressed.Reset()
zc.CompressorWriter.Reset(zc.compressed)
}

type BrotliCompressor struct {
BaseChannelCompressor
}

func (bc *BrotliCompressor) Reset() {
bc.compressed.Reset()
bc.compressed.WriteByte(ChannelVersionBrotli)
bc.CompressorWriter.Reset(bc.compressed)
}

func NewChannelCompressor(algo CompressionAlgo) (ChannelCompressor, error) {
compressed := &bytes.Buffer{}
if algo == Zlib {
writer, err := zlib.NewWriterLevel(compressed, zlib.BestCompression)
if err != nil {
return nil, err
}
return &ZlibCompressor{
BaseChannelCompressor{
CompressorWriter: writer,
compressed: compressed,
},
}, nil
} else if algo.IsBrotli() {
compressed.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(compressed, GetBrotliLevel(algo))
return &BrotliCompressor{
BaseChannelCompressor{
CompressorWriter: writer,
compressed: compressed,
},
}, nil
} else {
return nil, fmt.Errorf("unsupported compression algorithm: %s", algo)
}
}
67 changes: 67 additions & 0 deletions op-node/rollup/derive/channel_compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package derive

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

var r = rand.New(rand.NewSource(99))

func randomBytes(length int) []byte {
b := make([]byte, length)
_, err := r.Read(b)
// Rand.Read always returns nil error
if err != nil {
panic(err)
}
return b
}

func TestChannelCompressor_NewReset(t *testing.T) {
testCases := []struct {
name string
algo CompressionAlgo
expectedResetSize int
expectErr bool
}{
{
name: "zlib",
algo: Zlib,
expectedResetSize: 0,
},
{
name: "brotli10",
algo: Brotli10,
expectedResetSize: 1,
},
{
name: "zstd",
algo: CompressionAlgo("zstd"),
expectedResetSize: 0,
expectErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
scc, err := NewChannelCompressor(tc.algo)
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedResetSize, scc.Len())

_, err = scc.Write(randomBytes(10))
require.NoError(t, err)
err = scc.Flush()
require.NoError(t, err)
require.Greater(t, scc.Len(), tc.expectedResetSize)

scc.Reset()
require.Equal(t, tc.expectedResetSize, scc.Len())
})
}
}
2 changes: 1 addition & 1 deletion op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {

// TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time)); err == nil {
if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time), cr.cfg.IsFjord(cr.prev.Origin().Time)); err == nil {
cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data))
return nil
58 changes: 43 additions & 15 deletions op-node/rollup/derive/channel_out_test.go
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ var channelTypes = []struct {
{
Name: "Span",
ChannelOut: func(t *testing.T) ChannelOut {
cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000)
cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000, Zlib)
require.NoError(t, err)
return cout
},
@@ -113,7 +113,7 @@ func TestOutputFrameNoEmptyLastFrame(t *testing.T) {

// depending on the channel type, determine the size of the written data
if span, ok := cout.(*SpanChannelOut); ok {
written = uint64(span.compressed.Len())
written = uint64(span.compressor.Len())
} else if singular, ok := cout.(*SingularChannelOut); ok {
written = uint64(singular.compress.Len())
}
@@ -220,12 +220,12 @@ func TestBlockToBatchValidity(t *testing.T) {
require.ErrorContains(t, err, "has no transactions")
}

func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOut, []*SingularBatch) {
func SpanChannelAndBatches(t *testing.T, target uint64, len int, algo CompressionAlgo) (*SpanChannelOut, []*SingularBatch) {
// target is larger than one batch, but smaller than two batches
rng := rand.New(rand.NewSource(0x543331))
chainID := big.NewInt(rng.Int63n(1000))
txCount := 1
cout, err := NewSpanChannelOut(0, chainID, target)
cout, err := NewSpanChannelOut(0, chainID, target, algo)
require.NoError(t, err)
batches := make([]*SingularBatch, len)
// adding the first batch should not cause an error
@@ -237,14 +237,33 @@ func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOu
return cout, batches
}

func TestSpanChannelOut(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, algo CompressionAlgo)
}{
{"SpanChannelOutCompressionOnlyOneBatch", SpanChannelOutCompressionOnlyOneBatch},
{"SpanChannelOutCompressionUndo", SpanChannelOutCompressionUndo},
{"SpanChannelOutClose", SpanChannelOutClose},
}
for _, test := range tests {
test := test
for _, algo := range CompressionAlgoTypes {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo)
})
}
}
}

// TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch
// and it is larger than the target size. The single batch should be compressed, and the channel should now be full
func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) {
cout, singularBatches := SpanChannelAndBatches(t, 300, 2)
func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) {
cout, singularBatches := SpanChannelAndBatches(t, 300, 2, algo)

err := cout.AddSingularBatch(singularBatches[0], 0)
// confirm compression was not skipped
require.Greater(t, cout.compressed.Len(), 0)
require.Greater(t, cout.compressor.Len(), 0)
require.NoError(t, err)

// confirm the channel is full
@@ -256,36 +275,45 @@ func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) {
}

// TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull
func TestSpanChannelOutCompressionUndo(t *testing.T) {
func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) {
// target is larger than one batch, but smaller than two batches
cout, singularBatches := SpanChannelAndBatches(t, 750, 2)
cout, singularBatches := SpanChannelAndBatches(t, 750, 2, algo)

err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err)
// confirm that the first compression was skipped
require.Equal(t, 0, cout.compressed.Len())
if algo == Zlib {
require.Equal(t, 0, cout.compressor.Len())
} else {
require.Equal(t, 1, cout.compressor.Len()) // 1 because of brotli channel version
}
// record the RLP length to confirm it doesn't change when adding a rejected batch
rlp1 := cout.activeRLP().Len()

err = cout.AddSingularBatch(singularBatches[1], 0)
require.ErrorIs(t, err, ErrCompressorFull)
// confirm that the second compression was not skipped
require.Greater(t, cout.compressed.Len(), 0)
require.Greater(t, cout.compressor.Len(), 0)

// confirm that the second rlp is tht same size as the first (because the second batch was not added)
require.Equal(t, rlp1, cout.activeRLP().Len())
}

// TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed.
// it should compress the batch even if it is smaller than the target size because the channel is closing
func TestSpanChannelOutClose(t *testing.T) {
func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) {
target := uint64(600)
cout, singularBatches := SpanChannelAndBatches(t, target, 1)
cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo)

err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err)
// confirm no compression has happened yet
require.Equal(t, 0, cout.compressed.Len())

if algo == Zlib {
require.Equal(t, 0, cout.compressor.Len())
} else {
require.Equal(t, 1, cout.compressor.Len()) // 1 because of brotli channel version
}

// confirm the RLP length is less than the target
rlpLen := cout.activeRLP().Len()
@@ -295,6 +323,6 @@ func TestSpanChannelOutClose(t *testing.T) {
require.NoError(t, cout.Close())

// confirm that the only batch was compressed, and that the RLP did not change
require.Greater(t, cout.compressed.Len(), 0)
require.Greater(t, cout.compressor.Len(), 0)
require.Equal(t, rlpLen, cout.activeRLP().Len())
}
120 changes: 120 additions & 0 deletions op-node/rollup/derive/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package derive

import (
"bytes"
"compress/zlib"
"math/big"
"math/rand"
"testing"

"github.com/DataDog/zstd"
"github.com/andybalholm/brotli"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/stretchr/testify/require"
)
@@ -99,3 +105,117 @@ func TestFrameValidity(t *testing.T) {
t.Run(tc.name, tc.Run)
}
}

func TestBatchReader(t *testing.T) {
// Get batch data
rng := rand.New(rand.NewSource(0x543331))
singularBatch := RandomSingularBatch(rng, 20, big.NewInt(333))
batchDataInput := NewBatchData(singularBatch)

encodedBatch := &bytes.Buffer{}
err := batchDataInput.EncodeRLP(encodedBatch)
require.NoError(t, err)

var testCases = []struct {
name string
algo func(buf *bytes.Buffer, t *testing.T)
isFjord bool
expectErr bool
}{
{
name: "zlib-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "zlib-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: false,
},
{
name: "brotli9-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 9)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "brotli9-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 9)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: false,
expectErr: true, // expect an error because brotli is not supported before Fjord
},
{
name: "brotli10-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 10)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "brotli11-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 11)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "zstd-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
writer := zstd.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
expectErr: true,
isFjord: true,
}}

for _, tc := range testCases {
compressed := new(bytes.Buffer)
tc := tc
t.Run(tc.name, func(t *testing.T) {
tc.algo(compressed, t)
reader, err := BatchReader(bytes.NewReader(compressed.Bytes()), 120000, tc.isFjord)
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)

// read the batch data
batchData, err := reader()
require.NoError(t, err)
require.NotNil(t, batchData)
require.Equal(t, batchDataInput, batchData)
})
}
}
39 changes: 18 additions & 21 deletions op-node/rollup/derive/span_channel_out.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ package derive

import (
"bytes"
"compress/zlib"

"crypto/rand"
"fmt"
"io"
@@ -26,10 +26,8 @@ type SpanChannelOut struct {
// lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed
// it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression
lastCompressedRLPSize int
// compressed contains compressed data for making output frames
compressed *bytes.Buffer
// compress is the zlib writer for the channel
compressor *zlib.Writer
// the compressor for the channel
compressor ChannelCompressor
// target is the target size of the compressed data
target uint64
// closed indicates if the channel is closed
@@ -49,22 +47,23 @@ func (co *SpanChannelOut) setRandomID() error {
return err
}

func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) {
func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo) (*SpanChannelOut, error) {
c := &SpanChannelOut{
id: ChannelID{},
frame: 0,
spanBatch: NewSpanBatch(genesisTimestamp, chainID),
rlp: [2]*bytes.Buffer{{}, {}},
compressed: &bytes.Buffer{},
target: targetOutputSize,
id: ChannelID{},
frame: 0,
spanBatch: NewSpanBatch(genesisTimestamp, chainID),
rlp: [2]*bytes.Buffer{{}, {}},
target: targetOutputSize,
}
var err error
if err = c.setRandomID(); err != nil {
return nil, err
}
if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil {

if c.compressor, err = NewChannelCompressor(compressionAlgo); err != nil {
return nil, err
}

return c, nil
}

@@ -75,8 +74,7 @@ func (co *SpanChannelOut) Reset() error {
co.rlp[0].Reset()
co.rlp[1].Reset()
co.lastCompressedRLPSize = 0
co.compressed.Reset()
co.compressor.Reset(co.compressed)
co.compressor.Reset()
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
// setting the new randomID is the only part of the reset that can fail
return co.setRandomID()
@@ -153,7 +151,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// if the compressed data *plus* the new rlp data is under the target size, return early
// this optimizes out cases where the compressor will obviously come in under the target size
rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize
if uint64(co.compressed.Len()+rlpGrowth) < co.target {
if uint64(co.compressor.Len()+rlpGrowth) < co.target {
return nil
}

@@ -186,8 +184,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// compress compresses the active RLP buffer and checks if the compressed data is over the target size.
// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally.
func (co *SpanChannelOut) compress() error {
co.compressed.Reset()
co.compressor.Reset(co.compressed)
co.compressor.Reset()
if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
@@ -207,7 +204,7 @@ func (co *SpanChannelOut) InputBytes() int {
// Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full
func (co *SpanChannelOut) ReadyBytes() int {
if co.closed || co.FullErr() != nil {
return co.compressed.Len()
return co.compressor.Len()
}
return 0
}
@@ -225,7 +222,7 @@ func (co *SpanChannelOut) checkFull() {
if co.full != nil {
return
}
if uint64(co.compressed.Len()) >= co.target {
if uint64(co.compressor.Len()) >= co.target {
co.full = ErrCompressorFull
}
}
@@ -264,7 +261,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16,

f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)

if _, err := io.ReadFull(co.compressed, f.Data); err != nil {
if _, err := io.ReadFull(co.compressor.GetCompressed(), f.Data); err != nil {
return 0, err
}

68 changes: 68 additions & 0 deletions op-node/rollup/derive/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package derive

import (
"fmt"
"regexp"
)

type CompressionAlgo string

const (
// compression algo types
Zlib CompressionAlgo = "zlib"
Brotli9 CompressionAlgo = "brotli-9"
Brotli10 CompressionAlgo = "brotli-10"
Brotli11 CompressionAlgo = "brotli-11"
)

var CompressionAlgoTypes = []CompressionAlgo{
Zlib,
Brotli9,
Brotli10,
Brotli11,
}

var brotliRegexp = regexp.MustCompile(`^brotli-(9|10|11)$`)

func (algo CompressionAlgo) String() string {
return string(algo)
}

func (algo *CompressionAlgo) Set(value string) error {
if !ValidCompressionAlgoType(CompressionAlgo(value)) {
return fmt.Errorf("unknown compression algo type: %q", value)
}
*algo = CompressionAlgo(value)
return nil
}

func (algo *CompressionAlgo) Clone() any {
cpy := *algo
return &cpy
}

func (algo *CompressionAlgo) IsBrotli() bool {
return brotliRegexp.MatchString(algo.String())
}

func GetBrotliLevel(algo CompressionAlgo) int {
switch algo {
case Brotli9:
return 9
case Brotli10:
return 10
case Brotli11:
return 11
default:
panic("Unsupported brotli level")
}
}

func ValidCompressionAlgoType(value CompressionAlgo) bool {
for _, k := range CompressionAlgoTypes {
if k == value {
return true
}
}
return false
}
58 changes: 58 additions & 0 deletions op-node/rollup/derive/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package derive

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCompressionAlgo(t *testing.T) {
testCases := []struct {
name string
algo CompressionAlgo
isBrotli bool
isValidCompressionAlgoType bool
}{
{
name: "zlib",
algo: Zlib,
isBrotli: false,
isValidCompressionAlgoType: true,
},
{
name: "brotli-9",
algo: Brotli9,
isBrotli: true,
isValidCompressionAlgoType: true,
},
{
name: "brotli-10",
algo: Brotli10,
isBrotli: true,
isValidCompressionAlgoType: true,
},
{
name: "brotli-11",
algo: Brotli11,
isBrotli: true,
isValidCompressionAlgoType: true,
},
{
name: "invalid",
algo: CompressionAlgo("invalid"),
isBrotli: false,
isValidCompressionAlgoType: false,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.isBrotli, tc.algo.IsBrotli())
if tc.isBrotli {
require.NotPanics(t, func() { GetBrotliLevel((tc.algo)) })
} else {
require.Panics(t, func() { GetBrotliLevel(tc.algo) })
}
require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgoType(tc.algo))
})
}
}

0 comments on commit 4b8f6f4

Please sign in to comment.