Skip to content

Commit

Permalink
feat(event): implement event store stream
Browse files Browse the repository at this point in the history
  • Loading branch information
amimart committed Nov 21, 2022
1 parent 190df7c commit 2c57d8c
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 0 deletions.
5 changes: 5 additions & 0 deletions app/event/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"okp4/nemeton-leaderboard/app/util"

"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)

Expand All @@ -29,3 +30,7 @@ func (s *Store) Store(ctx context.Context, evt Event) error {
_, err := s.db.Collection(collectionName).InsertOne(ctx, evt)
return err
}

func (s *Store) StreamFrom(ctx context.Context, from *primitive.ObjectID) (*Stream, error) {
return openStream(ctx, s.db.Collection(collectionName), from)
}
166 changes: 166 additions & 0 deletions app/event/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package event

import (
"context"
"sync"
"sync/atomic"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type Stream struct {
closed *atomic.Bool
evtCh chan Event
errCh chan error
wg *sync.WaitGroup
}

func (s *Stream) Next() (*Event, error) {
select {
case evt := <-s.evtCh:
return &evt, nil
case err := <-s.errCh:
return nil, err
}
}

func (s *Stream) Close() {
s.closed.Store(true)
close(s.evtCh)
close(s.errCh)
s.wg.Wait()
}

func openStream(ctx context.Context, col *mongo.Collection, from *primitive.ObjectID) (*Stream, error) {
watch, catchup, err := fetch(ctx, col, from)
if err != nil {
return nil, err
}

stream := &Stream{
closed: &atomic.Bool{},
evtCh: make(chan Event, 100),
errCh: make(chan error, 1),
wg: &sync.WaitGroup{},
}

go stream.start(ctx, watch, catchup)
return stream, nil
}

func (s *Stream) start(ctx context.Context, watch *mongo.ChangeStream, catchUp *mongo.Cursor) {
s.wg.Add(1)
defer func() {
_ = watch.Close(ctx)
s.wg.Done()
}()

caughtUpIDs, err := s.catchUp(ctx, catchUp)
if err != nil {
s.errCh <- err
return
}

for {
if s.closed.Load() {
return
}

if err := s.readWatch(ctx, watch, caughtUpIDs); err != nil {
s.errCh <- err
return
}
}
}

func (s *Stream) readWatch(ctx context.Context, watch *mongo.ChangeStream, idsToIgnore map[primitive.ObjectID]interface{}) error {
nextCTX, cancelFn := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancelFn()

if !watch.Next(nextCTX) {
err := watch.Err()
if err == nextCTX.Err() {
return nil
}
return err
}

var res struct {
OperationType string `bson:"operationType"`
FullDocument Event `bson:"fullDocument"`
}
if err := watch.Decode(&res); err != nil {
return err
}

if res.OperationType != "insert" {
return nil
}

evt := res.FullDocument
if _, ok := idsToIgnore[evt.id]; ok {
delete(idsToIgnore, evt.id)
return nil
}

s.evtCh <- evt
return nil
}

func (s *Stream) catchUp(ctx context.Context, c *mongo.Cursor) (map[primitive.ObjectID]interface{}, error) {
defer func() {
_ = c.Close(ctx)
}()

ids := make(map[primitive.ObjectID]interface{})
for c.Next(ctx) {
var evt Event
if err := c.Decode(&evt); err != nil {
return ids, err
}

ids[evt.id] = nil
s.evtCh <- evt

if s.closed.Load() {
return ids, nil
}
}
return ids, nil
}

func fetch(ctx context.Context, col *mongo.Collection, from *primitive.ObjectID) (*mongo.ChangeStream, *mongo.Cursor, error) {
watch, err := col.Watch(ctx, nil, options.ChangeStream())
if err != nil {
return nil, nil, err
}

var filter bson.M
if from != nil {
filter = bson.M{
"_id": bson.M{
"$gt": from,
},
}
}
catchUp, err := col.
Find(
ctx,
filter,
&options.FindOptions{
Sort: bson.M{
"_id": 1,
},
},
)
if err != nil {
_ = watch.Close(ctx)
return nil, nil, err
}

return watch, catchUp, nil
}

0 comments on commit 2c57d8c

Please sign in to comment.