From f19cc6619a0164f3ef693e91354059c07fa0918f Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Wed, 15 Nov 2023 13:09:02 +0100 Subject: [PATCH] add namespace to synchronizer messages too Signed-off-by: Matthias Bertschy --- api/asyncapi.yaml | 17 +++++++ core/synchronizer.go | 98 ++++++++++++++++++++++++---------------- domain/GetObject.go | 1 + domain/NewChecksum.go | 1 + domain/ObjectAdded.go | 1 + domain/ObjectDeleted.go | 1 + domain/ObjectModified.go | 1 + domain/PatchObject.go | 1 + domain/PutObject.go | 1 + 9 files changed, 83 insertions(+), 39 deletions(-) diff --git a/api/asyncapi.yaml b/api/asyncapi.yaml index 40a188d..5cc50da 100644 --- a/api/asyncapi.yaml +++ b/api/asyncapi.yaml @@ -98,6 +98,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' objectAdded: type: object properties: @@ -113,6 +115,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' object: $ref: '#/components/schemas/object' objectDeleted: @@ -128,6 +132,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' objectModified: type: object properties: @@ -143,6 +149,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' object: $ref: '#/components/schemas/object' getObject: @@ -160,6 +168,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' patchObject: type: object properties: @@ -175,6 +185,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' patch: $ref: '#/components/schemas/object' putObject: @@ -190,6 +202,8 @@ components: $ref: '#/components/schemas/msgID' name: $ref: '#/components/schemas/name' + namespace: + $ref: '#/components/schemas/namespace' object: $ref: '#/components/schemas/object' depth: @@ -222,6 +236,9 @@ components: name: type: string description: name of the object + namespace: + type: string + description: namespace of the object object: type: string description: The object is encoded in JSON diff --git a/core/synchronizer.go b/core/synchronizer.go index 2d4c4fb..5bee011 100644 --- a/core/synchronizer.go +++ b/core/synchronizer.go @@ -158,13 +158,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { continue } id := domain.KindName{ - Kind: msg.Kind, - Name: msg.Name, + Kind: msg.Kind, + Name: msg.Name, + Namespace: msg.Namespace, } err := s.handleSyncGetObject(ctx, id, []byte(msg.BaseObject)) if err != nil { logger.L().Error("error handling message", helpers.Error(err), - helpers.Interface("event", generic.Event.Value())) + helpers.Interface("event", msg.Event.Value()), + helpers.String("id", id.String())) continue } case domain.EventNewChecksum: @@ -176,13 +178,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { continue } id := domain.KindName{ - Kind: msg.Kind, - Name: msg.Name, + Kind: msg.Kind, + Name: msg.Name, + Namespace: msg.Namespace, } err := s.handleSyncNewChecksum(ctx, id, msg.Checksum) if err != nil { logger.L().Error("error handling message", helpers.Error(err), - helpers.Interface("event", generic.Event.Value())) + helpers.Interface("event", msg.Event.Value()), + helpers.String("id", id.String())) continue } case domain.EventObjectDeleted: @@ -194,13 +198,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { continue } id := domain.KindName{ - Kind: msg.Kind, - Name: msg.Name, + Kind: msg.Kind, + Name: msg.Name, + Namespace: msg.Namespace, } err := s.handleSyncObjectDeleted(ctx, id) if err != nil { logger.L().Error("error handling message", helpers.Error(err), - helpers.Interface("event", generic.Event.Value())) + helpers.Interface("event", msg.Event.Value()), + helpers.String("id", id.String())) continue } case domain.EventPatchObject: @@ -212,13 +218,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { continue } id := domain.KindName{ - Kind: msg.Kind, - Name: msg.Name, + Kind: msg.Kind, + Name: msg.Name, + Namespace: msg.Namespace, } err := s.handleSyncPatchObject(ctx, id, msg.Checksum, []byte(msg.Patch)) if err != nil { logger.L().Error("error handling message", helpers.Error(err), - helpers.Interface("event", generic.Event.Value())) + helpers.Interface("event", msg.Event.Value()), + helpers.String("id", id.String())) continue } case domain.EventPutObject: @@ -230,13 +238,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { continue } id := domain.KindName{ - Kind: msg.Kind, - Name: msg.Name, + Kind: msg.Kind, + Name: msg.Name, + Namespace: msg.Namespace, } err := s.handleSyncPutObject(ctx, id, []byte(msg.Object)) if err != nil { logger.L().Error("error handling message", helpers.Error(err), - helpers.Interface("event", generic.Event.Value())) + helpers.Interface("event", msg.Event.Value()), + helpers.String("id", id.String())) continue } } @@ -294,6 +304,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba Kind: id.Kind, MsgId: msgId, Name: id.Name, + Namespace: id.Namespace, } data, err := json.Marshal(msg) if err != nil { @@ -308,6 +319,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba helpers.String("account", clientId.Account), helpers.String("cluster", clientId.Cluster), helpers.String("kind", msg.Kind.String()), + helpers.String("namespace", msg.Namespace), helpers.String("name", msg.Name), helpers.Int("base object size", len(msg.BaseObject))) return nil @@ -318,12 +330,13 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName, depth := ctx.Value(domain.ContextKeyDepth).(int) msgId := ctx.Value(domain.ContextKeyMsgId).(string) msg := domain.NewChecksum{ - Checksum: checksum, - Depth: depth + 1, - Event: &event, - Kind: id.Kind, - MsgId: msgId, - Name: id.Name, + Checksum: checksum, + Depth: depth + 1, + Event: &event, + Kind: id.Kind, + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, } data, err := json.Marshal(msg) if err != nil { @@ -341,6 +354,7 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName, helpers.String("account", clientId.Account), helpers.String("cluster", clientId.Cluster), helpers.String("kind", msg.Kind.String()), + helpers.String("namespace", msg.Namespace), helpers.String("name", msg.Name), helpers.String("checksum", msg.Checksum)) return nil @@ -351,11 +365,12 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName depth := ctx.Value(domain.ContextKeyDepth).(int) msgId := ctx.Value(domain.ContextKeyMsgId).(string) msg := domain.ObjectDeleted{ - Depth: depth + 1, - Event: &event, - Kind: id.Kind, - MsgId: msgId, - Name: id.Name, + Depth: depth + 1, + Event: &event, + Kind: id.Kind, + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, } data, err := json.Marshal(msg) if err != nil { @@ -370,6 +385,7 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName helpers.String("account", clientId.Account), helpers.String("cluster", clientId.Cluster), helpers.String("kind", msg.Kind.String()), + helpers.String("namespace", msg.Namespace), helpers.String("name", msg.Name)) return nil } @@ -380,13 +396,14 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName, msgId := ctx.Value(domain.ContextKeyMsgId).(string) msg := domain.PatchObject{ - Checksum: checksum, - Depth: depth + 1, - Event: &event, - Kind: id.Kind, - MsgId: msgId, - Name: id.Name, - Patch: string(patch), + Checksum: checksum, + Depth: depth + 1, + Event: &event, + Kind: id.Kind, + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, + Patch: string(patch), } data, err := json.Marshal(msg) if err != nil { @@ -403,6 +420,7 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName, helpers.String("account", clientId.Account), helpers.String("cluster", clientId.Cluster), helpers.String("kind", msg.Kind.String()), + helpers.String("namespace", msg.Namespace), helpers.String("name", msg.Name), helpers.String("checksum", msg.Checksum), helpers.Int("patch size", len(msg.Patch))) @@ -414,12 +432,13 @@ func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, ob depth := ctx.Value(domain.ContextKeyDepth).(int) msgId := ctx.Value(domain.ContextKeyMsgId).(string) msg := domain.PutObject{ - Depth: depth + 1, - Event: &event, - Kind: id.Kind, - MsgId: msgId, - Name: id.Name, - Object: string(object), + Depth: depth + 1, + Event: &event, + Kind: id.Kind, + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, + Object: string(object), } data, err := json.Marshal(msg) if err != nil { @@ -435,6 +454,7 @@ func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, ob helpers.String("account", clientId.Account), helpers.String("cluster", clientId.Cluster), helpers.String("kind", msg.Kind.String()), + helpers.String("namespace", msg.Namespace), helpers.String("name", msg.Name), helpers.Int("object size", len(msg.Object))) return nil diff --git a/domain/GetObject.go b/domain/GetObject.go index 62abf93..ed28ce4 100644 --- a/domain/GetObject.go +++ b/domain/GetObject.go @@ -8,5 +8,6 @@ type GetObject struct { Kind *Kind MsgId string Name string + Namespace string AdditionalProperties map[string]interface{} } diff --git a/domain/NewChecksum.go b/domain/NewChecksum.go index baadf16..1727d93 100644 --- a/domain/NewChecksum.go +++ b/domain/NewChecksum.go @@ -8,5 +8,6 @@ type NewChecksum struct { Kind *Kind MsgId string Name string + Namespace string AdditionalProperties map[string]interface{} } diff --git a/domain/ObjectAdded.go b/domain/ObjectAdded.go index 1e0b840..8ec176d 100644 --- a/domain/ObjectAdded.go +++ b/domain/ObjectAdded.go @@ -8,6 +8,7 @@ type ObjectAdded struct { Kind *Kind MsgId string Name string + Namespace string Object string AdditionalProperties map[string]interface{} } diff --git a/domain/ObjectDeleted.go b/domain/ObjectDeleted.go index aee9dae..d110b54 100644 --- a/domain/ObjectDeleted.go +++ b/domain/ObjectDeleted.go @@ -7,5 +7,6 @@ type ObjectDeleted struct { Kind *Kind MsgId string Name string + Namespace string AdditionalProperties map[string]interface{} } diff --git a/domain/ObjectModified.go b/domain/ObjectModified.go index 5abdd99..e8f3ce2 100644 --- a/domain/ObjectModified.go +++ b/domain/ObjectModified.go @@ -8,6 +8,7 @@ type ObjectModified struct { Kind *Kind MsgId string Name string + Namespace string Object string AdditionalProperties map[string]interface{} } diff --git a/domain/PatchObject.go b/domain/PatchObject.go index e323096..671f2c4 100644 --- a/domain/PatchObject.go +++ b/domain/PatchObject.go @@ -8,6 +8,7 @@ type PatchObject struct { Kind *Kind MsgId string Name string + Namespace string Patch string AdditionalProperties map[string]interface{} } diff --git a/domain/PutObject.go b/domain/PutObject.go index caf05c1..3fbf96e 100644 --- a/domain/PutObject.go +++ b/domain/PutObject.go @@ -7,6 +7,7 @@ type PutObject struct { Kind *Kind MsgId string Name string + Namespace string Object string AdditionalProperties map[string]interface{} }