Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telem)_: track raw message by type on send #6176

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
v1protocol "github.com/status-im/status-go/protocol/v1"
)

type TelemetryService interface {
PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
})
}

// Whisper message properties.
const (
whisperTTL = 15
Expand Down Expand Up @@ -88,6 +95,8 @@ type MessageSender struct {

// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error

telemetryClient TelemetryService
}

func NewMessageSender(
Expand All @@ -113,6 +122,10 @@ func NewMessageSender(
return p, nil
}

func (s *MessageSender) WithTelemetryClient(client TelemetryService) {
s.telemetryClient = client
}

func (s *MessageSender) Stop() {
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
Expand Down Expand Up @@ -432,6 +445,9 @@ func (s *MessageSender) sendCommunity(
zap.String("messageType", "community"),
zap.Any("contentType", rawMessage.MessageType),
zap.Strings("hashes", types.EncodeHexes(hashes)))
if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -550,6 +566,10 @@ func (s *MessageSender) sendPrivate(
s.transport.Track(messageID, hashes, newMessages)
}

if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, rawMessage)
}

return messageID, nil
}

Expand Down Expand Up @@ -578,6 +598,9 @@ func (s *MessageSender) SendPairInstallation(
return nil, errors.Wrap(err, "failed to send a message spec")
}

if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, &rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -808,6 +831,9 @@ func (s *MessageSender) SendPublic(
zap.Any("contentType", rawMessage.MessageType),
zap.String("messageType", "public"),
zap.Strings("hashes", types.EncodeHexes(hashes)))
if s.telemetryClient != nil {
s.sendBandwidthMetric(ctx, &rawMessage)
}
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -1381,3 +1407,13 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error {

return nil
}

func (s *MessageSender) sendBandwidthMetric(ctx context.Context, rawMessage *RawMessage) {
s.telemetryClient.PushRawMessageByType(ctx, struct {
MessageType string
Size uint32
}{
MessageType: rawMessage.MessageType.String(),
Size: uint32(len(rawMessage.Payload)),
})
}
12 changes: 12 additions & 0 deletions protocol/common/message_sender_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"math"
"testing"

Expand Down Expand Up @@ -40,6 +41,14 @@ type MessageSenderSuite struct {
logger *zap.Logger
}

type mockTelemetryService struct{}

func (m *mockTelemetryService) PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
}) {
}

func (s *MessageSenderSuite) SetupTest() {
s.testMessage = protobuf.ChatMessage{
Text: "abc123",
Expand Down Expand Up @@ -95,6 +104,9 @@ func (s *MessageSenderSuite) SetupTest() {
Datasync: true,
},
)

mockTelemetry := &mockTelemetryService{}
s.sender.WithTelemetryClient(mockTelemetry)
s.Require().NoError(err)
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ func NewMessenger(
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
telemetryClient.Start(ctx)

sender.WithTelemetryClient(telemetryClient)
}

messenger = &Messenger{
Expand Down
12 changes: 12 additions & 0 deletions protocol/messenger_builder_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol

import (
"context"
"crypto/ecdsa"

"github.com/google/uuid"
Expand Down Expand Up @@ -52,6 +53,14 @@ func (tmc *testMessengerConfig) complete() error {
return nil
}

type mockTelemetryService struct{}

func (m *mockTelemetryService) PushRawMessageByType(ctx context.Context, msg struct {
Copy link
Contributor

@plopezlpz plopezlpz Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we are recording telemetry when sending other individual messages, if we do then just including message type and size to those would be enought, right?(maybe not since it might be in another part of the code without this info). Can we include pubsub topic and content topic here

MessageType string
Size uint32
}) {
}

func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, error) {
err := config.complete()
if err != nil {
Expand Down Expand Up @@ -96,6 +105,9 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger,
"testVersion",
options...,
)

mockTelemetry := &mockTelemetryService{}
m.sender.WithTelemetryClient(mockTelemetry)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P
}

m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
if m.telemetryClient != nil {
m.telemetryClient.PushRawMessageByType(ctx, struct {
MessageType string
Size uint32
}{
MessageType: "DATASYNC",
Size: uint32(len(marshalledPayload)),
})
}
m.transport.TrackMany(messageIDs, hashes, newMessages)

return nil
Expand Down
29 changes: 28 additions & 1 deletion telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
// Size and type of raw message successfully returned by dispatchMessage
RawMessageByTypeMetric TelemetryType = "RawMessageByType"
)

const MaxRetryCache = 5000
Expand Down Expand Up @@ -151,6 +153,13 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
}

func (c *Client) PushRawMessageByType(ctx context.Context, msg struct {
MessageType string
Size uint32
}) {
c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: msg.MessageType, Size: msg.Size})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -206,6 +215,11 @@ type SentMessageTotal struct {
Size uint32
}

type RawMessageByType struct {
MessageType string
Size uint32
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -287,6 +301,7 @@ func (c *Client) Start(ctx context.Context) {
}
}
}()

go func() {
defer common.LogOnPanic()
sendPeriod := c.sendPeriod
Expand Down Expand Up @@ -317,7 +332,6 @@ func (c *Client) Start(ctx context.Context) {
return
}
}

}()
}

Expand Down Expand Up @@ -408,6 +422,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: SentMessageTotalMetric,
TelemetryData: c.ProcessSentMessageTotal(v),
}
case RawMessageByType:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: RawMessageByTypeMetric,
TelemetryData: c.ProcessRawMessageByType(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -589,6 +609,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso
return c.marshalPostBody(postBody)
}

func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageType"] = rawMessageByType.MessageType
postBody["size"] = rawMessageByType.Size
return c.marshalPostBody(postBody)
}

// Helper function to marshal post body and handle errors
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
body, err := json.Marshal(postBody)
Expand Down
Loading
Loading