Skip to content

Commit

Permalink
p2p: hybrid node net identity for connection deduplication (#6035)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jul 19, 2024
1 parent 1fa0ef7 commit 9c93670
Show file tree
Hide file tree
Showing 30 changed files with 994 additions and 261 deletions.
20 changes: 18 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/user"
"path/filepath"
"strings"

"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/codecs"
Expand Down Expand Up @@ -147,7 +148,17 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) {
defer f.Close()

err = loadConfig(f, &source)
if err != nil {
return source, err
}
source, err = enrichNetworkingConfig(source)
return source, err
}

// enrichNetworkingConfig makes the following tweaks to the config:
// - If NetAddress is set, enable the ledger and block services
// - If EnableP2PHybridMode is set, require PublicAddress to be set
func enrichNetworkingConfig(source Local) (Local, error) {
// If the PublicAddress in config file has the PlaceholderPublicAddress, treat it as if it were empty
if source.PublicAddress == PlaceholderPublicAddress {
source.PublicAddress = ""
Expand All @@ -163,8 +174,13 @@ func mergeConfigFromFile(configpath string, source Local) (Local, error) {
source.GossipFanout = defaultRelayGossipFanout
}
}

return source, err
// In hybrid mode we want to prevent connections from the same node over both P2P and WS.
// The only way it is supported at the moment is to use net identity challenge that is based on PublicAddress.
if (source.NetAddress != "" || source.P2PNetAddress != "") && source.EnableP2PHybridMode && source.PublicAddress == "" {
return source, errors.New("PublicAddress must be specified when EnableP2PHybridMode is set")
}
source.PublicAddress = strings.ToLower(source.PublicAddress)
return source, nil
}

func loadConfig(reader io.Reader, config *Local) error {
Expand Down
185 changes: 157 additions & 28 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,62 @@ func TestLocal_MergeConfig(t *testing.T) {
require.Equal(t, c1.GossipFanout, c2.GossipFanout)
}

func TestLocal_EnrichNetworkingConfig(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

c1 := Local{
NetAddress: "test1",
GossipFanout: defaultLocal.GossipFanout,
}
c2, err := enrichNetworkingConfig(c1)
require.NoError(t, err)
require.NotEqual(t, c1, c2)
require.False(t, c1.EnableLedgerService)
require.False(t, c1.EnableBlockService)
require.Equal(t, c1.GossipFanout, defaultLocal.GossipFanout)
require.True(t, c2.EnableLedgerService)
require.True(t, c2.EnableBlockService)
require.Equal(t, c2.GossipFanout, defaultRelayGossipFanout)

c1 = Local{
EnableP2PHybridMode: true,
}
c2, err = enrichNetworkingConfig(c1)
require.NoError(t, err)

c1 = Local{
NetAddress: "test1",
EnableP2PHybridMode: true,
}
c2, err = enrichNetworkingConfig(c1)
require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set")

c1 = Local{
P2PNetAddress: "test1",
EnableP2PHybridMode: true,
}
c2, err = enrichNetworkingConfig(c1)
require.ErrorContains(t, err, "PublicAddress must be specified when EnableP2PHybridMode is set")

c1 = Local{
EnableP2PHybridMode: true,
PublicAddress: "test2",
}
c2, err = enrichNetworkingConfig(c1)
require.NoError(t, err)
require.Equal(t, c1, c2)
require.True(t, c2.EnableP2PHybridMode)
require.NotEmpty(t, c2.PublicAddress)

c1 = Local{
PublicAddress: "R1.test3.my-domain.tld",
}
c2, err = enrichNetworkingConfig(c1)
require.NoError(t, err)
require.Equal(t, "r1.test3.my-domain.tld", c2.PublicAddress)
}

func saveFullPhonebook(phonebook phonebookBlackWhiteList, saveToDir string) error {
filename := filepath.Join(saveToDir, PhonebookFilename)
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
Expand Down Expand Up @@ -559,55 +615,128 @@ func TestLocal_IsGossipServer(t *testing.T) {

cfg := GetDefaultLocal()
require.False(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.NetAddress = ":4160"
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableGossipService = false
// EnableGossipService does not matter
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableP2P = true
cfg.NetAddress = ":4160"
require.True(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2P = false

cfg.EnableP2PHybridMode = true
// with net address set it is ws net gossip server
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.NetAddress = ""
require.False(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.P2PNetAddress = ":4190"
require.True(t, cfg.IsGossipServer())
require.False(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.NetAddress = ":4160"
cfg.P2PNetAddress = ":4190"
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.EnableP2P = true
cfg.NetAddress = ":4160"
cfg.P2PNetAddress = ":4190"
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.True(t, cfg.IsP2PGossipServer())

cfg.EnableP2PHybridMode = true
cfg.EnableP2P = true
cfg.NetAddress = ":4160"
cfg.P2PNetAddress = ""
require.True(t, cfg.IsGossipServer())
require.True(t, cfg.IsWsGossipServer())
require.False(t, cfg.IsP2PGossipServer())
}

func TestLocal_RecalculateConnectionLimits(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

var tests = []struct {
maxFDs uint64
reservedIn uint64
restSoftIn uint64
restHardIn uint64
incomingIn int

updated bool
restSoftExp uint64
restHardExp uint64
incomingExp int
maxFDs uint64
reservedIn uint64
restSoftIn uint64
restHardIn uint64
incomingIn int
p2pIncomingIn int

updated bool
restSoftExp uint64
restHardExp uint64
incomingExp int
p2pIncomingExp int
}{
{100, 10, 20, 40, 50, false, 20, 40, 50}, // no change
{100, 10, 20, 50, 50, true, 20, 40, 50}, // borrow from rest
{100, 10, 25, 50, 50, true, 25, 40, 50}, // borrow from rest
{100, 10, 50, 50, 50, true, 40, 40, 50}, // borrow from rest, update soft
{100, 10, 9, 19, 81, true, 9, 10, 80}, // borrow from both rest and incoming
{100, 10, 10, 20, 80, true, 10, 10, 80}, // borrow from both rest and incoming
{100, 50, 10, 30, 40, true, 10, 10, 40}, // borrow from both rest and incoming
{100, 90, 10, 30, 40, true, 10, 10, 0}, // borrow from both rest and incoming, clear incoming
{4096, 256, 1024, 2048, 2400, true, 1024, 1440, 2400}, // real numbers
{5000, 256, 1024, 2048, 2400, false, 1024, 2048, 2400}, // real numbers
{100, 10, 20, 40, 50, 0, false, 20, 40, 50, 0}, // no change
{100, 10, 20, 50, 50, 0, true, 20, 40, 50, 0}, // borrow from rest
{100, 10, 25, 50, 50, 0, true, 25, 40, 50, 0}, // borrow from rest
{100, 10, 25, 50, 50, 50, true, 10, 10, 40, 40}, // borrow from rest for incoming and p2p incoming
{100, 10, 50, 50, 50, 0, true, 40, 40, 50, 0}, // borrow from rest, update soft
{100, 10, 50, 50, 40, 10, true, 40, 40, 40, 10}, // borrow from rest, update soft for incoming and p2p incoming
{100, 10, 9, 19, 81, 0, true, 9, 10, 80, 0}, // borrow from both rest and incoming
{100, 10, 9, 19, 41, 41, true, 9, 10, 40, 40}, // borrow from both rest and incoming for incoming and p2p incoming
{100, 90, 10, 30, 40, 0, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming
{100, 90, 10, 30, 40, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming
{100, 90, 10, 30, 50, 40, true, 10, 10, 0, 0}, // borrow from both rest and incoming, clear incoming
{4096, 256, 1024, 2048, 2400, 0, true, 1024, 1440, 2400, 0}, // real numbers
{5000, 256, 1024, 2048, 2400, 0, false, 1024, 2048, 2400, 0}, // real numbers
{4096, 256, 1024, 2048, 2400, 1200, true, 240, 240, 2400, 1200}, // real numbers
{6000, 256, 1024, 2048, 2400, 1200, false, 1024, 2048, 2400, 1200}, // real numbers
}

for i, test := range tests {
test := test
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Run(fmt.Sprintf("test=%d", i), func(t *testing.T) {
t.Parallel()

c := Local{
RestConnectionsSoftLimit: test.restSoftIn,
RestConnectionsHardLimit: test.restHardIn,
IncomingConnectionsLimit: test.incomingIn,
NetAddress: ":4160",
RestConnectionsSoftLimit: test.restSoftIn,
RestConnectionsHardLimit: test.restHardIn,
IncomingConnectionsLimit: test.incomingIn,
P2PIncomingConnectionsLimit: test.p2pIncomingIn,
}
if test.p2pIncomingIn > 0 {
c.EnableP2PHybridMode = true
c.P2PNetAddress = ":4190"
}
requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn)
requireFDs := test.reservedIn + test.restHardIn + uint64(test.incomingIn) + uint64(test.p2pIncomingIn)
res := c.AdjustConnectionLimits(requireFDs, test.maxFDs)
require.Equal(t, test.updated, res)
require.Equal(t, test.restSoftExp, c.RestConnectionsSoftLimit)
require.Equal(t, test.restHardExp, c.RestConnectionsHardLimit)
require.Equal(t, test.incomingExp, c.IncomingConnectionsLimit)
require.Equal(t, int(test.restSoftExp), int(c.RestConnectionsSoftLimit))
require.Equal(t, int(test.restHardExp), int(c.RestConnectionsHardLimit))
require.Equal(t, int(test.incomingExp), int(c.IncomingConnectionsLimit))
require.Equal(t, int(test.p2pIncomingExp), int(c.P2PIncomingConnectionsLimit))
})
}
}
Expand Down
42 changes: 35 additions & 7 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type Local struct {
// Estimating 1.5MB per incoming connection, 1.5MB*2400 = 3.6GB
IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800" version[27]:"2400"`

P2PIncomingConnectionsLimit int `version[34]:"1200"`

// BroadcastConnectionsLimit specifies the number of connections that
// will receive broadcast (gossip) messages from this node. If the
// node has more connections than this number, it will send broadcasts
Expand Down Expand Up @@ -602,6 +604,7 @@ type Local struct {
EnableP2P bool `version[31]:"false"`

// EnableP2PHybridMode turns on both websockets and P2P networking.
// Enabling this setting also requires PublicAddress to be set.
EnableP2PHybridMode bool `version[34]:"false"`

// P2PNetAddress sets the listen address used for P2P networking, if hybrid mode is set.
Expand Down Expand Up @@ -734,10 +737,21 @@ func (cfg Local) TxFilterCanonicalEnabled() bool {
return cfg.TxIncomingFilteringFlags&txFilterCanonical != 0
}

// IsGossipServer returns true if NetAddress is set and this node supposed
// to start websocket server
// IsGossipServer returns true if this node supposed to start websocket or p2p server
func (cfg Local) IsGossipServer() bool {
return cfg.NetAddress != ""
return cfg.IsWsGossipServer() || cfg.IsP2PGossipServer()
}

// IsWsGossipServer returns true if a node configured to run a listening ws net
func (cfg Local) IsWsGossipServer() bool {
// 1. NetAddress is set and EnableP2P is not set
// 2. NetAddress is set and EnableP2PHybridMode is set then EnableP2P is overridden by EnableP2PHybridMode
return cfg.NetAddress != "" && (!cfg.EnableP2P || cfg.EnableP2PHybridMode)
}

// IsP2PGossipServer returns true if a node configured to run a listening p2p net
func (cfg Local) IsP2PGossipServer() bool {
return (cfg.EnableP2P && !cfg.EnableP2PHybridMode && cfg.NetAddress != "") || (cfg.EnableP2PHybridMode && cfg.P2PNetAddress != "")
}

// ensureAbsGenesisDir will convert a path to absolute, and will attempt to make a genesis directory there
Expand Down Expand Up @@ -935,10 +949,24 @@ func (cfg *Local) AdjustConnectionLimits(requiredFDs, maxFDs uint64) bool {
if cfg.RestConnectionsHardLimit <= diff+reservedRESTConns {
restDelta := diff + reservedRESTConns - cfg.RestConnectionsHardLimit
cfg.RestConnectionsHardLimit = reservedRESTConns
if cfg.IncomingConnectionsLimit > int(restDelta) {
cfg.IncomingConnectionsLimit -= int(restDelta)
} else {
cfg.IncomingConnectionsLimit = 0
splitRatio := 1
if cfg.IsWsGossipServer() && cfg.IsP2PGossipServer() {
// split the rest of the delta between ws and p2p evenly
splitRatio = 2
}
if cfg.IsWsGossipServer() {
if cfg.IncomingConnectionsLimit > int(restDelta) {
cfg.IncomingConnectionsLimit -= int(restDelta) / splitRatio
} else {
cfg.IncomingConnectionsLimit = 0
}
}
if cfg.IsP2PGossipServer() {
if cfg.P2PIncomingConnectionsLimit > int(restDelta) {
cfg.P2PIncomingConnectionsLimit -= int(restDelta) / splitRatio
} else {
cfg.P2PIncomingConnectionsLimit = 0
}
}
} else {
cfg.RestConnectionsHardLimit -= diff
Expand Down
1 change: 1 addition & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var defaultLocal = Local{
OptimizeAccountsDatabaseOnStartup: false,
OutgoingMessageFilterBucketCount: 3,
OutgoingMessageFilterBucketSize: 128,
P2PIncomingConnectionsLimit: 1200,
P2PNetAddress: "",
P2PPersistPeerID: false,
P2PPrivateKeyLocation: "",
Expand Down
24 changes: 20 additions & 4 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,21 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes

if cfg.IsGossipServer() {
var ot basics.OverflowTracker
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)+network.ReservedHealthServiceConnections)
fdRequired = ot.Add(fdRequired, network.ReservedHealthServiceConnections)
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit or IncomingConnectionsLimit")
return errors.New("Initialize() overflowed when adding up ReservedHealthServiceConnections to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit")
}
if cfg.IsWsGossipServer() {
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit))
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease IncomingConnectionsLimit")
}
}
if cfg.IsP2PGossipServer() {
fdRequired = ot.Add(fdRequired, uint64(cfg.P2PIncomingConnectionsLimit))
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up P2PIncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease P2PIncomingConnectionsLimit")
}
}
_, hard, fdErr := util.GetFdLimits()
if fdErr != nil {
Expand All @@ -164,14 +176,18 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes
// but try to keep cfg.ReservedFDs untouched by decreasing other limits
if cfg.AdjustConnectionLimits(fdRequired, hard) {
s.log.Warnf(
"Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d",
"Updated connection limits: RestConnectionsSoftLimit=%d, RestConnectionsHardLimit=%d, IncomingConnectionsLimit=%d, P2PIncomingConnectionsLimit=%d",
cfg.RestConnectionsSoftLimit,
cfg.RestConnectionsHardLimit,
cfg.IncomingConnectionsLimit,
cfg.P2PIncomingConnectionsLimit,
)
if cfg.IncomingConnectionsLimit == 0 {
if cfg.IsWsGossipServer() && cfg.IncomingConnectionsLimit == 0 {
return errors.New("Initialize() failed to adjust connection limits")
}
if cfg.IsP2PGossipServer() && cfg.P2PIncomingConnectionsLimit == 0 {
return errors.New("Initialize() failed to adjust p2p connection limits")
}
}
}
fdErr = util.SetFdSoftLimit(maxFDs)
Expand Down
1 change: 1 addition & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"OptimizeAccountsDatabaseOnStartup": false,
"OutgoingMessageFilterBucketCount": 3,
"OutgoingMessageFilterBucketSize": 128,
"P2PIncomingConnectionsLimit": 1200,
"P2PNetAddress": "",
"P2PPersistPeerID": false,
"P2PPrivateKeyLocation": "",
Expand Down
Loading

0 comments on commit 9c93670

Please sign in to comment.