Skip to content

Commit

Permalink
add a separate field for Namespace in messages
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
  • Loading branch information
matthyx committed Nov 13, 2023
1 parent 0dd2751 commit 79a6a47
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 107 deletions.
61 changes: 33 additions & 28 deletions adapters/backend/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName
cId := domain.ClientIdentifierFromContext(ctx)

msg := messaging.DeleteObjectMessage{
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
logger.L().Debug("Sending delete object message to producer",
helpers.String("cluster", msg.Cluster),
Expand Down Expand Up @@ -150,6 +151,7 @@ func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, b
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
logger.L().Debug("Sending get object message to producer",
helpers.String("cluster", msg.Cluster),
Expand All @@ -171,14 +173,15 @@ func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName,
cId := domain.ClientIdentifierFromContext(ctx)

msg := messaging.PatchObjectMessage{
Checksum: checksum,
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Patch: patch,
Checksum: checksum,
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
Patch: patch,
}
logger.L().Debug("Sending patch object message to producer",
helpers.String("cluster", msg.Cluster),
Expand All @@ -201,13 +204,14 @@ func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, o
cId := domain.ClientIdentifierFromContext(ctx)

msg := messaging.PutObjectMessage{
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Object: object,
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
Object: object,
}
logger.L().Debug("Sending put object message to producer",
helpers.String("cluster", msg.Cluster),
Expand All @@ -229,13 +233,14 @@ func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName
cId := domain.ClientIdentifierFromContext(ctx)

msg := messaging.VerifyObjectMessage{
Checksum: checksum,
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Checksum: checksum,
Cluster: cId.Cluster,
Account: cId.Account,
Depth: depth + 1,
Kind: id.Kind.String(),
MsgId: msgId,
Name: id.Name,
Namespace: id.Namespace,
}
logger.L().Debug("Sending verify object message to producer",
helpers.String("cluster", msg.Cluster),
Expand Down
25 changes: 15 additions & 10 deletions adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont
return err
}
if err := callbacks.GetObject(ctx, domain.KindName{
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Namespace: data.Namespace,
}, data.BaseObject); err != nil {
return fmt.Errorf("failed to send GetObject message: %w", err)
}
Expand All @@ -130,8 +131,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont
return err
}
if err := callbacks.PatchObject(ctx, domain.KindName{
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Namespace: data.Namespace,
}, data.Checksum, data.Patch); err != nil {
return fmt.Errorf("failed to send PatchObject message: %w", err)
}
Expand All @@ -153,8 +155,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont
return err
}
if err := callbacks.VerifyObject(ctx, domain.KindName{
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Namespace: data.Namespace,
}, data.Checksum); err != nil {
return fmt.Errorf("failed to send VerifyObject message: %w", err)
}
Expand All @@ -176,8 +179,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont
return err
}
if err := callbacks.PutObject(ctx, domain.KindName{
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Namespace: data.Namespace,
}, data.Object); err != nil {
return fmt.Errorf("failed to send PutObject message: %w", err)
}
Expand All @@ -199,8 +203,9 @@ func (c *PulsarMessageConsumer) handleSingleSynchronizerMessage(ctx context.Cont
return err
}
if err := callbacks.DeleteObject(ctx, domain.KindName{
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Kind: domain.KindFromString(data.Kind),
Name: data.Name,
Namespace: data.Namespace,
}); err != nil {
return fmt.Errorf("failed to send DeleteObject message: %w", err)
}
Expand Down
27 changes: 12 additions & 15 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func (c *Client) Start(ctx context.Context) error {
for _, d := range list.Items {
ctx := utils.ContextFromGeneric(ctx, domain.Generic{})
id := domain.KindName{
Kind: c.kind,
Name: utils.NsNameToKey(d.GetNamespace(), d.GetName()),
Kind: c.kind,
Name: d.GetName(),
Namespace: d.GetNamespace(),
}
obj, err := c.client.Resource(c.res).Namespace(d.GetNamespace()).Get(context.Background(), d.GetName(), metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -104,14 +105,14 @@ func (c *Client) Start(ctx context.Context) error {
if !ok {
continue
}
key := utils.NsNameToKey(d.GetNamespace(), d.GetName())
id := domain.KindName{
Kind: c.kind,
Name: key,
Kind: c.kind,
Name: d.GetName(),
Namespace: d.GetNamespace(),
}
newObject, err := d.MarshalJSON()
if err != nil {
logger.L().Error("cannot marshal object", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("key", key))
logger.L().Error("cannot marshal object", helpers.Error(err), helpers.String("resource", c.res.Resource), helpers.String("key", id.String()))
continue
}
switch {
Expand Down Expand Up @@ -196,15 +197,13 @@ func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, objec
func (c *Client) DeleteObject(_ context.Context, id domain.KindName) error {
if c.Strategy == domain.PatchStrategy {
// remove from known resources
delete(c.shadowObjects, id.Name)
delete(c.shadowObjects, id.String())
}
ns, name := utils.KeyToNsName(id.Name)
return c.client.Resource(c.res).Namespace(ns).Delete(context.Background(), name, metav1.DeleteOptions{})
return c.client.Resource(c.res).Namespace(id.Namespace).Delete(context.Background(), id.Name, metav1.DeleteOptions{})
}

func (c *Client) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error {
ns, name := utils.KeyToNsName(id.Name)
obj, err := c.client.Resource(c.res).Namespace(ns).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.res).Namespace(id.Namespace).Get(context.Background(), id.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get resource: %w", err)
}
Expand All @@ -228,8 +227,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s
if c.Strategy != domain.PatchStrategy {
return nil, fmt.Errorf("patch strategy not enabled for resource %s", id.Kind.String())
}
ns, name := utils.KeyToNsName(id.Name)
obj, err := c.client.Resource(c.res).Namespace(ns).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.res).Namespace(id.Namespace).Get(context.Background(), id.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
Expand Down Expand Up @@ -287,8 +285,7 @@ func (c *Client) VerifyObject(ctx context.Context, id domain.KindName, newChecks
}

func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, error) {
ns, name := utils.KeyToNsName(id.Name)
obj, err := c.client.Resource(c.res).Namespace(ns).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.res).Namespace(id.Namespace).Get(context.Background(), id.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions domain/identifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package domain
import "strings"

type KindName struct {
Kind *Kind
Name string
Kind *Kind
Name string
Namespace string
}

func (c KindName) String() string {
Expand Down
91 changes: 49 additions & 42 deletions messaging/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ const (
)

type DeleteObjectMessage struct {
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}

type GetObjectMessage struct {
Expand All @@ -33,57 +34,63 @@ type GetObjectMessage struct {
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}

type NewChecksumMessage struct {
Checksum string `json:"checksum"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Checksum string `json:"checksum"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}

type NewObjectMessage struct {
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Object []byte `json:"patch"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Object []byte `json:"patch"`
}

type PatchObjectMessage struct {
Checksum string `json:"checksum"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Patch []byte `json:"patch"`
Checksum string `json:"checksum"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Patch []byte `json:"patch"`
}

type PutObjectMessage struct {
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Object []byte `json:"patch"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Object []byte `json:"patch"`
}

type VerifyObjectMessage struct {
Checksum string `json:"checksum"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Checksum string `json:"checksum"`
Cluster string `json:"cluster"`
Account string `json:"account"`
Depth int `json:"depth"`
Kind string `json:"kind"`
MsgId string `json:"msgId"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}

type ServerConnectedMessage struct {
Expand Down
10 changes: 0 additions & 10 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"path/filepath"
"reflect"
"strconv"
"strings"
"time"

"github.com/SergJa/jsonhash"
Expand Down Expand Up @@ -55,15 +54,6 @@ func ContextFromIdentifiers(parent context.Context, id domain.ClientIdentifier)
})
}

func KeyToNsName(key string) (string, string) {
split := strings.Split(key, "/")
return split[0], split[1]
}

func NsNameToKey(ns, name string) string {
return strings.Join([]string{ns, name}, "/")
}

//goland:noinspection GoUnusedExportedFunction
func CompareJson(a, b []byte) bool {
var aData interface{}
Expand Down

0 comments on commit 79a6a47

Please sign in to comment.