Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: handle txns in pubsub validator #6070

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// pubsub.WithValidateThrottle(cfg.TxBacklogSize),
pubsub.WithValidateWorkers(incomingThreads),
pubsub.WithRawTracer(pubsubTracer{}),
}

return pubsub.NewGossipSub(ctx, host, options...)
Expand Down
86 changes: 86 additions & 0 deletions network/p2p/pubsubTracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package p2p

import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/algorand/go-algorand/util/metrics"
)

var _ = pubsub.RawTracer(pubsubTracer{})
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved

var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg)
cce marked this conversation as resolved.
Show resolved Hide resolved

// pubsubTracer is a tracer for pubsub events used to track metrics.
type pubsubTracer struct{}

// AddPeer is invoked when a new peer is added.
func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {}

Check warning on line 36 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L36

Added line #L36 was not covered by tests

// RemovePeer is invoked when a peer is removed.
func (t pubsubTracer) RemovePeer(p peer.ID) {}

Check warning on line 39 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L39

Added line #L39 was not covered by tests

// Join is invoked when a new topic is joined
func (t pubsubTracer) Join(topic string) {}

Check warning on line 42 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L42

Added line #L42 was not covered by tests

// Leave is invoked when a topic is abandoned
func (t pubsubTracer) Leave(topic string) {}

Check warning on line 45 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L45

Added line #L45 was not covered by tests

// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
func (t pubsubTracer) Graft(p peer.ID, topic string) {}

Check warning on line 48 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L48

Added line #L48 was not covered by tests

// Prune is invoked when a peer is pruned from the message (gossipsub)
func (t pubsubTracer) Prune(p peer.ID, topic string) {}

Check warning on line 51 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L51

Added line #L51 was not covered by tests

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {}

Check warning on line 54 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L54

Added line #L54 was not covered by tests

// DeliverMessage is invoked when a message is delivered
func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {}

Check warning on line 57 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L57

Added line #L57 was not covered by tests

// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) {
if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull {
transactionMessagesDroppedFromBacklog.Inc(nil)

Check warning on line 63 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L61-L63

Added lines #L61 - L63 were not covered by tests
cce marked this conversation as resolved.
Show resolved Hide resolved
}
}

// DuplicateMessage is invoked when a duplicate message is dropped.
func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) {
transactionMessagesDupRawMsg.Inc(nil)

Check warning on line 69 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}

// ThrottlePeer is invoked when a peer is throttled by the peer gater.
func (t pubsubTracer) ThrottlePeer(p peer.ID) {}

Check warning on line 73 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L73

Added line #L73 was not covered by tests

// RecvRPC is invoked when an incoming RPC is received.
func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {}

Check warning on line 76 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L76

Added line #L76 was not covered by tests

// SendRPC is invoked when a RPC is sent.
func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 79 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L79

Added line #L79 was not covered by tests

// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 82 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L82

Added line #L82 was not covered by tests

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {}

Check warning on line 86 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L86

Added line #L86 was not covered by tests
2 changes: 1 addition & 1 deletion network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"})

var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"})
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})
var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"})
var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"})
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
Expand All @@ -135,7 +136,6 @@

var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"})
var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"})
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})

var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."})
var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."})
Expand Down Expand Up @@ -865,7 +865,7 @@
}

// RegisterValidatorHandlers registers the set of given message handlers.
func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {

Check warning on line 868 in network/wsNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/wsNetwork.go#L868

Added line #L868 was not covered by tests
}

// ClearProcessors deregisters all the existing message handlers.
Expand Down
Loading