Skip to content

Commit

Permalink
feat(event): handle subscribe event message
Browse files Browse the repository at this point in the history
  • Loading branch information
amimart committed Nov 22, 2022
1 parent 8df900c commit 6698596
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
28 changes: 22 additions & 6 deletions app/actor/event/actor.go β†’ app/actor/event/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,31 @@ import (
"github.com/rs/zerolog/log"
)

type Actor struct {
type EventStoreActor struct {
mongoURI string
dbName string
store *event.Store
}

func NewPublisherActor(mongoURI, dbName string) *Actor {
return &Actor{
func NewEventStoreActor(mongoURI, dbName string) *EventStoreActor {
return &EventStoreActor{
mongoURI: mongoURI,
dbName: dbName,
}
}

func (a *Actor) Receive(ctx actor.Context) {
func (a *EventStoreActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
a.handleStart()
case *message.PublishEventMessage:
a.handlePublishEvent(msg)
case *message.SubscribeEventMessage:
a.handleSubscribeEvent(ctx, msg)
}
}

func (a *Actor) handleStart() {
func (a *EventStoreActor) handleStart() {
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
defer cancelFn()

Expand All @@ -45,9 +47,23 @@ func (a *Actor) handleStart() {
a.store = store
}

func (a *Actor) handlePublishEvent(msg *message.PublishEventMessage) {
func (a *EventStoreActor) handlePublishEvent(msg *message.PublishEventMessage) {
if err := a.store.Store(context.Background(), msg.Event); err != nil {
log.Fatal().Err(err).Str("type", msg.Event.Type()).Msg("❌ Couldn't publish event")
}
log.Info().Str("type", msg.Event.Type()).Msg("πŸ’Œ Event published")
}

func (a *EventStoreActor) handleSubscribeEvent(ctx actor.Context, msg *message.SubscribeEventMessage) {
stream, err := a.store.StreamFrom(context.Background(), msg.From)
if err != nil {
log.Fatal().Err(err).Msg("❌ Couldn't create stream")
}

streamProps := actor.PropsFromProducer(func() actor.Actor {
return NewStreamHandlerActor(stream, ctx.Sender())
})

ctx.Spawn(streamProps)
log.Info().Msg("Create stream handler for subscriber")
}
16 changes: 15 additions & 1 deletion app/message/message.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
package message

import "okp4/nemeton-leaderboard/app/event"
import (
"okp4/nemeton-leaderboard/app/event"

"go.mongodb.org/mongo-driver/bson/primitive"
)

type PublishEventMessage struct {
Event event.Event
}

type SubscribeEventMessage struct {
From *primitive.ObjectID
}

type NewEventMessage struct {
Event event.Event
}

type BrokenStreamMessage struct{}

0 comments on commit 6698596

Please sign in to comment.