Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: deprecate NewTopicSelectorStoreRistretto #583

Merged
merged 1 commit into from
Nov 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 9 additions & 56 deletions caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ type Mercure struct {
// Transport to use.
TransportURL string `json:"transport_url,omitempty"`

// Number of cache counters, defaults to 6e7, set to -1 to disable the cache. See https://github.com/dgraph-io/ristretto for details.
CacheNumCounters *int64 `json:"cache_max_counters,omitempty"`

// Maximum cache cost, defaults to 100MB, set to -1 to disable the cache. See https://github.com/dgraph-io/ristretto for details.
CacheMaxCost *int64 `json:"cache_max_cost,omitempty"`

// Triggers use of LRU topic selector cache and avoidance of select priority queue (recommend 10,000 - 1,000,000)
LRUShardSize *int64 `json:"lru_shard_size,omitempty"`

Expand Down Expand Up @@ -123,34 +117,14 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
m.TransportURL = "bolt://mercure.db"
}

var (
nc int64
mc int64
)
if m.CacheNumCounters == nil {
nc = mercure.TopicSelectorStoreDefaultCacheNumCounters
} else {
nc = *m.CacheNumCounters
}

if m.CacheMaxCost == nil {
mc = mercure.TopicSelectorStoreCacheMaxCost
} else {
mc = *m.CacheMaxCost
maxEntriesPerShard := mercure.DefaultTopicSelectorStoreLRUMaxEntriesPerShard
if m.LRUShardSize != nil {
maxEntriesPerShard = *m.LRUShardSize
}

var err error
var tss *mercure.TopicSelectorStore
if m.LRUShardSize == nil {
tss, err = mercure.NewTopicSelectorStore(nc, mc)
if err != nil {
return err //nolint:wrapcheck
}
} else {
tss, err = mercure.NewTopicSelectorStoreLRU(*m.LRUShardSize, mercure.DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return err //nolint:wrapcheck
}
tss, err := mercure.NewTopicSelectorStoreLRU(maxEntriesPerShard, mercure.DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return err //nolint:wrapcheck
}

m.logger = ctx.Logger(m)
Expand All @@ -161,7 +135,9 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
}

if m.WriteTimeout != nil {
u.Query().Set("write_timeout", time.Duration(*m.WriteTimeout).String())
query := u.Query()
query.Set("write_timeout", time.Duration(*m.WriteTimeout).String())
u.RawQuery = query.Encode()
}

transport, err := mercure.NewTransport(u, m.logger, tss)
Expand Down Expand Up @@ -349,29 +325,6 @@ func (m *Mercure) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { //nolint:fu

m.TransportURL = d.Val()

case "cache":
if !d.NextArg() {
return d.ArgErr()
}

v, err := strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.CacheNumCounters = &v

if !d.NextArg() {
return d.ArgErr()
}

v, err = strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.CacheMaxCost = &v

case "lru_cache":
if !d.NextArg() {
return d.ArgErr()
Expand Down
17 changes: 7 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,13 @@ func NewHubFromViper(v *viper.Viper) (*Hub, error) { //nolint:funlen,gocognit
}

var tss *TopicSelectorStore
if v.GetInt64("tcsz") > 0 {
tss, err = NewTopicSelectorStoreLRU(v.GetInt64("tcsz"), DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return nil, err
}
} else {
tss, err = NewTopicSelectorStore(TopicSelectorStoreDefaultCacheNumCounters, TopicSelectorStoreCacheMaxCost)
if err != nil {
return nil, err
}
tcsz := v.GetInt64("tcsz")
if tcsz == 0 {
tcsz = DefaultTopicSelectorStoreLRUMaxEntriesPerShard
}
tss, err = NewTopicSelectorStoreLRU(tcsz, DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return nil, err
}

if t := v.GetString("transport_url"); t != "" {
Expand Down
2 changes: 1 addition & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func NewHub(options ...Option) (*Hub, error) {
}

if opt.topicSelectorStore == nil {
tss, err := NewTopicSelectorStore(TopicSelectorStoreDefaultCacheNumCounters, TopicSelectorStoreCacheMaxCost)
tss, err := NewTopicSelectorStoreRistretto(TopicSelectorStoreRistrettoDefaultCacheNumCounters, TopicSelectorStoreRistrettoCacheMaxCost)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestStop(t *testing.T) {
}

func createDummy(options ...Option) *Hub {
tss, _ := NewTopicSelectorStore(0, 0)
tss, _ := NewTopicSelectorStoreLRU(0, 0)
options = append(
[]Option{
WithPublisherJWT([]byte("publisher"), jwt.SigningMethodHS256.Name),
Expand Down
27 changes: 0 additions & 27 deletions topic_selector.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
package mercure

import (
"fmt"
"regexp"
"strings"

"github.com/dgraph-io/ristretto"
uritemplate "github.com/yosida95/uritemplate/v3"
)

// Gather stats to find the best default values.
const (
TopicSelectorStoreDefaultCacheNumCounters = int64(6e7)
TopicSelectorStoreCacheMaxCost = int64(1e8) // 100 MB
)

type TopicSelectorStoreCache interface {
Get(interface{}) (interface{}, bool)
Set(interface{}, interface{}, int64) bool
Expand All @@ -26,25 +18,6 @@ type TopicSelectorStore struct {
skipSelect bool
}

// NewTopicSelectorStore creates a TopicSelectorStore instance with a ristretto cache.
// See https://github.com/dgraph-io/ristretto, set values to 0 to disable.
func NewTopicSelectorStore(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error) {
if cacheNumCounters == 0 {
return &TopicSelectorStore{}, nil
}

cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: cacheNumCounters,
MaxCost: cacheMaxCost,
BufferItems: 64,
})
if err != nil {
return nil, fmt.Errorf("unable to create cache: %w", err)
}

return &TopicSelectorStore{cache: cache}, nil
}

func (tss *TopicSelectorStore) match(topic, topicSelector string) bool {
// Always do an exact matching comparison first
// Also check if the topic selector is the reserved keyword *
Expand Down
34 changes: 34 additions & 0 deletions topic_selector_ristretto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mercure

import (
"fmt"

"github.com/dgraph-io/ristretto"
)

// Gather stats to find the best default values.
const (
TopicSelectorStoreRistrettoDefaultCacheNumCounters = int64(6e7)
TopicSelectorStoreRistrettoCacheMaxCost = int64(1e8) // 100 MB
)

// NewTopicSelectorStoreRistretto creates a TopicSelectorStore instance with a ristretto cache.
// See https://github.com/dgraph-io/ristretto, set values to 0 to disable.
//
// Deprecated: use NewTopicSelectorStoreLRU instead.
func NewTopicSelectorStoreRistretto(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error) {
if cacheNumCounters == 0 {
return &TopicSelectorStore{}, nil
}

cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: cacheNumCounters,
MaxCost: cacheMaxCost,
BufferItems: 64,
})
if err != nil {
return nil, fmt.Errorf("unable to create cache: %w", err)
}

return &TopicSelectorStore{cache: cache}, nil
}
4 changes: 2 additions & 2 deletions topic_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

func TestMatch(t *testing.T) {
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: TopicSelectorStoreDefaultCacheNumCounters,
MaxCost: TopicSelectorStoreCacheMaxCost,
NumCounters: TopicSelectorStoreRistrettoDefaultCacheNumCounters,
MaxCost: TopicSelectorStoreRistrettoCacheMaxCost,
BufferItems: 64,
})
tss := &TopicSelectorStore{cache, false}
Expand Down