-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
Copy pathservice.go
353 lines (300 loc) · 10.2 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package conductor
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/ethereum-optimism/optimism/op-conductor/client"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-conductor/health"
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
)
var (
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch")
)
// New creates a new OpConductor instance.
func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) {
return NewOpConductor(ctx, cfg, log, version, nil, nil, nil)
}
// NewOpConductor creates a new OpConductor instance.
func NewOpConductor(
ctx context.Context,
cfg *Config,
log log.Logger,
version string,
ctrl client.SequencerControl,
cons consensus.Consensus,
hmon health.HealthMonitor,
) (*OpConductor, error) {
if err := cfg.Check(); err != nil {
return nil, errors.Wrap(err, "invalid config")
}
oc := &OpConductor{
log: log,
version: version,
cfg: cfg,
pauseCh: make(chan struct{}),
pauseDoneCh: make(chan struct{}),
resumeCh: make(chan struct{}),
resumeDoneCh: make(chan struct{}),
actionCh: make(chan struct{}, 1),
ctrl: ctrl,
cons: cons,
hmon: hmon,
}
oc.actionFn = oc.action
// explicitly set all atomic.Bool values
oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus.
oc.healthy.Store(true) // default to healthy unless reported otherwise by health monitor.
oc.seqActive.Store(false) // explicitly set to false by default, the real value will be reported after sequencer control initialization.
oc.paused.Store(cfg.Paused)
oc.stopped.Store(false)
err := oc.init(ctx)
if err != nil {
log.Error("failed to initialize OpConductor", "err", err)
// ensure we always close the resources if we fail to initialize the conductor.
if closeErr := oc.Stop(ctx); closeErr != nil {
return nil, multierror.Append(err, closeErr)
}
}
return oc, nil
}
func (c *OpConductor) init(ctx context.Context) error {
c.log.Info("initializing OpConductor", "version", c.version)
if err := c.initSequencerControl(ctx); err != nil {
return errors.Wrap(err, "failed to initialize sequencer control")
}
if err := c.initConsensus(ctx); err != nil {
return errors.Wrap(err, "failed to initialize consensus")
}
if err := c.initHealthMonitor(ctx); err != nil {
return errors.Wrap(err, "failed to initialize health monitor")
}
return nil
}
func (c *OpConductor) initSequencerControl(ctx context.Context) error {
if c.ctrl != nil {
return nil
}
ec, err := opclient.NewRPC(ctx, c.log, c.cfg.ExecutionRPC)
if err != nil {
return errors.Wrap(err, "failed to create geth rpc client")
}
execCfg := sources.L2ClientDefaultConfig(&c.cfg.RollupCfg, true)
// TODO: Add metrics tracer here. tracked by https://github.com/ethereum-optimism/protocol-quest/issues/45
exec, err := sources.NewEthClient(ec, c.log, nil, &execCfg.EthClientConfig)
if err != nil {
return errors.Wrap(err, "failed to create geth client")
}
nc, err := opclient.NewRPC(ctx, c.log, c.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create node rpc client")
}
node := sources.NewRollupClient(nc)
c.ctrl = client.NewSequencerControl(exec, node)
active, err := c.ctrl.SequencerActive(ctx)
if err != nil {
return errors.Wrap(err, "failed to get sequencer active status")
}
c.seqActive.Store(active)
return nil
}
func (c *OpConductor) initConsensus(ctx context.Context) error {
if c.cons != nil {
return nil
}
serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort)
cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg)
if err != nil {
return errors.Wrap(err, "failed to create raft consensus")
}
c.cons = cons
return nil
}
func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
if c.hmon != nil {
return nil
}
nc, err := opclient.NewRPC(ctx, c.log, c.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create node rpc client")
}
node := sources.NewRollupClient(nc)
pc, err := rpc.DialContext(ctx, c.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create p2p rpc client")
}
p2p := opp2p.NewClient(pc)
c.hmon = health.NewSequencerHealthMonitor(
c.log,
c.cfg.HealthCheck.Interval,
c.cfg.HealthCheck.SafeInterval,
c.cfg.HealthCheck.MinPeerCount,
&c.cfg.RollupCfg,
node,
p2p,
)
return nil
}
// OpConductor represents a full conductor instance and its resources, it does:
// 1. performs health checks on sequencer
// 2. participate in consensus protocol for leader election
// 3. and control sequencer state based on leader, sequencer health and sequencer active status.
//
// OpConductor has three states:
// 1. running: it is running normally, which executes control loop and participates in leader election.
// 2. paused: control loop (sequencer start/stop) is paused, but it still participates in leader election, and receives health updates.
// 3. stopped: it is stopped, which means it is not participating in leader election and control loop. OpConductor cannot be started again from stopped mode.
type OpConductor struct {
log log.Logger
version string
cfg *Config
ctrl client.SequencerControl
cons consensus.Consensus
hmon health.HealthMonitor
leader atomic.Bool
healthy atomic.Bool
seqActive atomic.Bool
actionFn func() // actionFn defines the action to be executed to bring the sequencer to the desired state.
wg sync.WaitGroup
pauseCh chan struct{}
pauseDoneCh chan struct{}
resumeCh chan struct{}
resumeDoneCh chan struct{}
actionCh chan struct{}
paused atomic.Bool
stopped atomic.Bool
shutdownCtx context.Context
shutdownCancel context.CancelFunc
}
var _ cliapp.Lifecycle = (*OpConductor)(nil)
// Start implements cliapp.Lifecycle.
func (oc *OpConductor) Start(ctx context.Context) error {
oc.log.Info("starting OpConductor")
if err := oc.hmon.Start(); err != nil {
return errors.Wrap(err, "failed to start health monitor")
}
oc.shutdownCtx, oc.shutdownCancel = context.WithCancel(ctx)
oc.wg.Add(1)
go oc.loop()
oc.log.Info("OpConductor started")
return nil
}
// Stop implements cliapp.Lifecycle.
func (oc *OpConductor) Stop(ctx context.Context) error {
oc.log.Info("stopping OpConductor")
var result *multierror.Error
// close control loop
oc.shutdownCancel()
oc.wg.Wait()
// stop health check
if err := oc.hmon.Stop(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to stop health monitor"))
}
if err := oc.cons.Shutdown(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to shutdown consensus"))
}
if result.ErrorOrNil() != nil {
oc.log.Error("failed to stop OpConductor", "err", result.ErrorOrNil())
return result.ErrorOrNil()
}
oc.stopped.Store(true)
oc.log.Info("OpConductor stopped")
return nil
}
// Stopped implements cliapp.Lifecycle.
func (oc *OpConductor) Stopped() bool {
return oc.stopped.Load()
}
// Pause pauses the control loop of OpConductor, but still allows it to participate in leader election.
func (oc *OpConductor) Pause(ctx context.Context) error {
select {
case oc.pauseCh <- struct{}{}:
<-oc.pauseDoneCh
return nil
case <-ctx.Done():
return ErrPauseTimeout
}
}
// Resume resumes the control loop of OpConductor.
func (oc *OpConductor) Resume(ctx context.Context) error {
select {
case oc.resumeCh <- struct{}{}:
<-oc.resumeDoneCh
return nil
case <-ctx.Done():
return ErrResumeTimeout
}
}
// Paused returns true if OpConductor is paused.
func (oc *OpConductor) Paused() bool {
return oc.paused.Load()
}
func (oc *OpConductor) loop() {
defer oc.wg.Done()
healthUpdate := oc.hmon.Subscribe()
leaderUpdate := oc.cons.LeaderCh()
for {
select {
// We process status update (health, leadership) first regardless of the paused state.
// This way we could properly bring the sequencer to the desired state when resumed.
case healthy := <-healthUpdate:
oc.handleHealthUpdate(healthy)
case leader := <-leaderUpdate:
oc.handleLeaderUpdate(leader)
case <-oc.pauseCh:
oc.paused.Store(true)
oc.pauseDoneCh <- struct{}{}
case <-oc.resumeCh:
oc.paused.Store(false)
oc.resumeDoneCh <- struct{}{}
// queue an action to make sure sequencer is in the desired state after resume.
oc.queueAction()
case <-oc.shutdownCtx.Done():
return
// Handle control action last, so that when executing the action, we have the latest status and bring the sequencer to the desired state.
case <-oc.actionCh:
oc.actionFn()
}
}
}
func (oc *OpConductor) queueAction() {
select {
case oc.actionCh <- struct{}{}:
default:
// do nothing if there's an action queued already, this is fine because whenever an action is executed,
// it is guaranteed to have the latest status and bring the sequencer to the desired state.
}
}
// handleLeaderUpdate handles leadership update from consensus.
func (oc *OpConductor) handleLeaderUpdate(leader bool) {
oc.log.Info("Leadership status changed", "server", oc.cons.ServerID(), "leader", leader)
oc.leader.Store(leader)
oc.queueAction()
}
// handleHealthUpdate handles health update from health monitor.
func (oc *OpConductor) handleHealthUpdate(healthy bool) {
if !healthy {
oc.log.Error("Sequencer is unhealthy", "server", oc.cons.ServerID())
}
if healthy != oc.healthy.Load() {
oc.healthy.Store(healthy)
oc.queueAction()
}
}
// action tries to bring the sequencer to the desired state, a retry will be queued if any action failed.
func (oc *OpConductor) action() {
if oc.Paused() {
return
}
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/47) implement
}