diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 927446f4137b..65805af9bb97 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -3,6 +3,7 @@ package batcher import ( "errors" "fmt" + "strings" "time" "github.com/urfave/cli/v2" @@ -20,10 +21,10 @@ type CLIConfig struct { // L1EthRpc is the HTTP provider URL for L1. L1EthRpc string - // L2EthRpc is the HTTP provider URL for the L2 execution engine. + // L2EthRpc is the HTTP provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided. L2EthRpc string - // RollupRpc is the HTTP provider URL for the L2 rollup node. + // RollupRpc is the HTTP provider URL for the L2 rollup node. A comma-separated list enables the active L2 provider. Such a list needs to match the number of L2EthRpcs provided. RollupRpc string // MaxChannelDuration is the maximum duration (in #L1-blocks) to keep a @@ -74,6 +75,9 @@ func (c *CLIConfig) Check() error { if c.RollupRpc == "" { return errors.New("empty rollup RPC URL") } + if strings.Count(c.RollupRpc, ",") != strings.Count(c.L2EthRpc, ",") { + return errors.New("number of rollup and eth URLs must match") + } if c.PollInterval == 0 { return errors.New("must set PollInterval") } diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 9a55464e621d..897315ecc473 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -8,6 +8,7 @@ import ( "net" _ "net/http/pprof" "strconv" + "strings" "sync/atomic" "time" @@ -125,9 +126,16 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er } bs.L1Client = l1Client - endpointProvider, err := dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc) + var endpointProvider dial.L2EndpointProvider + if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") { + rollupUrls := strings.Split(cfg.RollupRpc, ",") + ethUrls := strings.Split(cfg.L2EthRpc, ",") + endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log) + } else { + endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc) + } if err != nil { - return fmt.Errorf("failed to create L2 endpoint provider: %w", err) + return fmt.Errorf("failed to build L2 endpoint provider: %w", err) } bs.EndpointProvider = endpointProvider diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 5e752d9d2748..72a1bb1567f0 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -30,12 +30,12 @@ var ( } L2EthRpcFlag = &cli.StringFlag{ Name: "l2-eth-rpc", - Usage: "HTTP provider URL for L2 execution engine", + Usage: "HTTP provider URL for L2 execution engine. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of rollup-rpcs provided.", EnvVars: prefixEnvVars("L2_ETH_RPC"), } RollupRpcFlag = &cli.StringFlag{ Name: "rollup-rpc", - Usage: "HTTP provider URL for Rollup node", + Usage: "HTTP provider URL for Rollup node. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of l2-eth-rpcs provided.", EnvVars: prefixEnvVars("ROLLUP_RPC"), } // Optional flags diff --git a/op-proposer/flags/flags.go b/op-proposer/flags/flags.go index d9acde95dcd4..2ac346d708ba 100644 --- a/op-proposer/flags/flags.go +++ b/op-proposer/flags/flags.go @@ -29,7 +29,7 @@ var ( } RollupRpcFlag = &cli.StringFlag{ Name: "rollup-rpc", - Usage: "HTTP provider URL for the rollup node", + Usage: "HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.", EnvVars: prefixEnvVars("ROLLUP_RPC"), } L2OOAddressFlag = &cli.StringFlag{ diff --git a/op-proposer/proposer/config.go b/op-proposer/proposer/config.go index c4a53a8aab76..dcc493f4e7f5 100644 --- a/op-proposer/proposer/config.go +++ b/op-proposer/proposer/config.go @@ -22,7 +22,7 @@ type CLIConfig struct { // L1EthRpc is the HTTP provider URL for L1. L1EthRpc string - // RollupRpc is the HTTP provider URL for the rollup node. + // RollupRpc is the HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider. RollupRpc string // L2OOAddress is the L2OutputOracle contract address. diff --git a/op-proposer/proposer/driver.go b/op-proposer/proposer/driver.go index ac8eb2aba167..6deb7fa90ea1 100644 --- a/op-proposer/proposer/driver.go +++ b/op-proposer/proposer/driver.go @@ -206,7 +206,7 @@ func (l *L2OutputSubmitter) fetchOutput(ctx context.Context, block *big.Int) (*e } output, err := rollupClient.OutputAtBlock(ctx, block.Uint64()) if err != nil { - l.Log.Error("failed to fetch output at block %d: %w", block, err) + l.Log.Error("failed to fetch output at block", "block", block, "err", err) return nil, false, err } if output.Version != supportedL2OutputVersion { diff --git a/op-proposer/proposer/service.go b/op-proposer/proposer/service.go index c60f0bb07d54..20cd186af734 100644 --- a/op-proposer/proposer/service.go +++ b/op-proposer/proposer/service.go @@ -7,6 +7,7 @@ import ( "io" "net" "strconv" + "strings" "sync/atomic" "time" @@ -121,7 +122,13 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e } ps.L1Client = l1Client - rollupProvider, err := dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc) + var rollupProvider dial.RollupProvider + if strings.Contains(cfg.RollupRpc, ",") { + rollupUrls := strings.Split(cfg.RollupRpc, ",") + rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, ps.Log) + } else { + rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc) + } if err != nil { return fmt.Errorf("failed to build L2 endpoint provider: %w", err) } diff --git a/op-service/dial/active_l2_provider.go b/op-service/dial/active_l2_provider.go new file mode 100644 index 000000000000..5eb3fadbc050 --- /dev/null +++ b/op-service/dial/active_l2_provider.go @@ -0,0 +1,114 @@ +package dial + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout + +type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) + +// ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client +// It manages the lifecycle of the RollupClient and eth client for callers +// It does this by failing over down the list of rollupUrls if the current one is inactive or broken +type ActiveL2EndpointProvider struct { + ActiveL2RollupProvider + currentEthClient EthClientInterface + ethClientIndex int + ethDialer ethDialer + ethUrls []string +} + +// NewActiveL2EndpointProvider creates a new ActiveL2EndpointProvider +// the checkDuration is the duration between checks to see if the current rollup client is active +// provide a checkDuration of 0 to check every time +func NewActiveL2EndpointProvider(ctx context.Context, + ethUrls, rollupUrls []string, + checkDuration time.Duration, + networkTimeout time.Duration, + logger log.Logger, +) (*ActiveL2EndpointProvider, error) { + ethDialer := func(ctx context.Context, timeout time.Duration, + log log.Logger, url string, + ) (EthClientInterface, error) { + return DialEthClientWithTimeout(ctx, timeout, log, url) + } + rollupDialer := func(ctx context.Context, timeout time.Duration, + log log.Logger, url string, + ) (RollupClientInterface, error) { + return DialRollupClientWithTimeout(ctx, timeout, log, url) + } + return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer) +} + +func newActiveL2EndpointProvider( + ctx context.Context, + ethUrls, rollupUrls []string, + checkDuration time.Duration, + networkTimeout time.Duration, + logger log.Logger, + ethDialer ethDialer, + rollupDialer rollupDialer, +) (*ActiveL2EndpointProvider, error) { + if len(rollupUrls) == 0 { + return nil, errors.New("empty rollup urls list, expected at least one URL") + } + if len(ethUrls) != len(rollupUrls) { + return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", len(ethUrls), len(rollupUrls)) + } + + rollupProvider, err := newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer) + if err != nil { + return nil, err + } + p := &ActiveL2EndpointProvider{ + ActiveL2RollupProvider: *rollupProvider, + ethDialer: ethDialer, + ethUrls: ethUrls, + } + cctx, cancel := context.WithTimeout(ctx, networkTimeout) + defer cancel() + if _, err = p.EthClient(cctx); err != nil { + return nil, fmt.Errorf("setting provider eth client: %w", err) + } + return p, nil +} + +func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInterface, error) { + p.clientLock.Lock() + defer p.clientLock.Unlock() + err := p.ensureActiveEndpoint(ctx) + if err != nil { + return nil, err + } + if p.ethClientIndex != p.rollupIndex || p.currentEthClient == nil { + // we changed sequencers, dial a new EthClient + cctx, cancel := context.WithTimeout(ctx, p.networkTimeout) + defer cancel() + idx := p.rollupIndex + ep := p.ethUrls[idx] + log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep) + ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep) + if err != nil { + return nil, fmt.Errorf("dialing eth client: %w", err) + } + if p.currentEthClient != nil { + p.currentEthClient.Close() + } + p.ethClientIndex = idx + p.currentEthClient = ethClient + } + return p.currentEthClient, nil +} + +func (p *ActiveL2EndpointProvider) Close() { + if p.currentEthClient != nil { + p.currentEthClient.Close() + } + p.ActiveL2RollupProvider.Close() +} diff --git a/op-service/dial/active_l2_provider_test.go b/op-service/dial/active_l2_provider_test.go new file mode 100644 index 000000000000..5790c3dbd71c --- /dev/null +++ b/op-service/dial/active_l2_provider_test.go @@ -0,0 +1,794 @@ +package dial + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +// endpointProviderTest is a test harness for setting up endpoint provider tests. +type endpointProviderTest struct { + t *testing.T + rollupClients []*testutils.MockRollupClient + ethClients []*testutils.MockEthClient + rollupDialOutcomes map[int]bool // true for success, false for failure + ethDialOutcomes map[int]bool // true for success, false for failure +} + +// setupEndpointProviderTest sets up the basic structure of the endpoint provider tests. +func setupEndpointProviderTest(t *testing.T, numSequencers int) *endpointProviderTest { + ept := &endpointProviderTest{ + t: t, + rollupClients: make([]*testutils.MockRollupClient, numSequencers), + ethClients: make([]*testutils.MockEthClient, numSequencers), + rollupDialOutcomes: make(map[int]bool), + ethDialOutcomes: make(map[int]bool), + } + + for i := 0; i < numSequencers; i++ { + ept.rollupClients[i] = new(testutils.MockRollupClient) + ept.ethClients[i] = new(testutils.MockEthClient) + ept.rollupDialOutcomes[i] = true // by default, all dials succeed + ept.ethDialOutcomes[i] = true // by default, all dials succeed + } + + return ept +} + +// newActiveL2EndpointProvider constructs a new ActiveL2RollupProvider using the test harness setup. +func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Duration) (*ActiveL2RollupProvider, error) { + mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) { + for i, client := range et.rollupClients { + if url == fmt.Sprintf("rollup%d", i) { + if !et.rollupDialOutcomes[i] { + return nil, fmt.Errorf("simulated dial failure for rollup %d", i) + } + return client, nil + } + } + return nil, fmt.Errorf("unknown test url: %s", url) + } + + // make the "URLs" + rollupUrls := make([]string, len(et.rollupClients)) + for i := range et.rollupClients { + rollupUrl := fmt.Sprintf("rollup%d", i) + rollupUrls[i] = rollupUrl + } + + return newActiveL2RollupProvider( + context.Background(), + rollupUrls, + checkDuration, + 1*time.Minute, + testlog.Logger(et.t, log.LvlDebug), + mockRollupDialer, + ) +} + +// newActiveL2EndpointProvider constructs a new ActiveL2EndpointProvider using the test harness setup. +func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.Duration) (*ActiveL2EndpointProvider, error) { + mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) { + for i, client := range et.rollupClients { + if url == fmt.Sprintf("rollup%d", i) { + if !et.rollupDialOutcomes[i] { + return nil, fmt.Errorf("simulated dial failure for rollup %d", i) + } + return client, nil + } + } + return nil, fmt.Errorf("unknown test url: %s", url) + } + + mockEthDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) { + for i, client := range et.ethClients { + if url == fmt.Sprintf("eth%d", i) { + if !et.ethDialOutcomes[i] { + return nil, fmt.Errorf("simulated dial failure for eth %d", i) + } + return client, nil + } + } + return nil, fmt.Errorf("unknown test url: %s", url) + } + + // make the "URLs" + rollupUrls := make([]string, len(et.rollupClients)) + for i := range et.rollupClients { + rollupUrl := fmt.Sprintf("rollup%d", i) + rollupUrls[i] = rollupUrl + } + ethUrls := make([]string, len(et.ethClients)) + for i := range et.ethClients { + ethUrl := fmt.Sprintf("eth%d", i) + ethUrls[i] = ethUrl + } + + return newActiveL2EndpointProvider( + context.Background(), + ethUrls, + rollupUrls, + checkDuration, + 1*time.Minute, + testlog.Logger(et.t, log.LvlDebug), + mockEthDialer, + mockRollupDialer, + ) +} + +func (et *endpointProviderTest) assertAllExpectations(t *testing.T) { + for _, sequencer := range et.rollupClients { + sequencer.AssertExpectations(t) + } + for _, ethClient := range et.ethClients { + ethClient.AssertExpectations(t) + } +} + +func (et *endpointProviderTest) setRollupDialOutcome(index int, success bool) { + et.rollupDialOutcomes[index] = success +} + +// TestRollupProvider_FailoverOnInactiveSequencer verifies that the ActiveL2RollupProvider +// will switch to the next provider if the current one becomes inactive. +func TestRollupProvider_FailoverOnInactiveSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1] + + primarySequencer.ExpectSequencerActive(true, nil) // respond true once on creation + primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `RollupClient()` the first time + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + firstSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, firstSequencerUsed) + + primarySequencer.ExpectSequencerActive(false, nil) // become inactive after that + primarySequencer.MaybeClose() + secondarySequencer.ExpectSequencerActive(true, nil) + secondSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, secondarySequencer, secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_FailoverOnInactiveSequencer verifies that the ActiveL2EndpointProvider +// will switch to the next provider if the current one becomes inactive. +func TestEndpointProvider_FailoverOnInactiveSequencer(t *testing.T) { + // as TestActiveSequencerFailoverBehavior_RollupProviders, + // but ensure the added `EthClient()` method also triggers the failover. + ept := setupEndpointProviderTest(t, 2) + primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1] + primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit once on creation: embedded call of `RollupClient()` + primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit twice on creation: implicit call of `EthClient()` + primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `EthClient()` the first time + + activeProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + firstSequencerUsed, err := activeProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], firstSequencerUsed) + + primarySequencer.ExpectSequencerActive(false, nil) // become inactive after that + secondarySequencer.ExpectSequencerActive(true, nil) + primarySequencer.MaybeClose() + ept.ethClients[0].MaybeClose() // we close the ethclient when we switch over to the next sequencer + secondSequencerUsed, err := activeProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[1], secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_FailoverOnErroredSequencer verifies that the ActiveL2RollupProvider +// will switch to the next provider if the current one returns an error. +func TestRollupProvider_FailoverOnErroredSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1] + + primarySequencer.ExpectSequencerActive(true, nil) // respond true once on creation + primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `RollupClient()` the first time + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + firstSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, firstSequencerUsed) + + primarySequencer.ExpectSequencerActive(true, fmt.Errorf("a test error")) // error-out after that + primarySequencer.MaybeClose() + secondarySequencer.ExpectSequencerActive(true, nil) + secondSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, secondarySequencer, secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_FailoverOnErroredSequencer verifies that the ActiveL2EndpointProvider +// will switch to the next provider if the current one returns an error. +func TestEndpointProvider_FailoverOnErroredSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1] + primaryEthClient, secondaryEthClient := ept.ethClients[0], ept.ethClients[1] + + primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit once on creation: embedded call of `RollupClient()` + primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit twice on creation: implicit call of `EthClient()` + + activeProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `EthClient()` the first time + firstSequencerUsed, err := activeProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, primaryEthClient, firstSequencerUsed) + + primarySequencer.ExpectSequencerActive(true, fmt.Errorf("a test error")) // error out after that + primarySequencer.MaybeClose() + primaryEthClient.MaybeClose() + secondarySequencer.ExpectSequencerActive(true, nil) + + secondSequencerUsed, err := activeProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, secondaryEthClient, secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_NoExtraCheckOnActiveSequencer verifies that the ActiveL2RollupProvider +// does not change if the current sequencer is active. +func TestRollupProvider_NoExtraCheckOnActiveSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks Active on creation + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + require.Same(t, primarySequencer, rollupProvider.currentRollupClient) + + primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks again on RollupClient() + + firstSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, firstSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_NoExtraCheckOnActiveSequencer verifies that the ActiveL2EndpointProvider +// does not change if the current sequencer is active. +func TestEndpointProvider_NoExtraCheckOnActiveSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks Active twice on creation (once for internal RollupClient() call) + primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks Active twice on creation (once for internal EthClient() call) + + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], endpointProvider.currentEthClient) + + primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks again on EthClient() + + firstEthClientUsed, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], firstEthClientUsed) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_FailoverAndReturn verifies the ActiveL2RollupProvider's ability to +// failover and then return to the primary sequencer once it becomes active again. +func TestRollupProvider_FailoverAndReturn(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1] + + // Primary initially active + primarySequencer.ExpectSequencerActive(true, nil) + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + // Primary becomes inactive, secondary active + primarySequencer.ExpectSequencerActive(false, nil) + primarySequencer.MaybeClose() + secondarySequencer.ExpectSequencerActive(true, nil) + + // Fails over to secondary + secondSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, secondarySequencer, secondSequencerUsed) + + // Primary becomes active again, secondary becomes inactive + primarySequencer.ExpectSequencerActive(true, nil) + secondarySequencer.ExpectSequencerActive(false, nil) + secondarySequencer.MaybeClose() + + // Should return to primary + thirdSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, thirdSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_FailoverAndReturn verifies the ActiveL2EndpointProvider's ability to +// failover and then return to the primary sequencer once it becomes active again. +func TestEndpointProvider_FailoverAndReturn(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1] + + // Primary initially active + primarySequencer.ExpectSequencerActive(true, nil) + primarySequencer.ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + // Primary becomes inactive, secondary active + primarySequencer.ExpectSequencerActive(false, nil) + primarySequencer.MaybeClose() + ept.ethClients[0].MaybeClose() + secondarySequencer.ExpectSequencerActive(true, nil) + + // Fails over to secondary + secondEthClient, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[1], secondEthClient) + + // Primary becomes active again, secondary becomes inactive + primarySequencer.ExpectSequencerActive(true, nil) + secondarySequencer.ExpectSequencerActive(false, nil) + secondarySequencer.MaybeClose() + ept.ethClients[1].MaybeClose() + + // // Should return to primary + thirdSequencerUsed, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], thirdSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_InitialActiveSequencerSelection verifies that the ActiveL2RollupProvider +// selects the active sequencer correctly at the time of creation. +func TestRollupProvider_InitialActiveSequencerSelection(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + // Primary active at creation + primarySequencer.ExpectSequencerActive(true, nil) + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + // Check immediately after creation without additional Active check + require.Same(t, primarySequencer, rollupProvider.currentRollupClient) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_InitialActiveSequencerSelection verifies that the ActiveL2EndpointProvider +// selects the active sequencer correctly at the time of creation. +func TestEndpointProvider_InitialActiveSequencerSelection(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + // Primary active at creation + primarySequencer.ExpectSequencerActive(true, nil) + primarySequencer.ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice + + rollupProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + // Check immediately after creation without additional Active check + require.Same(t, primarySequencer, rollupProvider.currentRollupClient) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_SelectSecondSequencerIfFirstInactiveAtCreation verifies that if the first sequencer +// is inactive at the time of ActiveL2RollupProvider creation, the second active sequencer is chosen. +func TestRollupProvider_SelectSecondSequencerIfFirstInactiveAtCreation(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // First sequencer is inactive, second sequencer is active + ept.rollupClients[0].ExpectSequencerActive(false, nil) + ept.rollupClients[0].MaybeClose() + ept.rollupClients[1].ExpectSequencerActive(true, nil) + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + require.Same(t, ept.rollupClients[1], rollupProvider.currentRollupClient) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_SelectLastSequencerIfManyOfflineAtCreation verifies that if all but the last sequencer +// are offline at the time of ActiveL2RollupProvider creation, the last active sequencer is chosen. +func TestRollupProvider_SelectLastSequencerIfManyOfflineAtCreation(t *testing.T) { + ept := setupEndpointProviderTest(t, 5) + + // First four sequencers are dead, last sequencer is active + for i := 0; i < 4; i++ { + ept.setRollupDialOutcome(i, false) + } + ept.rollupClients[4].ExpectSequencerActive(true, nil) + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + require.Same(t, ept.rollupClients[4], rollupProvider.currentRollupClient) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_SelectSecondSequencerIfFirstOfflineAtCreation verifies that if the first sequencer +// is inactive at the time of ActiveL2EndpointProvider creation, the second active sequencer is chosen. +func TestEndpointProvider_SelectSecondSequencerIfFirstOfflineAtCreation(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // First sequencer is inactive, second sequencer is active + ept.rollupClients[0].ExpectSequencerActive(false, nil) + ept.rollupClients[0].MaybeClose() + ept.rollupClients[1].ExpectSequencerActive(true, nil) + ept.rollupClients[1].ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice + + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + require.Same(t, ept.ethClients[1], endpointProvider.currentEthClient) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_SelectLastSequencerIfManyInactiveAtCreation verifies that if all but the last sequencer +// are inactive at the time of ActiveL2EndpointProvider creation, the last active sequencer is chosen. +func TestEndpointProvider_SelectLastSequencerIfManyInactiveAtCreation(t *testing.T) { + ept := setupEndpointProviderTest(t, 5) + + // First four sequencers are dead, last sequencer is active + for i := 0; i < 4; i++ { + ept.setRollupDialOutcome(i, false) + } + ept.rollupClients[4].ExpectSequencerActive(true, nil) + ept.rollupClients[4].ExpectSequencerActive(true, nil) // Double check due to embedded call of `EthClient()` + + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + require.Same(t, ept.ethClients[4], endpointProvider.currentEthClient) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_ConstructorErrorOnFirstSequencerOffline verifies that the ActiveL2RollupProvider +// constructor handles the case where the first sequencer (index 0) is offline at startup. +func TestRollupProvider_ConstructorErrorOnFirstSequencerOffline(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // First sequencer is dead, second sequencer is active + ept.rollupClients[0].ExpectSequencerActive(false, fmt.Errorf("I am offline")) + ept.rollupClients[0].MaybeClose() + ept.rollupClients[1].ExpectSequencerActive(true, nil) + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + require.Same(t, ept.rollupClients[1], rollupProvider.currentRollupClient) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_ConstructorErrorOnFirstSequencerOffline verifies that the ActiveL2EndpointProvider +// constructor handles the case where the first sequencer (index 0) is offline at startup. +func TestEndpointProvider_ConstructorErrorOnFirstSequencerOffline(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // First sequencer is dead, second sequencer is active + ept.rollupClients[0].ExpectSequencerActive(false, fmt.Errorf("I am offline")) + ept.rollupClients[0].MaybeClose() + ept.rollupClients[1].ExpectSequencerActive(true, nil) + ept.rollupClients[1].ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice + + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + require.Same(t, ept.ethClients[1], endpointProvider.currentEthClient) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_FailOnAllInactiveSequencers verifies that the ActiveL2RollupProvider +// fails to be created when all sequencers are inactive. +func TestRollupProvider_FailOnAllInactiveSequencers(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // All sequencers are inactive + for _, sequencer := range ept.rollupClients { + sequencer.ExpectSequencerActive(false, nil) + sequencer.MaybeClose() + } + + _, err := ept.newActiveL2RollupProvider(0) + require.Error(t, err) // Expect an error as all sequencers are inactive + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_FailOnAllInactiveSequencers verifies that the ActiveL2EndpointProvider +// fails to be created when all sequencers are inactive. +func TestEndpointProvider_FailOnAllInactiveSequencers(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // All sequencers are inactive + for _, sequencer := range ept.rollupClients { + sequencer.ExpectSequencerActive(false, nil) + sequencer.MaybeClose() + } + + _, err := ept.newActiveL2EndpointProvider(0) + require.Error(t, err) // Expect an error as all sequencers are inactive + ept.assertAllExpectations(t) +} + +// TestRollupProvider_FailOnAllErroredSequencers verifies that the ActiveL2RollupProvider +// fails to create when all sequencers return an error. +func TestRollupProvider_FailOnAllErroredSequencers(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // All sequencers are inactive + for _, sequencer := range ept.rollupClients { + sequencer.ExpectSequencerActive(true, fmt.Errorf("a test error")) + sequencer.MaybeClose() + } + + _, err := ept.newActiveL2RollupProvider(0) + require.Error(t, err) // Expect an error as all sequencers are inactive + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_FailOnAllErroredSequencers verifies that the ActiveL2EndpointProvider +// fails to create when all sequencers return an error. +func TestEndpointProvider_FailOnAllErroredSequencers(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + + // All sequencers are inactive + for _, sequencer := range ept.rollupClients { + sequencer.ExpectSequencerActive(true, fmt.Errorf("a test error")) + sequencer.MaybeClose() + } + + _, err := ept.newActiveL2EndpointProvider(0) + require.Error(t, err) // Expect an error as all sequencers are inactive + ept.assertAllExpectations(t) +} + +// TestRollupProvider_LongCheckDuration verifies the behavior of ActiveL2RollupProvider with a long check duration. +func TestRollupProvider_LongCheckDuration(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + longCheckDuration := 1 * time.Hour + primarySequencer.ExpectSequencerActive(true, nil) // Active check on creation + + rollupProvider, err := ept.newActiveL2RollupProvider(longCheckDuration) + require.NoError(t, err) + + // Should return the same client without extra checks + firstSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, firstSequencerUsed) + + secondSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_LongCheckDuration verifies the behavior of ActiveL2EndpointProvider with a long check duration. +func TestEndpointProvider_LongCheckDuration(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + longCheckDuration := 1 * time.Hour + primarySequencer.ExpectSequencerActive(true, nil) // Active check on creation + + endpointProvider, err := ept.newActiveL2EndpointProvider(longCheckDuration) + require.NoError(t, err) + + // Should return the same client without extra checks + firstEthClient, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], firstEthClient) + + secondEthClient, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], secondEthClient) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_ErrorWhenAllSequencersInactive verifies that RollupClient() returns an error +// if all sequencers become inactive after the provider is successfully created. +func TestRollupProvider_ErrorWhenAllSequencersInactive(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + ept.rollupClients[0].ExpectSequencerActive(true, nil) // Main sequencer initially active + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + // All sequencers become inactive + for _, sequencer := range ept.rollupClients { + sequencer.ExpectSequencerActive(false, nil) + sequencer.MaybeClose() + } + + _, err = rollupProvider.RollupClient(context.Background()) + require.Error(t, err) // Expect an error as all sequencers are inactive + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_ErrorWhenAllSequencersInactive verifies that EthClient() returns an error +// if all sequencers become inactive after the provider is successfully created. +func TestEndpointProvider_ErrorWhenAllSequencersInactive(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + ept.rollupClients[0].ExpectSequencerActive(true, nil) // Main sequencer initially active + ept.rollupClients[0].ExpectSequencerActive(true, nil) // Main sequencer initially active (double check due to embedded call of `EthClient()`) + + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + // All sequencers become inactive + for _, sequencer := range ept.rollupClients { + sequencer.ExpectSequencerActive(false, nil) + sequencer.MaybeClose() + } + + _, err = endpointProvider.EthClient(context.Background()) + require.Error(t, err) // Expect an error as all sequencers are inactive + ept.assertAllExpectations(t) +} + +// TestRollupProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration verifies that the ActiveL2RollupProvider +// still returns the same sequencer across calls even if it becomes inactive, due to a long check duration. +func TestRollupProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + longCheckDuration := 1 * time.Hour + primarySequencer.ExpectSequencerActive(true, nil) // Active on creation + + rollupProvider, err := ept.newActiveL2RollupProvider(longCheckDuration) + require.NoError(t, err) + + // Primary sequencer becomes inactive, but the provider won't check immediately due to longCheckDuration + primarySequencer.ExpectSequencerActive(false, nil) + firstSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, firstSequencerUsed) + + active, err := primarySequencer.SequencerActive(context.Background()) + require.NoError(t, err) + require.False(t, active) + + secondSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, primarySequencer, secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration verifies that the ActiveL2EndpointProvider +// still returns the same sequencer across calls even if it becomes inactive, due to a long check duration. +func TestEndpointProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration(t *testing.T) { + ept := setupEndpointProviderTest(t, 2) + primarySequencer := ept.rollupClients[0] + + longCheckDuration := 1 * time.Hour + primarySequencer.ExpectSequencerActive(true, nil) // Active on creation + + endpointProvider, err := ept.newActiveL2EndpointProvider(longCheckDuration) + require.NoError(t, err) + + // Primary sequencer becomes inactive, but the provider won't check immediately due to longCheckDuration + primarySequencer.ExpectSequencerActive(false, nil) + firstEthClientUsed, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], firstEthClientUsed) + + active, err := primarySequencer.SequencerActive(context.Background()) + require.NoError(t, err) + require.False(t, active) + + secondEthClientUsed, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], secondEthClientUsed) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_HandlesManyIndexClientMismatch verifies that the ActiveL2RollupProvider avoids +// the case where the index of the current sequencer does not match the index of the current rollup client. +func TestRollupProvider_HandlesManyIndexClientMismatch(t *testing.T) { + ept := setupEndpointProviderTest(t, 3) + seq0, seq1, seq2 := ept.rollupClients[0], ept.rollupClients[1], ept.rollupClients[2] + + // "start happy": primarySequencer is active on creation + seq0.ExpectSequencerActive(true, nil) // active on creation + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + // primarySequencer goes down + seq0.ExpectSequencerActive(false, fmt.Errorf("I'm offline now")) + seq0.MaybeClose() + ept.setRollupDialOutcome(0, false) // primarySequencer fails to dial + // secondarySequencer is inactive, but online + seq1.ExpectSequencerActive(false, nil) + seq1.MaybeClose() + // tertiarySequencer can't even be dialed + ept.setRollupDialOutcome(2, false) + // In a prior buggy implementation, this scenario lead to an internal inconsistent state + // where the current client didn't match the index. On a subsequent try, this led to the + // active sequencer at 0 to be skipped entirely, while the sequencer at index 1 + // was checked twice. + rollupClient, err := rollupProvider.RollupClient(context.Background()) + require.Error(t, err) + require.Nil(t, rollupClient) + // internal state would now be inconsistent in a buggy impl. + + // now seq0 is dialable and active + ept.setRollupDialOutcome(0, true) + seq0.ExpectSequencerActive(true, nil) + seq0.MaybeClose() + // now seq1 and seq2 are dialable, but inactive + ept.setRollupDialOutcome(1, true) + seq1.ExpectSequencerActive(false, nil) + seq1.MaybeClose() + ept.setRollupDialOutcome(2, true) + seq2.ExpectSequencerActive(false, nil) + seq2.MaybeClose() + // this would trigger the prior bug: request the rollup client. + rollupClient, err = rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, seq0, rollupClient) + ept.assertAllExpectations(t) +} + +// TestRollupProvider_HandlesSingleSequencer verifies that the ActiveL2RollupProvider +// can handle being passed a single sequencer endpoint without issue. +func TestRollupProvider_HandlesSingleSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 1) + onlySequencer := ept.rollupClients[0] + onlySequencer.ExpectSequencerActive(true, nil) // respond true once on creation + + rollupProvider, err := ept.newActiveL2RollupProvider(0) + require.NoError(t, err) + + onlySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `RollupClient()` the first time + firstSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.NoError(t, err) + require.Same(t, onlySequencer, firstSequencerUsed) + + onlySequencer.ExpectSequencerActive(false, nil) // become inactive after that + onlySequencer.MaybeClose() + secondSequencerUsed, err := rollupProvider.RollupClient(context.Background()) + require.Error(t, err) + require.Nil(t, secondSequencerUsed) + ept.assertAllExpectations(t) +} + +// TestEndpointProvider_HandlesSingleSequencer verifies that the ActiveL2EndpointProvider +// can handle being passed a single sequencer endpoint without issue. +func TestEndpointProvider_HandlesSingleSequencer(t *testing.T) { + ept := setupEndpointProviderTest(t, 1) + onlySequencer := ept.rollupClients[0] + onlySequencer.ExpectSequencerActive(true, nil) // respond true once on creation + onlySequencer.ExpectSequencerActive(true, nil) // respond true again when the constructor calls `RollupClient()` + + endpointProvider, err := ept.newActiveL2EndpointProvider(0) + require.NoError(t, err) + + onlySequencer.ExpectSequencerActive(true, nil) // respond true a once more on fall-through check in `EthClient()` + firstEthClientUsed, err := endpointProvider.EthClient(context.Background()) + require.NoError(t, err) + require.Same(t, ept.ethClients[0], firstEthClientUsed) + + onlySequencer.ExpectSequencerActive(false, nil) // become inactive after that + onlySequencer.MaybeClose() + secondEthClientUsed, err := endpointProvider.EthClient(context.Background()) + require.Error(t, err) + require.Nil(t, secondEthClientUsed) + ept.assertAllExpectations(t) +} diff --git a/op-service/dial/active_rollup_provider.go b/op-service/dial/active_rollup_provider.go new file mode 100644 index 000000000000..76f30b3da99a --- /dev/null +++ b/op-service/dial/active_rollup_provider.go @@ -0,0 +1,169 @@ +package dial + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +type rollupDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) + +// ActiveL2EndpointProvider is an interface for providing a RollupClient +// It manages the lifecycle of the RollupClient for callers +// It does this by failing over down the list of rollupUrls if the current one is inactive or broken +type ActiveL2RollupProvider struct { + checkDuration time.Duration + networkTimeout time.Duration + log log.Logger + + activeTimeout time.Time + + rollupUrls []string + rollupDialer rollupDialer + currentRollupClient RollupClientInterface + rollupIndex int + clientLock *sync.Mutex +} + +// NewActiveL2RollupProvider creates a new ActiveL2RollupProvider +// the checkDuration is the duration between checks to see if the current rollup client is active +// provide a checkDuration of 0 to check every time +func NewActiveL2RollupProvider( + ctx context.Context, + rollupUrls []string, + checkDuration time.Duration, + networkTimeout time.Duration, + logger log.Logger, +) (*ActiveL2RollupProvider, error) { + rollupDialer := func(ctx context.Context, timeout time.Duration, + log log.Logger, url string, + ) (RollupClientInterface, error) { + return DialRollupClientWithTimeout(ctx, timeout, log, url) + } + return newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer) +} + +func newActiveL2RollupProvider( + ctx context.Context, + rollupUrls []string, + checkDuration time.Duration, + networkTimeout time.Duration, + logger log.Logger, + dialer rollupDialer, +) (*ActiveL2RollupProvider, error) { + if len(rollupUrls) == 0 { + return nil, errors.New("empty rollup urls list") + } + p := &ActiveL2RollupProvider{ + checkDuration: checkDuration, + networkTimeout: networkTimeout, + log: logger, + rollupUrls: rollupUrls, + rollupDialer: dialer, + clientLock: &sync.Mutex{}, + } + cctx, cancel := context.WithTimeout(ctx, networkTimeout) + defer cancel() + + if _, err := p.RollupClient(cctx); err != nil { + return nil, fmt.Errorf("setting provider rollup client: %w", err) + } + return p, nil +} + +func (p *ActiveL2RollupProvider) RollupClient(ctx context.Context) (RollupClientInterface, error) { + p.clientLock.Lock() + defer p.clientLock.Unlock() + err := p.ensureActiveEndpoint(ctx) + if err != nil { + return nil, err + } + return p.currentRollupClient, nil +} + +func (p *ActiveL2RollupProvider) ensureActiveEndpoint(ctx context.Context) error { + if !p.shouldCheck() { + return nil + } + if err := p.findActiveEndpoints(ctx); err != nil { + return err + } + p.activeTimeout = time.Now().Add(p.checkDuration) + return nil +} + +func (p *ActiveL2RollupProvider) shouldCheck() bool { + return time.Now().After(p.activeTimeout) +} + +func (p *ActiveL2RollupProvider) findActiveEndpoints(ctx context.Context) error { + startIdx := p.rollupIndex + var errs error + for offset := range p.rollupUrls { + idx := (startIdx + offset) % p.numEndpoints() + if offset != 0 || p.currentRollupClient == nil { + if err := p.dialSequencer(ctx, idx); err != nil { + errs = errors.Join(errs, err) + p.log.Warn("Error dialing next sequencer.", "err", err, "index", p.rollupIndex) + continue + } + } + + ep := p.rollupUrls[idx] + if active, err := p.checkCurrentSequencer(ctx); err != nil { + errs = errors.Join(errs, err) + p.log.Warn("Error querying active sequencer, trying next.", "err", err, "index", idx, "url", ep) + } else if active { + if offset == 0 { + p.log.Debug("Current sequencer active.", "index", idx, "url", ep) + } else { + p.log.Info("Found new active sequencer.", "index", idx, "url", ep) + } + return nil + } else { + p.log.Info("Sequencer inactive, trying next.", "index", idx, "url", ep) + } + } + return fmt.Errorf("failed to find an active sequencer, tried following urls: %v; errs: %w", p.rollupUrls, errs) +} + +func (p *ActiveL2RollupProvider) checkCurrentSequencer(ctx context.Context) (bool, error) { + cctx, cancel := context.WithTimeout(ctx, p.networkTimeout) + defer cancel() + return p.currentRollupClient.SequencerActive(cctx) +} + +func (p *ActiveL2RollupProvider) numEndpoints() int { + return len(p.rollupUrls) +} + +// dialSequencer dials the sequencer for the url at the given index. +// If successful, the currentRollupClient and rollupIndex are updated and the +// old rollup client is closed. +func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) error { + cctx, cancel := context.WithTimeout(ctx, p.networkTimeout) + defer cancel() + + ep := p.rollupUrls[idx] + p.log.Info("Dialing next sequencer.", "index", idx, "url", ep) + rollupClient, err := p.rollupDialer(cctx, p.networkTimeout, p.log, ep) + if err != nil { + return fmt.Errorf("dialing rollup client: %w", err) + } + if p.currentRollupClient != nil { + p.currentRollupClient.Close() + } + p.rollupIndex = idx + p.currentRollupClient = rollupClient + return nil +} + +func (p *ActiveL2RollupProvider) Close() { + if p.currentRollupClient != nil { + p.currentRollupClient.Close() + } +} diff --git a/op-service/dial/ethclient_interface.go b/op-service/dial/ethclient_interface.go new file mode 100644 index 000000000000..58a2070974ee --- /dev/null +++ b/op-service/dial/ethclient_interface.go @@ -0,0 +1,16 @@ +package dial + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" +) + +// EthClientInterface is an interface for providing an ethclient.Client +// It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers +type EthClientInterface interface { + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + + Close() +} diff --git a/op-service/dial/rollupclient_interface.go b/op-service/dial/rollupclient_interface.go new file mode 100644 index 000000000000..46e1afdc7264 --- /dev/null +++ b/op-service/dial/rollupclient_interface.go @@ -0,0 +1,20 @@ +package dial + +import ( + "context" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/common" +) + +// RollupClientInterface is an interface for providing a RollupClient +// It does not describe all of the functions a RollupClient has, only the ones used by the L2 Providers and their callers +type RollupClientInterface interface { + OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) + SyncStatus(ctx context.Context) (*eth.SyncStatus, error) + RollupConfig(ctx context.Context) (*rollup.Config, error) + StartSequencer(ctx context.Context, unsafeHead common.Hash) error + SequencerActive(ctx context.Context) (bool, error) + Close() +} diff --git a/op-service/dial/static_l2_provider.go b/op-service/dial/static_l2_provider.go index 1f557c03ea60..8d51f062b9af 100644 --- a/op-service/dial/static_l2_provider.go +++ b/op-service/dial/static_l2_provider.go @@ -13,7 +13,7 @@ import ( type L2EndpointProvider interface { RollupProvider // EthClient(ctx) returns the underlying ethclient pointing to the L2 execution node - EthClient(ctx context.Context) (*ethclient.Client, error) + EthClient(ctx context.Context) (EthClientInterface, error) } // StaticL2EndpointProvider is a L2EndpointProvider that always returns the same static RollupClient and eth client @@ -38,7 +38,7 @@ func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, ethClientU }, nil } -func (p *StaticL2EndpointProvider) EthClient(context.Context) (*ethclient.Client, error) { +func (p *StaticL2EndpointProvider) EthClient(context.Context) (EthClientInterface, error) { return p.ethClient, nil } diff --git a/op-service/dial/static_rollup_provider.go b/op-service/dial/static_rollup_provider.go index 44852626d547..a40bb667290a 100644 --- a/op-service/dial/static_rollup_provider.go +++ b/op-service/dial/static_rollup_provider.go @@ -11,7 +11,7 @@ import ( // It manages the lifecycle of the RollupClient for callers type RollupProvider interface { // RollupClient(ctx) returns the underlying sources.RollupClient pointing to the L2 rollup consensus node - RollupClient(ctx context.Context) (*sources.RollupClient, error) + RollupClient(ctx context.Context) (RollupClientInterface, error) // Close() closes the underlying client or clients Close() } @@ -39,7 +39,7 @@ func NewStaticL2RollupProviderFromExistingRollup(rollupCl *sources.RollupClient) }, nil } -func (p *StaticL2RollupProvider) RollupClient(context.Context) (*sources.RollupClient, error) { +func (p *StaticL2RollupProvider) RollupClient(context.Context) (RollupClientInterface, error) { return p.rollupClient, nil } diff --git a/op-service/testutils/mock_eth_client.go b/op-service/testutils/mock_eth_client.go index aa96da3ddeca..1ca1c953a497 100644 --- a/op-service/testutils/mock_eth_client.go +++ b/op-service/testutils/mock_eth_client.go @@ -2,6 +2,7 @@ package testutils import ( "context" + "math/big" "github.com/stretchr/testify/mock" @@ -128,3 +129,24 @@ func (m *MockEthClient) ReadStorageAt(ctx context.Context, address common.Addres func (m *MockEthClient) ExpectReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash, result common.Hash, err error) { m.Mock.On("ReadStorageAt", address, storageSlot, blockHash).Once().Return(result, &err) } + +func (m *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + out := m.Mock.Called(number) + return out.Get(0).(*types.Block), out.Error(1) +} + +func (m *MockEthClient) ExpectBlockByNumber(number *big.Int, block *types.Block, err error) { + m.Mock.On("BlockByNumber", number).Once().Return(block, err) +} + +func (m *MockEthClient) ExpectClose() { + m.Mock.On("Close").Once() +} + +func (m *MockEthClient) MaybeClose() { + m.Mock.On("Close").Maybe() +} + +func (m *MockEthClient) Close() { + m.Mock.Called() +} diff --git a/op-service/testutils/mock_rollup_client.go b/op-service/testutils/mock_rollup_client.go new file mode 100644 index 000000000000..a7a42e878481 --- /dev/null +++ b/op-service/testutils/mock_rollup_client.go @@ -0,0 +1,71 @@ +package testutils + +import ( + "context" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" +) + +type MockRollupClient struct { + mock.Mock +} + +func (m *MockRollupClient) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) { + out := m.Mock.Called(blockNum) + return out.Get(0).(*eth.OutputResponse), out.Error(1) +} + +func (m *MockRollupClient) ExpectOutputAtBlock(blockNum uint64, response *eth.OutputResponse, err error) { + m.Mock.On("OutputAtBlock", blockNum).Once().Return(response, err) +} + +func (m *MockRollupClient) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { + out := m.Mock.Called() + return out.Get(0).(*eth.SyncStatus), out.Error(1) +} + +func (m *MockRollupClient) ExpectSyncStatus(status *eth.SyncStatus, err error) { + m.Mock.On("SyncStatus").Once().Return(status, err) +} + +func (m *MockRollupClient) RollupConfig(ctx context.Context) (*rollup.Config, error) { + out := m.Mock.Called() + return out.Get(0).(*rollup.Config), out.Error(1) +} + +func (m *MockRollupClient) ExpectRollupConfig(config *rollup.Config, err error) { + m.Mock.On("RollupConfig").Once().Return(config, err) +} + +func (m *MockRollupClient) StartSequencer(ctx context.Context, unsafeHead common.Hash) error { + out := m.Mock.Called(unsafeHead) + return out.Error(0) +} + +func (m *MockRollupClient) ExpectStartSequencer(unsafeHead common.Hash, err error) { + m.Mock.On("StartSequencer", unsafeHead).Once().Return(err) +} + +func (m *MockRollupClient) SequencerActive(ctx context.Context) (bool, error) { + out := m.Mock.Called() + return out.Bool(0), out.Error(1) +} + +func (m *MockRollupClient) ExpectSequencerActive(active bool, err error) { + m.Mock.On("SequencerActive").Once().Return(active, err) +} + +func (m *MockRollupClient) ExpectClose() { + m.Mock.On("Close").Once() +} + +func (m *MockRollupClient) MaybeClose() { + m.Mock.On("Close").Maybe() +} + +func (m *MockRollupClient) Close() { + m.Mock.Called() +}