Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventbus: add metrics #2038

Merged
merged 18 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Subscription interface {

// Out returns the channel from which to consume events.
Out() <-chan interface{}

// Name returns the name for the subscription
Name() string
}

// Bus is an interface for a type-based event delivery system.
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type HostOpts struct {

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
eventBus := eventbus.NewBus()
eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
mux: mstream.NewMultistreamMuxer[protocol.ID](),
}
if bh.eventbus == nil {
bh.eventbus = eventbus.NewBus()
bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
}

// subscribe the connection manager to network notifications (has no effect with NullConnMgr)
Expand Down
139 changes: 102 additions & 37 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,34 @@ import (

// basicBus is a type-based event delivery system
type basicBus struct {
lk sync.RWMutex
nodes map[reflect.Type]*node
wildcard *wildcardNode
lk sync.RWMutex
nodes map[reflect.Type]*node
wildcard *wildcardNode
metricsTracer MetricsTracer
}

var _ event.Bus = (*basicBus)(nil)

type emitter struct {
n *node
w *wildcardNode
typ reflect.Type
closed int32
dropper func(reflect.Type)
n *node
w *wildcardNode
typ reflect.Type
closed int32
dropper func(reflect.Type)
metricsTracer MetricsTracer
}

func (e *emitter) Emit(evt interface{}) error {
if atomic.LoadInt32(&e.closed) != 0 {
return fmt.Errorf("emitter is closed")
}

e.n.emit(evt)
e.w.emit(evt)

if e.metricsTracer != nil {
e.metricsTracer.EventEmitted(e.typ)
}
return nil
}

Expand All @@ -50,19 +56,23 @@ func (e *emitter) Close() error {
return nil
}

func NewBus() event.Bus {
return &basicBus{
func NewBus(opts ...Option) event.Bus {
bus := &basicBus{
nodes: map[reflect.Type]*node{},
wildcard: new(wildcardNode),
wildcard: &wildcardNode{},
}
for _, opt := range opts {
opt(bus)
}
return bus
}

func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) {
b.lk.Lock()

n, ok := b.nodes[typ]
if !ok {
n = newNode(typ)
n = newNode(typ, b.metricsTracer)
b.nodes[typ] = n
}

Expand Down Expand Up @@ -102,8 +112,10 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
}

type wildcardSub struct {
ch chan interface{}
w *wildcardNode
ch chan interface{}
w *wildcardNode
metricsTracer MetricsTracer
name string
}

func (w *wildcardSub) Out() <-chan interface{} {
Expand All @@ -112,13 +124,31 @@ func (w *wildcardSub) Out() <-chan interface{} {

func (w *wildcardSub) Close() error {
w.w.removeSink(w.ch)
if w.metricsTracer != nil {
w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription))
}
return nil
}

func (w *wildcardSub) Name() string {
return w.name
}

type namedSink struct {
name string
ch chan interface{}
}

type sub struct {
ch chan interface{}
nodes []*node
dropper func(reflect.Type)
ch chan interface{}
nodes []*node
dropper func(reflect.Type)
metricsTracer MetricsTracer
name string
}

func (s *sub) Name() string {
return s.name
}

func (s *sub) Out() <-chan interface{} {
Expand All @@ -137,9 +167,13 @@ func (s *sub) Close() error {
n.lk.Lock()

for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == s.ch {
if n.sinks[i].ch == s.ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]

if s.metricsTracer != nil {
s.metricsTracer.RemoveSubscriber(n.typ)
}
break
}
}
Expand All @@ -162,7 +196,7 @@ var _ event.Subscription = (*sub)(nil)
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
settings := subSettingsDefault
settings := newSubSettings()
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
Expand All @@ -171,10 +205,12 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt

if evtTypes == event.WildcardSubscription {
out := &wildcardSub{
ch: make(chan interface{}, settings.buffer),
w: b.wildcard,
ch: make(chan interface{}, settings.buffer),
w: b.wildcard,
metricsTracer: b.metricsTracer,
name: settings.name,
}
b.wildcard.addSink(out.ch)
b.wildcard.addSink(&namedSink{ch: out.ch, name: out.name})
return out, nil
}

Expand All @@ -195,7 +231,9 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
ch: make(chan interface{}, settings.buffer),
nodes: make([]*node, len(types)),

dropper: b.tryDropNode,
dropper: b.tryDropNode,
metricsTracer: b.metricsTracer,
name: settings.name,
}

for _, etyp := range types {
Expand All @@ -208,8 +246,11 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
typ := reflect.TypeOf(etyp)

b.withNode(typ.Elem(), func(n *node) {
n.sinks = append(n.sinks, out.ch)
n.sinks = append(n.sinks, &namedSink{ch: out.ch, name: out.name})
out.nodes[i] = n
if b.metricsTracer != nil {
b.metricsTracer.AddSubscriber(typ.Elem())
}
}, func(n *node) {
if n.keepLast {
l := n.last
Expand Down Expand Up @@ -255,7 +296,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard}
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard, metricsTracer: b.metricsTracer}
}, nil)
return
}
Expand All @@ -278,22 +319,27 @@ func (b *basicBus) GetAllEventTypes() []reflect.Type {

type wildcardNode struct {
sync.RWMutex
nSinks int32
sinks []chan interface{}
nSinks int32
sinks []*namedSink
metricsTracer MetricsTracer
}

func (n *wildcardNode) addSink(ch chan interface{}) {
func (n *wildcardNode) addSink(sink *namedSink) {
atomic.AddInt32(&n.nSinks, 1) // ok to do outside the lock
n.Lock()
n.sinks = append(n.sinks, ch)
n.sinks = append(n.sinks, sink)
n.Unlock()

if n.metricsTracer != nil {
n.metricsTracer.AddSubscriber(reflect.TypeOf(event.WildcardSubscription))
}
}

func (n *wildcardNode) removeSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, -1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == ch {
if n.sinks[i].ch == ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]
break
Expand All @@ -308,8 +354,13 @@ func (n *wildcardNode) emit(evt interface{}) {
}

n.RLock()
for _, ch := range n.sinks {
ch <- evt
for _, sink := range n.sinks {

// Sending metrics before sending on channel allows us to
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)

sink.ch <- evt
}
n.RUnlock()
}
Expand All @@ -326,12 +377,14 @@ type node struct {
keepLast bool
last interface{}

sinks []chan interface{}
sinks []*namedSink
metricsTracer MetricsTracer
}

func newNode(typ reflect.Type) *node {
func newNode(typ reflect.Type, metricsTracer MetricsTracer) *node {
return &node{
typ: typ,
typ: typ,
metricsTracer: metricsTracer,
}
}

Expand All @@ -346,8 +399,20 @@ func (n *node) emit(evt interface{}) {
n.last = evt
}

for _, ch := range n.sinks {
ch <- evt
for _, sink := range n.sinks {

// Sending metrics before sending on channel allows us to
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)
sink.ch <- evt
}
n.lk.Unlock()
}

func sendSubscriberMetrics(metricsTracer MetricsTracer, sink *namedSink) {
if metricsTracer != nil {
metricsTracer.SubscriberQueueLength(sink.name, len(sink.ch)+1)
metricsTracer.SubscriberQueueFull(sink.name, len(sink.ch)+1 >= cap(sink.ch))
metricsTracer.SubscriberEventQueued(sink.name)
}
}
Loading