Skip to content

Commit

Permalink
add namespace to synchronizer messages too
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
  • Loading branch information
matthyx committed Nov 15, 2023
1 parent ee94089 commit f19cc66
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 39 deletions.
17 changes: 17 additions & 0 deletions api/asyncapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
objectAdded:
type: object
properties:
Expand All @@ -113,6 +115,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
object:
$ref: '#/components/schemas/object'
objectDeleted:
Expand All @@ -128,6 +132,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
objectModified:
type: object
properties:
Expand All @@ -143,6 +149,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
object:
$ref: '#/components/schemas/object'
getObject:
Expand All @@ -160,6 +168,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
patchObject:
type: object
properties:
Expand All @@ -175,6 +185,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
patch:
$ref: '#/components/schemas/object'
putObject:
Expand All @@ -190,6 +202,8 @@ components:
$ref: '#/components/schemas/msgID'
name:
$ref: '#/components/schemas/name'
namespace:
$ref: '#/components/schemas/namespace'
object:
$ref: '#/components/schemas/object'
depth:
Expand Down Expand Up @@ -222,6 +236,9 @@ components:
name:
type: string
description: name of the object
namespace:
type: string
description: namespace of the object
object:
type: string
description: The object is encoded in JSON
Expand Down
98 changes: 59 additions & 39 deletions core/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncGetObject(ctx, id, []byte(msg.BaseObject))
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventNewChecksum:
Expand All @@ -176,13 +178,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncNewChecksum(ctx, id, msg.Checksum)
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventObjectDeleted:
Expand All @@ -194,13 +198,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncObjectDeleted(ctx, id)
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventPatchObject:
Expand All @@ -212,13 +218,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncPatchObject(ctx, id, msg.Checksum, []byte(msg.Patch))
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
case domain.EventPutObject:
Expand All @@ -230,13 +238,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
continue
}
id := domain.KindName{
Kind: msg.Kind,
Name: msg.Name,
Kind: msg.Kind,
Name: msg.Name,
Namespace: msg.Namespace,
}
err := s.handleSyncPutObject(ctx, id, []byte(msg.Object))
if err != nil {
logger.L().Error("error handling message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
}
}
Expand Down Expand Up @@ -294,6 +304,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -308,6 +319,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.Int("base object size", len(msg.BaseObject)))
return nil
Expand All @@ -318,12 +330,13 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName,
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
msg := domain.NewChecksum{
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -341,6 +354,7 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName,
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.String("checksum", msg.Checksum))
return nil
Expand All @@ -351,11 +365,12 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
msg := domain.ObjectDeleted{
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -370,6 +385,7 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name))
return nil
}
Expand All @@ -380,13 +396,14 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName,
msgId := ctx.Value(domain.ContextKeyMsgId).(string)

msg := domain.PatchObject{
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Patch: string(patch),
Checksum: checksum,
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
Patch: string(patch),
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -403,6 +420,7 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName,
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.String("checksum", msg.Checksum),
helpers.Int("patch size", len(msg.Patch)))
Expand All @@ -414,12 +432,13 @@ func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, ob
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
msg := domain.PutObject{
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Object: string(object),
Depth: depth + 1,
Event: &event,
Kind: id.Kind,
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
Object: string(object),
}
data, err := json.Marshal(msg)
if err != nil {
Expand All @@ -435,6 +454,7 @@ func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, ob
helpers.String("account", clientId.Account),
helpers.String("cluster", clientId.Cluster),
helpers.String("kind", msg.Kind.String()),
helpers.String("namespace", msg.Namespace),
helpers.String("name", msg.Name),
helpers.Int("object size", len(msg.Object)))
return nil
Expand Down
1 change: 1 addition & 0 deletions domain/GetObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type GetObject struct {
Kind *Kind
MsgId string
Name string
Namespace string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/NewChecksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type NewChecksum struct {
Kind *Kind
MsgId string
Name string
Namespace string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/ObjectAdded.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ObjectAdded struct {
Kind *Kind
MsgId string
Name string
Namespace string
Object string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/ObjectDeleted.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ type ObjectDeleted struct {
Kind *Kind
MsgId string
Name string
Namespace string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/ObjectModified.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ObjectModified struct {
Kind *Kind
MsgId string
Name string
Namespace string
Object string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/PatchObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type PatchObject struct {
Kind *Kind
MsgId string
Name string
Namespace string
Patch string
AdditionalProperties map[string]interface{}
}
1 change: 1 addition & 0 deletions domain/PutObject.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type PutObject struct {
Kind *Kind
MsgId string
Name string
Namespace string
Object string
AdditionalProperties map[string]interface{}
}

0 comments on commit f19cc66

Please sign in to comment.