Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fjord: Add Brotli channel compression support #10358

Merged
merged 37 commits into from
May 13, 2024
Merged
Changes from 15 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a727046
wip
cody-wang-cb Apr 26, 2024
9e1673b
wip
cody-wang-cb Apr 30, 2024
e732b21
fix
cody-wang-cb May 1, 2024
6566283
fix
cody-wang-cb May 1, 2024
4eb83ea
fix
cody-wang-cb May 1, 2024
934ed5e
fix
cody-wang-cb May 1, 2024
ec984a3
address some of the bots comments
cody-wang-cb May 1, 2024
a5ba98e
use version bit of 1
cody-wang-cb May 2, 2024
940f2bc
fix lint
cody-wang-cb May 2, 2024
642933e
adding compression type
cody-wang-cb May 2, 2024
8bd2eb6
update batch reader
cody-wang-cb May 2, 2024
314e99d
abstract span channel compressor
cody-wang-cb May 3, 2024
fee3e44
test and singular batch compressor
cody-wang-cb May 5, 2024
633aae2
fix
cody-wang-cb May 5, 2024
bf3675a
lint
cody-wang-cb May 5, 2024
1efd338
move channel compressor as interface
cody-wang-cb May 7, 2024
42ade28
add base class
cody-wang-cb May 7, 2024
6e09caf
fix go mod
cody-wang-cb May 8, 2024
e5f7e6c
test fixes
May 8, 2024
1730bfc
Merge pull request #4 from roberto-bayardo/tmp
cody-wang-cb May 8, 2024
c5f28f3
address comments
cody-wang-cb May 9, 2024
aade67d
fix
cody-wang-cb May 9, 2024
c69be77
merge from develop
cody-wang-cb May 9, 2024
ca47d0f
fix
cody-wang-cb May 10, 2024
729616f
revert channel builder test
cody-wang-cb May 10, 2024
7aaa4ed
revert ratio compressor test
cody-wang-cb May 10, 2024
da699a9
add checks to accept brotli only post fjord
cody-wang-cb May 10, 2024
abdea7d
revemo unnecessary in test
cody-wang-cb May 10, 2024
3ad55be
Merge branch 'develop' into cody/brotli-impl
cody-wang-cb May 10, 2024
5f1b198
fix forge-std
cody-wang-cb May 10, 2024
7298161
gofmt
cody-wang-cb May 10, 2024
aa0823b
address comments
cody-wang-cb May 11, 2024
010ed14
remove methods in compressor
cody-wang-cb May 11, 2024
d508412
fix error msg
cody-wang-cb May 11, 2024
7ecd688
Merge branch 'develop' into cody/brotli-impl
cody-wang-cb May 13, 2024
be6629e
add compression algo flag to optional flags
cody-wang-cb May 13, 2024
eadc24e
add Clone() function
cody-wang-cb May 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ go 1.21

require (
github.com/BurntSushi/toml v1.3.2
github.com/andybalholm/brotli v1.1.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
@@ -64,7 +65,6 @@ require (
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
2 changes: 1 addition & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
}
var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize)
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo)
} else {
co, err = derive.NewSingularChannelOut(c)
}
215 changes: 143 additions & 72 deletions op-batcher/batcher/channel_builder_test.go
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
@@ -53,25 +53,26 @@ type ChannelConfig struct {
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
// comprKind can be the empty string, in which case the default compressor will
// be used.
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) {
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo derive.CompressionAlgo) {
cc.CompressorConfig = compressor.Config{
// Compressor output size needs to account for frame encoding overhead
TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize),
ApproxComprRatio: approxComprRatio,
Kind: comprKind,
CompressionAlgo: compressionAlgo,
}
}

func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind)
func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind, compressionAlgo)
}

func (cc *ChannelConfig) InitShadowCompressor() {
cc.InitCompressorConfig(0, compressor.ShadowKind)
func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(0, compressor.ShadowKind, compressionAlgo)
}

func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind)
func (cc *ChannelConfig) InitNoneCompressor(compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(0, compressor.NoneKind, compressionAlgo)
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
35 changes: 22 additions & 13 deletions op-batcher/batcher/channel_config_test.go
Original file line number Diff line number Diff line change
@@ -5,12 +5,13 @@ import (
"math"
"testing"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"

"github.com/stretchr/testify/require"
)

func defaultTestChannelConfig() ChannelConfig {
func defaultTestChannelConfig(algo derive.CompressionAlgo) ChannelConfig {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
c := ChannelConfig{
SeqWindowSize: 15,
ChannelTimeout: 40,
@@ -19,27 +20,30 @@ func defaultTestChannelConfig() ChannelConfig {
MaxFrameSize: 120_000,
TargetNumFrames: 1,
BatchType: derive.SingularBatchType,
CompressorConfig: compressor.Config{
CompressionAlgo: algo,
},
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}
c.InitRatioCompressor(0.4)
c.InitRatioCompressor(0.4, algo)
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
return c
}

func TestChannelConfig_Check(t *testing.T) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
type test struct {
input func() ChannelConfig
input func(algo derive.CompressionAlgo) ChannelConfig
assertion func(error)
}

tests := []test{
{
input: defaultTestChannelConfig,
input: func(algo derive.CompressionAlgo) ChannelConfig { return defaultTestChannelConfig(algo) },
assertion: func(output error) {
require.NoError(t, output)
},
},
{
input: func() ChannelConfig {
cfg := defaultTestChannelConfig()
input: func(algo derive.CompressionAlgo) ChannelConfig {
cfg := defaultTestChannelConfig(algo)
cfg.ChannelTimeout = 0
cfg.SubSafetyMargin = 1
return cfg
@@ -53,8 +57,8 @@ func TestChannelConfig_Check(t *testing.T) {
expectedErr := fmt.Sprintf("max frame size %d is less than the minimum 23", i)
i := i // need to udpate Go version...
tests = append(tests, test{
input: func() ChannelConfig {
cfg := defaultTestChannelConfig()
input: func(algo derive.CompressionAlgo) ChannelConfig {
cfg := defaultTestChannelConfig(algo)
cfg.MaxFrameSize = uint64(i)
return cfg
},
@@ -66,8 +70,10 @@ func TestChannelConfig_Check(t *testing.T) {

// Run the table tests
for _, test := range tests {
cfg := test.input()
test.assertion(cfg.Check())
for _, algo := range derive.CompressionAlgoTypes {
cfg := test.input(algo)
test.assertion(cfg.Check())
}
}
}

@@ -76,9 +82,12 @@ func TestChannelConfig_Check(t *testing.T) {
// the ChannelTimeout is less than the SubSafetyMargin.
func FuzzChannelConfig_CheckTimeout(f *testing.F) {
for i := range [10]int{} {
f.Add(uint64(i+1), uint64(i))
for _, algo := range derive.CompressionAlgoTypes {
f.Add(uint64(i+1), uint64(i), algo.String())
}

}
f.Fuzz(func(t *testing.T, channelTimeout uint64, subSafetyMargin uint64) {
f.Fuzz(func(t *testing.T, channelTimeout uint64, subSafetyMargin uint64, algo string) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
// We only test where [ChannelTimeout] is less than the [SubSafetyMargin]
// So we cannot have [ChannelTimeout] be [math.MaxUint64]
if channelTimeout == math.MaxUint64 {
@@ -88,7 +97,7 @@ func FuzzChannelConfig_CheckTimeout(f *testing.F) {
subSafetyMargin = channelTimeout + 1
}

channelConfig := defaultTestChannelConfig()
channelConfig := defaultTestChannelConfig(derive.CompressionAlgo(algo))
channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin
require.ErrorIs(t, channelConfig.Check(), ErrInvalidChannelTimeout)
87 changes: 57 additions & 30 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
@@ -19,20 +20,23 @@ import (
"github.com/stretchr/testify/require"
)

func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig {
func channelManagerTestConfig(maxFrameSize uint64, batchType uint, algo derive.CompressionAlgo) ChannelConfig {
cfg := ChannelConfig{
MaxFrameSize: maxFrameSize,
TargetNumFrames: 1,
BatchType: batchType,
CompressorConfig: compressor.Config{
CompressionAlgo: algo,
},
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, algo)
return cfg
}

func TestChannelManagerBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
f func(t *testing.T, batchType uint, algo derive.CompressionAlgo)
}{
{"ChannelManagerReturnsErrReorg", ChannelManagerReturnsErrReorg},
{"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained},
@@ -45,24 +49,28 @@ func TestChannelManagerBatchType(t *testing.T) {
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
for _, algo := range derive.CompressionAlgoTypes {
t.Run(test.name+"_SingularBatch_"+algo.String(), func(t *testing.T) {
test.f(t, derive.SingularBatchType, algo)
})
}
}

for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
for _, algo := range derive.CompressionAlgoTypes {
t.Run(test.name+"_SpanBatch"+algo.String(), func(t *testing.T) {
test.f(t, derive.SpanBatchType, algo)
})
}
}
}

// ChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks.
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType, CompressorConfig: compressor.Config{CompressionAlgo: algo}}, &rollup.Config{})
m.Clear(eth.BlockID{})

a := types.NewBlock(&types.Header{
@@ -91,9 +99,9 @@ func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {

// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// detects a reorg even if it does not have any blocks inside it.
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
log := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(120_000, batchType)
cfg := channelManagerTestConfig(120_000, batchType, algo)
cfg.CompressorConfig.TargetOutputSize = 1 // full on first block
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &rollup.Config{})
m.Clear(eth.BlockID{})
@@ -112,18 +120,18 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
}

// ChannelManager_Clear tests clearing the channel manager.
func ChannelManager_Clear(t *testing.T, batchType uint) {
func ChannelManager_Clear(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
require := require.New(t)

// Create a channel manager
log := testlog.Logger(t, log.LevelCrit)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg := channelManagerTestConfig(derive.FrameV0OverHeadSize+1, batchType)
cfg := channelManagerTestConfig(derive.FrameV0OverHeadSize+1, batchType, algo)
// Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map
cfg.ChannelTimeout = 10
cfg.InitRatioCompressor(1)
cfg.InitRatioCompressor(1, algo)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)

// Channel Manager state should be empty by default
@@ -189,11 +197,11 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.Empty(m.txChannels)
}

func ChannelManager_TxResend(t *testing.T, batchType uint) {
func ChannelManager_TxResend(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LevelError)
cfg := channelManagerTestConfig(120_000, batchType)
cfg := channelManagerTestConfig(120_000, batchType, algo)
cfg.CompressorConfig.TargetOutputSize = 1 // full on first block
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.Clear(eth.BlockID{})
@@ -228,12 +236,12 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {

// ChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
channelManagerTestConfig(10000, batchType),
channelManagerTestConfig(10000, batchType, algo),
&defaultTestRollupConfig,
)
m.Clear(eth.BlockID{})
@@ -252,10 +260,10 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
require := require.New(t)
log := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(10000, batchType)
cfg := channelManagerTestConfig(10000, batchType, algo)
cfg.CompressorConfig.TargetOutputSize = 1 // full on first block
cfg.ChannelTimeout = 1000
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
@@ -286,13 +294,13 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LevelError)
cfg := channelManagerTestConfig(10_000, batchType)
cfg := channelManagerTestConfig(10_000, batchType, algo)
cfg.ChannelTimeout = 1000
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.Clear(eth.BlockID{})
@@ -335,6 +343,14 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
// Couldn't get the test to work even with modifying NonCompressor
// to flush half-way through writing to the compressor...
func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
t.Run("ChannelManager_Close_PartiallyPendingChannel"+algo.String(), func(t *testing.T) {
ChannelManager_Close_PartiallyPendingChannel(t, algo)
})
}
}

func ChannelManager_Close_PartiallyPendingChannel(t *testing.T, algo derive.CompressionAlgo) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
@@ -344,8 +360,11 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
MaxFrameSize: 2200,
ChannelTimeout: 1000,
TargetNumFrames: 100,
CompressorConfig: compressor.Config{
CompressionAlgo: algo,
},
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
}
cfg.InitNoneCompressor()
cfg.InitNoneCompressor(algo)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.Clear(eth.BlockID{})

@@ -391,13 +410,13 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
// ChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint, algo derive.CompressionAlgo) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
require := require.New(t)
rng := rand.New(rand.NewSource(1357))
log := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, batchType)
cfg := channelManagerTestConfig(100, batchType, algo)
cfg.TargetNumFrames = 1000
cfg.InitNoneCompressor()
cfg.InitNoneCompressor(algo)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.Clear(eth.BlockID{})

@@ -441,11 +460,19 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
}

func TestChannelManager_ChannelCreation(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
t.Run("ChannelManager_ChannelCreation"+algo.String(), func(t *testing.T) {
ChannelManager_ChannelCreation(t, algo)
})
}
}

func ChannelManager_ChannelCreation(t *testing.T, algo derive.CompressionAlgo) {
cody-wang-cb marked this conversation as resolved.
Show resolved Hide resolved
l := testlog.Logger(t, log.LevelCrit)
const maxChannelDuration = 15
cfg := channelManagerTestConfig(1000, derive.SpanBatchType)
cfg := channelManagerTestConfig(1000, derive.SpanBatchType, algo)
cfg.MaxChannelDuration = maxChannelDuration
cfg.InitNoneCompressor()
cfg.InitNoneCompressor(algo)

for _, tt := range []struct {
name string
Loading