From 4a487b8920daa9dc4b496d691d5f283f9bb659b1 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Wed, 22 May 2024 20:10:49 -0700 Subject: [PATCH] feat: op-conductor strongly consistent reads (#10619) * op-conductor: add more logs for raft debugging * Add barrier * LatestUnsafePayload reads in a strongly consistent fashion * Atomic swap OpConductor.healthy * Fix conductor/service_test * Add test for when LatestUnsafePayload returns an error * Update some method comments --------- Co-authored-by: Francis Li --- op-conductor/conductor/service.go | 41 ++++++++++++-------- op-conductor/conductor/service_test.go | 46 ++++++++++++++++++++--- op-conductor/consensus/iface.go | 6 +-- op-conductor/consensus/mocks/Consensus.go | 20 +++++++--- op-conductor/consensus/raft.go | 21 +++++++---- op-conductor/consensus/raft_fsm.go | 8 ++++ op-conductor/consensus/raft_fsm_test.go | 27 +++++++++---- op-conductor/consensus/raft_test.go | 3 +- op-conductor/rpc/api.go | 2 +- 9 files changed, 128 insertions(+), 46 deletions(-) diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index 9686d376b86d..d7e9073b04b0 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -30,10 +30,10 @@ import ( ) var ( - ErrResumeTimeout = errors.New("timeout to resume conductor") - ErrPauseTimeout = errors.New("timeout to pause conductor") - ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch") - ErrUnableToRetrieveUnsafeHeadFromConsensus = errors.New("unable to retrieve unsafe head from consensus") + ErrResumeTimeout = errors.New("timeout to resume conductor") + ErrPauseTimeout = errors.New("timeout to pause conductor") + ErrUnsafeHeadMismatch = errors.New("unsafe head mismatch") + ErrNoUnsafeHead = errors.New("no unsafe head") ) // New creates a new OpConductor instance. @@ -441,7 +441,7 @@ func (oc *OpConductor) TransferLeaderToServer(_ context.Context, id string, addr return oc.cons.TransferLeaderTo(id, addr) } -// CommitUnsafePayload commits a unsafe payload (latest head) to the cluster FSM. +// CommitUnsafePayload commits an unsafe payload (latest head) to the cluster FSM ensuring strong consistency by leveraging Raft consensus mechanisms. func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.ExecutionPayloadEnvelope) error { return oc.cons.CommitUnsafePayload(payload) } @@ -456,8 +456,8 @@ func (oc *OpConductor) ClusterMembership(_ context.Context) ([]*consensus.Server return oc.cons.ClusterMembership() } -// LatestUnsafePayload returns the latest unsafe payload envelope from FSM. -func (oc *OpConductor) LatestUnsafePayload(_ context.Context) *eth.ExecutionPayloadEnvelope { +// LatestUnsafePayload returns the latest unsafe payload envelope from FSM in a strongly consistent fashion. +func (oc *OpConductor) LatestUnsafePayload(_ context.Context) (*eth.ExecutionPayloadEnvelope, error) { return oc.cons.LatestUnsafePayload() } @@ -522,12 +522,11 @@ func (oc *OpConductor) handleHealthUpdate(hcerr error) { oc.queueAction() } - if healthy != oc.healthy.Load() { + if oc.healthy.Swap(healthy) != healthy { // queue an action if health status changed. oc.queueAction() } - oc.healthy.Store(healthy) oc.hcerr = hcerr } @@ -668,8 +667,15 @@ func (oc *OpConductor) startSequencer() error { unsafeInCons, unsafeInNode, err := oc.compareUnsafeHead(ctx) // if there's a mismatch, try to post the unsafe head to op-node if err != nil { - if errors.Is(err, ErrUnsafeHeadMismarch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 { + if errors.Is(err, ErrUnsafeHeadMismatch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 { // tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay) + oc.log.Debug( + "posting unsafe head to op-node", + "consensus_num", uint64(unsafeInCons.ExecutionPayload.BlockNumber), + "consensus_hash", unsafeInCons.ExecutionPayload.BlockHash.Hex(), + "node_num", unsafeInNode.NumberU64(), + "node_hash", unsafeInNode.Hash().Hex(), + ) if innerErr := oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); innerErr != nil { oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", innerErr) } @@ -692,9 +698,12 @@ func (oc *OpConductor) startSequencer() error { } func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPayloadEnvelope, eth.BlockInfo, error) { - unsafeInCons := oc.cons.LatestUnsafePayload() + unsafeInCons, err := oc.cons.LatestUnsafePayload() + if err != nil { + return nil, nil, errors.Wrap(err, "unable to retrieve unsafe head from consensus") + } if unsafeInCons == nil { - return nil, nil, ErrUnableToRetrieveUnsafeHeadFromConsensus + return nil, nil, ErrNoUnsafeHead } unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(ctx) @@ -702,17 +711,17 @@ func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPay return unsafeInCons, nil, errors.Wrap(err, "failed to get latest unsafe block from EL during compareUnsafeHead phase") } - oc.log.Debug("comparing unsafe head", "consensus", unsafeInCons.ExecutionPayload.BlockNumber, "node", unsafeInNode.NumberU64()) + oc.log.Debug("comparing unsafe head", "consensus", uint64(unsafeInCons.ExecutionPayload.BlockNumber), "node", unsafeInNode.NumberU64()) if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() { oc.log.Warn( "latest unsafe block in consensus is not the same as the one in op-node", "consensus_hash", unsafeInCons.ExecutionPayload.BlockHash, - "consensus_block_num", unsafeInCons.ExecutionPayload.BlockNumber, + "consensus_num", uint64(unsafeInCons.ExecutionPayload.BlockNumber), "node_hash", unsafeInNode.Hash(), - "node_block_num", unsafeInNode.NumberU64(), + "node_num", unsafeInNode.NumberU64(), ) - return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismarch + return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismatch } return unsafeInCons, unsafeInNode, nil diff --git a/op-conductor/conductor/service_test.go b/op-conductor/conductor/service_test.go index 9f61ed90ba18..7feb3826646b 100644 --- a/op-conductor/conductor/service_test.go +++ b/op-conductor/conductor/service_test.go @@ -298,7 +298,7 @@ func (s *OpConductorTestSuite) TestScenario1() { InfoHash: [32]byte{1, 2, 3}, } s.cons.EXPECT().TransferLeader().Return(nil) - s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1) + s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1) // become leader @@ -317,6 +317,42 @@ func (s *OpConductorTestSuite) TestScenario1() { s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1) } +// In this test, we have a follower that is not healthy and not sequencing, it becomes leader through election. +// But since it fails to compare the unsafe head to the value stored in consensus, we expect it to transfer leadership to another node. +// [follower, not healthy, not sequencing] -- become leader --> [leader, not healthy, not sequencing] -- transfer leadership --> [follower, not healthy, not sequencing] +func (s *OpConductorTestSuite) TestScenario1Err() { + s.enableSynchronization() + + // set initial state + s.conductor.leader.Store(false) + s.conductor.healthy.Store(false) + s.conductor.seqActive.Store(false) + s.conductor.hcerr = health.ErrSequencerNotHealthy + s.conductor.prevState = &state{ + leader: false, + healthy: false, + active: false, + } + + s.cons.EXPECT().LatestUnsafePayload().Return(nil, errors.New("fake connection error")).Times(1) + s.cons.EXPECT().TransferLeader().Return(nil) + + // become leader + s.updateLeaderStatusAndExecuteAction(true) + + // expect to transfer leadership, go back to [follower, not healthy, not sequencing] + s.False(s.conductor.leader.Load()) + s.False(s.conductor.healthy.Load()) + s.False(s.conductor.seqActive.Load()) + s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr) + s.Equal(&state{ + leader: true, + healthy: false, + active: false, + }, s.conductor.prevState) + s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1) +} + // In this test, we have a follower that is not healthy and not sequencing. it becomes healthy and we expect it to stay as follower and not start sequencing. // [follower, not healthy, not sequencing] -- become healthy --> [follower, healthy, not sequencing] func (s *OpConductorTestSuite) TestScenario2() { @@ -353,7 +389,7 @@ func (s *OpConductorTestSuite) TestScenario3() { InfoNum: 1, InfoHash: [32]byte{1, 2, 3}, } - s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1) + s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1) s.ctrl.EXPECT().StartSequencer(mock.Anything, mock.Anything).Return(nil).Times(1) @@ -392,7 +428,7 @@ func (s *OpConductorTestSuite) TestScenario4() { InfoNum: 1, InfoHash: [32]byte{2, 3, 4}, } - s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1) + s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1) s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mock.Anything).Return(nil).Times(1) @@ -410,7 +446,7 @@ func (s *OpConductorTestSuite) TestScenario4() { // unsafe caught up, we try to start sequencer at specified block and succeeds mockBlockInfo.InfoNum = 2 mockBlockInfo.InfoHash = [32]byte{1, 2, 3} - s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1) + s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1) s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1) @@ -664,7 +700,7 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() { InfoNum: 1, InfoHash: [32]byte{1, 2, 3}, } - s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(2) + s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(2) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(2) s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1) diff --git a/op-conductor/consensus/iface.go b/op-conductor/consensus/iface.go index 7134eef5a273..15096c2e8ae1 100644 --- a/op-conductor/consensus/iface.go +++ b/op-conductor/consensus/iface.go @@ -59,10 +59,10 @@ type Consensus interface { // ClusterMembership returns the current cluster membership configuration. ClusterMembership() ([]*ServerInfo, error) - // CommitPayload commits latest unsafe payload to the FSM. + // CommitPayload commits latest unsafe payload to the FSM in a strongly consistent fashion. CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error - // LatestUnsafeBlock returns the latest unsafe payload from FSM. - LatestUnsafePayload() *eth.ExecutionPayloadEnvelope + // LatestUnsafeBlock returns the latest unsafe payload from FSM in a strongly consistent fashion. + LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) // Shutdown shuts down the consensus protocol client. Shutdown() error diff --git a/op-conductor/consensus/mocks/Consensus.go b/op-conductor/consensus/mocks/Consensus.go index e85ad0410013..02d65869c06a 100644 --- a/op-conductor/consensus/mocks/Consensus.go +++ b/op-conductor/consensus/mocks/Consensus.go @@ -266,7 +266,7 @@ func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Cons } // LatestUnsafePayload provides a mock function with given fields: -func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope { +func (_m *Consensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) { ret := _m.Called() if len(ret) == 0 { @@ -274,6 +274,10 @@ func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope { } var r0 *eth.ExecutionPayloadEnvelope + var r1 error + if rf, ok := ret.Get(0).(func() (*eth.ExecutionPayloadEnvelope, error)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() *eth.ExecutionPayloadEnvelope); ok { r0 = rf() } else { @@ -282,7 +286,13 @@ func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope { } } - return r0 + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // Consensus_LatestUnsafePayload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestUnsafePayload' @@ -302,12 +312,12 @@ func (_c *Consensus_LatestUnsafePayload_Call) Run(run func()) *Consensus_LatestU return _c } -func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayloadEnvelope) *Consensus_LatestUnsafePayload_Call { - _c.Call.Return(_a0) +func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayloadEnvelope, _a1 error) *Consensus_LatestUnsafePayload_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() *eth.ExecutionPayloadEnvelope) *Consensus_LatestUnsafePayload_Call { +func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() (*eth.ExecutionPayloadEnvelope, error)) *Consensus_LatestUnsafePayload_Call { _c.Call.Return(run) return _c } diff --git a/op-conductor/consensus/raft.go b/op-conductor/consensus/raft.go index 37d9ac69d70a..2c3f79fe2946 100644 --- a/op-conductor/consensus/raft.go +++ b/op-conductor/consensus/raft.go @@ -75,7 +75,7 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b return nil, errors.Wrap(err, "failed to create raft tcp transport") } - fsm := &unsafeHeadTracker{} + fsm := NewUnsafeHeadTracker(log) r, err := raft.NewRaft(rc, fsm, logStore, stableStore, snapshotStore, transport) if err != nil { @@ -140,8 +140,7 @@ func (rc *RaftConsensus) DemoteVoter(id string) error { // Leader implements Consensus, it returns true if it is the leader of the cluster. func (rc *RaftConsensus) Leader() bool { - _, id := rc.r.LeaderWithID() - return id == rc.serverID + return rc.r.State() == raft.Leader } // LeaderWithID implements Consensus, it returns the leader's server ID and address. @@ -205,8 +204,10 @@ func (rc *RaftConsensus) Shutdown() error { return nil } -// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM. +// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM in a strongly consistent fashion. func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error { + rc.log.Debug("committing unsafe payload", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex()) + var buf bytes.Buffer if _, err := payload.MarshalSSZ(&buf); err != nil { return errors.Wrap(err, "failed to marshal payload envelope") @@ -216,14 +217,18 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo if err := f.Error(); err != nil { return errors.Wrap(err, "failed to apply payload envelope") } + rc.log.Debug("unsafe payload committed", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex()) return nil } -// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM. -func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope { - payload := rc.unsafeTracker.UnsafeHead() - return payload +// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM in a strongly consistent fashion. +func (rc *RaftConsensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) { + if err := rc.r.Barrier(defaultTimeout).Error(); err != nil { + return nil, errors.Wrap(err, "failed to apply barrier") + } + + return rc.unsafeTracker.UnsafeHead(), nil } // ClusterMembership implements Consensus, it returns the current cluster membership configuration. diff --git a/op-conductor/consensus/raft_fsm.go b/op-conductor/consensus/raft_fsm.go index 7333fb8f2a44..31631d2c91e6 100644 --- a/op-conductor/consensus/raft_fsm.go +++ b/op-conductor/consensus/raft_fsm.go @@ -16,10 +16,17 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil) // unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer. type unsafeHeadTracker struct { + log log.Logger mtx sync.RWMutex unsafeHead *eth.ExecutionPayloadEnvelope } +func NewUnsafeHeadTracker(log log.Logger) *unsafeHeadTracker { + return &unsafeHeadTracker{ + log: log, + } +} + // Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM. func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} { if l.Data == nil || len(l.Data) == 0 { @@ -33,6 +40,7 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} { t.mtx.Lock() defer t.mtx.Unlock() + t.log.Debug("applying new unsafe head", "number", uint64(data.ExecutionPayload.BlockNumber), "hash", data.ExecutionPayload.BlockHash.Hex()) if t.unsafeHead == nil || t.unsafeHead.ExecutionPayload.BlockNumber < data.ExecutionPayload.BlockNumber { t.unsafeHead = data } diff --git a/op-conductor/consensus/raft_fsm_test.go b/op-conductor/consensus/raft_fsm_test.go index 4f390d17e555..3b154ceded28 100644 --- a/op-conductor/consensus/raft_fsm_test.go +++ b/op-conductor/consensus/raft_fsm_test.go @@ -8,22 +8,24 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/hashicorp/raft" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/testlog" ) type Bytes32 [32]byte -func createPayloadEnvelope() *eth.ExecutionPayloadEnvelope { +func createPayloadEnvelope(blockNum uint64) *eth.ExecutionPayloadEnvelope { hash := common.HexToHash("0x12345") one := hexutil.Uint64(1) return ð.ExecutionPayloadEnvelope{ ParentBeaconBlockRoot: &hash, ExecutionPayload: ð.ExecutionPayload{ - BlockNumber: eth.Uint64Quantity(222), + BlockNumber: eth.Uint64Quantity(blockNum), BlockHash: common.HexToHash("0x888"), Withdrawals: &types.Withdrawals{{Index: 1, Validator: 2, Address: common.HexToAddress("0x123"), Amount: 3}}, ExcessBlobGas: &one, @@ -32,11 +34,12 @@ func createPayloadEnvelope() *eth.ExecutionPayloadEnvelope { } func TestUnsafeHeadTracker(t *testing.T) { tracker := &unsafeHeadTracker{ - unsafeHead: createPayloadEnvelope(), + log: testlog.Logger(t, log.LevelDebug), + unsafeHead: createPayloadEnvelope(222), } t.Run("Apply", func(t *testing.T) { - data := createPayloadEnvelope() + data := createPayloadEnvelope(333) var buf bytes.Buffer _, err := data.MarshalSSZ(&buf) @@ -44,17 +47,27 @@ func TestUnsafeHeadTracker(t *testing.T) { l := raft.Log{Data: buf.Bytes()} require.Nil(t, tracker.Apply(&l)) - require.Equal(t, hexutil.Uint64(222), tracker.unsafeHead.ExecutionPayload.BlockNumber) + require.Equal(t, hexutil.Uint64(333), tracker.unsafeHead.ExecutionPayload.BlockNumber) + }) + + t.Run("Snapshot", func(t *testing.T) { + snapshot, err := tracker.Snapshot() + require.NoError(t, err) + + sink := new(raft.DiscardSnapshotSink) + + err = snapshot.Persist(sink) + require.NoError(t, err) }) t.Run("Restore", func(t *testing.T) { - data := createPayloadEnvelope() + data := createPayloadEnvelope(333) mrc, err := NewMockReadCloser(data) require.NoError(t, err) err = tracker.Restore(mrc) require.NoError(t, err) - require.Equal(t, hexutil.Uint64(222), tracker.unsafeHead.ExecutionPayload.BlockNumber) + require.Equal(t, hexutil.Uint64(333), tracker.unsafeHead.ExecutionPayload.BlockNumber) }) } diff --git a/op-conductor/consensus/raft_test.go b/op-conductor/consensus/raft_test.go index 775e0a77847f..332bbd203e7e 100644 --- a/op-conductor/consensus/raft_test.go +++ b/op-conductor/consensus/raft_test.go @@ -70,6 +70,7 @@ func TestCommitAndRead(t *testing.T) { // ExecutionPayloadEnvelope is expected to succeed when unmarshalling a blockV3 require.NoError(t, err) - unsafeHead := cons.LatestUnsafePayload() + unsafeHead, err := cons.LatestUnsafePayload() + require.NoError(t, err) require.Equal(t, payload, unsafeHead) } diff --git a/op-conductor/rpc/api.go b/op-conductor/rpc/api.go index 9fbee544d489..b5d954f5ceb0 100644 --- a/op-conductor/rpc/api.go +++ b/op-conductor/rpc/api.go @@ -47,7 +47,7 @@ type API interface { // APIs called by op-node // Active returns true if op-conductor is active (not paused or stopped). Active(ctx context.Context) (bool, error) - // CommitUnsafePayload commits a unsafe payload (latest head) to the consensus layer. + // CommitUnsafePayload commits an unsafe payload (latest head) to the consensus layer. CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error }