From 6d13978827ef4c32cd6c60e33c763bb95167f7a1 Mon Sep 17 00:00:00 2001 From: yk-eukarya <81808708+yk-eukarya@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:58:30 +0100 Subject: [PATCH] chore(server): bulk event to enhance performance (#1326) --- .../internal/infrastructure/memory/event.go | 14 ++++- server/internal/infrastructure/mongo/event.go | 9 +++ .../infrastructure/mongo/mongodoc/event.go | 17 ++++++ server/internal/usecase/interactor/common.go | 61 ++++++++++++------- server/internal/usecase/interactor/item.go | 24 +++++--- server/internal/usecase/repo/event.go | 1 + server/pkg/event/list.go | 4 ++ 7 files changed, 101 insertions(+), 29 deletions(-) create mode 100644 server/pkg/event/list.go diff --git a/server/internal/infrastructure/memory/event.go b/server/internal/infrastructure/memory/event.go index 263343c47d..cdf8d63a1c 100644 --- a/server/internal/infrastructure/memory/event.go +++ b/server/internal/infrastructure/memory/event.go @@ -8,6 +8,7 @@ import ( "github.com/reearth/reearth-cms/server/pkg/id" "github.com/reearth/reearthx/rerror" "github.com/reearth/reearthx/util" + "github.com/samber/lo" ) type Event struct { @@ -36,7 +37,7 @@ func (r *Event) FindByID(_ context.Context, iId id.EventID) (*event.Event[any], return nil, rerror.ErrNotFound } -func (r *Event) Save(ctx context.Context, ev *event.Event[any]) error { +func (r *Event) Save(_ context.Context, ev *event.Event[any]) error { if r.err != nil { return r.err } @@ -44,3 +45,14 @@ func (r *Event) Save(ctx context.Context, ev *event.Event[any]) error { r.data.Store(ev.ID(), ev) return nil } + +func (r *Event) SaveAll(_ context.Context, ev event.List) error { + if r.err != nil { + return r.err + } + + r.data.StoreAll(lo.SliceToMap(ev, func(e *event.Event[any]) (id.EventID, *event.Event[any]) { + return e.ID(), e + })) + return nil +} diff --git a/server/internal/infrastructure/mongo/event.go b/server/internal/infrastructure/mongo/event.go index 4471f03a28..1119faaaf1 100644 --- a/server/internal/infrastructure/mongo/event.go +++ b/server/internal/infrastructure/mongo/event.go @@ -8,6 +8,7 @@ import ( "github.com/reearth/reearth-cms/server/pkg/event" "github.com/reearth/reearth-cms/server/pkg/id" "github.com/reearth/reearthx/mongox" + "github.com/samber/lo" "go.mongodb.org/mongo-driver/bson" ) @@ -42,6 +43,14 @@ func (r *Event) Save(ctx context.Context, ev *event.Event[any]) error { return r.client.SaveOne(ctx, eID, doc) } +func (r *Event) SaveAll(ctx context.Context, ev event.List) error { + doc, eID, err := mongodoc.NewEvents(ev) + if err != nil { + return err + } + return r.client.SaveAll(ctx, eID, lo.ToAnySlice(doc)) +} + func (r *Event) findOne(ctx context.Context, filter any) (*event.Event[any], error) { c := mongodoc.NewEventConsumer() if err := r.client.FindOne(ctx, filter, c); err != nil { diff --git a/server/internal/infrastructure/mongo/mongodoc/event.go b/server/internal/infrastructure/mongo/mongodoc/event.go index 9ca60e68fe..c8655d9ad1 100644 --- a/server/internal/infrastructure/mongo/mongodoc/event.go +++ b/server/internal/infrastructure/mongo/mongodoc/event.go @@ -37,6 +37,23 @@ func NewEvent(e *event.Event[any]) (*EventDocument, string, error) { }, eId, nil } +func NewEvents(e event.List) ([]*EventDocument, []string, error) { + res := make([]*EventDocument, 0, len(e)) + ids := make([]string, 0, len(e)) + for _, d := range e { + if d == nil { + continue + } + r, rid, err := NewEvent(d) + if err != nil { + return nil, nil, err + } + res = append(res, r) + ids = append(ids, rid) + } + return res, ids, nil +} + func (d *EventDocument) Model() (*event.Event[any], error) { eID, err := event.IDFrom(d.ID) if err != nil { diff --git a/server/internal/usecase/interactor/common.go b/server/internal/usecase/interactor/common.go index 0513f96881..91306414de 100644 --- a/server/internal/usecase/interactor/common.go +++ b/server/internal/usecase/interactor/common.go @@ -12,6 +12,7 @@ import ( "github.com/reearth/reearth-cms/server/pkg/project" "github.com/reearth/reearth-cms/server/pkg/task" "github.com/reearth/reearthx/account/accountdomain" + "github.com/reearth/reearthx/account/accountdomain/workspace" "github.com/reearth/reearthx/account/accountusecase/accountgateway" "github.com/reearth/reearthx/account/accountusecase/accountinteractor" "github.com/reearth/reearthx/account/accountusecase/accountrepo" @@ -64,55 +65,73 @@ func (e *Event) EventProject() *event.Project { } func createEvent(ctx context.Context, r *repo.Container, g *gateway.Container, e Event) (*event.Event[any], error) { - ev, err := event.New[any]().NewID().Object(e.Object).Type(e.Type).Project(e.EventProject()).Timestamp(util.Now()).Operator(e.Operator).Build() + evs, err := createEvents(ctx, r, g, []Event{e}) if err != nil { return nil, err } + return evs[0], nil +} + +func createEvents(ctx context.Context, r *repo.Container, g *gateway.Container, el []Event) (event.List, error) { + evl := make(event.List, 0, len(el)) + for _, e := range el { + ev, err := event.New[any]().NewID().Object(e.Object).Type(e.Type).Project(e.EventProject()).Timestamp(util.Now()).Operator(e.Operator).Build() + if err != nil { + return nil, err + } + evl = append(evl, ev) + } - if err := r.Event.Save(ctx, ev); err != nil { + if err := r.Event.SaveAll(ctx, evl); err != nil { return nil, err } - if err := webhook(ctx, r, g, e, ev); err != nil { + if err := webhooks(ctx, r, g, el, evl); err != nil { return nil, err } - return ev, nil + return evl, nil } func webhook(ctx context.Context, r *repo.Container, g *gateway.Container, e Event, ev *event.Event[any]) error { + return webhooks(ctx, r, g, []Event{e}, event.List{ev}) +} + +func webhooks(ctx context.Context, r *repo.Container, g *gateway.Container, el []Event, evl event.List) error { if g == nil || g.TaskRunner == nil { log.Infof("asset: webhook was not sent because task runner is not configured") return nil } - ws, err := r.Workspace.FindByID(ctx, e.Workspace) + // all events are assumed to have the same workspace + wId := el[0].Workspace + ws, err := r.Workspace.FindByID(ctx, wId) if err != nil { return err } - integrationIDs := ws.Members().IntegrationIDs() - ids := make([]id.IntegrationID, len(integrationIDs)) - for i, iid := range integrationIDs { - id, err := id.IntegrationIDFrom(iid.String()) - if err != nil { - return err - } - ids[i] = id + iIds, err := util.TryMap(ws.Members().IntegrationIDs(), func(iid workspace.IntegrationID) (id.IntegrationID, error) { + return id.IntegrationIDFrom(iid.String()) + }) + if err != nil { + return err } - integrations, err := r.Integration.FindByIDs(ctx, ids) + integrations, err := r.Integration.FindByIDs(ctx, iIds) if err != nil { return err } - for _, w := range integrations.ActiveWebhooks(ev.Type()) { - if err := g.TaskRunner.Run(ctx, task.WebhookPayload{ - Webhook: w, - Event: ev, - Override: e.WebhookObject, - }.Payload()); err != nil { - return err + for i, ev := range evl { + e := el[i] + for _, w := range integrations.ActiveWebhooks(ev.Type()) { + if err := g.TaskRunner.Run(ctx, task.WebhookPayload{ + Webhook: w, + Event: ev, + Override: e.WebhookObject, + }.Payload()); err != nil { + return err + } } } diff --git a/server/internal/usecase/interactor/item.go b/server/internal/usecase/interactor/item.go index a1f7f1a992..e66a157a49 100644 --- a/server/internal/usecase/interactor/item.go +++ b/server/internal/usecase/interactor/item.go @@ -565,6 +565,12 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope oldFields: oldFields, action: action, } + + if action == interfaces.ImportStrategyTypeInsert { + res.ItemInserted() + } else { + res.ItemUpdated() + } } if err := i.repos.Thread.SaveAll(ctx, threadsToSave); err != nil { @@ -580,6 +586,7 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope if err != nil { return interfaces.ImportItemsResponse{}, err } + var events []Event for k, changes := range itemsEvent { vi := items.Item(k) @@ -593,12 +600,10 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope var eType event.Type if changes.action == interfaces.ImportStrategyTypeInsert { eType = event.ItemCreate - res.ItemInserted() } else { eType = event.ItemUpdate - res.ItemUpdated() } - if err := i.event(ctx, Event{ + events = append(events, Event{ Project: prj, Workspace: s.Workspace(), Type: eType, @@ -612,9 +617,10 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope Changes: item.CompareFields(it.Fields(), changes.oldFields), }, Operator: operator.Operator(), - }); err != nil { - return interfaces.ImportItemsResponse{}, err - } + }) + } + if err := i.events(ctx, events); err != nil { + return interfaces.ImportItemsResponse{}, err } return res.Into(), nil @@ -1145,11 +1151,15 @@ func itemFieldsFromParams(fields []interfaces.ItemFieldParam, s *schema.Schema) } func (i Item) event(ctx context.Context, e Event) error { + return i.events(ctx, []Event{e}) +} + +func (i Item) events(ctx context.Context, e []Event) error { if i.ignoreEvent { return nil } - _, err := createEvent(ctx, i.repos, i.gateways, e) + _, err := createEvents(ctx, i.repos, i.gateways, e) return err } diff --git a/server/internal/usecase/repo/event.go b/server/internal/usecase/repo/event.go index 404f81350d..3ad1f43619 100644 --- a/server/internal/usecase/repo/event.go +++ b/server/internal/usecase/repo/event.go @@ -10,4 +10,5 @@ import ( type Event interface { FindByID(context.Context, id.EventID) (*event.Event[any], error) Save(context.Context, *event.Event[any]) error + SaveAll(context.Context, event.List) error } diff --git a/server/pkg/event/list.go b/server/pkg/event/list.go new file mode 100644 index 0000000000..b4b6d429c7 --- /dev/null +++ b/server/pkg/event/list.go @@ -0,0 +1,4 @@ +package event + +// TODO: use T instead of any when go 1.24 is released +type List = []*Event[any]