Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement configurable provider record caching #38

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cassette

import (
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
)

type (
cache struct {
c *Cassette
cache *lru.TwoQueueCache[string, cacheRecord]
}
cacheRecord struct {
insertedAt time.Time
providers []peer.AddrInfo
//TODO: optimize cache memory consumption by deduplication of addrinfos.
}
)

func newCache(c *Cassette) (*cache, error) {
twoq, err := lru.New2Q[string, cacheRecord](c.cacheSize)
if err != nil {
return nil, err
}
return &cache{
c: c,
cache: twoq,
}, nil
}

func (l *cache) getProviders(c cid.Cid) ([]peer.AddrInfo, bool) {
value, ok := l.cache.Get(string(c.Hash()))
switch {
case !ok:
return nil, false
case time.Since(value.insertedAt) > l.c.cacheExpiry:
l.expire(c)
return nil, false
masih marked this conversation as resolved.
Show resolved Hide resolved
default:
return value.providers, true
}
}

func (l *cache) putProviders(c cid.Cid, providers []peer.AddrInfo) {
l.cache.Add(string(c.Hash()), cacheRecord{
insertedAt: time.Now(),
providers: providers,
})
}

func (l *cache) expire(c cid.Cid) {
l.cache.Remove(string(c.Hash()))
}

func (l *cache) len() int {
return l.cache.Len()
}
96 changes: 69 additions & 27 deletions cassette.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Cassette struct {
discoverer *peerDiscoverer

metrics *metrics
cache *cache
}

func New(o ...Option) (*Cassette, error) {
Expand All @@ -44,8 +45,10 @@ func New(o ...Option) (*Cassette, error) {
Handler: c.serveMux(),
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.metrics, err = newMetrics(&c)
if err != nil {
if c.metrics, err = newMetrics(&c); err != nil {
return nil, err
}
if c.cache, err = newCache(&c); err != nil {
return nil, err
}
return &c, nil
Expand Down Expand Up @@ -85,49 +88,88 @@ func (c *Cassette) Find(ctx context.Context, k cid.Cid) chan peer.AddrInfo {
close(rch)
}()

var resultCount int64
var timeToFirstProvider time.Duration
var cacheHit bool
defer func() {
c.metrics.notifyLookupResponded(context.Background(), resultCount, timeToFirstProvider, time.Since(start), cacheHit)
}()

// Attempt to get provider IDs from cache
if providers, found := c.cache.getProviders(k); found {
cacheHit = true
for _, provider := range providers {
select {
case <-ctx.Done():
return
case rch <- provider:
resultCount++
if resultCount == 1 {
timeToFirstProvider = time.Since(start)
}
}
}
// Note that 404s are also cached; meaning, if there is a cache record for a given CID
// we return it even if it has no providers.
return
}

providers, unregister, err := c.r.registerFoundHook(ctx, k)
if err != nil {
return
}
defer unregister()

var resultCount int64
var timeToFirstProvider time.Duration
defer func() {
c.metrics.notifyLookupResponded(context.Background(), resultCount, timeToFirstProvider, time.Since(start))
}()

targets := c.toFindTargets(k)
if err := c.broadcaster.broadcastWant(ctx, targets); err != nil {
return
}

providersSoFar := make(map[peer.ID]struct{})
for {
returnIfUnseen := func(provider peer.ID) *peer.AddrInfo {
select {
case <-ctx.Done():
return
case id, ok := <-providers:
if !ok {
return
return nil
default:
if _, seen := providersSoFar[provider]; seen {
return nil
}
if _, seen := providersSoFar[id]; seen {
continue
}
providersSoFar[id] = struct{}{}
addrs := c.h.Peerstore().Addrs(id)
providersSoFar[provider] = struct{}{}
addrs := c.h.Peerstore().Addrs(provider)
if !c.addrFilterDisabled {
addrs = multiaddr.FilterAddrs(addrs, IsPubliclyDialableAddr)
}
if len(addrs) > 0 {
select {
case <-ctx.Done():
return
case rch <- peer.AddrInfo{ID: id, Addrs: addrs}:
resultCount++
if resultCount == 1 {
timeToFirstProvider = time.Since(start)
}
if len(addrs) == 0 {
return nil
}
result := peer.AddrInfo{ID: provider, Addrs: addrs}
select {
case <-ctx.Done():
return nil
case rch <- result:
resultCount++
if resultCount == 1 {
timeToFirstProvider = time.Since(start)
}
return &result
}
}
}

var returnedProviders []peer.AddrInfo
defer func() {
c.cache.putProviders(k, returnedProviders)
}()
for {
select {
case <-ctx.Done():
return
case provider, ok := <-providers:
if !ok {
return
}
if returned := returnIfUnseen(provider); returned != nil {
returnedProviders = append(returnedProviders, *returned)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions cmd/cassette/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type Config struct {
RecipientSendTimeout *time.Duration `yaml:"recipientSendTimeout"`
BroadcastCancelAfter *time.Duration `yaml:"broadcastCancelAfter"`
} `yaml:"bitswap"`
Cache *struct {
Size *int `yaml:"size"`
Expiry *time.Duration `yaml:"expiry"`
} `yaml:"cache"`
}

func NewConfig(path string) (*Config, error) {
Expand Down Expand Up @@ -273,5 +277,13 @@ func (c *Config) ToOptions() ([]cassette.Option, error) {
opts = append(opts, cassette.WithRecipientSendTimeout(*c.Bitswap.RecipientSendTimeout))
}
}
if c.Cache != nil {
if c.Cache.Expiry != nil {
opts = append(opts, cassette.WithCacheExpiry(*c.Cache.Expiry))
}
if c.Cache.Size != nil {
opts = append(opts, cassette.WithCacheSize(*c.Cache.Size))
}
}
return opts, nil
}
5 changes: 4 additions & 1 deletion examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ bitswap:
recipientCBOpenTimeoutBackOff:
exponential: { }
recipientSendTimeout: 5s
broadcastCancelAfter: 5s
broadcastCancelAfter: 5s
cache:
expiry: 1h
size: 1000
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/cenkalti/backoff/v3 v3.1.1
github.com/hashicorp/golang-lru/v2 v2.0.3
github.com/ipfs/boxo v0.8.1
github.com/ipfs/go-cid v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.3 h1:kmRrRLlInXvng0SmLxmQpQkpbYAvcXm7NPDrgxJa9mE=
github.com/hashicorp/golang-lru/v2 v2.0.3/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ=
github.com/huin/goupnp v1.0.3/go.mod h1:ZxNlw5WqJj6wSsRK5+YfflQGXYfccj5VgQsMNixHM7Y=
Expand Down
24 changes: 20 additions & 4 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type metrics struct {
lookupResponseResultCountHistogram instrument.Int64Histogram
lookupResponseLatencyHistogram instrument.Int64Histogram

cacheSize instrument.Int64ObservableGauge

broadcastInFlightTimeHistogram instrument.Int64Histogram
broadcastBatchSizeHistogram instrument.Int64Histogram
broadcastCidCounter instrument.Int64Counter
Expand Down Expand Up @@ -97,6 +99,14 @@ func (m *metrics) Start(_ context.Context) error {
); err != nil {
return err
}
if m.cacheSize, err = meter.Int64ObservableGauge(
"ipni/cassette/cache_size",
instrument.WithUnit("1"),
instrument.WithDescription("The number cached records."),
instrument.WithInt64Callback(m.reportCacheSize),
); err != nil {
return err
}
if m.broadcastInFlightTimeHistogram, err = meter.Int64Histogram(
"ipni/cassette/broadcast_in_flight_time",
instrument.WithUnit("ms"),
Expand Down Expand Up @@ -253,12 +263,18 @@ func (m *metrics) notifyLookupRequested(ctx context.Context) {
m.lookupRequestCounter.Add(ctx, 1)
}

func (m *metrics) notifyLookupResponded(ctx context.Context, resultCount int64, timeToFirstResult time.Duration, latency time.Duration) {
func (m *metrics) notifyLookupResponded(ctx context.Context, resultCount int64, timeToFirstResult time.Duration, latency time.Duration, cacheHit bool) {
cacheAttr := attribute.Bool("cached", cacheHit)
if resultCount > 0 {
m.lookupResponseTTFPHistogram.Record(ctx, timeToFirstResult.Milliseconds())
m.lookupResponseTTFPHistogram.Record(ctx, timeToFirstResult.Milliseconds(), cacheAttr)
}
m.lookupResponseResultCountHistogram.Record(ctx, resultCount)
m.lookupResponseLatencyHistogram.Record(ctx, latency.Milliseconds())
m.lookupResponseResultCountHistogram.Record(ctx, resultCount, cacheAttr)
m.lookupResponseLatencyHistogram.Record(ctx, latency.Milliseconds(), cacheAttr)
}

func (m *metrics) reportCacheSize(_ context.Context, observer instrument.Int64Observer) error {
observer.Observe(int64(m.c.cache.len()))
return nil
}

func errKindAttribute(err error) attribute.KeyValue {
Expand Down
19 changes: 19 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type (
peerDiscoveryHost host.Host
peerDiscoveryInterval time.Duration
peerDiscoveryAddrTTL time.Duration

cacheExpiry time.Duration
cacheSize int
}
)

Expand Down Expand Up @@ -94,6 +97,8 @@ func newOptions(o ...Option) (*options, error) {
recipientCBOpenTimeoutBackOff: circuitbreaker.DefaultOpenBackOff(),
recipientCBOpenTimeout: 5 * time.Second,
recipientSendTimeout: 5 * time.Second,
cacheExpiry: time.Hour,
cacheSize: 1_000,
}
for _, apply := range o {
if err := apply(&opts); err != nil {
Expand Down Expand Up @@ -365,3 +370,17 @@ func WithRecipientSendTimeout(t time.Duration) Option {
return nil
}
}

func WithCacheExpiry(t time.Duration) Option {
return func(o *options) error {
o.cacheExpiry = t
return nil
}
}

func WithCacheSize(s int) Option {
return func(o *options) error {
o.cacheSize = s
return nil
}
}