Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator smarter subscribe #5334

Merged
merged 10 commits into from
Apr 7, 2020
9 changes: 9 additions & 0 deletions shared/bytesutil/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ func ToBytes48(x []byte) [48]byte {
return y
}

// ToBytes64 is a convenience method for converting a byte slice to a fix
// sized 64 byte array. This method will truncate the input if it is larger
// than 64 bytes.
func ToBytes64(x []byte) [64]byte {
var y [64]byte
copy(y[:], x)
return y
}

// ToBool is a convenience method for converting a byte to a bool.
// This method will use the first bit of the 0 byte to generate the returned value.
func ToBool(x byte) bool {
Expand Down
44 changes: 37 additions & 7 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
subscribeSlots := make([]uint64, 0, len(validatingKeys))
subscribeCommitteeIDs := make([]uint64, 0, len(validatingKeys))
subscribeIsAggregator := make([]bool, 0, len(validatingKeys))
alreadySubscribed := make(map[[64]byte]bool)
// Only log the full assignments output on epoch start to be less verbose.
// Also log out on first launch so the user doesn't have to wait a whole epoch to see their assignments.
if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived {
Expand All @@ -310,17 +311,28 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
}

if duty.Status == ethpb.ValidatorStatus_ACTIVE {
attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex

if duty.ProposerSlot > 0 {
lFields["proposerSlot"] = duty.ProposerSlot
}
lFields["attesterSlot"] = duty.AttesterSlot
lFields["attesterSlot"] = attesterSlot

alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
continue
}

aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey))
aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey))
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
subscribeSlots = append(subscribeSlots, duty.AttesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex)
if aggregator {
alreadySubscribed[alreadySubscribedKey] = true
}
subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
}

Expand All @@ -338,12 +350,24 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived {
for _, duty := range dutiesNextEpoch.Duties {
if duty.Status == ethpb.ValidatorStatus_ACTIVE {
aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey))
attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex

alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
continue
}

aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey))
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
subscribeSlots = append(subscribeSlots, duty.AttesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex)
if aggregator {
alreadySubscribed[alreadySubscribedKey] = true
}

subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
}
}
Expand Down Expand Up @@ -461,3 +485,9 @@ func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte)

return res, nil
}

// This constructs a validator subscribed key, it's used to track
// which subnet has already been pending requested.
func validatorSubscribeKey(slot uint64, committeeID uint64) [64]byte {
return bytesutil.ToBytes64(append(bytesutil.Bytes32(slot), bytesutil.Bytes32(committeeID)...))
}