diff --git a/core/event/bus.go b/core/event/bus.go index 0cd8d2ff7c..13e18e5356 100644 --- a/core/event/bus.go +++ b/core/event/bus.go @@ -39,6 +39,9 @@ type Subscription interface { // Out returns the channel from which to consume events. Out() <-chan interface{} + + // Name returns the name for the subscription + Name() string } // Bus is an interface for a type-based event delivery system. diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 2a500151fc..58918ce3a6 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -155,7 +155,7 @@ type HostOpts struct { // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { - eventBus := eventbus.NewBus() + eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer())) psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus) if err != nil { return nil, err diff --git a/p2p/host/blank/blank.go b/p2p/host/blank/blank.go index 08f1332643..e264e5bb16 100644 --- a/p2p/host/blank/blank.go +++ b/p2p/host/blank/blank.go @@ -68,7 +68,7 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost { mux: mstream.NewMultistreamMuxer[protocol.ID](), } if bh.eventbus == nil { - bh.eventbus = eventbus.NewBus() + bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer())) } // subscribe the connection manager to network notifications (has no effect with NullConnMgr) diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index 6ab6c410ae..5e11fad875 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -15,28 +15,34 @@ import ( // basicBus is a type-based event delivery system type basicBus struct { - lk sync.RWMutex - nodes map[reflect.Type]*node - wildcard *wildcardNode + lk sync.RWMutex + nodes map[reflect.Type]*node + wildcard *wildcardNode + metricsTracer MetricsTracer } var _ event.Bus = (*basicBus)(nil) type emitter struct { - n *node - w *wildcardNode - typ reflect.Type - closed int32 - dropper func(reflect.Type) + n *node + w *wildcardNode + typ reflect.Type + closed int32 + dropper func(reflect.Type) + metricsTracer MetricsTracer } func (e *emitter) Emit(evt interface{}) error { if atomic.LoadInt32(&e.closed) != 0 { return fmt.Errorf("emitter is closed") } + e.n.emit(evt) e.w.emit(evt) + if e.metricsTracer != nil { + e.metricsTracer.EventEmitted(e.typ) + } return nil } @@ -50,11 +56,15 @@ func (e *emitter) Close() error { return nil } -func NewBus() event.Bus { - return &basicBus{ +func NewBus(opts ...Option) event.Bus { + bus := &basicBus{ nodes: map[reflect.Type]*node{}, - wildcard: new(wildcardNode), + wildcard: &wildcardNode{}, + } + for _, opt := range opts { + opt(bus) } + return bus } func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) { @@ -62,7 +72,7 @@ func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) n, ok := b.nodes[typ] if !ok { - n = newNode(typ) + n = newNode(typ, b.metricsTracer) b.nodes[typ] = n } @@ -102,8 +112,10 @@ func (b *basicBus) tryDropNode(typ reflect.Type) { } type wildcardSub struct { - ch chan interface{} - w *wildcardNode + ch chan interface{} + w *wildcardNode + metricsTracer MetricsTracer + name string } func (w *wildcardSub) Out() <-chan interface{} { @@ -112,13 +124,31 @@ func (w *wildcardSub) Out() <-chan interface{} { func (w *wildcardSub) Close() error { w.w.removeSink(w.ch) + if w.metricsTracer != nil { + w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription)) + } return nil } +func (w *wildcardSub) Name() string { + return w.name +} + +type namedSink struct { + name string + ch chan interface{} +} + type sub struct { - ch chan interface{} - nodes []*node - dropper func(reflect.Type) + ch chan interface{} + nodes []*node + dropper func(reflect.Type) + metricsTracer MetricsTracer + name string +} + +func (s *sub) Name() string { + return s.name } func (s *sub) Out() <-chan interface{} { @@ -137,9 +167,13 @@ func (s *sub) Close() error { n.lk.Lock() for i := 0; i < len(n.sinks); i++ { - if n.sinks[i] == s.ch { + if n.sinks[i].ch == s.ch { n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil n.sinks = n.sinks[:len(n.sinks)-1] + + if s.metricsTracer != nil { + s.metricsTracer.RemoveSubscriber(n.typ) + } break } } @@ -162,7 +196,7 @@ var _ event.Subscription = (*sub)(nil) // publishers to get blocked. CancelFunc is guaranteed to return after last send // to the channel func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) { - settings := subSettingsDefault + settings := newSubSettings() for _, opt := range opts { if err := opt(&settings); err != nil { return nil, err @@ -171,10 +205,12 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt if evtTypes == event.WildcardSubscription { out := &wildcardSub{ - ch: make(chan interface{}, settings.buffer), - w: b.wildcard, + ch: make(chan interface{}, settings.buffer), + w: b.wildcard, + metricsTracer: b.metricsTracer, + name: settings.name, } - b.wildcard.addSink(out.ch) + b.wildcard.addSink(&namedSink{ch: out.ch, name: out.name}) return out, nil } @@ -195,7 +231,9 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt ch: make(chan interface{}, settings.buffer), nodes: make([]*node, len(types)), - dropper: b.tryDropNode, + dropper: b.tryDropNode, + metricsTracer: b.metricsTracer, + name: settings.name, } for _, etyp := range types { @@ -208,8 +246,11 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt typ := reflect.TypeOf(etyp) b.withNode(typ.Elem(), func(n *node) { - n.sinks = append(n.sinks, out.ch) + n.sinks = append(n.sinks, &namedSink{ch: out.ch, name: out.name}) out.nodes[i] = n + if b.metricsTracer != nil { + b.metricsTracer.AddSubscriber(typ.Elem()) + } }, func(n *node) { if n.keepLast { l := n.last @@ -255,7 +296,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve b.withNode(typ, func(n *node) { atomic.AddInt32(&n.nEmitters, 1) n.keepLast = n.keepLast || settings.makeStateful - e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard} + e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard, metricsTracer: b.metricsTracer} }, nil) return } @@ -278,22 +319,27 @@ func (b *basicBus) GetAllEventTypes() []reflect.Type { type wildcardNode struct { sync.RWMutex - nSinks int32 - sinks []chan interface{} + nSinks int32 + sinks []*namedSink + metricsTracer MetricsTracer } -func (n *wildcardNode) addSink(ch chan interface{}) { +func (n *wildcardNode) addSink(sink *namedSink) { atomic.AddInt32(&n.nSinks, 1) // ok to do outside the lock n.Lock() - n.sinks = append(n.sinks, ch) + n.sinks = append(n.sinks, sink) n.Unlock() + + if n.metricsTracer != nil { + n.metricsTracer.AddSubscriber(reflect.TypeOf(event.WildcardSubscription)) + } } func (n *wildcardNode) removeSink(ch chan interface{}) { atomic.AddInt32(&n.nSinks, -1) // ok to do outside the lock n.Lock() for i := 0; i < len(n.sinks); i++ { - if n.sinks[i] == ch { + if n.sinks[i].ch == ch { n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil n.sinks = n.sinks[:len(n.sinks)-1] break @@ -308,8 +354,13 @@ func (n *wildcardNode) emit(evt interface{}) { } n.RLock() - for _, ch := range n.sinks { - ch <- evt + for _, sink := range n.sinks { + + // Sending metrics before sending on channel allows us to + // record channel full events before blocking + sendSubscriberMetrics(n.metricsTracer, sink) + + sink.ch <- evt } n.RUnlock() } @@ -326,12 +377,14 @@ type node struct { keepLast bool last interface{} - sinks []chan interface{} + sinks []*namedSink + metricsTracer MetricsTracer } -func newNode(typ reflect.Type) *node { +func newNode(typ reflect.Type, metricsTracer MetricsTracer) *node { return &node{ - typ: typ, + typ: typ, + metricsTracer: metricsTracer, } } @@ -346,8 +399,20 @@ func (n *node) emit(evt interface{}) { n.last = evt } - for _, ch := range n.sinks { - ch <- evt + for _, sink := range n.sinks { + + // Sending metrics before sending on channel allows us to + // record channel full events before blocking + sendSubscriberMetrics(n.metricsTracer, sink) + sink.ch <- evt } n.lk.Unlock() } + +func sendSubscriberMetrics(metricsTracer MetricsTracer, sink *namedSink) { + if metricsTracer != nil { + metricsTracer.SubscriberQueueLength(sink.name, len(sink.ch)+1) + metricsTracer.SubscriberQueueFull(sink.name, len(sink.ch)+1 >= cap(sink.ch)) + metricsTracer.SubscriberEventQueued(sink.name) + } +} diff --git a/p2p/host/eventbus/basic_metrics.go b/p2p/host/eventbus/basic_metrics.go new file mode 100644 index 0000000000..0a6edf8a8c --- /dev/null +++ b/p2p/host/eventbus/basic_metrics.go @@ -0,0 +1,136 @@ +package eventbus + +import ( + "reflect" + "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +const metricNamespace = "libp2p_eventbus" + +var ( + eventsEmitted = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "events_emitted_total", + Help: "Events Emitted", + }, + []string{"event"}, + ) + totalSubscribers = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "subscribers_total", + Help: "Number of subscribers for an event type", + }, + []string{"event"}, + ) + subscriberQueueLength = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "subscriber_queue_length", + Help: "Subscriber queue length", + }, + []string{"subscriber_name"}, + ) + subscriberQueueFull = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "subscriber_queue_full", + Help: "Subscriber Queue completely full", + }, + []string{"subscriber_name"}, + ) + subscriberEventQueued = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "subscriber_event_queued", + Help: "Event Queued for subscriber", + }, + []string{"subscriber_name"}, + ) +) + +// MetricsTracer tracks metrics for the eventbus subsystem +type MetricsTracer interface { + + // EventEmitted counts the total number of events grouped by event type + EventEmitted(typ reflect.Type) + + // AddSubscriber adds a subscriber for the event type + AddSubscriber(typ reflect.Type) + + // RemoveSubscriber removes a subscriber for the event type + RemoveSubscriber(typ reflect.Type) + + // SubscriberQueueLength is the length of the subscribers channel + SubscriberQueueLength(name string, n int) + + // SubscriberQueueFull tracks whether a subscribers channel if full + SubscriberQueueFull(name string, isFull bool) + + // SubscriberEventQueued counts the total number of events grouped by subscriber + SubscriberEventQueued(name string) +} + +type metricsTracer struct{} + +var _ MetricsTracer = &metricsTracer{} + +type MetricsTracerOption = func(*metricsTracerSetting) + +type metricsTracerSetting struct { + reg prometheus.Registerer +} + +var initMetricsOnce sync.Once + +func initMetrics(reg prometheus.Registerer) { + reg.MustRegister(eventsEmitted, totalSubscribers, subscriberQueueLength, subscriberQueueFull, subscriberEventQueued) +} + +func MustRegisterWith(reg prometheus.Registerer) MetricsTracerOption { + return func(s *metricsTracerSetting) { + s.reg = reg + } +} + +func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer { + settings := &metricsTracerSetting{reg: prometheus.DefaultRegisterer} + for _, opt := range opts { + opt(settings) + } + initMetricsOnce.Do(func() { initMetrics(settings.reg) }) + return &metricsTracer{} +} + +func (m *metricsTracer) EventEmitted(typ reflect.Type) { + eventsEmitted.WithLabelValues(strings.TrimPrefix(typ.String(), "event.")).Inc() +} + +func (m *metricsTracer) AddSubscriber(typ reflect.Type) { + totalSubscribers.WithLabelValues(strings.TrimPrefix(typ.String(), "event.")).Inc() +} + +func (m *metricsTracer) RemoveSubscriber(typ reflect.Type) { + totalSubscribers.WithLabelValues(strings.TrimPrefix(typ.String(), "event.")).Dec() +} + +func (m *metricsTracer) SubscriberQueueLength(name string, n int) { + subscriberQueueLength.WithLabelValues(name).Set(float64(n)) +} + +func (m *metricsTracer) SubscriberQueueFull(name string, isFull bool) { + observer := subscriberQueueFull.WithLabelValues(name) + if isFull { + observer.Set(1) + } else { + observer.Set(0) + } +} + +func (m *metricsTracer) SubscriberEventQueued(name string) { + subscriberEventQueued.WithLabelValues(name).Inc() +} diff --git a/p2p/host/eventbus/basic_metrics_test.go b/p2p/host/eventbus/basic_metrics_test.go new file mode 100644 index 0000000000..8322f73e97 --- /dev/null +++ b/p2p/host/eventbus/basic_metrics_test.go @@ -0,0 +1,27 @@ +package eventbus + +import ( + "reflect" + "testing" + + "github.com/libp2p/go-libp2p/core/event" +) + +func BenchmarkEventEmitted(b *testing.B) { + b.ReportAllocs() + types := []reflect.Type{reflect.TypeOf(new(event.EvtLocalAddressesUpdated)), reflect.TypeOf(new(event.EvtNATDeviceTypeChanged)), + reflect.TypeOf(new(event.EvtLocalProtocolsUpdated))} + mt := NewMetricsTracer() + for i := 0; i < b.N; i++ { + mt.EventEmitted(types[i%len(types)]) + } +} + +func BenchmarkSubscriberQueueLength(b *testing.B) { + b.ReportAllocs() + names := []string{"s1", "s2", "s3", "s4"} + mt := NewMetricsTracer() + for i := 0; i < b.N; i++ { + mt.SubscriberQueueLength(names[i%len(names)], i) + } +} diff --git a/p2p/host/eventbus/grafana-dashboards/eventbus.json b/p2p/host/eventbus/grafana-dashboards/eventbus.json new file mode 100644 index 0000000000..17de2a8a72 --- /dev/null +++ b/p2p/host/eventbus/grafana-dashboards/eventbus.json @@ -0,0 +1,529 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "Prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__elements": {}, + "__requires": [ + { + "type": "panel", + "id": "gauge", + "name": "Gauge", + "version": "" + }, + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "9.3.6" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "state-timeline", + "name": "State timeline", + "version": "" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "rate(libp2p_eventbus_events_emitted_total[$__rate_interval])", + "format": "time_series", + "instant": false, + "interval": "", + "legendFormat": "{{event}}", + "range": true, + "refId": "A" + } + ], + "title": "Events Types emitted", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 9 + }, + "id": 6, + "options": { + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "text": { + "titleSize": 14 + } + }, + "pluginVersion": "9.3.6", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "libp2p_eventbus_subscribers_total", + "legendFormat": "{{event}}", + "range": true, + "refId": "A" + } + ], + "title": "Event Subscribers", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "rate(libp2p_eventbus_subscriber_event_queued[$__rate_interval])", + "legendFormat": "{{subscriber_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Events emitted By Subscriber", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "Event Subscribers need to consume events quickly enough, otherwise they risk stalling the libp2p process.\nSubscribers use a buffered channel to catch temporary bursts. A queue length that doesn't return to 0 might be indicative of a problem.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "libp2p_eventbus_subscriber_queue_length", + "hide": false, + "legendFormat": "{{subscriber_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Subscriber Queue Length", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "When the subscriber event queue fills up, it blocks the libp2p process. This can be mitigated by\n1. consuming events quickly enough on the subscriber side\n2. using a large enough buffer to absorb bursts", + "fieldConfig": { + "defaults": { + "color": { + "mode": "continuous-GrYlRd" + }, + "custom": { + "fillOpacity": 75, + "lineWidth": 0, + "spanNulls": 60000 + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "OK" + }, + "1": { + "color": "red", + "index": 1, + "text": "FULL" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "#73BF69", + "value": null + } + ] + }, + "unit": "string" + }, + "overrides": [] + }, + "gridPos": { + "h": 11, + "w": 23, + "x": 0, + "y": 23 + }, + "id": 10, + "options": { + "alignValue": "center", + "legend": { + "displayMode": "list", + "placement": "right", + "showLegend": false + }, + "mergeValues": true, + "rowHeight": 0.94, + "showValue": "always", + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "exemplar": false, + "expr": "libp2p_eventbus_subscriber_queue_full", + "format": "time_series", + "hide": false, + "instant": false, + "legendFormat": "{{subscriber_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Subscriber Queue Full", + "type": "state-timeline" + } + ], + "refresh": false, + "schemaVersion": 37, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "libp2p EventBus", + "uid": "ZFbI6NAVn", + "version": 4, + "weekStart": "" +} diff --git a/p2p/host/eventbus/opts.go b/p2p/host/eventbus/opts.go index a8eae6f2fa..3922be0a98 100644 --- a/p2p/host/eventbus/opts.go +++ b/p2p/host/eventbus/opts.go @@ -1,13 +1,44 @@ package eventbus +import ( + "fmt" + "runtime" + "strings" + "sync/atomic" +) + type subSettings struct { buffer int + name string } +var subCnt int64 + var subSettingsDefault = subSettings{ buffer: 16, } +// newSubSettings returns the settings for a new subscriber +// The default naming strategy is sub--L +func newSubSettings() subSettings { + settings := subSettingsDefault + _, file, line, ok := runtime.Caller(2) // skip=1 is eventbus.Subscriber + if ok { + file = strings.TrimPrefix(file, "github.com/") + // remove the version number from the path, for example + // go-libp2p-package@v0.x.y-some-hash-123/file.go will be shortened go go-libp2p-package/file.go + if idx1 := strings.Index(file, "@"); idx1 != -1 { + if idx2 := strings.Index(file[idx1:], "/"); idx2 != -1 { + file = file[:idx1] + file[idx1+idx2:] + } + } + settings.name = fmt.Sprintf("%s-L%d", file, line) + } else { + settings.name = fmt.Sprintf("subscriber-%d", atomic.AddInt64(&subCnt, 1)) + } + return settings +} + func BufSize(n int) func(interface{}) error { return func(s interface{}) error { s.(*subSettings).buffer = n @@ -15,6 +46,13 @@ func BufSize(n int) func(interface{}) error { } } +func Name(name string) func(interface{}) error { + return func(s interface{}) error { + s.(*subSettings).name = name + return nil + } +} + type emitterSettings struct { makeStateful bool } @@ -30,3 +68,12 @@ func Stateful(s interface{}) error { s.(*emitterSettings).makeStateful = true return nil } + +type Option func(*basicBus) + +func WithMetricsTracer(metricsTracer MetricsTracer) Option { + return func(bus *basicBus) { + bus.metricsTracer = metricsTracer + bus.wildcard.metricsTracer = metricsTracer + } +}