Skip to content

Commit

Permalink
feat: deprecate NewTopicSelectorStoreRistretto (dunglas#583)
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas authored Nov 14, 2021
1 parent df815c9 commit 6fb1621
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 97 deletions.
65 changes: 9 additions & 56 deletions caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ type Mercure struct {
// Transport to use.
TransportURL string `json:"transport_url,omitempty"`

// Number of cache counters, defaults to 6e7, set to -1 to disable the cache. See https://github.com/dgraph-io/ristretto for details.
CacheNumCounters *int64 `json:"cache_max_counters,omitempty"`

// Maximum cache cost, defaults to 100MB, set to -1 to disable the cache. See https://github.com/dgraph-io/ristretto for details.
CacheMaxCost *int64 `json:"cache_max_cost,omitempty"`

// Triggers use of LRU topic selector cache and avoidance of select priority queue (recommend 10,000 - 1,000,000)
LRUShardSize *int64 `json:"lru_shard_size,omitempty"`

Expand Down Expand Up @@ -123,34 +117,14 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
m.TransportURL = "bolt://mercure.db"
}

var (
nc int64
mc int64
)
if m.CacheNumCounters == nil {
nc = mercure.TopicSelectorStoreDefaultCacheNumCounters
} else {
nc = *m.CacheNumCounters
}

if m.CacheMaxCost == nil {
mc = mercure.TopicSelectorStoreCacheMaxCost
} else {
mc = *m.CacheMaxCost
maxEntriesPerShard := mercure.DefaultTopicSelectorStoreLRUMaxEntriesPerShard
if m.LRUShardSize != nil {
maxEntriesPerShard = *m.LRUShardSize
}

var err error
var tss *mercure.TopicSelectorStore
if m.LRUShardSize == nil {
tss, err = mercure.NewTopicSelectorStore(nc, mc)
if err != nil {
return err //nolint:wrapcheck
}
} else {
tss, err = mercure.NewTopicSelectorStoreLRU(*m.LRUShardSize, mercure.DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return err //nolint:wrapcheck
}
tss, err := mercure.NewTopicSelectorStoreLRU(maxEntriesPerShard, mercure.DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return err //nolint:wrapcheck
}

m.logger = ctx.Logger(m)
Expand All @@ -161,7 +135,9 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
}

if m.WriteTimeout != nil {
u.Query().Set("write_timeout", time.Duration(*m.WriteTimeout).String())
query := u.Query()
query.Set("write_timeout", time.Duration(*m.WriteTimeout).String())
u.RawQuery = query.Encode()
}

transport, err := mercure.NewTransport(u, m.logger, tss)
Expand Down Expand Up @@ -349,29 +325,6 @@ func (m *Mercure) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { //nolint:fu

m.TransportURL = d.Val()

case "cache":
if !d.NextArg() {
return d.ArgErr()
}

v, err := strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.CacheNumCounters = &v

if !d.NextArg() {
return d.ArgErr()
}

v, err = strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.CacheMaxCost = &v

case "lru_cache":
if !d.NextArg() {
return d.ArgErr()
Expand Down
17 changes: 7 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,13 @@ func NewHubFromViper(v *viper.Viper) (*Hub, error) { //nolint:funlen,gocognit
}

var tss *TopicSelectorStore
if v.GetInt64("tcsz") > 0 {
tss, err = NewTopicSelectorStoreLRU(v.GetInt64("tcsz"), DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return nil, err
}
} else {
tss, err = NewTopicSelectorStore(TopicSelectorStoreDefaultCacheNumCounters, TopicSelectorStoreCacheMaxCost)
if err != nil {
return nil, err
}
tcsz := v.GetInt64("tcsz")
if tcsz == 0 {
tcsz = DefaultTopicSelectorStoreLRUMaxEntriesPerShard
}
tss, err = NewTopicSelectorStoreLRU(tcsz, DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return nil, err
}

if t := v.GetString("transport_url"); t != "" {
Expand Down
2 changes: 1 addition & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func NewHub(options ...Option) (*Hub, error) {
}

if opt.topicSelectorStore == nil {
tss, err := NewTopicSelectorStore(TopicSelectorStoreDefaultCacheNumCounters, TopicSelectorStoreCacheMaxCost)
tss, err := NewTopicSelectorStoreRistretto(TopicSelectorStoreRistrettoDefaultCacheNumCounters, TopicSelectorStoreRistrettoCacheMaxCost)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestStop(t *testing.T) {
}

func createDummy(options ...Option) *Hub {
tss, _ := NewTopicSelectorStore(0, 0)
tss, _ := NewTopicSelectorStoreLRU(0, 0)
options = append(
[]Option{
WithPublisherJWT([]byte("publisher"), jwt.SigningMethodHS256.Name),
Expand Down
27 changes: 0 additions & 27 deletions topic_selector.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
package mercure

import (
"fmt"
"regexp"
"strings"

"github.com/dgraph-io/ristretto"
uritemplate "github.com/yosida95/uritemplate/v3"
)

// Gather stats to find the best default values.
const (
TopicSelectorStoreDefaultCacheNumCounters = int64(6e7)
TopicSelectorStoreCacheMaxCost = int64(1e8) // 100 MB
)

type TopicSelectorStoreCache interface {
Get(interface{}) (interface{}, bool)
Set(interface{}, interface{}, int64) bool
Expand All @@ -26,25 +18,6 @@ type TopicSelectorStore struct {
skipSelect bool
}

// NewTopicSelectorStore creates a TopicSelectorStore instance with a ristretto cache.
// See https://github.com/dgraph-io/ristretto, set values to 0 to disable.
func NewTopicSelectorStore(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error) {
if cacheNumCounters == 0 {
return &TopicSelectorStore{}, nil
}

cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: cacheNumCounters,
MaxCost: cacheMaxCost,
BufferItems: 64,
})
if err != nil {
return nil, fmt.Errorf("unable to create cache: %w", err)
}

return &TopicSelectorStore{cache: cache}, nil
}

func (tss *TopicSelectorStore) match(topic, topicSelector string) bool {
// Always do an exact matching comparison first
// Also check if the topic selector is the reserved keyword *
Expand Down
34 changes: 34 additions & 0 deletions topic_selector_ristretto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mercure

import (
"fmt"

"github.com/dgraph-io/ristretto"
)

// Gather stats to find the best default values.
const (
TopicSelectorStoreRistrettoDefaultCacheNumCounters = int64(6e7)
TopicSelectorStoreRistrettoCacheMaxCost = int64(1e8) // 100 MB
)

// NewTopicSelectorStoreRistretto creates a TopicSelectorStore instance with a ristretto cache.
// See https://github.com/dgraph-io/ristretto, set values to 0 to disable.
//
// Deprecated: use NewTopicSelectorStoreLRU instead.
func NewTopicSelectorStoreRistretto(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error) {
if cacheNumCounters == 0 {
return &TopicSelectorStore{}, nil
}

cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: cacheNumCounters,
MaxCost: cacheMaxCost,
BufferItems: 64,
})
if err != nil {
return nil, fmt.Errorf("unable to create cache: %w", err)
}

return &TopicSelectorStore{cache: cache}, nil
}
4 changes: 2 additions & 2 deletions topic_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

func TestMatch(t *testing.T) {
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: TopicSelectorStoreDefaultCacheNumCounters,
MaxCost: TopicSelectorStoreCacheMaxCost,
NumCounters: TopicSelectorStoreRistrettoDefaultCacheNumCounters,
MaxCost: TopicSelectorStoreRistrettoCacheMaxCost,
BufferItems: 64,
})
tss := &TopicSelectorStore{cache, false}
Expand Down

0 comments on commit 6fb1621

Please sign in to comment.