Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Server metrics #34

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 20 additions & 53 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (a *App) metricsReporterInterceptor(
handler grpc.UnaryHandler,
) (interface{}, error) {
events := []*pb.Event{}
retry := "0"
payloadSize := 0
switch t := req.(type) {
case *pb.Event:
Expand All @@ -187,70 +186,38 @@ func (a *App) metricsReporterInterceptor(
case *pb.SendEventsRequest:
request := req.(*pb.SendEventsRequest)
events = append(events, request.Events...)
retry = fmt.Sprintf("%d", request.Retry)
payloadSize = proto.Size(request)
default:
a.log.WithField("route", info.FullMethod).Infof("Unexpected request type %T", t)
}

topic := events[0].Topic
l := a.log.
WithField("route", info.FullMethod).
WithField("topic", topic)

metrics.APIPayloadSize.WithLabelValues(
info.FullMethod,
topic,
).Observe(float64(payloadSize))
topic).Observe(float64(payloadSize))
metrics.APIIncomingEvents.WithLabelValues(
topic).Add(float64(len(events)))

defer func(startTime time.Time) {
elapsedTime := float64(time.Since(startTime).Nanoseconds() / (1000 * 1000))
for _, e := range events {
metrics.APIResponseTime.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
).Observe(elapsedTime)
}
l.WithField("elapsedTime", elapsedTime).Debug("request processed")
}(time.Now())

reportedFailures := false
startTime := time.Now()
res, err := handler(ctx, req)
responseStatus := "OK"
if err != nil {
l.WithError(err).Error("error processing request")
for _, e := range events {
metrics.APIRequestsFailureCounter.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
"error processing request",
).Inc()
}
reportedFailures = true
return res, err
}
failureIndexes := []int64{}
if _, ok := res.(*pb.SendEventsResponse); ok {
failureIndexes = res.(*pb.SendEventsResponse).FailureIndexes
}
fC := 0
for i, e := range events {
if !reportedFailures && len(failureIndexes) > fC && int64(i) == failureIndexes[fC] {
metrics.APIRequestsFailureCounter.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
"couldn't produce event",
).Inc()
fC++
}
metrics.APIRequestsSuccessCounter.WithLabelValues(
responseStatus = "ERROR"
metrics.APIResponseTime.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
).Inc()
responseStatus,
topic,
).Observe(float64(time.Since(startTime).Milliseconds()))
a.log.
WithField("route", info.FullMethod).
WithField("topic", topic).
WithError(err).Error("error processing request")
return res, err
}
metrics.APIResponseTime.WithLabelValues(
info.FullMethod,
responseStatus,
topic,
).Observe(float64(time.Since(startTime).Milliseconds()))
return res, nil
}

Expand Down
9 changes: 4 additions & 5 deletions server/app/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/viper"

"strings"
"testing"

"github.com/topfreegames/eventsgateway/v4/server/logger"
"github.com/topfreegames/eventsgateway/v4/server/metrics"
"github.com/topfreegames/eventsgateway/v4/server/mocks"
mockpb "github.com/topfreegames/protos/eventsgateway/grpc/mock"
"strings"
"testing"
)

// GetDefaultConfig returns the configuration at ./config/test.yaml
Expand Down Expand Up @@ -77,7 +76,7 @@ var (
var _ = BeforeEach(func() {
log = &logger.NullLogger{}
config, _ = GetDefaultConfig()

metrics.StartServer(config)
mockCtrl = gomock.NewController(GinkgoT())
mockGRPCServer = mockpb.NewMockGRPCForwarderServer(mockCtrl)
mockForwarder = mocks.NewMockForwarder(mockCtrl)
Expand Down
77 changes: 39 additions & 38 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,58 +35,40 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
// LabelRoute is the GRPC route the request is reaching
LabelRoute = "route"
// LabelTopic is the Kafka topic the event refers to
LabelTopic = "topic"
// LabelStatus is the status of the request. OK if success or ERROR if fail
LabelStatus = "status"
)

var (
// APIResponseTime summary, observes the API response time as perceived by the server
APIResponseTime *prometheus.HistogramVec

// APIPayloadSize summary, observes the payload size of requests arriving at the server
APIPayloadSize *prometheus.HistogramVec

// APIRequestsSuccessCounter counter
APIRequestsSuccessCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "requests_success_counter",
Help: "A counter of succeeded api requests",
},
[]string{"route", "topic", "retry"},
)
// KafkaRequestLatency summary, observes that kafka request latency per topic and status
KafkaRequestLatency *prometheus.HistogramVec

// APIRequestsFailureCounter counter
APIRequestsFailureCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "requests_failure_counter",
Help: "A counter of failed api requests",
},
[]string{"route", "topic", "retry", "reason"},
)

APITopicsSubmission = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "topics_submission_total",
Help: "Topic submissions sent to kafka",
},
[]string{"topic", "success"},
)
// APIIncomingEvents count of all events the API is receiving (unpacking the array of input events)
APIIncomingEvents *prometheus.CounterVec
)

func defaultLatencyBuckets(config *viper.Viper) []float64 {
// in milliseconds
const configKey = "prometheus.buckets.latency"
config.SetDefault(configKey, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000})

config.SetDefault(configKey, []float64{10, 30, 50, 100, 500})
return config.Get(configKey).([]float64)
}

func defaultPayloadSizeBuckets(config *viper.Viper) []float64 {
// in bytes
configKey := "prometheus.buckets.payloadSize"
config.SetDefault(configKey, []float64{100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000})

config.SetDefault(configKey, []float64{10000, 50000, 100000, 500000, 1000000, 5000000})
return config.Get(configKey).([]float64)
}

Expand Down Expand Up @@ -119,7 +101,7 @@ func StartServer(config *viper.Viper) {
Help: "payload size of API routes, in bytes",
Buckets: defaultPayloadSizeBuckets(config),
},
[]string{"route", "topic"},
[]string{LabelTopic},
)

APIResponseTime = prometheus.NewHistogramVec(
Expand All @@ -130,15 +112,34 @@ func StartServer(config *viper.Viper) {
Help: "the response time in ms of api routes",
Buckets: defaultLatencyBuckets(config),
},
[]string{"route", "topic", "retry"},
[]string{LabelRoute, LabelStatus, LabelTopic},
)

APIIncomingEvents = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "incoming_events",
Help: "A counter of succeeded api requests",
},
[]string{LabelTopic},
)
KafkaRequestLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "kafka",
Name: "response_time_ms",
Help: "the response time in ms of Kafka",
Buckets: defaultLatencyBuckets(config),
},
[]string{LabelStatus, LabelTopic},
)

collectors := []prometheus.Collector{
APIResponseTime,
APIPayloadSize,
APIRequestsFailureCounter,
APIRequestsSuccessCounter,
APITopicsSubmission,
APIIncomingEvents,
KafkaRequestLatency,
}

err := RegisterMetrics(collectors)
Expand Down
12 changes: 7 additions & 5 deletions server/sender/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (k *KafkaSender) SendEvent(
ctx context.Context,
event *pb.Event,
) error {
startTime := time.Now()

l := k.logger.WithFields(map[string]interface{}{
"topic": event.GetTopic(),
"event": event,
Expand Down Expand Up @@ -103,16 +105,16 @@ func (k *KafkaSender) SendEvent(
}

topic := event.GetTopic()

partition, offset, err := k.producer.Produce(ctx, topic, buf.Bytes())

kafkaStatus := "OK"
if err != nil {
l.WithError(err).
Error("error producing event to kafka")
metrics.APITopicsSubmission.WithLabelValues(topic, "false").Inc()
kafkaStatus = "ERROR"
l.WithError(err).Error("error producing event to kafka")
metrics.KafkaRequestLatency.WithLabelValues(kafkaStatus, topic).Observe(float64(time.Since(startTime).Milliseconds()))
return err
}
metrics.APITopicsSubmission.WithLabelValues(topic, "true").Inc()
metrics.KafkaRequestLatency.WithLabelValues(kafkaStatus, topic).Observe(float64(time.Since(startTime).Milliseconds()))
l.WithFields(map[string]interface{}{
"partition": partition,
"offset": offset,
Expand Down