Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count number of messages received by sender peer ID #19

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
Expand All @@ -35,6 +36,7 @@ type metrics struct {

receiverErrorCounter instrument.Int64Counter
receiverConnectionUpDownCounter instrument.Int64UpDownCounter
receiverMessageCounter instrument.Int64Counter
}

func newMetrics(c *Cassette) (*metrics, error) {
Expand Down Expand Up @@ -143,6 +145,13 @@ func (m *metrics) Start(ctx context.Context) error {
); err != nil {
return err
}
if m.receiverMessageCounter, err = meter.Int64Counter(
"ipni/cassette/receiver_message_count",
instrument.WithUnit("1"),
instrument.WithDescription("The number of messages observed by BitSwap receiver."),
); err != nil {
return err
}

m.server.Handler = m.serveMux()
go func() { _ = m.server.ListenAndServe() }()
Expand Down Expand Up @@ -187,9 +196,9 @@ func (m *metrics) notifyBroadcastFailed(ctx context.Context, batchSize int64, er
m.broadcastInFlightUpDownCounter.Add(ctx, -batchSize)
}

func (m *metrics) notifyBroadcastSucceeded(ctx context.Context, batchSize int64, wantHaves bool, inFlightTime time.Duration) {
func (m *metrics) notifyBroadcastSucceeded(ctx context.Context, batchSize int64, wantHave bool, inFlightTime time.Duration) {
m.broadcastInFlightTimeHistogram.Record(ctx, inFlightTime.Milliseconds(), attribute.String("status", "succeeded"))
m.broadcastBatchSizeHistogram.Record(ctx, batchSize)
m.broadcastBatchSizeHistogram.Record(ctx, batchSize, attribute.Bool("want-have", wantHave))
m.broadcastInFlightUpDownCounter.Add(ctx, -batchSize)
}

Expand All @@ -216,6 +225,13 @@ func (m *metrics) notifyReceiverDisconnected(ctx context.Context) {
m.receiverConnectionUpDownCounter.Add(ctx, -1)
}

func (m *metrics) notifyReceiverMessageReceived(ctx context.Context, sender peer.ID) {
// TODO: It is OK to use prometheus for counting messages received per sender.
// Because, there is a handful of recipients. Once the number of senders increase beyond
// 10s we should expose a dedicated HTTP api for querying sender rankings.
m.receiverMessageCounter.Add(ctx, 1, attribute.String("sender", sender.String()))
}

func (m *metrics) notifyLookupRequested(ctx context.Context) {
m.lookupRequestCounter.Add(ctx, 1)
}
Expand Down
20 changes: 10 additions & 10 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type (
nextID atomic.Int64
}
receivedMessageEvent struct {
k string
id peer.ID
k string
from peer.ID
}
registerHook struct {
id int64
Expand Down Expand Up @@ -57,7 +57,7 @@ func newReceiver(c *Cassette) (*receiver, error) {
}
switch ee := e.(type) {
case receivedMessageEvent:

c.metrics.notifyReceiverMessageReceived(r.ctx, ee.from)
hooks, ok := registry[ee.k]
if ok && hooks != nil {
for _, hook := range hooks {
Expand All @@ -66,7 +66,7 @@ func newReceiver(c *Cassette) (*receiver, error) {
return
default:
if hook != nil {
hook.hook(ee.id)
hook.hook(ee.from)
}
}
}
Expand Down Expand Up @@ -120,8 +120,8 @@ func (r *receiver) ReceiveMessage(ctx context.Context, sender peer.ID, in messag
case <-ctx.Done():
return
case r.mailbox <- receivedMessageEvent{
k: r.keyFromCid(c),
id: sender,
k: r.keyFromCid(c),
from: sender,
}:
}
}
Expand All @@ -133,8 +133,8 @@ func (r *receiver) ReceiveMessage(ctx context.Context, sender peer.ID, in messag
case <-ctx.Done():
return
case r.mailbox <- receivedMessageEvent{
k: r.keyFromCid(c.Cid),
id: sender,
k: r.keyFromCid(c.Cid),
from: sender,
}:
}
}
Expand All @@ -146,8 +146,8 @@ func (r *receiver) ReceiveMessage(ctx context.Context, sender peer.ID, in messag
case <-ctx.Done():
return
case r.mailbox <- receivedMessageEvent{
k: r.keyFromCid(c.Cid()),
id: sender,
k: r.keyFromCid(c.Cid()),
from: sender,
}:
}
}
Expand Down