Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix(el/cl): allow multi clients to start if at least one node is up #2000

Merged
merged 44 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4a7ed06
do not crash if one client fails version check
y0sher Jan 22, 2025
1784907
fix help note on multiple addresses
y0sher Jan 22, 2025
6139748
don't compare genensis values
y0sher Jan 22, 2025
54f982f
remove old assertSameGenesis code
y0sher Jan 22, 2025
7a4248d
beacon/goclient: set up connection hooks, assert genesis
nkryuchkov Jan 22, 2025
1025641
beacon/goclient: remove outdated comment
nkryuchkov Jan 23, 2025
af5c455
eth/executionclient: allow starting with unhealthy client
nkryuchkov Jan 23, 2025
3b9efcf
eth/executionclient: simplify mutex usage
nkryuchkov Jan 23, 2025
c6592ad
eth/executionclient: fix tests for assertSameChainID
nkryuchkov Jan 23, 2025
b21c110
eth/executionclient: improve comment for assertSameGenesis
nkryuchkov Jan 23, 2025
902c99a
handle nil genesis and chain ID responses
nkryuchkov Jan 23, 2025
13edb6a
Merge branch 'stage' into fix/multiclient-oneclient
nkryuchkov Jan 24, 2025
e1cc2e3
refactor(multi_client): replace connectedCount atomic int with bool. …
y0sher Jan 26, 2025
2bdc1ac
clarify comment
nkryuchkov Jan 27, 2025
8db421f
fix double mutex lock
nkryuchkov Jan 27, 2025
0dd1027
fix potential nil pointer dereference
nkryuchkov Jan 27, 2025
8452bf4
check only genesis fork version instead of whole genesis
nkryuchkov Jan 27, 2025
d5fed41
create getClient helper method
nkryuchkov Jan 27, 2025
75e2753
improve logs
nkryuchkov Jan 27, 2025
999608b
fix issue with log fields
nkryuchkov Jan 27, 2025
66d5051
atomic chain ID
nkryuchkov Jan 27, 2025
61874f7
fix comment for client mutexes
nkryuchkov Jan 27, 2025
70594e8
no nil assignment in Close
nkryuchkov Jan 27, 2025
3cc71cd
add panic hook
nkryuchkov Jan 27, 2025
7dd5aa0
attempt to fix panic
nkryuchkov Jan 27, 2025
ab91668
fix logging
nkryuchkov Jan 27, 2025
99405c9
iterate clients forever in StreamLogs
nkryuchkov Jan 27, 2025
768665d
set follow distance to 1
nkryuchkov Jan 27, 2025
6af3463
Revert "set follow distance to 1"
nkryuchkov Jan 27, 2025
e9f5c7b
delete healthy channel
nkryuchkov Jan 27, 2025
f093561
remove obsolete tests
nkryuchkov Jan 28, 2025
7b3a1f5
add successful call log
nkryuchkov Jan 28, 2025
16dc397
fix potential nil ptr dereference
nkryuchkov Jan 28, 2025
95af801
go-eth2-client: use fork with sync distance tolerance
nkryuchkov Jan 28, 2025
411f482
add a comment about using github.com/nkryuchkov/go-eth2-client
nkryuchkov Jan 28, 2025
cbd61f5
code review comments
nkryuchkov Jan 28, 2025
f8188c6
named consensus client logger
nkryuchkov Jan 29, 2025
5b72d6a
remove trace logs
nkryuchkov Jan 29, 2025
77ca3cf
fix a typo
nkryuchkov Jan 29, 2025
0406ad3
add a comment with execution client shutdown scenarios
nkryuchkov Jan 30, 2025
23d393d
improve the comment for the call method
nkryuchkov Jan 30, 2025
85779ec
Merge branch 'stage' into fix/multiclient-oneclient
nkryuchkov Jan 30, 2025
c890973
log client address on submissions
nkryuchkov Jan 30, 2025
4751783
improve logs
nkryuchkov Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 119 additions & 95 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"math"
"strings"
"sync"
"sync/atomic"
"time"

eth2client "github.com/attestantio/go-eth2-client"
Expand Down Expand Up @@ -114,6 +115,8 @@
clients []Client
multiClient MultiClient

genesisVersion atomic.Pointer[phase0.Version]

syncDistanceTolerance phase0.Slot
nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error)

Expand Down Expand Up @@ -154,82 +157,41 @@
longTimeout = DefaultLongTimeout
}

client := &GoClient{
log: logger,
ctx: opt.Context,
network: opt.Network,
syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance),
operatorDataStore: operatorDataStore,
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{},
attestationDataCache: ttlcache.New(
// we only fetch attestation data during the slot of the relevant duty (and never later),
// hence caching it for 2 slots is sufficient
ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()),
),
commonTimeout: commonTimeout,
longTimeout: longTimeout,
}

beaconAddrList := strings.Split(opt.BeaconNodeAddr, ";") // TODO: Decide what symbol to use as a separator. Bootnodes are currently separated by ";". Deployment bot currently uses ",".
if len(beaconAddrList) == 0 {
return nil, fmt.Errorf("no beacon node address provided")
}

var consensusClients []Client
var consensusClientsAsServices []eth2client.Service
for _, beaconAddr := range beaconAddrList {
httpClient, err := setupHTTPClient(opt.Context, logger, beaconAddr, commonTimeout)
if err != nil {
if err := client.addSingleClient(opt.Context, beaconAddr); err != nil {
return nil, err
}

nodeVersionResp, err := httpClient.NodeVersion(opt.Context, &api.NodeVersionOpts{})
if err != nil {
logger.Error(clResponseErrMsg,
zap.String("api", "NodeVersion"),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get node version: %w", err)
}
if nodeVersionResp == nil {
logger.Error(clNilResponseErrMsg,
zap.String("api", "NodeVersion"),
)
return nil, fmt.Errorf("node version response is nil")
}

logger.Info("consensus client connected",
fields.Name(httpClient.Name()),
fields.Address(httpClient.Address()),
zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))),
zap.String("version", nodeVersionResp.Data),
)

consensusClients = append(consensusClients, httpClient)
consensusClientsAsServices = append(consensusClientsAsServices, httpClient)
}

err := assertSameGenesis(opt.Context, consensusClients...)
if err != nil {
return nil, fmt.Errorf("assert same spec: %w", err)
}

multiClient, err := eth2clientmulti.New(
opt.Context,
eth2clientmulti.WithClients(consensusClientsAsServices),
eth2clientmulti.WithLogLevel(zerolog.DebugLevel),
eth2clientmulti.WithTimeout(commonTimeout),
)
err := client.initMultiClient(opt.Context)
if err != nil {
logger.Error("Consensus multi client initialization failed",
zap.String("address", opt.BeaconNodeAddr),
zap.Error(err),
)

return nil, fmt.Errorf("create multi client: %w", err)
}
consensusClient := multiClient.(*eth2clientmulti.Service)

client := &GoClient{
log: logger,
ctx: opt.Context,
network: opt.Network,
clients: consensusClients,
multiClient: consensusClient,
syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance),
operatorDataStore: operatorDataStore,
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{},
attestationDataCache: ttlcache.New(
// we only fetch attestation data during the slot of the relevant duty (and never later),
// hence caching it for 2 slots is sufficient
ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()),
),
commonTimeout: commonTimeout,
longTimeout: longTimeout,
return nil, err
}

client.nodeSyncingFn = client.nodeSyncing
Expand All @@ -241,68 +203,130 @@
return client, nil
}

func setupHTTPClient(ctx context.Context, logger *zap.Logger, addr string, commonTimeout time.Duration) (*eth2clienthttp.Service, error) {
func (gc *GoClient) initMultiClient(ctx context.Context) error {
var services []eth2client.Service
for _, client := range gc.clients {
services = append(services, client)
}

multiClient, err := eth2clientmulti.New(
ctx,
eth2clientmulti.WithClients(services),
eth2clientmulti.WithLogLevel(zerolog.DebugLevel),
eth2clientmulti.WithTimeout(gc.commonTimeout),
)
if err != nil {
return fmt.Errorf("create multi client: %w", err)
}

gc.multiClient = multiClient.(*eth2clientmulti.Service)
return nil
}

func (gc *GoClient) addSingleClient(ctx context.Context, addr string) error {
httpClient, err := eth2clienthttp.New(
ctx,
// WithAddress supplies the address of the beacon node, in host:port format.
eth2clienthttp.WithAddress(addr),
// LogLevel supplies the level of logging to carry out.
eth2clienthttp.WithLogLevel(zerolog.DebugLevel),
eth2clienthttp.WithTimeout(commonTimeout),
eth2clienthttp.WithTimeout(gc.commonTimeout),
eth2clienthttp.WithReducedMemoryUsage(true),
eth2clienthttp.WithAllowDelayedStart(true),
eth2clienthttp.WithHooks(gc.singleClientHooks()),
)
if err != nil {
logger.Error("Consensus http client initialization failed",
gc.log.Error("Consensus http client initialization failed",

Check warning on line 239 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L239

Added line #L239 was not covered by tests
zap.String("address", addr),
zap.Error(err),
)

return nil, fmt.Errorf("create http client: %w", err)
return fmt.Errorf("create http client: %w", err)

Check warning on line 244 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L244

Added line #L244 was not covered by tests
Comment on lines -256 to +245
Copy link
Contributor

@iurii-ssv iurii-ssv Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really specific to this PR but we often (and here as well) do 2 "duplicate" things

  • log error
  • return that same error (that's always gonna be logged eventually by the caller resulting in roughly duplicate log-line)

maybe would be simpler to just return error (with formatted with fmt.Errorf to provide the necessary context) in places like this, bringing it up so we can get on the same page (whether we want to keep an eye on things like this or not)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference and issue is that when logging with zap and adding fields its easy to search them by the label value. we'll need to squeeze everything to the fmt.Errorf, it'll still be searchable, but not by label.

Copy link
Contributor

@nkryuchkov nkryuchkov Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we could think more about our logging approach, but I think this package currently uses logging in a way similar to other packages. If we decide to improve logging (e.g. use custom error types with fields), we need to do it project-wide

}

return httpClient.(*eth2clienthttp.Service), nil
}

// assertSameGenesis should receive a non-empty list
func assertSameGenesis(ctx context.Context, services ...Client) error {
firstGenesis, err := services[0].Genesis(ctx, &api.GenesisOpts{})
if err != nil {
return fmt.Errorf("get first genesis: %w", err)
}

for _, service := range services[1:] {
srvGenesis, err := service.Genesis(ctx, &api.GenesisOpts{})
if err != nil {
return fmt.Errorf("get service genesis: %w", err)
}

if err := sameGenesis(firstGenesis.Data, srvGenesis.Data); err != nil {
return fmt.Errorf("different genesis: %w", err)
}
}
gc.clients = append(gc.clients, httpClient.(*eth2clienthttp.Service))

return nil
}

func sameGenesis(a, b *apiv1.Genesis) error {
if a == nil || b == nil { // Input parameters should never be nil, so the check may fail if both are nil
return fmt.Errorf("genesis is nil")
}
func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks {
return &eth2clienthttp.Hooks{
OnActive: func(ctx context.Context, s *eth2clienthttp.Service) {
// If err is nil, nodeVersionResp is never nil.
iurii-ssv marked this conversation as resolved.
Show resolved Hide resolved
nodeVersionResp, err := s.NodeVersion(ctx, &api.NodeVersionOpts{})
if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("address", s.Address()),
zap.String("api", "NodeVersion"),
zap.Error(err),
)
return
}

Check warning on line 264 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L258-L264

Added lines #L258 - L264 were not covered by tests

gc.log.Info("consensus client connected",
fields.Name(s.Name()),
fields.Address(s.Address()),
zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))),
zap.String("version", nodeVersionResp.Data),
)

if !a.GenesisTime.Equal(b.GenesisTime) {
return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime)
genesis, err := s.Genesis(ctx, &api.GenesisOpts{})
if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("address", s.Address()),
zap.String("api", "Genesis"),
zap.Error(err),
)
return
}

Check warning on line 281 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L275-L281

Added lines #L275 - L281 were not covered by tests

if expected, err := gc.assertSameGenesisVersion(genesis.Data.GenesisForkVersion); err != nil {
gc.log.Fatal("client returned unexpected genesis fork version, make sure all clients use the same Ethereum network",
zap.String("address", s.Address()),
zap.Any("client_genesis", genesis.Data.GenesisForkVersion),
zap.Any("expected_genesis", expected),
zap.Error(err),
)
return // Tests may override Fatal's behavior
}

Check warning on line 291 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L284-L291

Added lines #L284 - L291 were not covered by tests
},
OnInactive: func(ctx context.Context, s *eth2clienthttp.Service) {
gc.log.Warn("consensus client disconnected",
fields.Name(s.Name()),
fields.Address(s.Address()),
)
},

Check warning on line 298 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L293-L298

Added lines #L293 - L298 were not covered by tests
OnSynced: func(ctx context.Context, s *eth2clienthttp.Service) {
gc.log.Info("consensus client synced",
fields.Name(s.Name()),
fields.Address(s.Address()),
)
},
OnDesynced: func(ctx context.Context, s *eth2clienthttp.Service) {
gc.log.Warn("consensus client desynced",
fields.Name(s.Name()),
fields.Address(s.Address()),
)
},

Check warning on line 310 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L305-L310

Added lines #L305 - L310 were not covered by tests
}
}

if a.GenesisValidatorsRoot != b.GenesisValidatorsRoot {
return fmt.Errorf("genesis validators root mismatch, got %v and %v", a.GenesisValidatorsRoot, b.GenesisValidatorsRoot)
// assertSameGenesis checks if genesis is same.
// Clients may have different values returned by Spec call,
// so we decided that it's best to assert that GenesisForkVersion is the same.
// To add more assertions, we check the whole apiv1.Genesis (GenesisTime and GenesisValidatorsRoot)
// as they should be same too.
func (gc *GoClient) assertSameGenesisVersion(genesisVersion phase0.Version) (phase0.Version, error) {
if gc.genesisVersion.CompareAndSwap(nil, &genesisVersion) {
return genesisVersion, nil
}

if a.GenesisForkVersion != b.GenesisForkVersion {
return fmt.Errorf("genesis fork version mismatch, got %v and %v", a.GenesisForkVersion, b.GenesisForkVersion)
expected := *gc.genesisVersion.Load()
if expected != genesisVersion {
return expected, fmt.Errorf("genesis fork version mismatch, expected %v, got %v", expected, genesisVersion)

Check warning on line 326 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L324-L326

Added lines #L324 - L326 were not covered by tests
}

return nil
return expected, nil

Check warning on line 329 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L329

Added line #L329 was not covered by tests
}

func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) {
Expand Down
2 changes: 1 addition & 1 deletion beacon/goclient/goclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestTimeouts(t *testing.T) {
return nil
})
_, err := mockClient(ctx, undialableServer.URL, commonTimeout, longTimeout)
require.ErrorContains(t, err, "context deadline exceeded")
require.ErrorContains(t, err, "client is not active")
}

// Too slow to respond to the Validators request.
Expand Down
2 changes: 1 addition & 1 deletion eth/executionclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// ExecutionOptions contains config configurations related to Ethereum execution client.
type ExecutionOptions struct {
Addr string `yaml:"ETH1Addr" env:"ETH_1_ADDR" env-required:"true" env-description:"Execution client WebSocket address. Supports multiple comma-separated addresses"`
Addr string `yaml:"ETH1Addr" env:"ETH_1_ADDR" env-required:"true" env-description:"Execution client WebSocket address. Supports multiple semicolon separated addresses. ex: ws://localhost:8546;ws://localhost:8547"`
ConnectionTimeout time.Duration `yaml:"ETH1ConnectionTimeout" env:"ETH_1_CONNECTION_TIMEOUT" env-default:"10s" env-description:"Execution client connection timeout"`
SyncDistanceTolerance uint64 `yaml:"ETH1SyncDistanceTolerance" env:"ETH_1_SYNC_DISTANCE_TOLERANCE" env-default:"5" env-description:"The number of out-of-sync blocks we can tolerate"`
}
27 changes: 1 addition & 26 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -49,7 +48,6 @@ var _ Provider = &ExecutionClient{}

var (
ErrClosed = fmt.Errorf("closed")
ErrUnhealthy = fmt.Errorf("unhealthy")
ErrNotConnected = fmt.Errorf("not connected")
ErrBadInput = fmt.Errorf("bad input")
ErrNothingToSync = errors.New("nothing to sync")
Expand Down Expand Up @@ -81,8 +79,6 @@ type ExecutionClient struct {
client *ethclient.Client
closed chan struct{}
lastSyncedTime atomic.Int64
healthyChMu sync.Mutex
healthyCh chan struct{}
}

// New creates a new instance of ExecutionClient.
Expand All @@ -98,7 +94,6 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o
healthInvalidationInterval: DefaultHealthInvalidationInterval,
logBatchSize: DefaultHistoricalLogsBatchSize, // TODO Make batch of logs adaptive depending on "websocket: read limit"
closed: make(chan struct{}),
healthyCh: make(chan struct{}),
}
for _, opt := range opts {
opt(client)
Expand Down Expand Up @@ -246,8 +241,6 @@ func (ec *ExecutionClient) StreamLogs(ctx context.Context, fromBlock uint64) <-c
return
case <-ec.closed:
return
case <-ec.healthyCh:
return
default:
lastBlock, err := ec.streamLogsToChan(ctx, logs, fromBlock)
if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -294,22 +287,7 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error {
return nil
}

ec.healthyChMu.Lock()
defer ec.healthyChMu.Unlock()

if err := ec.healthy(ctx); err != nil {
close(ec.healthyCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This triggers closing of closed channel which panics, I'm removing it as it's dangerous to use
FYI @moshe-blox

return fmt.Errorf("unhealthy: %w", err)
}

// Reset the healthyCh channel if it was closed.
select {
case <-ec.healthyCh:
default:
ec.healthyCh = make(chan struct{})
}

return nil
return ec.healthy(ctx)
}

func (ec *ExecutionClient) healthy(ctx context.Context) error {
Expand Down Expand Up @@ -426,9 +404,6 @@ func (ec *ExecutionClient) streamLogsToChan(ctx context.Context, logs chan<- Blo
case <-ec.closed:
return fromBlock, ErrClosed

case <-ec.healthyCh:
return fromBlock, ErrUnhealthy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not working as expected, I saw it returning ErrUnhealthy when the client was healthy. This channel looks tricky and dangerous to work with (see another comment about closing channel), so I'm removing it
FYI @moshe-blox

Copy link
Contributor

@iurii-ssv iurii-ssv Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Just briefly looking at this channel (and corresponding mutex) - it's not entirely clear how it is supposed to be used (what it's for, etc.)

so if we aren't gonna remove it entirely we need to at least re-think it's purpose/usage


case err := <-sub.Err():
if err == nil {
return fromBlock, ErrClosed
Expand Down
Loading
Loading