Skip to content

Commit

Permalink
Merge pull request #59 from kubescape/msgid
Browse files Browse the repository at this point in the history
fix msgId not unique between messages
  • Loading branch information
David Wertenteil authored Feb 19, 2024
2 parents 6c4a7cc + 47e3562 commit 6e35084
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 96 deletions.
31 changes: 8 additions & 23 deletions adapters/backend/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Client) Start(ctx context.Context) error {
return nil
}

func (c *Client) Stop(ctx context.Context) error {
func (c *Client) Stop(_ context.Context) error {
return nil
}

Expand All @@ -45,17 +45,12 @@ func (c *Client) IsRelated(ctx context.Context, id domain.ClientIdentifier) bool
}

func (c *Client) sendServerConnectedMessage(ctx context.Context) error {
ctx = utils.ContextFromGeneric(ctx, domain.Generic{})

depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
id := utils.ClientIdentifierFromContext(ctx)

msg := messaging.ServerConnectedMessage{
Cluster: id.Cluster,
Account: id.Account,
Depth: depth + 1,
MsgId: msgId,
MsgId: utils.NewMsgId(),
}
logger.L().Debug("sending server connected message to producer",
helpers.String("account", msg.Account),
Expand Down Expand Up @@ -174,8 +169,7 @@ func (c *Client) VerifyObject(ctx context.Context, id domain.KindName, checksum
}

func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName) error {
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
cId := utils.ClientIdentifierFromContext(ctx)

msg := messaging.DeleteObjectMessage{
Expand Down Expand Up @@ -203,8 +197,7 @@ func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName
}

func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, baseObject []byte) error {
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
cId := utils.ClientIdentifierFromContext(ctx)

msg := messaging.GetObjectMessage{
Expand Down Expand Up @@ -234,8 +227,7 @@ func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, b
}

func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
cId := utils.ClientIdentifierFromContext(ctx)

msg := messaging.PatchObjectMessage{
Expand Down Expand Up @@ -267,8 +259,7 @@ func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName,
}

func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, object []byte) error {
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
cId := utils.ClientIdentifierFromContext(ctx)

msg := messaging.PutObjectMessage{
Expand Down Expand Up @@ -298,8 +289,7 @@ func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, o
}

func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName, checksum string) error {
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
cId := utils.ClientIdentifierFromContext(ctx)

msg := messaging.VerifyObjectMessage{
Expand Down Expand Up @@ -329,17 +319,12 @@ func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName
}

func (c *Client) SendReconciliationRequestMessage(ctx context.Context) error {
ctx = utils.ContextFromGeneric(ctx, domain.Generic{})

depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
id := utils.ClientIdentifierFromContext(ctx)

msg := messaging.ReconciliationRequestMessage{
Cluster: id.Cluster,
Account: id.Account,
Depth: depth + 1,
MsgId: msgId,
MsgId: utils.NewMsgId(),
ServerInitiated: true,
}
logger.L().Debug("sending reconciliation request message to producer",
Expand Down
58 changes: 9 additions & 49 deletions adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,17 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
logger.L().Debug("received message from pulsar",
helpers.String("account", msgProperties[messaging.MsgPropAccount]),
helpers.String("cluster", msgProperties[messaging.MsgPropCluster]),
helpers.Interface("event", msgProperties[messaging.MsgPropEvent]),
helpers.String("msgId", msgID))

// store in context
ctx = utils.ContextFromIdentifiers(ctx, clientIdentifier)
// get callbacks
callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return fmt.Errorf("failed to get callbacks: %w", err)
}
// handle message
switch msgProperties[messaging.MsgPropEvent] {
case messaging.MsgPropEventValueReconciliationRequestMessage:
var data messaging.ReconciliationRequestMessage
Expand All @@ -153,15 +162,6 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
Depth: data.Depth,
MsgId: data.MsgId,
})
ctx = utils.ContextFromIdentifiers(ctx, domain.ClientIdentifier{
Account: data.Account,
Cluster: data.Cluster,
})

callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return err
}

// unwrap the reconciliation request and send batches of NewChecksum for each kind
event := domain.EventNewChecksum
Expand Down Expand Up @@ -201,14 +201,6 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
Depth: data.Depth,
MsgId: data.MsgId,
})
ctx = utils.ContextFromIdentifiers(ctx, domain.ClientIdentifier{
Account: data.Account,
Cluster: data.Cluster,
})
callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return err
}
if err := callbacks.GetObject(ctx, domain.KindName{
Kind: domain.KindFromString(ctx, data.Kind),
Name: data.Name,
Expand All @@ -226,14 +218,6 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
Depth: data.Depth,
MsgId: data.MsgId,
})
ctx = utils.ContextFromIdentifiers(ctx, domain.ClientIdentifier{
Account: data.Account,
Cluster: data.Cluster,
})
callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return err
}
if err := callbacks.PatchObject(ctx, domain.KindName{
Kind: domain.KindFromString(ctx, data.Kind),
Name: data.Name,
Expand All @@ -251,14 +235,6 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
Depth: data.Depth,
MsgId: data.MsgId,
})
ctx = utils.ContextFromIdentifiers(ctx, domain.ClientIdentifier{
Account: data.Account,
Cluster: data.Cluster,
})
callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return err
}
if err := callbacks.VerifyObject(ctx, domain.KindName{
Kind: domain.KindFromString(ctx, data.Kind),
Name: data.Name,
Expand All @@ -276,14 +252,6 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
Depth: data.Depth,
MsgId: data.MsgId,
})
ctx = utils.ContextFromIdentifiers(ctx, domain.ClientIdentifier{
Account: data.Account,
Cluster: data.Cluster,
})
callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return err
}
if err := callbacks.PutObject(ctx, domain.KindName{
Kind: domain.KindFromString(ctx, data.Kind),
Name: data.Name,
Expand All @@ -301,14 +269,6 @@ func (c *PulsarMessageReader) handleSingleSynchronizerMessage(ctx context.Contex
Depth: data.Depth,
MsgId: data.MsgId,
})
ctx = utils.ContextFromIdentifiers(ctx, domain.ClientIdentifier{
Account: data.Account,
Cluster: data.Cluster,
})
callbacks, err := adapter.Callbacks(ctx)
if err != nil {
return err
}
if err := callbacks.DeleteObject(ctx, domain.KindName{
Kind: domain.KindFromString(ctx, data.Kind),
Name: data.Name,
Expand Down
1 change: 0 additions & 1 deletion adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func NewClient(client dynamic.Interface, account, cluster string, r config.Resou
var _ adapters.Client = (*Client)(nil)

func (c *Client) Start(ctx context.Context) error {
ctx = utils.ContextFromGeneric(ctx, domain.Generic{})
logger.L().Info("starting incluster client", helpers.String("resource", c.res.Resource))
watchOpts := metav1.ListOptions{}
// for our storage, we need to list all resources and get them one by one
Expand Down
33 changes: 14 additions & 19 deletions core/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,7 @@ func (s *Synchronizer) handleSyncPutObject(ctx context.Context, id domain.KindNa

func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, baseObject []byte) error {
event := domain.EventGetObject
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
msg := domain.GetObject{
BaseObject: string(baseObject),
Depth: depth + 1,
Expand Down Expand Up @@ -540,8 +539,7 @@ func (s *Synchronizer) sendGetObject(ctx context.Context, id domain.KindName, ba

func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName, checksum string) error {
event := domain.EventNewChecksum
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
msg := domain.NewChecksum{
Checksum: checksum,
Depth: depth + 1,
Expand Down Expand Up @@ -576,8 +574,7 @@ func (s *Synchronizer) sendNewChecksum(ctx context.Context, id domain.KindName,

func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName) error {
event := domain.EventObjectDeleted
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
msg := domain.ObjectDeleted{
Depth: depth + 1,
Event: &event,
Expand Down Expand Up @@ -607,8 +604,7 @@ func (s *Synchronizer) sendObjectDeleted(ctx context.Context, id domain.KindName

func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
event := domain.EventPatchObject
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)

msg := domain.PatchObject{
Checksum: checksum,
Expand Down Expand Up @@ -645,14 +641,15 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName,

func (s *Synchronizer) sendPing(ctx context.Context) {
event := domain.EventPing
msg := domain.Generic{
Event: &event,
}
data, err := json.Marshal(msg)
if err != nil {
logger.L().Fatal("marshal ping message", helpers.Error(err))
}
for {
msg := domain.Generic{
Event: &event,
MsgId: utils.NewMsgId(),
}
data, err := json.Marshal(msg)
if err != nil {
logger.L().Fatal("marshal ping message", helpers.Error(err))
}
err = s.outPool.Invoke(data)
if err != nil {
logger.L().Ctx(ctx).Error("invoke outPool on ping message", helpers.Error(err))
Expand All @@ -663,8 +660,7 @@ func (s *Synchronizer) sendPing(ctx context.Context) {

func (s *Synchronizer) sendBatch(ctx context.Context, kind domain.Kind, batchType domain.BatchType, items domain.BatchItems) error {
event := domain.EventBatch
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
msg := domain.Batch{
Depth: depth + 1,
Event: &event,
Expand Down Expand Up @@ -694,8 +690,7 @@ func (s *Synchronizer) sendBatch(ctx context.Context, kind domain.Kind, batchTyp

func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, object []byte) error {
event := domain.EventPutObject
depth := ctx.Value(domain.ContextKeyDepth).(int)
msgId := ctx.Value(domain.ContextKeyMsgId).(string)
depth, msgId := utils.DeptMsgIdFromContext(ctx)
msg := domain.PutObject{
Depth: depth + 1,
Event: &event,
Expand Down
20 changes: 16 additions & 4 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func CanonicalHash(in []byte) (string, error) {
return hex.EncodeToString(hash[:]), nil
}

func ContextFromGeneric(parent context.Context, generic domain.Generic) context.Context {
if generic.MsgId == "" {
generic.MsgId = uuid.NewString()
}
func NewMsgId() string {
return uuid.NewString()
}

func ContextFromGeneric(parent context.Context, generic domain.Generic) context.Context {
ctx := context.WithValue(parent, domain.ContextKeyDepth, generic.Depth)
ctx = context.WithValue(ctx, domain.ContextKeyMsgId, generic.MsgId)
return ctx
Expand All @@ -62,6 +62,18 @@ func ClientIdentifierFromContext(ctx context.Context) domain.ClientIdentifier {
return ctx.Value(domain.ContextKeyClientIdentifier).(domain.ClientIdentifier)
}

func DeptMsgIdFromContext(ctx context.Context) (int, string) {
depth := ctx.Value(domain.ContextKeyDepth)
if depth == nil {
depth = 0
}
msgId := ctx.Value(domain.ContextKeyMsgId)
if msgId == nil {
msgId = NewMsgId()
}
return depth.(int), msgId.(string)
}

//goland:noinspection GoUnusedExportedFunction
func CompareJson(a, b []byte) bool {
var aData interface{}
Expand Down

0 comments on commit 6e35084

Please sign in to comment.