-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
hotfix(el/cl): allow multi clients to start if at least one node is up #2000
Changes from 13 commits
4a7ed06
1784907
6139748
54f982f
7a4248d
1025641
af5c455
3b9efcf
c6592ad
b21c110
902c99a
13edb6a
e1cc2e3
2bdc1ac
8db421f
0dd1027
8452bf4
d5fed41
75e2753
999608b
66d5051
61874f7
70594e8
3cc71cd
7dd5aa0
ab91668
99405c9
768665d
6af3463
e9f5c7b
f093561
7b3a1f5
16dc397
95af801
411f482
cbd61f5
f8188c6
5b72d6a
77ca3cf
0406ad3
23d393d
85779ec
c890973
4751783
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,9 @@ | |
clients []Client | ||
multiClient MultiClient | ||
|
||
genesisMu sync.Mutex | ||
genesis *apiv1.Genesis | ||
|
||
syncDistanceTolerance phase0.Slot | ||
nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) | ||
|
||
|
@@ -154,82 +157,41 @@ | |
longTimeout = DefaultLongTimeout | ||
} | ||
|
||
client := &GoClient{ | ||
log: logger, | ||
ctx: opt.Context, | ||
network: opt.Network, | ||
syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance), | ||
operatorDataStore: operatorDataStore, | ||
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{}, | ||
attestationDataCache: ttlcache.New( | ||
// we only fetch attestation data during the slot of the relevant duty (and never later), | ||
// hence caching it for 2 slots is sufficient | ||
ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()), | ||
), | ||
commonTimeout: commonTimeout, | ||
longTimeout: longTimeout, | ||
} | ||
|
||
beaconAddrList := strings.Split(opt.BeaconNodeAddr, ";") // TODO: Decide what symbol to use as a separator. Bootnodes are currently separated by ";". Deployment bot currently uses ",". | ||
if len(beaconAddrList) == 0 { | ||
return nil, fmt.Errorf("no beacon node address provided") | ||
} | ||
|
||
var consensusClients []Client | ||
var consensusClientsAsServices []eth2client.Service | ||
for _, beaconAddr := range beaconAddrList { | ||
httpClient, err := setupHTTPClient(opt.Context, logger, beaconAddr, commonTimeout) | ||
if err != nil { | ||
if err := client.addSingleClient(opt.Context, beaconAddr); err != nil { | ||
return nil, err | ||
} | ||
|
||
nodeVersionResp, err := httpClient.NodeVersion(opt.Context, &api.NodeVersionOpts{}) | ||
if err != nil { | ||
logger.Error(clResponseErrMsg, | ||
zap.String("api", "NodeVersion"), | ||
zap.Error(err), | ||
) | ||
return nil, fmt.Errorf("failed to get node version: %w", err) | ||
} | ||
if nodeVersionResp == nil { | ||
logger.Error(clNilResponseErrMsg, | ||
zap.String("api", "NodeVersion"), | ||
) | ||
return nil, fmt.Errorf("node version response is nil") | ||
} | ||
|
||
logger.Info("consensus client connected", | ||
fields.Name(httpClient.Name()), | ||
fields.Address(httpClient.Address()), | ||
zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))), | ||
zap.String("version", nodeVersionResp.Data), | ||
) | ||
|
||
consensusClients = append(consensusClients, httpClient) | ||
consensusClientsAsServices = append(consensusClientsAsServices, httpClient) | ||
} | ||
|
||
err := assertSameGenesis(opt.Context, consensusClients...) | ||
if err != nil { | ||
return nil, fmt.Errorf("assert same spec: %w", err) | ||
} | ||
|
||
multiClient, err := eth2clientmulti.New( | ||
opt.Context, | ||
eth2clientmulti.WithClients(consensusClientsAsServices), | ||
eth2clientmulti.WithLogLevel(zerolog.DebugLevel), | ||
eth2clientmulti.WithTimeout(commonTimeout), | ||
) | ||
err := client.initMultiClient(opt.Context) | ||
if err != nil { | ||
logger.Error("Consensus multi client initialization failed", | ||
zap.String("address", opt.BeaconNodeAddr), | ||
zap.Error(err), | ||
) | ||
|
||
return nil, fmt.Errorf("create multi client: %w", err) | ||
} | ||
consensusClient := multiClient.(*eth2clientmulti.Service) | ||
|
||
client := &GoClient{ | ||
log: logger, | ||
ctx: opt.Context, | ||
network: opt.Network, | ||
clients: consensusClients, | ||
multiClient: consensusClient, | ||
syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance), | ||
operatorDataStore: operatorDataStore, | ||
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{}, | ||
attestationDataCache: ttlcache.New( | ||
// we only fetch attestation data during the slot of the relevant duty (and never later), | ||
// hence caching it for 2 slots is sufficient | ||
ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()), | ||
), | ||
commonTimeout: commonTimeout, | ||
longTimeout: longTimeout, | ||
return nil, err | ||
} | ||
|
||
client.nodeSyncingFn = client.nodeSyncing | ||
|
@@ -241,55 +203,143 @@ | |
return client, nil | ||
} | ||
|
||
func setupHTTPClient(ctx context.Context, logger *zap.Logger, addr string, commonTimeout time.Duration) (*eth2clienthttp.Service, error) { | ||
func (gc *GoClient) initMultiClient(ctx context.Context) error { | ||
var services []eth2client.Service | ||
for _, client := range gc.clients { | ||
services = append(services, client) | ||
} | ||
|
||
multiClient, err := eth2clientmulti.New( | ||
ctx, | ||
eth2clientmulti.WithClients(services), | ||
eth2clientmulti.WithLogLevel(zerolog.DebugLevel), | ||
eth2clientmulti.WithTimeout(gc.commonTimeout), | ||
) | ||
if err != nil { | ||
return fmt.Errorf("create multi client: %w", err) | ||
} | ||
|
||
gc.multiClient = multiClient.(*eth2clientmulti.Service) | ||
return nil | ||
} | ||
|
||
func (gc *GoClient) addSingleClient(ctx context.Context, addr string) error { | ||
httpClient, err := eth2clienthttp.New( | ||
ctx, | ||
// WithAddress supplies the address of the beacon node, in host:port format. | ||
eth2clienthttp.WithAddress(addr), | ||
// LogLevel supplies the level of logging to carry out. | ||
eth2clienthttp.WithLogLevel(zerolog.DebugLevel), | ||
eth2clienthttp.WithTimeout(commonTimeout), | ||
eth2clienthttp.WithTimeout(gc.commonTimeout), | ||
eth2clienthttp.WithReducedMemoryUsage(true), | ||
eth2clienthttp.WithAllowDelayedStart(true), | ||
eth2clienthttp.WithHooks(gc.singleClientHooks()), | ||
) | ||
if err != nil { | ||
logger.Error("Consensus http client initialization failed", | ||
gc.log.Error("Consensus http client initialization failed", | ||
zap.String("address", addr), | ||
zap.Error(err), | ||
) | ||
|
||
return nil, fmt.Errorf("create http client: %w", err) | ||
return fmt.Errorf("create http client: %w", err) | ||
} | ||
|
||
return httpClient.(*eth2clienthttp.Service), nil | ||
gc.clients = append(gc.clients, httpClient.(*eth2clienthttp.Service)) | ||
|
||
return nil | ||
} | ||
|
||
// assertSameGenesis should receive a non-empty list | ||
func assertSameGenesis(ctx context.Context, services ...Client) error { | ||
firstGenesis, err := services[0].Genesis(ctx, &api.GenesisOpts{}) | ||
if err != nil { | ||
return fmt.Errorf("get first genesis: %w", err) | ||
func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { | ||
return ð2clienthttp.Hooks{ | ||
OnActive: func(ctx context.Context, s *eth2clienthttp.Service) { | ||
// If err is nil, nodeVersionResp is never nil. | ||
iurii-ssv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nodeVersionResp, err := s.NodeVersion(ctx, &api.NodeVersionOpts{}) | ||
if err != nil { | ||
gc.log.Error(clResponseErrMsg, | ||
zap.String("address", s.Address()), | ||
zap.String("api", "NodeVersion"), | ||
zap.Error(err), | ||
) | ||
return | ||
} | ||
|
||
gc.log.Info("consensus client connected", | ||
fields.Name(s.Name()), | ||
fields.Address(s.Address()), | ||
zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))), | ||
zap.String("version", nodeVersionResp.Data), | ||
) | ||
|
||
genesis, err := s.Genesis(ctx, &api.GenesisOpts{}) | ||
if err != nil { | ||
gc.log.Error(clResponseErrMsg, | ||
zap.String("address", s.Address()), | ||
zap.String("api", "Genesis"), | ||
zap.Error(err), | ||
) | ||
return | ||
} | ||
|
||
if err := gc.assertSameGenesis(genesis.Data); err != nil { | ||
gc.genesisMu.Lock() | ||
defer gc.genesisMu.Unlock() | ||
|
||
gc.log.Fatal("client returned unexpected genesis", | ||
zap.String("address", s.Address()), | ||
zap.Any("client_genesis", genesis.Data), | ||
zap.Any("expected_genesis", gc.genesis), | ||
zap.Error(err), | ||
) | ||
return // Tests may override Fatal's behavior | ||
iurii-ssv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this might crash the program after a while that it's running if the second client won't be actuveat start but become active later. We should instead just stop using this client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clients should never have the same geneses because it would mean they use different ETH networks, so IMO we shouldn't continue in this case. If it were just a log, the multi-client would just silently log it and switch to the next client. |
||
}, | ||
OnInactive: func(ctx context.Context, s *eth2clienthttp.Service) { | ||
gc.log.Warn("consensus client disconnected", | ||
fields.Name(s.Name()), | ||
fields.Address(s.Address()), | ||
) | ||
}, | ||
OnSynced: func(ctx context.Context, s *eth2clienthttp.Service) { | ||
gc.log.Info("consensus client synced", | ||
fields.Name(s.Name()), | ||
fields.Address(s.Address()), | ||
) | ||
}, | ||
OnDesynced: func(ctx context.Context, s *eth2clienthttp.Service) { | ||
gc.log.Warn("consensus client desynced", | ||
fields.Name(s.Name()), | ||
fields.Address(s.Address()), | ||
) | ||
}, | ||
} | ||
} | ||
|
||
for _, service := range services[1:] { | ||
srvGenesis, err := service.Genesis(ctx, &api.GenesisOpts{}) | ||
if err != nil { | ||
return fmt.Errorf("get service genesis: %w", err) | ||
} | ||
// assertSameGenesis checks if genesis is same. | ||
// Clients may have different values returned by Spec call, | ||
// so we decided that it's best to assert that GenesisForkVersion is the same. | ||
// To add more assertions, we check the whole apiv1.Genesis (GenesisTime and GenesisValidatorsRoot) | ||
// as they should be same too. | ||
func (gc *GoClient) assertSameGenesis(genesis *apiv1.Genesis) error { | ||
gc.genesisMu.Lock() | ||
defer gc.genesisMu.Unlock() | ||
|
||
if err := sameGenesis(firstGenesis.Data, srvGenesis.Data); err != nil { | ||
return fmt.Errorf("different genesis: %w", err) | ||
} | ||
if genesis == nil { | ||
return fmt.Errorf("genesis is nil") | ||
} | ||
|
||
if gc.genesis == nil { | ||
gc.genesis = genesis | ||
return nil | ||
} | ||
|
||
if err := sameGenesis(gc.genesis, genesis); err != nil { | ||
return fmt.Errorf("different genesis: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func sameGenesis(a, b *apiv1.Genesis) error { | ||
if a == nil || b == nil { // Input parameters should never be nil, so the check may fail if both are nil | ||
return fmt.Errorf("genesis is nil") | ||
} | ||
|
||
if !a.GenesisTime.Equal(b.GenesisTime) { | ||
return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really specific to this PR but we often (and here as well) do 2 "duplicate" things
maybe would be simpler to just return
error
(with formatted withfmt.Errorf
to provide the necessary context) in places like this, bringing it up so we can get on the same page (whether we want to keep an eye on things like this or not)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main difference and issue is that when logging with zap and adding fields its easy to search them by the label value. we'll need to squeeze everything to the
fmt.Errorf
, it'll still be searchable, but not by label.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we could think more about our logging approach, but I think this package currently uses logging in a way similar to other packages. If we decide to improve logging (e.g. use custom error types with fields), we need to do it project-wide