diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 60c0daa1a3..8f2bc24c1e 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -26,7 +26,6 @@ type ConnManager struct { quicListeners map[string]quicListenerEntry srk quic.StatelessResetKey - mt *metricsTracer } type quicListenerEntry struct { @@ -48,26 +47,20 @@ func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (* quicConf := quicConfig.Clone() - if cm.enableMetrics { - cm.mt = newMetricsTracer() - } quicConf.Tracer = func(ctx context.Context, p quiclogging.Perspective, ci quic.ConnectionID) quiclogging.ConnectionTracer { - tracers := make([]quiclogging.ConnectionTracer, 0, 2) + var tracer quiclogging.ConnectionTracer if qlogTracerDir != "" { - tracers = append(tracers, qloggerForDir(qlogTracerDir, p, ci)) - } - if cm.mt != nil { - tracers = append(tracers, cm.mt.TracerForConnection(ctx, p, ci)) + tracer = qloggerForDir(qlogTracerDir, p, ci) } - return quiclogging.NewMultiplexedConnectionTracer(tracers...) + return tracer } serverConfig := quicConf.Clone() cm.clientConfig = quicConf cm.serverConfig = serverConfig if cm.enableReuseport { - cm.reuseUDP4 = newReuse(&statelessResetKey, cm.mt) - cm.reuseUDP6 = newReuse(&statelessResetKey, cm.mt) + cm.reuseUDP4 = newReuse(&statelessResetKey) + cm.reuseUDP6 = newReuse(&statelessResetKey) } return cm, nil } @@ -149,11 +142,7 @@ func (c *ConnManager) transportForListen(network string, laddr *net.UDPAddr) (re if err != nil { return nil, err } - tr := &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn} - if c.mt != nil { - tr.Transport.Tracer = c.mt - } - return tr, nil + return &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil } func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (quic.Connection, error) { @@ -208,12 +197,7 @@ func (c *ConnManager) TransportForDial(network string, raddr *net.UDPAddr) (refC if err != nil { return nil, err } - tr := &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn} - if c.mt != nil { - tr.Transport.Tracer = c.mt - } - - return tr, nil + return &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil } func (c *ConnManager) Protocols() []int { diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 2940e0d349..9e41534235 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -123,10 +123,9 @@ type reuse struct { globalDialers map[int]*refcountedTransport statelessResetKey *quic.StatelessResetKey - metricsTracer *metricsTracer } -func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse { +func newReuse(srk *quic.StatelessResetKey) *reuse { r := &reuse{ unicast: make(map[string]map[int]*refcountedTransport), globalListeners: make(map[int]*refcountedTransport), @@ -134,7 +133,6 @@ func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse { closeChan: make(chan struct{}), gcStopChan: make(chan struct{}), statelessResetKey: srk, - metricsTracer: mt, } go r.gc() return r @@ -271,9 +269,6 @@ func (r *reuse) transportForDialLocked(network string, source *net.IP) (*refcoun Conn: conn, StatelessResetKey: r.statelessResetKey, }, packetConn: conn} - if r.metricsTracer != nil { - tr.Transport.Tracer = r.metricsTracer - } r.globalDialers[conn.LocalAddr().(*net.UDPAddr).Port] = tr return tr, nil } @@ -317,14 +312,13 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun return nil, err } localAddr := conn.LocalAddr().(*net.UDPAddr) - tr := &refcountedTransport{Transport: quic.Transport{ - Conn: conn, - StatelessResetKey: r.statelessResetKey, - }, packetConn: conn} - if r.metricsTracer != nil { - tr.Transport.Tracer = r.metricsTracer + tr := &refcountedTransport{ + Transport: quic.Transport{ + Conn: conn, + StatelessResetKey: r.statelessResetKey, + }, + packetConn: conn, } - tr.IncreaseCount() // Deal with listen on a global address diff --git a/p2p/transport/quicreuse/reuse_test.go b/p2p/transport/quicreuse/reuse_test.go index b373f31fe4..c56356e5f2 100644 --- a/p2p/transport/quicreuse/reuse_test.go +++ b/p2p/transport/quicreuse/reuse_test.go @@ -61,7 +61,7 @@ func cleanup(t *testing.T, reuse *reuse) { } func TestReuseListenOnAllIPv4(t *testing.T) { - reuse := newReuse(nil, nil) + reuse := newReuse(nil) require.Eventually(t, isGarbageCollectorRunning, 500*time.Millisecond, 50*time.Millisecond, "expected garbage collector to be running") cleanup(t, reuse) @@ -73,7 +73,7 @@ func TestReuseListenOnAllIPv4(t *testing.T) { } func TestReuseListenOnAllIPv6(t *testing.T) { - reuse := newReuse(nil, nil) + reuse := newReuse(nil) require.Eventually(t, isGarbageCollectorRunning, 500*time.Millisecond, 50*time.Millisecond, "expected garbage collector to be running") cleanup(t, reuse) @@ -86,7 +86,7 @@ func TestReuseListenOnAllIPv6(t *testing.T) { } func TestReuseCreateNewGlobalConnOnDial(t *testing.T) { - reuse := newReuse(nil, nil) + reuse := newReuse(nil) cleanup(t, reuse) addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") @@ -100,7 +100,7 @@ func TestReuseCreateNewGlobalConnOnDial(t *testing.T) { } func TestReuseConnectionWhenDialing(t *testing.T) { - reuse := newReuse(nil, nil) + reuse := newReuse(nil) cleanup(t, reuse) addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") @@ -117,7 +117,7 @@ func TestReuseConnectionWhenDialing(t *testing.T) { } func TestReuseConnectionWhenListening(t *testing.T) { - reuse := newReuse(nil, nil) + reuse := newReuse(nil) cleanup(t, reuse) raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") @@ -132,7 +132,7 @@ func TestReuseConnectionWhenListening(t *testing.T) { } func TestReuseConnectionWhenDialBeforeListen(t *testing.T) { - reuse := newReuse(nil, nil) + reuse := newReuse(nil) cleanup(t, reuse) // dial any address @@ -166,7 +166,7 @@ func TestReuseListenOnSpecificInterface(t *testing.T) { if platformHasRoutingTables() { t.Skip("this test only works on platforms that support routing tables") } - reuse := newReuse(nil, nil) + reuse := newReuse(nil) cleanup(t, reuse) router, err := netroute.New() @@ -203,7 +203,7 @@ func TestReuseGarbageCollect(t *testing.T) { maxUnusedDuration = 10 * maxUnusedDuration } - reuse := newReuse(nil, nil) + reuse := newReuse(nil) cleanup(t, reuse) numGlobals := func() int { diff --git a/p2p/transport/quicreuse/tracer_metrics.go b/p2p/transport/quicreuse/tracer_metrics.go deleted file mode 100644 index 03e73fd25f..0000000000 --- a/p2p/transport/quicreuse/tracer_metrics.go +++ /dev/null @@ -1,372 +0,0 @@ -package quicreuse - -import ( - "context" - "errors" - "fmt" - "net" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/quic-go/quic-go" - "github.com/quic-go/quic-go/logging" -) - -var ( - bytesTransferred *prometheus.CounterVec - newConns *prometheus.CounterVec - closedConns *prometheus.CounterVec - sentPackets *prometheus.CounterVec - rcvdPackets *prometheus.CounterVec - bufferedPackets *prometheus.CounterVec - droppedPackets *prometheus.CounterVec - lostPackets *prometheus.CounterVec - connErrors *prometheus.CounterVec -) - -type aggregatingCollector struct { - mutex sync.Mutex - - conns map[string] /* conn ID */ *metricsConnTracer - rtts prometheus.Histogram - connDurations prometheus.Histogram -} - -func newAggregatingCollector() *aggregatingCollector { - return &aggregatingCollector{ - conns: make(map[string]*metricsConnTracer), - rtts: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "quic_smoothed_rtt", - Help: "Smoothed RTT", - Buckets: prometheus.ExponentialBuckets(0.001, 1.25, 40), // 1ms to ~6000ms - }), - connDurations: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "quic_connection_duration", - Help: "Connection Duration", - Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks - }), - } -} - -var _ prometheus.Collector = &aggregatingCollector{} - -func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) { - descs <- c.rtts.Desc() - descs <- c.connDurations.Desc() -} - -func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) { - now := time.Now() - c.mutex.Lock() - for _, conn := range c.conns { - if rtt, valid := conn.getSmoothedRTT(); valid { - c.rtts.Observe(rtt.Seconds()) - } - c.connDurations.Observe(now.Sub(conn.startTime).Seconds()) - } - c.mutex.Unlock() - metrics <- c.rtts - metrics <- c.connDurations -} - -func (c *aggregatingCollector) AddConn(id string, t *metricsConnTracer) { - c.mutex.Lock() - c.conns[id] = t - c.mutex.Unlock() -} - -func (c *aggregatingCollector) RemoveConn(id string) { - c.mutex.Lock() - delete(c.conns, id) - c.mutex.Unlock() -} - -var collector *aggregatingCollector - -var initMetricsOnce sync.Once - -func initMetrics() { - const ( - direction = "direction" - encLevel = "encryption_level" - ) - - closedConns = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_connections_closed_total", - Help: "closed QUIC connection", - }, - []string{direction}, - ) - prometheus.MustRegister(closedConns) - newConns = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_connections_new_total", - Help: "new QUIC connection", - }, - []string{direction, "handshake_successful"}, - ) - prometheus.MustRegister(newConns) - bytesTransferred = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_transferred_bytes", - Help: "QUIC bytes transferred", - }, - []string{direction}, // TODO: this is confusing. Other times, we use direction for the perspective - ) - prometheus.MustRegister(bytesTransferred) - sentPackets = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_packets_sent_total", - Help: "QUIC packets sent", - }, - []string{encLevel}, - ) - prometheus.MustRegister(sentPackets) - rcvdPackets = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_packets_rcvd_total", - Help: "QUIC packets received", - }, - []string{encLevel}, - ) - prometheus.MustRegister(rcvdPackets) - bufferedPackets = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_packets_buffered_total", - Help: "Buffered packets", - }, - []string{"packet_type"}, - ) - prometheus.MustRegister(bufferedPackets) - droppedPackets = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_packets_dropped_total", - Help: "Dropped packets", - }, - []string{"packet_type", "reason"}, - ) - prometheus.MustRegister(droppedPackets) - connErrors = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_connection_errors_total", - Help: "QUIC connection errors", - }, - []string{"side", "error_code"}, - ) - prometheus.MustRegister(connErrors) - lostPackets = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "quic_packets_lost_total", - Help: "QUIC lost received", - }, - []string{encLevel, "reason"}, - ) - prometheus.MustRegister(lostPackets) - collector = newAggregatingCollector() - prometheus.MustRegister(collector) -} - -type metricsTracer struct { - logging.NullTracer -} - -var _ logging.Tracer = &metricsTracer{} - -func newMetricsTracer() *metricsTracer { - initMetricsOnce.Do(func() { initMetrics() }) - return &metricsTracer{} -} - -func (m *metricsTracer) TracerForConnection(_ context.Context, p logging.Perspective, connID logging.ConnectionID) logging.ConnectionTracer { - return &metricsConnTracer{perspective: p, connID: connID} -} - -func (m *metricsTracer) SentPacket(_ net.Addr, _ *logging.Header, size logging.ByteCount, _ []logging.Frame) { - bytesTransferred.WithLabelValues("sent").Add(float64(size)) -} - -type metricsConnTracer struct { - logging.NullConnectionTracer - - perspective logging.Perspective - startTime time.Time - connID logging.ConnectionID - handshakeComplete bool - - mutex sync.Mutex - numRTTMeasurements int - rtt time.Duration -} - -var _ logging.ConnectionTracer = &metricsConnTracer{} - -func (m *metricsConnTracer) getDirection() string { - if m.perspective == logging.PerspectiveClient { - return "outgoing" - } - return "incoming" -} - -func (m *metricsConnTracer) getEncLevel(packetType logging.PacketType) string { - switch packetType { - case logging.PacketType0RTT: - return "0-RTT" - case logging.PacketTypeInitial: - return "Initial" - case logging.PacketTypeHandshake: - return "Handshake" - case logging.PacketTypeRetry: - return "Retry" - case logging.PacketType1RTT: - return "1-RTT" - default: - return "unknown" - } -} - -func (m *metricsConnTracer) StartedConnection(net.Addr, net.Addr, logging.ConnectionID, logging.ConnectionID) { - m.startTime = time.Now() - collector.AddConn(m.connID.String(), m) -} - -func (m *metricsConnTracer) ClosedConnection(e error) { - var ( - applicationErr *quic.ApplicationError - transportErr *quic.TransportError - statelessResetErr *quic.StatelessResetError - vnErr *quic.VersionNegotiationError - idleTimeoutErr *quic.IdleTimeoutError - handshakeTimeoutErr *quic.HandshakeTimeoutError - remote bool - desc string - ) - - switch { - case errors.As(e, &applicationErr): - return - case errors.As(e, &transportErr): - remote = transportErr.Remote - desc = transportErr.ErrorCode.String() - case errors.As(e, &statelessResetErr): - remote = true - desc = "stateless_reset" - case errors.As(e, &vnErr): - desc = "version_negotiation" - case errors.As(e, &idleTimeoutErr): - desc = "idle_timeout" - case errors.As(e, &handshakeTimeoutErr): - desc = "handshake_timeout" - default: - desc = fmt.Sprintf("unknown error: %v", e) - } - - side := "local" - if remote { - side = "remote" - } - connErrors.WithLabelValues(side, desc).Inc() -} -func (m *metricsConnTracer) SentPacket(hdr *logging.ExtendedHeader, size logging.ByteCount, _ *logging.AckFrame, _ []logging.Frame) { - bytesTransferred.WithLabelValues("sent").Add(float64(size)) - sentPackets.WithLabelValues(m.getEncLevel(logging.PacketTypeFromHeader(&hdr.Header))).Inc() -} - -func (m *metricsConnTracer) ReceivedVersionNegotiationPacket(dst, src logging.ArbitraryLenConnectionID, v []logging.VersionNumber) { - bytesTransferred.WithLabelValues("rcvd").Add(1 /* header form byte */ + 4 /* version number */ + 2 /* src and dest conn id length fields */ + float64(dst.Len()+src.Len()) + float64(4*len(v))) - rcvdPackets.WithLabelValues("Version Negotiation").Inc() -} - -func (m *metricsConnTracer) ReceivedRetry(*logging.Header) { - rcvdPackets.WithLabelValues("Retry").Inc() -} - -func (m *metricsConnTracer) ReceivedPacket(hdr *logging.ExtendedHeader, size logging.ByteCount, _ []logging.Frame) { - bytesTransferred.WithLabelValues("rcvd").Add(float64(size)) - rcvdPackets.WithLabelValues(m.getEncLevel(logging.PacketTypeFromHeader(&hdr.Header))).Inc() -} - -func (m *metricsConnTracer) BufferedPacket(packetType logging.PacketType, _ logging.ByteCount) { - bufferedPackets.WithLabelValues(m.getEncLevel(packetType)).Inc() -} - -func (m *metricsConnTracer) DroppedPacket(packetType logging.PacketType, size logging.ByteCount, r logging.PacketDropReason) { - bytesTransferred.WithLabelValues("rcvd").Add(float64(size)) - var reason string - switch r { - case logging.PacketDropKeyUnavailable: - reason = "key_unavailable" - case logging.PacketDropUnknownConnectionID: - reason = "unknown_connection_id" - case logging.PacketDropHeaderParseError: - reason = "header_parse_error" - case logging.PacketDropPayloadDecryptError: - reason = "payload_decrypt_error" - case logging.PacketDropProtocolViolation: - reason = "protocol_violation" - case logging.PacketDropDOSPrevention: - reason = "dos_prevention" - case logging.PacketDropUnsupportedVersion: - reason = "unsupported_version" - case logging.PacketDropUnexpectedPacket: - reason = "unexpected_packet" - case logging.PacketDropUnexpectedSourceConnectionID: - reason = "unexpected_source_connection_id" - case logging.PacketDropUnexpectedVersion: - reason = "unexpected_version" - case logging.PacketDropDuplicate: - reason = "duplicate" - default: - reason = "unknown" - } - droppedPackets.WithLabelValues(m.getEncLevel(packetType), reason).Inc() -} - -func (m *metricsConnTracer) UpdatedMetrics(rttStats *logging.RTTStats, cwnd, bytesInFlight logging.ByteCount, packetsInFlight int) { - m.mutex.Lock() - m.rtt = rttStats.SmoothedRTT() - m.numRTTMeasurements++ - m.mutex.Unlock() -} - -func (m *metricsConnTracer) LostPacket(level logging.EncryptionLevel, _ logging.PacketNumber, r logging.PacketLossReason) { - var reason string - switch r { - case logging.PacketLossReorderingThreshold: - reason = "reordering_threshold" - case logging.PacketLossTimeThreshold: - reason = "time_threshold" - default: - reason = "unknown" - } - lostPackets.WithLabelValues(level.String(), reason).Inc() -} - -func (m *metricsConnTracer) DroppedEncryptionLevel(level logging.EncryptionLevel) { - if level == logging.EncryptionHandshake { - m.handleHandshakeComplete() - } -} - -func (m *metricsConnTracer) Close() { - if m.handshakeComplete { - closedConns.WithLabelValues(m.getDirection()).Inc() - } else { - newConns.WithLabelValues(m.getDirection(), "false").Inc() - } - collector.RemoveConn(m.connID.String()) -} - -func (m *metricsConnTracer) handleHandshakeComplete() { - m.handshakeComplete = true - newConns.WithLabelValues(m.getDirection(), "true").Inc() -} - -func (m *metricsConnTracer) getSmoothedRTT() (rtt time.Duration, valid bool) { - m.mutex.Lock() - rtt = m.rtt - valid = m.numRTTMeasurements > 10 - m.mutex.Unlock() - return -}