Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
feat: add latency & count metrics for content routing client
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Oct 17, 2022
1 parent 0c84bf8 commit 9dfc67d
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 46 deletions.
53 changes: 53 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package client

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
proto "github.com/ipfs/go-delegated-routing/gen/proto"
ipns "github.com/ipfs/go-ipns"
logging "github.com/ipfs/go-log/v2"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)

var logger = logging.Logger("service/client/delegatedrouting")

type DelegatedRoutingClient interface {
FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error)
FindProvidersAsync(ctx context.Context, key cid.Cid) (<-chan FindProvidersAsyncResult, error)
GetIPNS(ctx context.Context, id []byte) ([]byte, error)
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
PutIPNS(ctx context.Context, id []byte, record []byte) error
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
Provide(ctx context.Context, key []cid.Cid, ttl time.Duration) (time.Duration, error)
ProvideAsync(ctx context.Context, key []cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
}

type Client struct {
client proto.DelegatedRouting_Client
validator record.Validator

provider *Provider
identity crypto.PrivKey
}

var _ DelegatedRoutingClient = (*Client)(nil)

// NewClient creates a client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
return nil, errors.New("identity does not match provider")
}

return &Client{
client: c,
validator: ipns.Validator{},
provider: p,
identity: identity,
}, nil
}
17 changes: 15 additions & 2 deletions client/contentrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,31 @@ func NewContentRoutingClient(c DelegatedRoutingClient) *ContentRoutingClient {
}

func (c *ContentRoutingClient) Provide(ctx context.Context, key cid.Cid, announce bool) error {
var err error
recordMetrics := startMetrics(ctx, "ContentRoutingClient.Provide")
defer recordMetrics(err)

// If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
if !announce {
return nil
}

_, err := c.client.Provide(ctx, []cid.Cid{key}, 24*time.Hour)
_, err = c.client.Provide(ctx, []cid.Cid{key}, 24*time.Hour)
return err
}

func (c *ContentRoutingClient) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
var err error
recordMetrics := startMetrics(ctx, "ContentRoutingClient.ProvideMany")
defer recordMetrics(err)

keysAsCids := make([]cid.Cid, 0, len(keys))
for _, m := range keys {
keysAsCids = append(keysAsCids, cid.NewCidV1(cid.Raw, m))
}
_, err := c.client.Provide(ctx, keysAsCids, 24*time.Hour)
_, err = c.client.Provide(ctx, keysAsCids, 24*time.Hour)
return err
}

Expand All @@ -51,13 +59,18 @@ func (c *ContentRoutingClient) Ready() bool {
}

func (c *ContentRoutingClient) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo {
var err error
recordMetrics := startMetrics(ctx, "ContentRoutingClient.FindProvidersAsync")

addrInfoCh := make(chan peer.AddrInfo)
resultCh, err := c.client.FindProvidersAsync(ctx, key)
if err != nil {
close(addrInfoCh)
recordMetrics(err)
return addrInfoCh
}
go func() {
defer recordMetrics(nil)
numProcessed := 0
closed := false
for asyncResult := range resultCh {
Expand Down
44 changes: 0 additions & 44 deletions client/findproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,14 @@ package client

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
proto "github.com/ipfs/go-delegated-routing/gen/proto"
ipns "github.com/ipfs/go-ipns"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/edelweiss/values"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

var logger = logging.Logger("service/client/delegatedrouting")

type DelegatedRoutingClient interface {
FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error)
FindProvidersAsync(ctx context.Context, key cid.Cid) (<-chan FindProvidersAsyncResult, error)
GetIPNS(ctx context.Context, id []byte) ([]byte, error)
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
PutIPNS(ctx context.Context, id []byte, record []byte) error
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
Provide(ctx context.Context, key []cid.Cid, ttl time.Duration) (time.Duration, error)
ProvideAsync(ctx context.Context, key []cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
}

type Client struct {
client proto.DelegatedRouting_Client
validator record.Validator

provider *Provider
identity crypto.PrivKey
}

var _ DelegatedRoutingClient = (*Client)(nil)

// NewClient creates a client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
return nil, errors.New("identity does not match provider")
}

return &Client{
client: c,
validator: ipns.Validator{},
provider: p,
identity: identity,
}, nil
}

func (fp *Client) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) {
resps, err := fp.client.FindProviders(ctx, cidsToFindProvidersRequest(key))
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package client

import (
"context"
"errors"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
defaultDurationDistribution = view.Distribution(0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000)

measureDuration = stats.Float64("delegated_routing/duration", "The time to complete an entire request", stats.UnitMilliseconds)
measureRequests = stats.Float64("delegated_routing/requests", "The number of requests made", stats.UnitDimensionless)

keyName = tag.MustNewKey("name")
keyError = tag.MustNewKey("error")

durationView = &view.View{
Measure: measureDuration,
TagKeys: []tag.Key{keyName, keyError},
Aggregation: defaultDurationDistribution,
}
requestsView = &view.View{
Measure: measureRequests,
TagKeys: []tag.Key{keyName, keyError},
Aggregation: view.Sum(),
}

DefaultViews = []*view.View{
durationView,
requestsView,
}
)

// startMetrics begins recording metrics.
// The returned function flushes the metrics when called, recording metrics about the passed error.
func startMetrics(ctx context.Context, name string) (done func(err error)) {
start := time.Now()

return func(err error) {
latency := time.Since(start)

errStr := "None"
if err != nil {
logger.Warnw("received delegated routing error", "Error", err)
if errors.Is(err, context.Canceled) {
errStr = "Canceled"
} else if errors.Is(err, context.DeadlineExceeded) {
errStr = "DeadlineExceeded"
} else {
errStr = "Unknown"
}
}

stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(keyName, name),
tag.Upsert(keyError, errStr),
},
[]stats.Measurement{
measureDuration.M(float64(latency.Milliseconds())),
measureRequests.M(1),
}...,
)
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ require (
github.com/ipld/edelweiss v0.2.0
github.com/ipld/go-ipld-prime v0.18.0
github.com/libp2p/go-libp2p v0.23.2
github.com/libp2p/go-libp2p-core v0.20.1
github.com/libp2p/go-libp2p-record v0.2.0
github.com/multiformats/go-multiaddr v0.7.0
github.com/multiformats/go-multicodec v0.6.0
github.com/multiformats/go-multihash v0.2.1
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e
go.opencensus.io v0.23.0
)

require (
Expand Down
Loading

0 comments on commit 9dfc67d

Please sign in to comment.