Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fork Digest For Gossip Topics #5191

Merged
merged 15 commits into from
Mar 26, 2020
12 changes: 8 additions & 4 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
forkDigest, err := s.ForkDigest()
if err != nil {
return err
}

var topic string
switch msg.(type) {
case *eth.Attestation:
topic = attestationToTopic(msg.(*eth.Attestation))
topic = attestationToTopic(msg.(*eth.Attestation), forkDigest)
default:
var ok bool
topic, ok = GossipTypeMapping[reflect.TypeOf(msg)]
Expand Down Expand Up @@ -59,11 +63,11 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
return nil
}

const attestationSubnetTopicFormat = "/eth2/committee_index%d_beacon_attestation"
const attestationSubnetTopicFormat = "/eth2/%x/committee_index%d_beacon_attestation"

func attestationToTopic(att *eth.Attestation) string {
func attestationToTopic(att *eth.Attestation, forkDigest [4]byte) string {
if att == nil || att.Data == nil {
return ""
}
return fmt.Sprintf(attestationSubnetTopicFormat, att.Data.CommitteeIndex)
return fmt.Sprintf(attestationSubnetTopicFormat, forkDigest, att.Data.CommitteeIndex)
}
8 changes: 4 additions & 4 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,23 @@ func TestService_Attestation_Subnet(t *testing.T) {
CommitteeIndex: 0,
},
},
topic: "/eth2/committee_index0_beacon_attestation",
topic: "/eth2/00000000/committee_index0_beacon_attestation",
},
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 11,
},
},
topic: "/eth2/committee_index11_beacon_attestation",
topic: "/eth2/00000000/committee_index11_beacon_attestation",
},
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 55,
},
},
topic: "/eth2/committee_index55_beacon_attestation",
topic: "/eth2/00000000/committee_index55_beacon_attestation",
},
{
att: &eth.Attestation{},
Expand All @@ -126,7 +126,7 @@ func TestService_Attestation_Subnet(t *testing.T) {
},
}
for _, tt := range tests {
if res := attestationToTopic(tt.att); res != tt.topic {
if res := attestationToTopic(tt.att, [4]byte{} /* fork digest */); res != tt.topic {
t.Errorf("Wrong topic, got %s wanted %s", res, tt.topic)
}
}
Expand Down
36 changes: 28 additions & 8 deletions beacon-chain/p2p/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
// ENR key used for eth2-related fork data.
const eth2ENRKey = "eth2"

// ForkDigest returns the current fork digest of
// the node.
func (s *Service) ForkDigest() ([4]byte, error) {
return createForkDigest(s.genesisTime, s.genesisValidatorsRoot)
}

// Compares fork ENRs between an incoming peer's record and our node's
// local record values for current and next fork version/epoch.
func (s *Service) compareForkENR(record *enr.Record) error {
Expand Down Expand Up @@ -66,16 +72,13 @@ func (s *Service) compareForkENR(record *enr.Record) error {
return nil
}

// Adds a fork entry as an ENR record under the eth2EnrKey for
// the local node. The fork entry is an ssz-encoded enrForkID type
// which takes into account the current fork version from the current
// epoch to create a fork digest, the next fork version,
// and the next fork epoch.
func addForkEntry(
node *enode.LocalNode,
// Creates a fork digest from a genesis time and genesis
// validators root, utilizing the current slot to determine
// the active fork version in the node.
func createForkDigest(
genesisTime time.Time,
genesisValidatorsRoot []byte,
) (*enode.LocalNode, error) {
) ([4]byte, error) {
currentSlot := helpers.SlotsSince(genesisTime)
currentEpoch := helpers.SlotToEpoch(currentSlot)

Expand All @@ -92,6 +95,23 @@ func addForkEntry(
}

digest, err := helpers.ComputeForkDigest(currentForkVersion, genesisValidatorsRoot)
if err != nil {
return [4]byte{}, err
}
return digest, nil
}

// Adds a fork entry as an ENR record under the eth2EnrKey for
// the local node. The fork entry is an ssz-encoded enrForkID type
// which takes into account the current fork version from the current
// epoch to create a fork digest, the next fork version,
// and the next fork epoch.
func addForkEntry(
node *enode.LocalNode,
genesisTime time.Time,
genesisValidatorsRoot []byte,
) (*enode.LocalNode, error) {
digest, err := createForkDigest(genesisTime, genesisValidatorsRoot)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
// GossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var GossipTopicMappings = map[string]proto.Message{
"/eth2/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/committee_index%d_beacon_attestation": &pb.Attestation{},
"/eth2/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/attester_slashing": &pb.AttesterSlashing{},
"/eth2/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
"/eth2/%x/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/%x/committee_index%d_beacon_attestation": &pb.Attestation{},
"/eth2/%x/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/%x/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/%x/attester_slashing": &pb.AttesterSlashing{},
"/eth2/%x/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
}

// GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ConnectionHandler interface {
// EncodingProvider provides p2p network encoding.
type EncodingProvider interface {
Encoding() encoder.NetworkEncoding
ForkDigest() ([4]byte, error)
}

// PubSubProvider provides the p2p pubsub protocol.
Expand Down
13 changes: 12 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,14 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
if _, err := p.Encoding().Encode(buf, msg); err != nil {
p.t.Fatalf("Failed to encode message: %v", err)
}
digest, err := p.ForkDigest()
if err != nil {
p.t.Fatal(err)
}
topic = fmt.Sprintf(topic, digest)
topic = topic + p.Encoding().ProtocolSuffix()

if err := ps.Publish(topic+p.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := ps.Publish(topic, buf.Bytes()); err != nil {
p.t.Fatalf("Failed to publish message; %v", err)
}
}
Expand Down Expand Up @@ -237,3 +243,8 @@ func (p *TestP2P) FindPeersWithSubnet(index uint64) (bool, error) {
func (p *TestP2P) RefreshENR(epoch uint64) {
return
}

// ForkDigest mocks the p2p func.
func (p *TestP2P) ForkDigest() ([4]byte, error) {
return [4]byte{}, nil
}
8 changes: 8 additions & 0 deletions beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error
}
topic := msg.TopicIDs[0]
topic = strings.TrimSuffix(topic, r.p2p.Encoding().ProtocolSuffix())
topic = r.replaceForkDigest(topic)
base, ok := p2p.GossipTopicMappings[topic]
if !ok {
return nil, fmt.Errorf("no message mapped for topic %s", topic)
Expand All @@ -26,3 +27,10 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error
}
return m, nil
}

// Replaces our fork digest with the formatter.
func (r *Service) replaceForkDigest(topic string) string {
subStrings := strings.Split(topic, "/")
subStrings[2] = "%x"
return strings.Join(subStrings, "/")
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const genericError = "internal service error"
const rateLimitedError = "rate limited"

var errWrongForkVersion = errors.New("wrong fork version")
var errWrongForkDigestVersion = errors.New("wrong fork digest version")
var errInvalidEpoch = errors.New("invalid epoch")

var responseCodeSuccess = byte(0x00)
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/initial-sync-old/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus
peerStatus.Add(peer.PeerID(), nil, network.DirOutbound, []uint64{})
peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected)
peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
ForkDigest: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
})
}
}
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/initial-sync/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus
peerStatus.Add(peer.PeerID(), nil, network.DirOutbound, []uint64{})
peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected)
peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
ForkDigest: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/pending_blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) {
if code == 0 {
t.Error("Expected a non-zero code")
}
if errMsg != errWrongForkVersion.Error() {
t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkVersion.Error()))
t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkVersion.Error())
if errMsg != errWrongForkDigestVersion.Error() {
t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkDigestVersion.Error()))
t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkDigestVersion.Error())
}
})

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler)
}
if err := handle(ctx, msg.Interface(), stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if err != errWrongForkVersion {
if err != errWrongForkDigestVersion {
log.WithError(err).Warn("Failed to handle p2p RPC")
}
traceutil.AnnotateError(span, err)
Expand All @@ -101,7 +101,7 @@ func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler)
}
if err := handle(ctx, msg.Elem().Interface(), stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if err != errWrongForkVersion {
if err != errWrongForkDigestVersion {
log.WithError(err).Warn("Failed to handle p2p RPC")
}
traceutil.AnnotateError(span, err)
Expand Down
36 changes: 24 additions & 12 deletions beacon-chain/sync/rpc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
return err
}

forkDigest, err := r.p2p.ForkDigest()
if err != nil {
return err
}
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
ForkDigest: forkDigest[:],
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}
stream, err := r.p2p.Send(ctx, resp, id)
if err != nil {
Expand Down Expand Up @@ -155,12 +159,16 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
return err
}

forkDigest, err := r.p2p.ForkDigest()
if err != nil {
return err
}
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
ForkDigest: forkDigest[:],
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}

if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
Expand All @@ -172,8 +180,12 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
}

func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) error {
if !bytes.Equal(params.BeaconConfig().GenesisForkVersion, msg.HeadForkVersion) {
return errWrongForkVersion
forkDigest, err := r.p2p.ForkDigest()
if err != nil {
return err
}
if !bytes.Equal(forkDigest[:], msg.ForkDigest) {
return errWrongForkDigestVersion
}
genesis := r.chain.GenesisTime()
maxEpoch := slotutil.EpochsSinceGenesis(genesis)
Expand Down
Loading