Skip to content

Commit

Permalink
p2p: more CR fixes: ERL and DHT err logging (#6040)
Browse files Browse the repository at this point in the history
Co-authored-by: chris erway <chris.erway@algorand.com>
  • Loading branch information
algorandskiy and cce authored Jun 26, 2024
1 parent 66a1d35 commit a6248b2
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 50 deletions.
70 changes: 25 additions & 45 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,24 +595,30 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed
return &d, false
}

// incomingMsgDupErlCheck runs the duplicate and rate limiting checks on a raw incoming messages.
// incomingMsgDupCheck runs the duplicate check on a raw incoming message.
// Returns:
// - the key used for insertion if the message was not found in the cache
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the message was a duplicate or the sender is rate limited
func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DeadlineSettableConn) (*crypto.Digest, *util.ErlCapacityGuard, bool) {
// - a boolean indicating if the message was a duplicate
func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool) {
var msgKey *crypto.Digest
var capguard *util.ErlCapacityGuard
var isDup bool
if handler.msgCache != nil {
// check for duplicate messages
// this helps against relaying duplicates
if msgKey, isDup = handler.msgCache.CheckAndPut(data); isDup {
transactionMessagesDupRawMsg.Inc(nil)
return msgKey, capguard, true
return msgKey, true
}
}
return msgKey, false
}

// incomingMsgErlCheck runs the rate limiting check on a sender.
// Returns:
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the sender is rate limited
func (handler *TxHandler) incomingMsgErlCheck(sender network.DeadlineSettableConn) (*util.ErlCapacityGuard, bool) {
var capguard *util.ErlCapacityGuard
var err error
if handler.erl != nil {
congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
Expand All @@ -625,14 +631,14 @@ func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.Dea
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
return msgKey, capguard, true
return capguard, true

Check warning on line 634 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L634

Added line #L634 was not covered by tests
}
// if the backlog Queue has 50% of its buffer back, turn congestion control off
if !congestedERL {
handler.erl.DisableCongestionControl()
}
}
return msgKey, capguard, false
return capguard, false
}

// decodeMsg decodes TX message buffer into transactions.SignedTxn,
Expand Down Expand Up @@ -711,8 +717,12 @@ func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transa
// - message are checked for duplicates
// - transactions are checked for duplicates
func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender)
msgKey, shouldDrop := handler.incomingMsgDupCheck(rawmsg.Data)
if shouldDrop {
return network.OutgoingMessage{Action: network.Ignore}
}

capguard, shouldDrop := handler.incomingMsgErlCheck(rawmsg.Sender)
accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
Expand All @@ -724,7 +734,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}()

if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
// this TX message was rate-limited by ERL
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 738 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L738

Added line #L738 was not covered by tests
}

Expand Down Expand Up @@ -756,12 +766,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
transactionMessagesDroppedFromBacklog.Inc(nil)

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
if handler.txCanonicalCache != nil && canonicalKey != nil {
handler.txCanonicalCache.Delete(canonicalKey)
}
if handler.msgCache != nil && msgKey != nil {
handler.msgCache.DeleteByKey(msgKey)
}
handler.deleteFromCaches(msgKey, canonicalKey)
}

return network.OutgoingMessage{Action: network.Ignore}
Expand All @@ -772,25 +777,12 @@ type validatedIncomingTxMessage struct {
unverifiedTxGroup []transactions.SignedTxn
msgKey *crypto.Digest
canonicalKey *crypto.Digest
capguard *util.ErlCapacityGuard
}

// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage {
msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender)

accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
if !accepted && capguard != nil {
if capErr := capguard.Release(); capErr != nil {
logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
}
}()

if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
if isDup {
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}

Check warning on line 786 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L783-L786

Added lines #L783 - L786 were not covered by tests
}

Expand All @@ -807,7 +799,6 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}

Check warning on line 799 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L799

Added line #L799 was not covered by tests
}

accepted = true
return network.ValidatedMessage{
Action: network.Accept,
Tag: rawmsg.Tag,
Expand All @@ -816,7 +807,6 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
unverifiedTxGroup: unverifiedTxGroup,
msgKey: msgKey,
canonicalKey: canonicalKey,
capguard: capguard,
},

Check warning on line 810 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L802-L810

Added lines #L802 - L810 were not covered by tests
}
}
Expand All @@ -830,25 +820,15 @@ func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.Vali
unverifiedTxGroup: msg.unverifiedTxGroup,
rawmsgDataHash: msg.msgKey,
unverifiedTxGroupHash: msg.canonicalKey,
capguard: msg.capguard,
capguard: nil,
}:
default:

Check warning on line 825 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L824-L825

Added lines #L824 - L825 were not covered by tests
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)

Check warning on line 828 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L828

Added line #L828 was not covered by tests

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
if handler.txCanonicalCache != nil && msg.canonicalKey != nil {
handler.txCanonicalCache.Delete(msg.canonicalKey)
}
if handler.msgCache != nil && msg.msgKey != nil {
handler.msgCache.DeleteByKey(msg.msgKey)
}
if msg.capguard != nil {
if capErr := msg.capguard.Release(); capErr != nil {
logging.Base().Warnf("processIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
}
handler.deleteFromCaches(msg.msgKey, msg.canonicalKey)

Check warning on line 831 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L831

Added line #L831 was not covered by tests
}
return network.OutgoingMessage{Action: network.Ignore}
}
Expand Down
5 changes: 3 additions & 2 deletions network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ func (c *CapabilitiesDiscovery) FindPeers(ctx context.Context, ns string, opts .
}

// Close should be called when fully shutting down the node
func (c *CapabilitiesDiscovery) Close() {
_ = c.dht.Close()
func (c *CapabilitiesDiscovery) Close() error {
err := c.dht.Close()
c.wg.Wait()
return err
}

// Host exposes the underlying libp2p host.Host object
Expand Down
6 changes: 4 additions & 2 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ func TestCapabilities_Varying(t *testing.T) {
wg.Wait()

for _, disc := range capsDisc[3:] {
disc.Close()
err := disc.Close()
require.NoError(t, err)
// Make sure it actually closes
disc.wg.Wait()
}
Expand Down Expand Up @@ -347,6 +348,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) {
"Found self when searching for capability",
)

disc[0].Close()
err := disc[0].Close()
require.NoError(t, err)
disc[0].wg.Wait()
}
5 changes: 4 additions & 1 deletion network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ func (n *P2PNetwork) Start() error {
// Stop closes sockets and stop threads.
func (n *P2PNetwork) Stop() {
if n.capabilitiesDiscovery != nil {
n.capabilitiesDiscovery.Close()
err := n.capabilitiesDiscovery.Close()
if err != nil {
n.log.Warnf("Error closing capabilities discovery: %v", err)

Check warning on line 370 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L370

Added line #L370 was not covered by tests
}
}

n.handler.ClearHandlers([]Tag{})
Expand Down

0 comments on commit a6248b2

Please sign in to comment.