Skip to content

Commit

Permalink
feat(dash/quorum): lookup node id in address book
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Dec 9, 2021
1 parent f57c403 commit 3e5a950
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 69 deletions.
8 changes: 7 additions & 1 deletion dash/quorum/mock/mock_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Switch struct {
PersistentPeers map[string]bool
History []SwitchHistoryEvent
HistoryChan chan SwitchHistoryEvent
AddressBook p2p.AddrBook
}

// NewMockSwitch creates a new mock Switch
Expand All @@ -40,10 +41,15 @@ func NewMockSwitch() *Switch {
PersistentPeers: map[string]bool{},
History: []SwitchHistoryEvent{},
HistoryChan: make(chan SwitchHistoryEvent, 1000),
AddressBook: &p2p.AddrBookMock{},
}
return isw
}

func (sw *Switch) AddrBook() p2p.AddrBook {
return sw.AddressBook
}

// Peers implements Switch by returning sw.PeerSet
func (sw *Switch) Peers() p2p.IPeerSet {
return sw.PeerSet
Expand Down Expand Up @@ -80,7 +86,7 @@ func (sw *Switch) DialPeersAsync(addrs []string) error {
return err
}

peer.On("ID").Return(parsed.NodeID())
peer.On("ID").Return(parsed.NodeID)
peer.On("String").Return(addr)
if err := sw.PeerSet.Add(peer); err != nil {
return err
Expand Down
40 changes: 35 additions & 5 deletions dash/quorum/validator_conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/dash/quorum/selectpeers"
dashtypes "github.com/tendermint/tendermint/dash/types"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
Expand Down Expand Up @@ -35,6 +36,8 @@ type Switch interface {
DialPeersAsync(addrs []string) error
IsDialingOrExistingAddress(*p2p.NetAddress) bool
StopPeerGracefully(p2p.Peer)

AddrBook() p2p.AddrBook
}

// ValidatorConnExecutor retrieves validator update events and establishes new validator connections
Expand All @@ -59,6 +62,9 @@ type ValidatorConnExecutor struct {
// quorumHash contains current quorum hash
quorumHash tmbytes.HexBytes

// nodeIDResolvers can be used to determine a node ID for a validator
nodeIDResolvers []dashtypes.NodeIDResolver

// mux is a mutex to ensure only one goroutine is processing connections
mux sync.Mutex

Expand All @@ -83,6 +89,10 @@ func NewValidatorConnExecutor(
validatorSetMembers: validatorMap{},
connectedValidators: validatorMap{},
quorumHash: make(tmbytes.HexBytes, crypto.QuorumHashSize),
nodeIDResolvers: []dashtypes.NodeIDResolver{
dashtypes.NewAddrbookNodeIDResolver(sw.AddrBook()),
dashtypes.NewTCPNodeIDResolver(),
},
}

baseService := service.NewBaseService(logger, validatorConnExecutorName, vc)
Expand Down Expand Up @@ -203,6 +213,22 @@ func (vc *ValidatorConnExecutor) me() (validator *types.Validator, ok bool) {
return &v, ok
}

// resolveNodeID adds node ID to the validator address if it's not set
func (vc *ValidatorConnExecutor) resolveNodeID(va *dashtypes.ValidatorAddress) error {
if va.NodeID != "" {
return nil
}
for _, resolver := range vc.nodeIDResolvers {
nid, err := resolver.Resolve(*va)
if err == nil && nid != "" {
va.NodeID = nid
return nil
}
vc.Logger.Debug("node id not found, trying another method", "url", va.String(), "error", err)
}
return dashtypes.ErrNoNodeID
}

// selectValidators selects `count` validators from current ValidatorSet.
// It uses algorithm described in DIP-6.
// Returns map indexed by validator address.
Expand All @@ -220,10 +246,10 @@ func (vc *ValidatorConnExecutor) selectValidators() (validatorMap, error) {
}
// fetch node IDs
for _, validator := range selectedValidators {
_, err := validator.NodeAddress.NodeID()
err := vc.resolveNodeID(&validator.NodeAddress)
if err != nil {
vc.Logger.Debug("cannot determine node id for validator", "url", validator.String(), "error", err)
return nil, err
// no return, as it's not critical
}
}

Expand All @@ -239,10 +265,12 @@ func (vc *ValidatorConnExecutor) disconnectValidator(validator types.Validator)
return err
}

id, err := validator.NodeAddress.NodeID()
err = vc.resolveNodeID(&validator.NodeAddress)
if err != nil {
return err
}
id := validator.NodeAddress.NodeID

peer := vc.p2pSwitch.Peers().Get(id)
if peer == nil {
return fmt.Errorf("cannot stop peer %s: not found", id)
Expand Down Expand Up @@ -296,9 +324,11 @@ func (vc *ValidatorConnExecutor) updateConnections() error {
if err := vc.disconnectValidators(newValidators); err != nil {
return fmt.Errorf("cannot disconnect unused validators: %w", err)
}
vc.Logger.Debug("filtering validators", "validators", newValidators)
// ensure that we can connect to all validators
newValidators = vc.filterAddresses(newValidators)
// Connect to new validators
vc.Logger.Debug("dialing validators", "validators", newValidators)
if err := vc.dial(newValidators); err != nil {
return fmt.Errorf("cannot dial validators: %w", err)
}
Expand All @@ -311,7 +341,7 @@ func (vc *ValidatorConnExecutor) filterAddresses(validators validatorMap) valida
filtered := make(validatorMap, len(validators))
for id, validator := range validators {
if vc.connectedValidators.contains(validator) {
vc.Logger.P2PDebug("validator already connected", "id", id)
vc.Logger.Debug("validator already connected", "id", id)
continue
}
address, err := validator.NodeAddress.NetAddress()
Expand All @@ -320,7 +350,7 @@ func (vc *ValidatorConnExecutor) filterAddresses(validators validatorMap) valida
continue
}
if vc.p2pSwitch.IsDialingOrExistingAddress(address) {
vc.Logger.P2PDebug("already dialing this validator", "id", id, "address", address.String())
vc.Logger.Debug("already dialing this validator", "id", id, "address", address.String())
continue
}
filtered[id] = validator
Expand Down
7 changes: 5 additions & 2 deletions dash/quorum/validator_conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package quorum

import (
"context"
"encoding/hex"
"fmt"
"testing"
"time"
Expand All @@ -16,6 +17,7 @@ import (
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
mmock "github.com/tendermint/tendermint/mempool/mock"
"github.com/tendermint/tendermint/p2p"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
Expand Down Expand Up @@ -61,9 +63,10 @@ func TestValidatorConnExecutor_NotValidator(t *testing.T) {
// TestValidatorConnExecutor_WrongAddress checks behavior in case of several issues in the address.
// Expected behavior: invalid address is dialed. Previous addresses are disconnected.
func TestValidatorConnExecutor_WrongAddress(t *testing.T) {

me := mock.NewValidator(65535)
addr1, err := dashtypes.ParseValidatorAddress("http://john@www.google.com:80")
zeroBytes := make([]byte, p2p.IDByteLength)
nodeID := hex.EncodeToString(zeroBytes)
addr1, err := dashtypes.ParseValidatorAddress("http://" + nodeID + "@www.domain-that-does-not-exist.com:80")
require.NoError(t, err)

val1 := mock.NewValidator(100)
Expand Down
42 changes: 36 additions & 6 deletions dash/types/nodeid_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ type NodeIDResolver interface {
Resolve(ValidatorAddress) (p2p.ID, error)
}

type nodeIDResolver struct {
type tcpNodeIDResolver struct {
DialerTimeout time.Duration
ConnectionTimeout time.Duration
// other dependencies
}

func NewNodeIDResolver() NodeIDResolver {
return &nodeIDResolver{
func NewTCPNodeIDResolver() NodeIDResolver {
return &tcpNodeIDResolver{
DialerTimeout: DefaultDialTimeout,
ConnectionTimeout: DefaultConnectionTimeout,
}
}

// connect establishes a TCP connection to remote host.
// When err == nil, caller is responsible for closing of the connection
func (resolver nodeIDResolver) connect(host string, port uint16) (net.Conn, error) {
func (resolver tcpNodeIDResolver) connect(host string, port uint16) (net.Conn, error) {
dialer := net.Dialer{
Timeout: resolver.DialerTimeout,
}
Expand All @@ -56,8 +56,8 @@ func (resolver nodeIDResolver) connect(host string, port uint16) (net.Conn, erro
// Resolve retrieves a node ID from remote node.
// Note that it is quite expensive, as it establishes secure connection to the other node
// which is dropped afterwards.
func (resolver nodeIDResolver) Resolve(va ValidatorAddress) (p2p.ID, error) {
connection, err := resolver.connect(va.Hostname(), va.Port())
func (resolver tcpNodeIDResolver) Resolve(va ValidatorAddress) (p2p.ID, error) {
connection, err := resolver.connect(va.Hostname, va.Port)
if err != nil {
return "", err
}
Expand All @@ -69,3 +69,33 @@ func (resolver nodeIDResolver) Resolve(va ValidatorAddress) (p2p.ID, error) {
}
return p2p.PubKeyToID(sc.RemotePubKey()), nil
}

type addrbookNodeIDResolver struct {
addrBook p2p.AddrBook
}

// NewAddrbookNodeIDResolver creates new node ID resolver.
// It looks up fora node ID based on IP address, using the p2p addressbook.
func NewAddrbookNodeIDResolver(addrBook p2p.AddrBook) NodeIDResolver {
return addrbookNodeIDResolver{addrBook: addrBook}
}

// Resolve implements NodeIDResolver
// Resolve retrieves a node ID from address book.
func (resolver addrbookNodeIDResolver) Resolve(va ValidatorAddress) (p2p.ID, error) {
ip := net.ParseIP(va.Hostname)
if ip == nil {
ips, err := net.LookupIP(va.Hostname)
if err != nil {
return "", p2p.ErrNetAddressLookup{Addr: va.Hostname, Err: err}
}
ip = ips[0]
}

id := resolver.addrBook.FindIP(ip, va.Port)
if id == "" {
return "", ErrNoNodeID
}

return id, nil
}
39 changes: 2 additions & 37 deletions dash/types/validator_address.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
// ValidatorAddress is a NodeAddress that does not require node ID to be set
type ValidatorAddress struct {
p2p.NodeAddress
resolver NodeIDResolver
}

var (
Expand All @@ -32,7 +31,6 @@ func ParseValidatorAddress(address string) (ValidatorAddress, error) {
}
va := ValidatorAddress{
NodeAddress: addr,
resolver: NewNodeIDResolver(),
}
return va, va.Validate()
}
Expand All @@ -58,47 +56,14 @@ func (va ValidatorAddress) Validate() error {
return nil
}

// Hostname returns host name of this address
func (va ValidatorAddress) Hostname() string {
return va.NodeAddress.Hostname
}

// Port returns port number of this address
func (va ValidatorAddress) Port() uint16 {
return va.NodeAddress.Port
}

// Protocol returns protocol name of this address, like "tcp"
func (va ValidatorAddress) Protocol() string {
return va.NodeAddress.Protocol
}

// NetAddress returns this ValidatorAddress as a *p2p.NetAddress that can be used to establish connection
func (va ValidatorAddress) NetAddress() (*p2p.NetAddress, error) {
if _, err := va.NodeID(); err != nil {
return nil, fmt.Errorf("cannot determine node id for address %s: %w", va.String(), err)
if va.NodeID == "" {
return nil, fmt.Errorf("cannot determine node id for address %s", va.String())
}
return va.NodeAddress.NetAddress()
}

// NodeID() returns node ID. If it is not set, it will connect to remote node, retrieve its public key
// and calculate Node ID based on it. Noe this connection can be expensive.
func (va *ValidatorAddress) NodeID() (p2p.ID, error) {
if va.NodeAddress.NodeID == "" {
var err error

if va.resolver == nil {
return "", ErrNoResolver
}

va.NodeAddress.NodeID, err = va.resolver.Resolve(*va)
if err != nil {
return "", err
}
}
return va.NodeAddress.NodeID, nil
}

// RandValidatorAddress generates a random validator address. Used in tests.
func RandValidatorAddress() ValidatorAddress {
nodeID := tmrand.Bytes(20)
Expand Down
14 changes: 7 additions & 7 deletions dash/types/validator_address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func TestValidatorAddress_NodeID_fail(t *testing.T) {
t.Run(tt.uri, func(t *testing.T) {
va, err := ParseValidatorAddress(tt.uri)
assert.NoError(t, err)
got, err := va.NodeID()
assert.Equal(t, err != nil, tt.wantErr, "wantErr=%t, but err = %s", tt.wantErr, err)
// todo lookup for an address
got := va.NodeID
// assert.Equal(t, err != nil, tt.wantErr, "wantErr=%t, but err = %s", tt.wantErr, err)
assert.EqualValues(t, tt.want, got)
})
}
Expand Down Expand Up @@ -133,13 +134,12 @@ func TestValidatorAddress_HostPortProto(t *testing.T) {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.EqualValues(t, tt.wantHost, va.Hostname())
assert.EqualValues(t, tt.wantPort, va.Port())
assert.EqualValues(t, tt.wantProto, va.Protocol())
assert.EqualValues(t, tt.wantHost, va.Hostname)
assert.EqualValues(t, tt.wantPort, va.Port)
assert.EqualValues(t, tt.wantProto, va.Protocol)

if tt.wantNodeID != "" {
nodeID, err := va.NodeID()
assert.NoError(t, err)
nodeID := va.NodeID
assert.EqualValues(t, tt.wantNodeID, nodeID)
}
err = va.Validate()
Expand Down
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestNodeStartStop(t *testing.T) {
// check if we can read node ID of this node
va, err := dashtypes.ParseValidatorAddress(config.P2P.ListenAddress)
assert.NoError(t, err)
id, err := va.NodeID()
assert.NoError(t, err)
id, err := dashtypes.NewTCPNodeIDResolver().Resolve(va)
assert.Equal(t, n.nodeInfo.ID(), id)
assert.NoError(t, err)

// stop the node
go func() {
Expand Down
13 changes: 13 additions & 0 deletions p2p/pex/addrbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type AddrBook interface {
// Check if the address is in the book
HasAddress(*p2p.NetAddress) bool

// Find by IP address
FindIP(net.IP, uint16) p2p.ID

// Do we need more peers?
NeedMoreAddrs() bool
// Is Address Book Empty? Answer should not depend on being in your own
Expand Down Expand Up @@ -695,6 +698,16 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
return a.addToNewBucket(ka, bucket)
}

func (a *addrBook) FindIP(ip net.IP, port uint16) p2p.ID {
for nodeID, item := range a.addrLookup {
if item.Addr != nil && item.Addr.IP.Equal(ip) && item.Addr.Port == port {
return nodeID
}
}

return ""
}

func (a *addrBook) randomPickAddresses(bucketType byte, num int) []*p2p.NetAddress {
var buckets []map[string]*knownAddress
switch bucketType {
Expand Down
Loading

0 comments on commit 3e5a950

Please sign in to comment.