Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-service, op-batcher, op-proposer: Active sequencer follow mode #8585

Merged
merged 44 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
43e2714
op-service: Add ActiveL2EndpointProvider.
Dec 12, 2023
7ac1fda
Fix bug in initialization, and handle case where no ethUrls are provi…
Dec 12, 2023
a233a87
Split active L2 provider into active rollup and active L2 provider.
Dec 12, 2023
997aae6
Re-duplicate some code until tests are passing.
Dec 12, 2023
6f41133
op-proposer: Add ability to enable active provider.
Dec 12, 2023
3f12c57
op-batcher: Add ability to enable active provider.
Dec 12, 2023
05b06f2
Add an empty test skeleton.
Dec 12, 2023
113a688
Add an empty test skeleton.
Dec 12, 2023
9be2531
Merge branch 'evan/batcher-active-seq' of github.com:ethereum-optimis…
Dec 13, 2023
87a359d
op-service: add, but do not yet use, RollupClientInterface and EthCli…
Dec 13, 2023
3a6e5e1
op-service: update mocks and interfaces for endpoint provider testing.
Dec 13, 2023
5241132
op-service - WIP on Active L2 Providers: unit tests pass, design and …
Dec 13, 2023
aabbe52
op-service: restore design in Active Endpoint Providers that only kee…
Dec 13, 2023
9fb14eb
op-service: when dialing a new sequencer, close() the old connection.
Dec 13, 2023
6ed8d57
op-service: obey coderabbit suggestion around safer handling of p.cur…
Dec 13, 2023
5396c73
op-service, op-batcher, op-proposer: address review comments in PR#8585.
Dec 14, 2023
999b271
op-service: Active L2 Provider - add test case for a sequencer return…
Dec 14, 2023
3ca276d
op-service: Active L2/Rollup Providers: improve unit testing and logg…
Dec 14, 2023
1efffd3
op-service, op-batcher: address review comments in 8585 regarding fir…
Dec 15, 2023
c497bd3
op-service: address review comments through adding more tests, and mo…
Dec 18, 2023
2b2a185
op-service: minor error message change in active endpoint providers.
Dec 18, 2023
00fc6c8
Update op-service/dial/active_l2_provider.go
EvanJRichard Dec 18, 2023
987204a
op-service: obey linter in rabbit-provided error message change.
Dec 18, 2023
15514b2
Update op-service/dial/active_l2_provider.go
EvanJRichard Dec 18, 2023
1514fb9
op-service active L2 provider tests: assertAllExpectations after most…
Dec 18, 2023
3599b84
op-service: more elegantly handle startup in active l2 providers, and…
Dec 18, 2023
a9a6912
Merge branch 'evan/batcher-active-seq' of github.com:ethereum-optimis…
Dec 18, 2023
314c94f
Change remaining longDurationTests to be able to use ept.assertAllExp…
Dec 18, 2023
a01b139
use new error errSeqUnset.
Dec 18, 2023
c635a1b
Add test for scenario where many sequencers are inactive, and only th…
Dec 18, 2023
8177ca1
Readability change: move the on-creation initialization to its own fu…
Dec 18, 2023
ee0191f
Move extra one-time dial to constructor.
Dec 18, 2023
f8efb3c
Update op-service/dial/active_rollup_provider.go
EvanJRichard Dec 18, 2023
73e749e
Add nil check to active l2 provider.
Dec 18, 2023
e54448e
Update op-service/dial/active_rollup_provider.go
EvanJRichard Dec 18, 2023
919e598
Address review comment: change many-inactive tests to many-undialable…
Dec 18, 2023
26d46b7
Add test that reproduces internal state corruption.
Dec 19, 2023
6f76ca8
op-service: Improve active seq provider
sebastianst Dec 18, 2023
aaaf312
Fix some tests.
Dec 19, 2023
85568d9
Move usage of ExpectClose to MaybeClose, we don't want to enforce a p…
Dec 19, 2023
c7bbd5f
add a missing call to assertAllExpectations.
Dec 19, 2023
ea15d07
Merge pull request #8673 from ethereum-optimism/seb/batcher-active-seq-1
EvanJRichard Dec 19, 2023
81befc8
Test even the case where the active providers are managing a list of …
Dec 19, 2023
be8ad6a
Revert experimental hunk in active_l2_provider.
Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string

// L2EthRpc is the HTTP provider URL for the L2 execution engine.
// L2EthRpc is the HTTP provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided.
L2EthRpc string

// RollupRpc is the HTTP provider URL for the L2 rollup node.
// RollupRpc is the HTTP provider URL for the L2 rollup node. A comma-separated list enables the active L2 provider. Such a list needs to match the number of L2EthRpcs provided.
RollupRpc string
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved

// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep a
Expand Down
12 changes: 10 additions & 2 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
_ "net/http/pprof"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -125,9 +126,16 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
}
bs.L1Client = l1Client

endpointProvider, err := dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
var endpointProvider dial.L2EndpointProvider
if strings.Contains(cfg.RollupRpc, ",") || strings.Contains(cfg.L2EthRpc, ",") {
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log, dial.DialEthClientInterfaceWithTimeout, dial.DialRollupClientInterfaceWithTimeout)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to create L2 endpoint provider: %w", err)
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
bs.EndpointProvider = endpointProvider

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ var (
}
L2EthRpcFlag = &cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2 execution engine",
Usage: "HTTP provider URL for L2 execution engine. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of rollup-rpcs provided.",
EnvVars: prefixEnvVars("L2_ETH_RPC"),
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for Rollup node",
Usage: "HTTP provider URL for Rollup node. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of l2-eth-rpcs provided.",
EnvVars: prefixEnvVars("ROLLUP_RPC"),
}
// Optional flags
Expand Down
2 changes: 1 addition & 1 deletion op-proposer/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for the rollup node",
Usage: "HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.",
EnvVars: prefixEnvVars("ROLLUP_RPC"),
}
L2OOAddressFlag = &cli.StringFlag{
Expand Down
2 changes: 1 addition & 1 deletion op-proposer/proposer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string

// RollupRpc is the HTTP provider URL for the rollup node.
// RollupRpc is the HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.
RollupRpc string

// L2OOAddress is the L2OutputOracle contract address.
Expand Down
2 changes: 1 addition & 1 deletion op-proposer/proposer/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (l *L2OutputSubmitter) fetchOutput(ctx context.Context, block *big.Int) (*e
}
output, err := rollupClient.OutputAtBlock(ctx, block.Uint64())
if err != nil {
l.Log.Error("failed to fetch output at block %d: %w", block, err)
l.Log.Error("failed to fetch output at block", "block", block, "err", err)
return nil, false, err
}
if output.Version != supportedL2OutputVersion {
Expand Down
9 changes: 8 additions & 1 deletion op-proposer/proposer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -121,7 +122,13 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e
}
ps.L1Client = l1Client

rollupProvider, err := dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
var rollupProvider dial.RollupProvider
if strings.Contains(cfg.RollupRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, ps.Log, dial.DialRollupClientInterfaceWithTimeout)
} else {
rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
131 changes: 131 additions & 0 deletions op-service/dial/active_l2_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package dial

import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/log"
)

const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout

type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error)
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved

type ActiveL2EndpointProvider struct {
ActiveL2RollupProvider
currentEthClient EthClientInterface
ethDialer ethDialer
}

func NewActiveL2EndpointProvider(
ctx context.Context,
ethUrls, rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
ethDialer ethDialer,
rollupDialer rollupDialer,
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
) (*ActiveL2EndpointProvider, error) {
if len(rollupUrls) == 0 {
return nil, errors.New("empty rollup urls list")
}
if len(ethUrls) != len(rollupUrls) {
return nil, errors.New("number of eth urls and rollup urls mismatch")
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
}

rollupProvider, err := NewActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
if err != nil {
return nil, err
}
cctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
ethClient, err := ethDialer(cctx, networkTimeout, logger, ethUrls[0])
if err != nil {
return nil, fmt.Errorf("dialing eth client: %w", err)
}
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
return &ActiveL2EndpointProvider{
ActiveL2RollupProvider: *rollupProvider,
currentEthClient: ethClient,
ethDialer: ethDialer,
}, nil
}

func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInterface, error) {
p.clientLock.Lock()
defer p.clientLock.Unlock()
err := p.ensureActiveEndpoint(ctx)
if err != nil {
return nil, err
}

return p.currentEthClient, nil
}

func (p *ActiveL2EndpointProvider) ensureActiveEndpoint(ctx context.Context) error {
if !p.shouldCheck() {
return nil
}

if err := p.findActiveEndpoints(ctx); err != nil {
return err
}
p.activeTimeout = time.Now().Add(p.checkDuration)
return nil
}

func (p *ActiveL2EndpointProvider) findActiveEndpoints(ctx context.Context) error {
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
const maxRetries = 20
totalAttempts := 0

for totalAttempts < maxRetries {
active, err := p.checkCurrentSequencer(ctx)
if err != nil {
p.log.Warn("Error querying active sequencer, closing connection and trying next.", "err", err, "try", totalAttempts)
p.currentRollupClient.Close()
p.currentEthClient.Close()
} else if active {
p.log.Debug("Current sequencer active.", "try", totalAttempts)
return nil
} else {
p.log.Info("Current sequencer inactive, closing connection and trying next.", "try", totalAttempts)
p.currentRollupClient.Close()
p.currentEthClient.Close()
}
if err := p.dialNextSequencer(ctx); err != nil {
return fmt.Errorf("dialing next sequencer: %w", err)
}

totalAttempts++
if p.currentIndex >= p.numEndpoints() {
p.currentIndex = 0
}
}
return fmt.Errorf("failed to find an active sequencer after %d retries", maxRetries)
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *ActiveL2EndpointProvider) dialNextSequencer(ctx context.Context) error {
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
p.currentIndex++
ep := p.rollupUrls[p.currentIndex]
p.log.Debug("Dialing next sequencer.", "url", ep)
rollupClient, err := p.rollupDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
return fmt.Errorf("dialing rollup client: %w", err)
}
ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
return fmt.Errorf("dialing eth client: %w", err)
}

p.currentRollupClient = rollupClient
p.currentEthClient = ethClient
return nil
}

func (p *ActiveL2EndpointProvider) Close() {
p.currentEthClient.Close()
p.ActiveL2RollupProvider.Close()
}
104 changes: 104 additions & 0 deletions op-service/dial/active_l2_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package dial

import (
"context"
"fmt"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

// TestActiveSequencerFailoverBehavior_RollupProvider tests that the ActiveL2RollupProvider
// will failover to the next provider if the current one is not active.
func TestActiveSequencerFailoverBehavior_RollupProviders(t *testing.T) {
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
// Create two mock rollup clients, one of which will declare itself inactive after first check.
primarySequencer := testutils.MockRollupClient{}
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
primarySequencer.ExpectSequencerActive(true, nil)
primarySequencer.ExpectSequencerActive(false, nil)
primarySequencer.ExpectClose()
secondarySequencer := testutils.MockRollupClient{}
secondarySequencer.ExpectSequencerActive(true, nil)

mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) {
if url == "primary" {
return &primarySequencer, nil
} else if url == "secondary" {
return &secondarySequencer, nil
} else {
return nil, fmt.Errorf("unknown test url: %s", url)
}
}

endpointProvider, err := NewActiveL2RollupProvider(
context.Background(),
[]string{"primary", "secondary"},
1*time.Microsecond,
1*time.Minute,
testlog.Logger(t, log.LvlDebug),
mockRollupDialer,
)
require.NoError(t, err)
// Check that the first client is used, then the second once the first declares itself inactive.
firstSequencerUsed, err := endpointProvider.RollupClient(context.Background())
require.NoError(t, err)
require.True(t, &primarySequencer == firstSequencerUsed) // avoids copying the struct (and its mutex, etc.)
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
secondSequencerUsed, err := endpointProvider.RollupClient(context.Background())
require.NoError(t, err)
require.True(t, &secondarySequencer == secondSequencerUsed)
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
EvanJRichard marked this conversation as resolved.
Show resolved Hide resolved
}

// TestActiveSequencerFailoverBehavior_L2Providers tests that the ActiveL2EndpointProvider
// will failover to the next provider if the current one is not active.
func TestActiveSequencerFailoverBehavior_L2Providers(t *testing.T) {
// as TestActiveSequencerFailoverBehavior_RollupProviders,
// but ensure the added `EthClient()` method also triggers the failover.
primarySequencer := testutils.MockRollupClient{}
primarySequencer.ExpectSequencerActive(true, nil)
primarySequencer.ExpectSequencerActive(false, nil)
primarySequencer.ExpectClose()
secondarySequencer := testutils.MockRollupClient{}
secondarySequencer.ExpectSequencerActive(true, nil)

mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) {
if url == "primary" {
return &primarySequencer, nil
} else if url == "secondary" {
return &secondarySequencer, nil
} else {
return nil, fmt.Errorf("unknown test url: %s", url)
}
}
primaryEthClient := testutils.MockEthClient{}
primaryEthClient.ExpectClose()
secondaryEthClient := testutils.MockEthClient{}
mockEthDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) {
if url == "primary" {
return &primaryEthClient, nil
} else if url == "secondary" {
return &secondaryEthClient, nil
} else {
return nil, fmt.Errorf("unknown test url: %s", url)
}
}
endpointProvider, err := NewActiveL2EndpointProvider(
context.Background(),
[]string{"primary", "secondary"},
[]string{"primary", "secondary"},
1*time.Microsecond,
1*time.Minute,
testlog.Logger(t, log.LvlDebug),
mockEthDialer,
mockRollupDialer,
)
require.NoError(t, err)
firstClientUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.True(t, &primaryEthClient == firstClientUsed) // avoids copying the struct (and its mutex, etc.)
secondClientUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.True(t, &secondaryEthClient == secondClientUsed)
}
Loading