Skip to content

Commit

Permalink
Revert "Code split"
Browse files Browse the repository at this point in the history
This reverts commit 0604b6d3fc155177a2bb295e6635ed21b20dd947.
  • Loading branch information
AeonSw4n committed Jan 29, 2024
1 parent 59af74d commit 95bde91
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 3 deletions.
43 changes: 43 additions & 0 deletions integration_testing/connection_controller_routines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,49 @@ func TestConnectionControllerValidatorInboundDeduplication(t *testing.T) {
t.Logf("Test #1 passed | Successfully run non-validator node1; validators node2, node3 with duplicate public key")
}

func TestConnectionControllerNonValidatorConnector(t *testing.T) {
require := require.New(t)

// Spawn 6 non-validators node1, node2, node3, node4, node5, node6. Set node1's targetOutboundPeers to 3. Then make
// node1 create persistent outbound connections to node2, node3, and node4, as well as non-validator connections to
// node5 and node6.
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node1.Config.TargetOutboundPeers = 0
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")
node3 := spawnNonValidatorNodeProtocol2(t, 18002, "node3")
node4 := spawnNonValidatorNodeProtocol2(t, 18003, "node4")
node5 := spawnNonValidatorNodeProtocol2(t, 18004, "node5")
node6 := spawnNonValidatorNodeProtocol2(t, 18005, "node6")

node2 = startNode(t, node2)
defer node2.Stop()
node3 = startNode(t, node3)
defer node3.Stop()
node4 = startNode(t, node4)
defer node4.Stop()
node5 = startNode(t, node5)
defer node5.Stop()
node6 = startNode(t, node6)
defer node6.Stop()

node1.Config.ConnectIPs = []string{
node2.Listeners[0].Addr().String(),
node3.Listeners[0].Addr().String(),
node4.Listeners[0].Addr().String(),
}
node1 = startNode(t, node1)
defer node1.Stop()

cc := node1.Server.GetConnectionController()
require.NoError(cc.CreateNonValidatorOutboundConnection(node5.Listeners[0].Addr().String()))
require.NoError(cc.CreateNonValidatorOutboundConnection(node6.Listeners[0].Addr().String()))

waitForCountRemoteNodeIndexer(t, node1, 3, 0, 3, 0)
waitForNonValidatorOutboundConnection(t, node1, node2)
waitForNonValidatorOutboundConnection(t, node1, node3)
waitForNonValidatorOutboundConnection(t, node1, node4)
}

func TestConnectionControllerNonValidatorCircularConnectIps(t *testing.T) {
node1 := spawnNonValidatorNodeProtocol2(t, 18000, "node1")
node2 := spawnNonValidatorNodeProtocol2(t, 18001, "node2")
Expand Down
2 changes: 1 addition & 1 deletion lib/block_view_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ func (op *UtxoOperation) RawEncodeWithoutMetadata(blockHeight uint64, skipMetada
data = append(data, EncodeToBytes(blockHeight, op.PrevLockupYieldCurvePoint, skipMetadata...)...)
data = append(data, byte(op.PrevLockupTransferRestriction))

// PrevSenderLockedBalanceEntry, PrevReceiverLockedBalanceEntry
// PrevSenderLockedBalanceEntry, PrevReceiverL*ockedBalanceEntry
data = append(data, EncodeToBytes(blockHeight, op.PrevSenderLockedBalanceEntry, skipMetadata...)...)
data = append(data, EncodeToBytes(blockHeight, op.PrevReceiverLockedBalanceEntry, skipMetadata...)...)

Expand Down
150 changes: 148 additions & 2 deletions lib/connection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ func NewConnectionController(params *DeSoParams, cmgr *ConnectionManager, handsh
}

func (cc *ConnectionController) Start() {
cc.startGroup.Add(2)
cc.startGroup.Add(3)
go cc.startPersistentConnector()
go cc.startValidatorConnector()
go cc.startNonValidatorConnector()

cc.startGroup.Wait()
cc.exitGroup.Add(2)
cc.exitGroup.Add(3)
}

func (cc *ConnectionController) Stop() {
Expand Down Expand Up @@ -137,6 +138,26 @@ func (cc *ConnectionController) startValidatorConnector() {
}
}

// startNonValidatorConnector is responsible for ensuring that the node is connected to the target number of outbound
// and inbound remote nodes. To do this, it periodically checks the number of outbound and inbound remote nodes, and
// if the number is above the target number, it disconnects the excess remote nodes. If the number is below the target
// number, it attempts to connect to new remote nodes.
func (cc *ConnectionController) startNonValidatorConnector() {
cc.startGroup.Done()

for {
select {
case <-cc.exitChan:
cc.exitGroup.Done()
return
case <-time.After(1 * time.Second):
cc.refreshNonValidatorOutboundIndex()
cc.refreshNonValidatorInboundIndex()
cc.connectNonValidators()
}
}
}

// ###########################
// ## Handlers (Peer, DeSoMessage)
// ###########################
Expand Down Expand Up @@ -330,6 +351,131 @@ func (cc *ConnectionController) connectValidators(activeValidatorsMap *collectio
}
}

// ###########################
// ## NonValidator Connections
// ###########################

// refreshNonValidatorOutboundIndex is called periodically by the peer connector. It is responsible for disconnecting excess
// outbound remote nodes.
func (cc *ConnectionController) refreshNonValidatorOutboundIndex() {
// There are three categories of outbound remote nodes: attempted, connected, and persistent. All of these
// remote nodes are stored in the same non-validator outbound index. We want to disconnect excess remote nodes that
// are not persistent, starting with the attempted nodes first.

// First let's run a quick check to see if the number of our non-validator remote nodes exceeds our target. Note that
// this number will include the persistent nodes.
numOutboundRemoteNodes := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())
if numOutboundRemoteNodes <= cc.targetNonValidatorOutboundRemoteNodes {
return
}

// If we get here, it means that we should potentially disconnect some remote nodes. Let's first separate the
// attempted and connected remote nodes, ignoring the persistent ones.
allOutboundRemoteNodes := cc.rnManager.GetNonValidatorOutboundIndex().GetAll()
var attemptedOutboundRemoteNodes, connectedOutboundRemoteNodes []*RemoteNode
for _, rn := range allOutboundRemoteNodes {
if rn.IsPersistent() {
// We do nothing for persistent remote nodes.
continue
} else if rn.IsHandshakeCompleted() {
connectedOutboundRemoteNodes = append(connectedOutboundRemoteNodes, rn)
} else {
attemptedOutboundRemoteNodes = append(attemptedOutboundRemoteNodes, rn)
}
}

// Having separated the attempted and connected remote nodes, we can now find the actual number of attempted and
// connected remote nodes. We can then find out how many remote nodes we need to disconnect.
numOutboundRemoteNodes = uint32(len(attemptedOutboundRemoteNodes) + len(connectedOutboundRemoteNodes))
excessiveOutboundRemoteNodes := uint32(0)
if numOutboundRemoteNodes > cc.targetNonValidatorOutboundRemoteNodes {
excessiveOutboundRemoteNodes = numOutboundRemoteNodes - cc.targetNonValidatorOutboundRemoteNodes
}

// First disconnect the attempted remote nodes.
for _, rn := range attemptedOutboundRemoteNodes {
if excessiveOutboundRemoteNodes == 0 {
break
}
cc.rnManager.Disconnect(rn)
excessiveOutboundRemoteNodes--
}
// Now disconnect the connected remote nodes, if we still have too many remote nodes.
for _, rn := range connectedOutboundRemoteNodes {
if excessiveOutboundRemoteNodes == 0 {
break
}
cc.rnManager.Disconnect(rn)
excessiveOutboundRemoteNodes--
}
}

// refreshNonValidatorInboundIndex is called periodically by the non-validator connector. It is responsible for
// disconnecting excess inbound remote nodes.
func (cc *ConnectionController) refreshNonValidatorInboundIndex() {
// First let's check if we have an excess number of inbound remote nodes. If we do, we'll disconnect some of them.
numConnectedInboundRemoteNodes := uint32(cc.rnManager.GetNonValidatorInboundIndex().Count())
excessiveInboundRemoteNodes := uint32(0)
if numConnectedInboundRemoteNodes > cc.targetNonValidatorInboundRemoteNodes {
excessiveInboundRemoteNodes = numConnectedInboundRemoteNodes - cc.targetNonValidatorInboundRemoteNodes
}
// Disconnect random inbound non-validators if we have too many of them.
inboundRemoteNodes := cc.rnManager.GetNonValidatorInboundIndex().GetAll()
for _, rn := range inboundRemoteNodes {
if excessiveInboundRemoteNodes == 0 {
break
}
cc.rnManager.Disconnect(rn)
excessiveInboundRemoteNodes--
}
}

func (cc *ConnectionController) connectNonValidators() {
numOutboundPeers := uint32(cc.rnManager.GetNonValidatorOutboundIndex().Count())

remainingOutboundPeers := uint32(0)
if numOutboundPeers < cc.targetNonValidatorOutboundRemoteNodes {
remainingOutboundPeers = cc.targetNonValidatorOutboundRemoteNodes - numOutboundPeers
}
for ii := uint32(0); ii < remainingOutboundPeers; ii++ {
addr := cc.getRandomUnconnectedAddress()
if addr == nil {
break
}
cc.AddrMgr.Attempt(addr)
if err := cc.rnManager.CreateNonValidatorOutboundConnection(addr); err != nil {
glog.V(2).Infof("ConnectionController.connectNonValidators: Problem connecting to addr %v: %v", addr, err)
}
}
}

func (cc *ConnectionController) getRandomUnconnectedAddress() *wire.NetAddress {
for tries := 0; tries < 100; tries++ {
addr := cc.AddrMgr.GetAddress()
if addr == nil {
break
}

if cc.cmgr.IsConnectedOutboundIpAddress(addr.NetAddress()) {
continue
}

if cc.cmgr.IsAttemptedOutboundIpAddress(addr.NetAddress()) {
continue
}

// We can only have one outbound address per /16. This is similar to
// Bitcoin and we do it to prevent Sybil attacks.
if cc.cmgr.IsFromRedundantOutboundIPAddress(addr.NetAddress()) {
continue
}

return addr.NetAddress()
}

return nil
}

func (cc *ConnectionController) CreateValidatorConnection(ipStr string, publicKey *bls.PublicKey) error {
netAddr, err := cc.ConvertIPStringToNetAddress(ipStr)
if err != nil {
Expand Down

0 comments on commit 95bde91

Please sign in to comment.