Skip to content

Commit

Permalink
Refactor replay providers implementation
Browse files Browse the repository at this point in the history
- replace old buffer implementation with a circular queue
- use the same circular queue implementation for both providers
- fix replay bugs in the finite provider: incorrect replaying when the circular queue was in a certain state (head > tail), superfluous client Flush call on no replayed events
- refactor automatic ID implementation
- use constant time event lookup for both replay providers when auto ID is used
- automatic ID generation now doesn't overwrite the message ID if it exists and panics instead
- use ahead of time initialization through constructors and configuration validation for both providers
  • Loading branch information
tmaxmax committed Dec 25, 2024
1 parent eda5b23 commit 97dc0ab
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 339 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ This file tracks changes to this project. It follows the [Keep a Changelog forma
### Removed

- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead
- `ValidReplayProvider.{TTL, AutoIDs}` – use the constructor instead

### Changed

- Due to a change in the internal implementation, the `FiniteReplayProvider` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped.
- `FiniteReplayProvider.GCInterval` should be set to `0` now in order to disable GC.
- Automatic ID generation for both providers does not overwrite already existing message IDs and errors instead. Ensure that your events do not have IDs when using providers configured to generate IDs.

### Added

- `NewFiniteReplayProvider` constructor
- `NewValidReplayProvider` constructor
- `Connection.Buffer`

### Fixed

- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to.
- `ValidReplayProvider` was also susceptible to a similar memory leak, which is also fixed now.

## [0.8.0] - 2024-01-30

Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ type ReplayProvider interface {
`go-sse` provides two replay providers by default, which both hold the events in-memory: the `ValidReplayProvider` and `FiniteReplayProvider`. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example:

```go
joe = &sse.Joe{
ReplayProvider: &sse.ValidReplayProvider{TTL: time.Minute * 5}, // let's have events expire after 5 minutes
// Let's have events expire after 5 minutes. For this example we don't enable automatic ID generation.
rp, err := sse.NewValidReplayProvider(time.Minute * 5, false)
if err != nil {
// TTL was 0 or negative.
// Useful to have this error if the value comes from a config which happens to be faulty.
}

joe = &sse.Joe{ReplayProvider: rp}
```

will tell Joe to replay all valid events! Replay providers can do so much more (for example, add IDs to events automatically): read the [docs][3] on how to use the existing ones and how to implement yours.
Expand Down
61 changes: 31 additions & 30 deletions cmd/complex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,34 @@ const (
topicMetrics = "metrics"
)

var sseHandler = &sse.Server{
Provider: &sse.Joe{
ReplayProvider: &sse.ValidReplayProvider{
TTL: time.Minute * 5,
GCInterval: time.Minute,
AutoIDs: true,
},
},
Logger: logger{log.New(os.Stderr, "", 0)},
OnSession: func(s *sse.Session) (sse.Subscription, bool) {
topics := s.Req.URL.Query()["topic"]
for _, topic := range topics {
if topic != topicRandomNumbers && topic != topicMetrics {
fmt.Fprintf(s.Res, "invalid topic %q; supported are %q, %q", topic, topicRandomNumbers, topicMetrics)
s.Res.WriteHeader(http.StatusBadRequest)
return sse.Subscription{}, false
func newSSE() *sse.Server {
rp, _ := sse.NewValidReplayProvider(time.Minute*5, true)
rp.GCInterval = time.Minute

return &sse.Server{
Provider: &sse.Joe{ReplayProvider: rp},
Logger: logger{log.New(os.Stderr, "", 0)},
OnSession: func(s *sse.Session) (sse.Subscription, bool) {
topics := s.Req.URL.Query()["topic"]
for _, topic := range topics {
if topic != topicRandomNumbers && topic != topicMetrics {
fmt.Fprintf(s.Res, "invalid topic %q; supported are %q, %q", topic, topicRandomNumbers, topicMetrics)
s.Res.WriteHeader(http.StatusBadRequest)
return sse.Subscription{}, false
}
}
if len(topics) == 0 {
// Provide default topics, if none are given.
topics = []string{topicRandomNumbers, topicMetrics}
}
}
if len(topics) == 0 {
// Provide default topics, if none are given.
topics = []string{topicRandomNumbers, topicMetrics}
}

return sse.Subscription{
Client: s,
LastEventID: s.LastEventID,
Topics: append(topics, sse.DefaultTopic), // the shutdown message is sent on the default topic
}, true
},
return sse.Subscription{
Client: s,
LastEventID: s.LastEventID,
Topics: append(topics, sse.DefaultTopic), // the shutdown message is sent on the default topic
}, true
},
}
}

func cors(h http.Handler) http.Handler {
Expand All @@ -63,6 +62,8 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

sseHandler := newSSE()

mux := http.NewServeMux()
mux.HandleFunc("/stop", func(w http.ResponseWriter, _ *http.Request) {
cancel()
Expand Down Expand Up @@ -94,8 +95,8 @@ func main() {
_ = sseHandler.Shutdown(ctx)
})

go recordMetric(ctx, "ops", time.Second*2)
go recordMetric(ctx, "cycles", time.Millisecond*500)
go recordMetric(ctx, sseHandler, "ops", time.Second*2)
go recordMetric(ctx, sseHandler, "cycles", time.Millisecond*500)

go func() {
duration := func() time.Duration {
Expand All @@ -122,7 +123,7 @@ func main() {
}
}

func recordMetric(ctx context.Context, metric string, frequency time.Duration) {
func recordMetric(ctx context.Context, sseHandler *sse.Server, metric string, frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()

Expand Down
5 changes: 3 additions & 2 deletions joe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestJoe_errors(t *testing.T) {

_ = j.Publish(msg(t, "world", "2"), []string{sse.DefaultTopic})

tests.Expect(t, called == 1, "callback was called after subscribe returned")
tests.Equal(t, called, 1, "callback was called after subscribe returned")

called = 0
ctx, cancel := newMockContext(t)
Expand All @@ -277,7 +277,8 @@ func TestJoe_errors(t *testing.T) {

err = j.Subscribe(ctx, sse.Subscription{Client: client, Topics: []string{sse.DefaultTopic}})
tests.Equal(t, err, callErr, "error not received from send")
tests.Equal(t, called, 0, "callback was called after subscribe returned")
// Only the first event should be attempted as nothing is replayed.
tests.Equal(t, called, 1, "callback was called after subscribe returned")

<-done
}
Expand Down
155 changes: 0 additions & 155 deletions replay_buffer.go

This file was deleted.

10 changes: 0 additions & 10 deletions replay_noop.go

This file was deleted.

Loading

0 comments on commit 97dc0ab

Please sign in to comment.