Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Subscriber performance #578

Merged
merged 23 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9f22e11
Add subscriber bench test
kevburnsjr Sep 11, 2021
cb28033
Bench update (not done)
kevburnsjr Sep 12, 2021
f0a99f3
Answer to the Ultimate Question of Life, the Universe, and Everything
kevburnsjr Sep 12, 2021
4834d44
Add comments to document environment variables
kevburnsjr Sep 12, 2021
9144f24
Add CLI params
kevburnsjr Sep 12, 2021
be8d8ca
Add new command line parameters
kevburnsjr Sep 12, 2021
c52e451
Implement liveQueue, fix a couple tests.
kevburnsjr Sep 12, 2021
e1df555
Prevent liveQueue from queueing history updates
kevburnsjr Sep 12, 2021
40dcc42
Fix panic in bolt transport test
kevburnsjr Sep 12, 2021
dd168bf
Fix command line arguments for performance test
kevburnsjr Sep 12, 2021
ef9ef7e
Fix regexp ordering in hot path
kevburnsjr Sep 12, 2021
b302b59
Off by one in bench test
kevburnsjr Sep 12, 2021
50c14f8
PR Feedback and test repair
kevburnsjr Sep 13, 2021
4eb6eec
Remove flag -yolo
kevburnsjr Sep 13, 2021
152706a
Parse caddy config
kevburnsjr Sep 13, 2021
e9f7e6c
Improve efficiency of subscriber update fan-out logic
kevburnsjr Sep 26, 2021
cebaee2
Subscriber shutdown correctly removes subscriber from transport
kevburnsjr Sep 27, 2021
2224cb7
Add RemoveSubscriber method to Transport interface
kevburnsjr Sep 27, 2021
f59bc3c
Rename Lru to LRU
kevburnsjr Sep 27, 2021
b1151ad
review
dunglas Oct 25, 2021
597b4d3
Merge pull request #2 from dunglas/review-subscriber-performance
kevburnsjr Oct 30, 2021
cc83fc5
Merge branch 'main' into subscriber-performance
kevburnsjr Nov 1, 2021
ee0047f
fix: lint
dunglas Nov 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ linters:
- cyclop
- forcetypeassert
- tagliatelle
- varnamelen

# deprecated
- interfacer
Expand Down
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
When you open a Pull Request to the project, you agree to license your code under the [GNU AFFERO GENERAL PUBLIC LICENSE](LICENSE)
and to transfer the copyright on the submitted code to [Kévin Dunglas](https://dunglas.fr).

Be sure to you have the right to do that (if you are a professional, ask your company)!
Be sure to have the right to do that (if you are a professional, ask your company)!

If you include code from another project, please mention it in the Pull Request description and credit the original author.

Expand Down Expand Up @@ -59,7 +59,7 @@ To test the legacy server:

Go to `http://localhost:3000` and enjoy!

When you send a PR, just make sure that:
When you send a PR, make sure that:

* You add valid test cases.
* Tests are green.
Expand All @@ -73,7 +73,7 @@ To debug potential deadlocks:

1. Install `go-deadlock`: `./tests/use-go-deadlock.sh`
2. Run the tests in race mode: `go test -race ./... -v`
3. To stress test the app, run the load test (see `docs/load-testing.md`)
3. To stress-test the app, run the load test (see `docs/load-testing.md`)
4. Be sure to remove `go-deadlock` before committing

## Spec
Expand Down
2 changes: 1 addition & 1 deletion authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func authorize(r *http.Request, jwtConfig *jwtConfig, publishOrigins []string) (
cookie, err := r.Cookie("mercureAuthorization")
if err != nil {
// Anonymous
return nil, nil //nolint:nilerr
return nil, nil //nolint:nilerr,nilnil
}

// CSRF attacks cannot occur when using safe methods
Expand Down
19 changes: 18 additions & 1 deletion bolt_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type BoltTransport struct {
}

// NewBoltTransport create a new boltTransport.
func NewBoltTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error) {
func NewBoltTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error) { //nolint:ireturn
var err error
q := u.Query()
bucketName := defaultBoltBucketName
Expand Down Expand Up @@ -190,6 +190,23 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
t.dispatchHistory(s, toSeq)
}

s.Ready()

return nil
}

// RemoveSubscriber removes a new subscriber from the transport.
func (t *BoltTransport) RemoveSubscriber(s *Subscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
default:
}

t.Lock()
delete(t.subscribers, s)
t.Unlock()

return nil
}

Expand Down
45 changes: 17 additions & 28 deletions bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s.Topics = topics
go s.start()
s := NewSubscriber("8", transport.logger)
s.SetTopics(topics, nil)

require.Nil(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -66,9 +65,8 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.Topics = topics
go s.start()
s := NewSubscriber(EarliestLastEventID, transport.logger)
s.SetTopics(topics, nil)
require.Nil(t, transport.AddSubscriber(s))

var count int
Expand All @@ -78,9 +76,10 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
count++
assert.Equal(t, strconv.Itoa(count), u.ID)
if count == 10 {
return
break
}
}
assert.Equal(t, 10, count)
}

func TestBoltTransportHistoryAndLive(t *testing.T) {
Expand All @@ -96,9 +95,8 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s.Topics = topics
go s.start()
s := NewSubscriber("8", transport.logger)
s.SetTopics(topics, nil)
require.Nil(t, transport.AddSubscriber(s))

var wg sync.WaitGroup
Expand Down Expand Up @@ -179,8 +177,7 @@ func TestBoltTransportDoNotDispatchUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
go s.start()
s := NewSubscriber("", transport.logger)
require.Nil(t, transport.AddSubscriber(s))

var wg sync.WaitGroup
Expand All @@ -204,9 +201,8 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s.Topics = []string{"https://example.com/foo"}
go s.start()
s := NewSubscriber("", transport.logger)
s.SetTopics([]string{"https://example.com/foo"}, nil)

require.Nil(t, transport.AddSubscriber(s))

Expand All @@ -222,9 +218,8 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s.Topics = []string{"https://example.com/foo"}
go s.start()
s := NewSubscriber("", transport.logger)
s.SetTopics([]string{"https://example.com/foo"}, nil)
require.Nil(t, transport.AddSubscriber(s))

require.Nil(t, transport.Close())
Expand All @@ -242,13 +237,10 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := &TopicSelectorStore{}
s1 := NewSubscriber("", transport.logger, tss)
go s1.start()
s1 := NewSubscriber("", transport.logger)
require.Nil(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger, tss)
go s2.start()
s2 := NewSubscriber("", transport.logger)
require.Nil(t, transport.AddSubscriber(s2))

assert.Len(t, transport.subscribers, 2)
Expand All @@ -272,13 +264,10 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := &TopicSelectorStore{}
s1 := NewSubscriber("", transport.logger, tss)
go s1.start()
s1 := NewSubscriber("", transport.logger)
require.Nil(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger, tss)
go s2.start()
s2 := NewSubscriber("", transport.logger)
require.Nil(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.GetSubscribers()
Expand Down
30 changes: 27 additions & 3 deletions caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Mercure struct {
// Maximum cache cost, defaults to 100MB, set to -1 to disable the cache. See https://github.com/dgraph-io/ristretto for details.
CacheMaxCost *int64 `json:"cache_max_cost,omitempty"`

// Triggers use of LRU topic selector cache and avoidance of select priority queue (recommend 10,000 - 1,000,000)
LRUShardSize *int64 `json:"lru_shard_size,omitempty"`

hub *mercure.Hub
logger *zap.Logger
}
Expand Down Expand Up @@ -136,9 +139,18 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
mc = *m.CacheMaxCost
}

tss, err := mercure.NewTopicSelectorStore(nc, mc)
if err != nil {
return err //nolint:wrapcheck
var err error
var tss *mercure.TopicSelectorStore
if m.LRUShardSize == nil {
tss, err = mercure.NewTopicSelectorStore(nc, mc)
if err != nil {
return err //nolint:wrapcheck
}
} else {
tss, err = mercure.NewTopicSelectorStoreLRU(*m.LRUShardSize, mercure.DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
return err //nolint:wrapcheck
}
}

m.logger = ctx.Logger(m)
Expand Down Expand Up @@ -359,6 +371,18 @@ func (m *Mercure) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { //nolint:fu
}

m.CacheMaxCost = &v

case "lru_cache":
if !d.NextArg() {
return d.ArgErr()
}

v, err := strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.LRUShardSize = &v
}
}
}
Expand Down
1 change: 1 addition & 0 deletions caddy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace github.com/dunglas/mercure => ../
require (
github.com/caddyserver/caddy/v2 v2.4.3
github.com/caddyserver/certmagic v0.14.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/dunglas/mercure v0.13.0
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
Expand Down
Loading