From 79a6a47e34d2ba876fdd2fa92836c046cfa7417f Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 13 Nov 2023 16:39:15 +0100 Subject: [PATCH] add a separate field for Namespace in messages Signed-off-by: Matthias Bertschy --- adapters/backend/v1/client.go | 61 ++++++++++++---------- adapters/backend/v1/pulsar.go | 25 +++++---- adapters/incluster/v1/client.go | 27 +++++----- domain/identifiers.go | 5 +- messaging/messages.go | 91 ++++++++++++++++++--------------- utils/utils.go | 10 ---- 6 files changed, 112 insertions(+), 107 deletions(-) diff --git a/adapters/backend/v1/client.go b/adapters/backend/v1/client.go index 15996ab..85829fd 100644 --- a/adapters/backend/v1/client.go +++ b/adapters/backend/v1/client.go @@ -117,12 +117,13 @@ func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName cId := domain.ClientIdentifierFromContext(ctx) msg := messaging.DeleteObjectMessage{ - Cluster: cId.Cluster, - Account: cId.Account, - Depth: depth + 1, - Kind: id.Kind.String(), - MsgId: msgId, - Name: id.Name, + Cluster: cId.Cluster, + Account: cId.Account, + Depth: depth + 1, + Kind: id.Kind.String(), + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, } logger.L().Debug("Sending delete object message to producer", helpers.String("cluster", msg.Cluster), @@ -150,6 +151,7 @@ func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, b Kind: id.Kind.String(), MsgId: msgId, Name: id.Name, + Namespace: id.Namespace, } logger.L().Debug("Sending get object message to producer", helpers.String("cluster", msg.Cluster), @@ -171,14 +173,15 @@ func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName, cId := domain.ClientIdentifierFromContext(ctx) msg := messaging.PatchObjectMessage{ - Checksum: checksum, - Cluster: cId.Cluster, - Account: cId.Account, - Depth: depth + 1, - Kind: id.Kind.String(), - MsgId: msgId, - Name: id.Name, - Patch: patch, + Checksum: checksum, + Cluster: cId.Cluster, + Account: cId.Account, + Depth: depth + 1, + Kind: id.Kind.String(), + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, + Patch: patch, } logger.L().Debug("Sending patch object message to producer", helpers.String("cluster", msg.Cluster), @@ -201,13 +204,14 @@ func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, o cId := domain.ClientIdentifierFromContext(ctx) msg := messaging.PutObjectMessage{ - Cluster: cId.Cluster, - Account: cId.Account, - Depth: depth + 1, - Kind: id.Kind.String(), - MsgId: msgId, - Name: id.Name, - Object: object, + Cluster: cId.Cluster, + Account: cId.Account, + Depth: depth + 1, + Kind: id.Kind.String(), + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, + Object: object, } logger.L().Debug("Sending put object message to producer", helpers.String("cluster", msg.Cluster), @@ -229,13 +233,14 @@ func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName cId := domain.ClientIdentifierFromContext(ctx) msg := messaging.VerifyObjectMessage{ - Checksum: checksum, - Cluster: cId.Cluster, - Account: cId.Account, - Depth: depth + 1, - Kind: id.Kind.String(), - MsgId: msgId, - Name: id.Name, + Checksum: checksum, + Cluster: cId.Cluster, + Account: cId.Account, + Depth: depth + 1, + Kind: id.Kind.String(), + MsgId: msgId, + Name: id.Name, + Namespace: id.Namespace, } logger.L().Debug("Sending verify object message to producer", helpers.String("cluster", msg.Cluster), diff --git a/adapters/backend/v1/pulsar.go b/adapters/backend/v1/pulsar.go index 81f44ed..8884e3a 100644 --- a/adapters/backend/v1/pulsar.go +++ b/adapters/backend/v1/pulsar.go @@ -107,8 +107,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont return err } if err := callbacks.GetObject(ctx, domain.KindName{ - Kind: domain.KindFromString(data.Kind), - Name: data.Name, + Kind: domain.KindFromString(data.Kind), + Name: data.Name, + Namespace: data.Namespace, }, data.BaseObject); err != nil { return fmt.Errorf("failed to send GetObject message: %w", err) } @@ -130,8 +131,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont return err } if err := callbacks.PatchObject(ctx, domain.KindName{ - Kind: domain.KindFromString(data.Kind), - Name: data.Name, + Kind: domain.KindFromString(data.Kind), + Name: data.Name, + Namespace: data.Namespace, }, data.Checksum, data.Patch); err != nil { return fmt.Errorf("failed to send PatchObject message: %w", err) } @@ -153,8 +155,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont return err } if err := callbacks.VerifyObject(ctx, domain.KindName{ - Kind: domain.KindFromString(data.Kind), - Name: data.Name, + Kind: domain.KindFromString(data.Kind), + Name: data.Name, + Namespace: data.Namespace, }, data.Checksum); err != nil { return fmt.Errorf("failed to send VerifyObject message: %w", err) } @@ -176,8 +179,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont return err } if err := callbacks.PutObject(ctx, domain.KindName{ - Kind: domain.KindFromString(data.Kind), - Name: data.Name, + Kind: domain.KindFromString(data.Kind), + Name: data.Name, + Namespace: data.Namespace, }, data.Object); err != nil { return fmt.Errorf("failed to send PutObject message: %w", err) } @@ -199,8 +203,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont return err } if err := callbacks.DeleteObject(ctx, domain.KindName{ - Kind: domain.KindFromString(data.Kind), - Name: data.Name, + Kind: domain.KindFromString(data.Kind), + Name: data.Name, + Namespace: data.Namespace, }); err != nil { return fmt.Errorf("failed to send DeleteObject message: %w", err) } diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index 3ef8fd2..3b160a1 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -61,8 +61,9 @@ func (c *Client) Start(ctx context.Context) error { for _, d := range list.Items { ctx := utils.ContextFromGeneric(ctx, domain.Generic{}) id := domain.KindName{ - Kind: c.kind, - Name: utils.NsNameToKey(d.GetNamespace(), d.GetName()), + Kind: c.kind, + Name: d.GetName(), + Namespace: d.GetNamespace(), } obj, err := c.client.Resource(c.res).Namespace(d.GetNamespace()).Get(context.Background(), d.GetName(), metav1.GetOptions{}) if err != nil { @@ -104,14 +105,14 @@ func (c *Client) Start(ctx context.Context) error { if !ok { continue } - key := utils.NsNameToKey(d.GetNamespace(), d.GetName()) id := domain.KindName{ - Kind: c.kind, - Name: key, + Kind: c.kind, + Name: d.GetName(), + Namespace: d.GetNamespace(), } newObject, err := d.MarshalJSON() if err != nil { - logger.L().Error("cannot marshal object", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("key", key)) + logger.L().Error("cannot marshal object", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("key", id.String())) continue } switch { @@ -196,15 +197,13 @@ func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, objec func (c *Client) DeleteObject(_ context.Context, id domain.KindName) error { if c.Strategy == domain.PatchStrategy { // remove from known resources - delete(c.shadowObjects, id.Name) + delete(c.shadowObjects, id.String()) } - ns, name := utils.KeyToNsName(id.Name) - return c.client.Resource(c.res).Namespace(ns).Delete(context.Background(), name, metav1.DeleteOptions{}) + return c.client.Resource(c.res).Namespace(id.Namespace).Delete(context.Background(), id.Name, metav1.DeleteOptions{}) } func (c *Client) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error { - ns, name := utils.KeyToNsName(id.Name) - obj, err := c.client.Resource(c.res).Namespace(ns).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.res).Namespace(id.Namespace).Get(context.Background(), id.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("get resource: %w", err) } @@ -228,8 +227,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s if c.Strategy != domain.PatchStrategy { return nil, fmt.Errorf("patch strategy not enabled for resource %s", id.Kind.String()) } - ns, name := utils.KeyToNsName(id.Name) - obj, err := c.client.Resource(c.res).Namespace(ns).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.res).Namespace(id.Namespace).Get(context.Background(), id.Name, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("get resource: %w", err) } @@ -287,8 +285,7 @@ func (c *Client) VerifyObject(ctx context.Context, id domain.KindName, newChecks } func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, error) { - ns, name := utils.KeyToNsName(id.Name) - obj, err := c.client.Resource(c.res).Namespace(ns).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.res).Namespace(id.Namespace).Get(context.Background(), id.Name, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("get resource: %w", err) } diff --git a/domain/identifiers.go b/domain/identifiers.go index 2a30f98..7fc0052 100644 --- a/domain/identifiers.go +++ b/domain/identifiers.go @@ -3,8 +3,9 @@ package domain import "strings" type KindName struct { - Kind *Kind - Name string + Kind *Kind + Name string + Namespace string } func (c KindName) String() string { diff --git a/messaging/messages.go b/messaging/messages.go index 33097ea..88fc287 100644 --- a/messaging/messages.go +++ b/messaging/messages.go @@ -17,12 +17,13 @@ const ( ) type DeleteObjectMessage struct { - Cluster string `json:"cluster"` - Account string `json:"account"` - Depth int `json:"depth"` - Kind string `json:"kind"` - MsgId string `json:"msgId"` - Name string `json:"name"` + Cluster string `json:"cluster"` + Account string `json:"account"` + Depth int `json:"depth"` + Kind string `json:"kind"` + MsgId string `json:"msgId"` + Name string `json:"name"` + Namespace string `json:"namespace"` } type GetObjectMessage struct { @@ -33,57 +34,63 @@ type GetObjectMessage struct { Kind string `json:"kind"` MsgId string `json:"msgId"` Name string `json:"name"` + Namespace string `json:"namespace"` } type NewChecksumMessage struct { - Checksum string `json:"checksum"` - Cluster string `json:"cluster"` - Account string `json:"account"` - Depth int `json:"depth"` - Kind string `json:"kind"` - MsgId string `json:"msgId"` - Name string `json:"name"` + Checksum string `json:"checksum"` + Cluster string `json:"cluster"` + Account string `json:"account"` + Depth int `json:"depth"` + Kind string `json:"kind"` + MsgId string `json:"msgId"` + Name string `json:"name"` + Namespace string `json:"namespace"` } type NewObjectMessage struct { - Cluster string `json:"cluster"` - Account string `json:"account"` - Depth int `json:"depth"` - Kind string `json:"kind"` - MsgId string `json:"msgId"` - Name string `json:"name"` - Object []byte `json:"patch"` + Cluster string `json:"cluster"` + Account string `json:"account"` + Depth int `json:"depth"` + Kind string `json:"kind"` + MsgId string `json:"msgId"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Object []byte `json:"patch"` } type PatchObjectMessage struct { - Checksum string `json:"checksum"` - Cluster string `json:"cluster"` - Account string `json:"account"` - Depth int `json:"depth"` - Kind string `json:"kind"` - MsgId string `json:"msgId"` - Name string `json:"name"` - Patch []byte `json:"patch"` + Checksum string `json:"checksum"` + Cluster string `json:"cluster"` + Account string `json:"account"` + Depth int `json:"depth"` + Kind string `json:"kind"` + MsgId string `json:"msgId"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Patch []byte `json:"patch"` } type PutObjectMessage struct { - Cluster string `json:"cluster"` - Account string `json:"account"` - Depth int `json:"depth"` - Kind string `json:"kind"` - MsgId string `json:"msgId"` - Name string `json:"name"` - Object []byte `json:"patch"` + Cluster string `json:"cluster"` + Account string `json:"account"` + Depth int `json:"depth"` + Kind string `json:"kind"` + MsgId string `json:"msgId"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Object []byte `json:"patch"` } type VerifyObjectMessage struct { - Checksum string `json:"checksum"` - Cluster string `json:"cluster"` - Account string `json:"account"` - Depth int `json:"depth"` - Kind string `json:"kind"` - MsgId string `json:"msgId"` - Name string `json:"name"` + Checksum string `json:"checksum"` + Cluster string `json:"cluster"` + Account string `json:"account"` + Depth int `json:"depth"` + Kind string `json:"kind"` + MsgId string `json:"msgId"` + Name string `json:"name"` + Namespace string `json:"namespace"` } type ServerConnectedMessage struct { diff --git a/utils/utils.go b/utils/utils.go index 5cd5df0..ef7580d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -10,7 +10,6 @@ import ( "path/filepath" "reflect" "strconv" - "strings" "time" "github.com/SergJa/jsonhash" @@ -55,15 +54,6 @@ func ContextFromIdentifiers(parent context.Context, id domain.ClientIdentifier) }) } -func KeyToNsName(key string) (string, string) { - split := strings.Split(key, "/") - return split[0], split[1] -} - -func NsNameToKey(ns, name string) string { - return strings.Join([]string{ns, name}, "/") -} - //goland:noinspection GoUnusedExportedFunction func CompareJson(a, b []byte) bool { var aData interface{}