Skip to content

Commit

Permalink
chore(server): bulk event to enhance performance (#1326)
Browse files Browse the repository at this point in the history
  • Loading branch information
yk-eukarya authored Nov 26, 2024
1 parent 3a64ae5 commit 6d13978
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 29 deletions.
14 changes: 13 additions & 1 deletion server/internal/infrastructure/memory/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/reearth/reearth-cms/server/pkg/id"
"github.com/reearth/reearthx/rerror"
"github.com/reearth/reearthx/util"
"github.com/samber/lo"
)

type Event struct {
Expand Down Expand Up @@ -36,11 +37,22 @@ func (r *Event) FindByID(_ context.Context, iId id.EventID) (*event.Event[any],
return nil, rerror.ErrNotFound
}

func (r *Event) Save(ctx context.Context, ev *event.Event[any]) error {
func (r *Event) Save(_ context.Context, ev *event.Event[any]) error {
if r.err != nil {
return r.err
}

r.data.Store(ev.ID(), ev)
return nil
}

func (r *Event) SaveAll(_ context.Context, ev event.List) error {
if r.err != nil {
return r.err
}

r.data.StoreAll(lo.SliceToMap(ev, func(e *event.Event[any]) (id.EventID, *event.Event[any]) {
return e.ID(), e
}))
return nil
}
9 changes: 9 additions & 0 deletions server/internal/infrastructure/mongo/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/reearth/reearth-cms/server/pkg/event"
"github.com/reearth/reearth-cms/server/pkg/id"
"github.com/reearth/reearthx/mongox"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
)

Expand Down Expand Up @@ -42,6 +43,14 @@ func (r *Event) Save(ctx context.Context, ev *event.Event[any]) error {
return r.client.SaveOne(ctx, eID, doc)
}

func (r *Event) SaveAll(ctx context.Context, ev event.List) error {
doc, eID, err := mongodoc.NewEvents(ev)
if err != nil {
return err
}
return r.client.SaveAll(ctx, eID, lo.ToAnySlice(doc))
}

func (r *Event) findOne(ctx context.Context, filter any) (*event.Event[any], error) {
c := mongodoc.NewEventConsumer()
if err := r.client.FindOne(ctx, filter, c); err != nil {
Expand Down
17 changes: 17 additions & 0 deletions server/internal/infrastructure/mongo/mongodoc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ func NewEvent(e *event.Event[any]) (*EventDocument, string, error) {
}, eId, nil
}

func NewEvents(e event.List) ([]*EventDocument, []string, error) {
res := make([]*EventDocument, 0, len(e))
ids := make([]string, 0, len(e))
for _, d := range e {
if d == nil {
continue
}
r, rid, err := NewEvent(d)
if err != nil {
return nil, nil, err
}
res = append(res, r)
ids = append(ids, rid)
}
return res, ids, nil
}

func (d *EventDocument) Model() (*event.Event[any], error) {
eID, err := event.IDFrom(d.ID)
if err != nil {
Expand Down
61 changes: 40 additions & 21 deletions server/internal/usecase/interactor/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/reearth/reearth-cms/server/pkg/project"
"github.com/reearth/reearth-cms/server/pkg/task"
"github.com/reearth/reearthx/account/accountdomain"
"github.com/reearth/reearthx/account/accountdomain/workspace"
"github.com/reearth/reearthx/account/accountusecase/accountgateway"
"github.com/reearth/reearthx/account/accountusecase/accountinteractor"
"github.com/reearth/reearthx/account/accountusecase/accountrepo"
Expand Down Expand Up @@ -64,55 +65,73 @@ func (e *Event) EventProject() *event.Project {
}

func createEvent(ctx context.Context, r *repo.Container, g *gateway.Container, e Event) (*event.Event[any], error) {
ev, err := event.New[any]().NewID().Object(e.Object).Type(e.Type).Project(e.EventProject()).Timestamp(util.Now()).Operator(e.Operator).Build()
evs, err := createEvents(ctx, r, g, []Event{e})
if err != nil {
return nil, err
}
return evs[0], nil
}

func createEvents(ctx context.Context, r *repo.Container, g *gateway.Container, el []Event) (event.List, error) {
evl := make(event.List, 0, len(el))
for _, e := range el {
ev, err := event.New[any]().NewID().Object(e.Object).Type(e.Type).Project(e.EventProject()).Timestamp(util.Now()).Operator(e.Operator).Build()
if err != nil {
return nil, err
}
evl = append(evl, ev)
}

if err := r.Event.Save(ctx, ev); err != nil {
if err := r.Event.SaveAll(ctx, evl); err != nil {
return nil, err
}

if err := webhook(ctx, r, g, e, ev); err != nil {
if err := webhooks(ctx, r, g, el, evl); err != nil {
return nil, err
}

return ev, nil
return evl, nil
}

func webhook(ctx context.Context, r *repo.Container, g *gateway.Container, e Event, ev *event.Event[any]) error {
return webhooks(ctx, r, g, []Event{e}, event.List{ev})
}

func webhooks(ctx context.Context, r *repo.Container, g *gateway.Container, el []Event, evl event.List) error {
if g == nil || g.TaskRunner == nil {
log.Infof("asset: webhook was not sent because task runner is not configured")
return nil
}

ws, err := r.Workspace.FindByID(ctx, e.Workspace)
// all events are assumed to have the same workspace
wId := el[0].Workspace
ws, err := r.Workspace.FindByID(ctx, wId)
if err != nil {
return err
}
integrationIDs := ws.Members().IntegrationIDs()

ids := make([]id.IntegrationID, len(integrationIDs))
for i, iid := range integrationIDs {
id, err := id.IntegrationIDFrom(iid.String())
if err != nil {
return err
}
ids[i] = id
iIds, err := util.TryMap(ws.Members().IntegrationIDs(), func(iid workspace.IntegrationID) (id.IntegrationID, error) {
return id.IntegrationIDFrom(iid.String())
})
if err != nil {
return err
}

integrations, err := r.Integration.FindByIDs(ctx, ids)
integrations, err := r.Integration.FindByIDs(ctx, iIds)
if err != nil {
return err
}

for _, w := range integrations.ActiveWebhooks(ev.Type()) {
if err := g.TaskRunner.Run(ctx, task.WebhookPayload{
Webhook: w,
Event: ev,
Override: e.WebhookObject,
}.Payload()); err != nil {
return err
for i, ev := range evl {
e := el[i]
for _, w := range integrations.ActiveWebhooks(ev.Type()) {
if err := g.TaskRunner.Run(ctx, task.WebhookPayload{
Webhook: w,
Event: ev,
Override: e.WebhookObject,
}.Payload()); err != nil {
return err
}
}
}

Expand Down
24 changes: 17 additions & 7 deletions server/internal/usecase/interactor/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,12 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope
oldFields: oldFields,
action: action,
}

if action == interfaces.ImportStrategyTypeInsert {
res.ItemInserted()
} else {
res.ItemUpdated()
}
}

if err := i.repos.Thread.SaveAll(ctx, threadsToSave); err != nil {
Expand All @@ -580,6 +586,7 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
var events []Event

for k, changes := range itemsEvent {
vi := items.Item(k)
Expand All @@ -593,12 +600,10 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope
var eType event.Type
if changes.action == interfaces.ImportStrategyTypeInsert {
eType = event.ItemCreate
res.ItemInserted()
} else {
eType = event.ItemUpdate
res.ItemUpdated()
}
if err := i.event(ctx, Event{
events = append(events, Event{
Project: prj,
Workspace: s.Workspace(),
Type: eType,
Expand All @@ -612,9 +617,10 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope
Changes: item.CompareFields(it.Fields(), changes.oldFields),
},
Operator: operator.Operator(),
}); err != nil {
return interfaces.ImportItemsResponse{}, err
}
})
}
if err := i.events(ctx, events); err != nil {
return interfaces.ImportItemsResponse{}, err
}

return res.Into(), nil
Expand Down Expand Up @@ -1145,11 +1151,15 @@ func itemFieldsFromParams(fields []interfaces.ItemFieldParam, s *schema.Schema)
}

func (i Item) event(ctx context.Context, e Event) error {
return i.events(ctx, []Event{e})
}

func (i Item) events(ctx context.Context, e []Event) error {
if i.ignoreEvent {
return nil
}

_, err := createEvent(ctx, i.repos, i.gateways, e)
_, err := createEvents(ctx, i.repos, i.gateways, e)
return err
}

Expand Down
1 change: 1 addition & 0 deletions server/internal/usecase/repo/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ import (
type Event interface {
FindByID(context.Context, id.EventID) (*event.Event[any], error)
Save(context.Context, *event.Event[any]) error
SaveAll(context.Context, event.List) error
}
4 changes: 4 additions & 0 deletions server/pkg/event/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package event

// TODO: use T instead of any when go 1.24 is released
type List = []*Event[any]

0 comments on commit 6d13978

Please sign in to comment.