Skip to content

Commit

Permalink
Measure time spent on validating messages tagged by result
Browse files Browse the repository at this point in the history
Measure the time spent on validating messages and tag the metric by
validation result.
  • Loading branch information
masih committed Jul 30, 2024
1 parent fc03ec1 commit d6b21b9
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 22 deletions.
4 changes: 2 additions & 2 deletions f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (m *F3) Start(startCtx context.Context) (_err error) {
for m.runningCtx.Err() == nil {
select {
case update := <-m.manifestProvider.ManifestUpdates():
metrics.manifests_received.Add(m.runningCtx, 1)
metrics.manifestsReceived.Add(m.runningCtx, 1)
if pendingManifest != nil && !manifestChangeTimer.Stop() {
<-manifestChangeTimer.C
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func (m *F3) reconfigure(ctx context.Context, manif *manifest.Manifest) (_err er
log.Info("starting f3 reconfiguration")
m.mu.Lock()
defer m.mu.Unlock()
metrics.manifests_received.Add(m.runningCtx, 1)
metrics.manifestsReceived.Add(m.runningCtx, 1)

if err := m.stopInternal(ctx); err != nil {
// Log but don't abort.
Expand Down
29 changes: 15 additions & 14 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,33 +268,34 @@ func (h *gpbftRunner) BroadcastMessage(msg *gpbft.GMessage) error {

var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage

func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, pID peer.ID,
msg *pubsub.Message) pubsub.ValidationResult {
func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg *pubsub.Message) (_result pubsub.ValidationResult) {
start := time.Now()
defer func() {
recordValidationTime(ctx, start, _result)
}()

var gmsg gpbft.GMessage
err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data))
if err != nil {
if err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)); err != nil {
return pubsub.ValidationReject
}

validatedMessage, err := h.participant.ValidateMessage(&gmsg)
if errors.Is(err, gpbft.ErrValidationInvalid) {
switch validatedMessage, err := h.participant.ValidateMessage(&gmsg); {
case errors.Is(err, gpbft.ErrValidationInvalid):
log.Debugf("validation error during validation: %+v", err)
return pubsub.ValidationReject
}
if errors.Is(err, gpbft.ErrValidationTooOld) {
case errors.Is(err, gpbft.ErrValidationTooOld):
// we got the message too late
return pubsub.ValidationIgnore
}
if errors.Is(err, gpbft.ErrValidationNoCommittee) {
case errors.Is(err, gpbft.ErrValidationNoCommittee):
log.Debugf("commitee error during validation: %+v", err)
return pubsub.ValidationIgnore
}
if err != nil {
case err != nil:

Check warning on line 292 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L292

Added line #L292 was not covered by tests
log.Infof("unknown error during validation: %+v", err)
return pubsub.ValidationIgnore
default:
msg.ValidatorData = validatedMessage
return pubsub.ValidationAccept
}
msg.ValidatorData = validatedMessage
return pubsub.ValidationAccept
}

func (h *gpbftRunner) setupPubsub() error {
Expand Down
40 changes: 34 additions & 6 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,47 @@
package f3

import (
"context"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("f3")
var metrics = struct {
headDiverged metric.Int64Counter
reconfigured metric.Int64Counter
manifests_received metric.Int64Counter
headDiverged metric.Int64Counter
reconfigured metric.Int64Counter
manifestsReceived metric.Int64Counter
validationTime metric.Int64Histogram
}{
headDiverged: must(meter.Int64Counter("f3_head_diverged", metric.WithDescription("Number of times we encountered the head has diverged from base scenario."))),
reconfigured: must(meter.Int64Counter("f3_reconfigured", metric.WithDescription("Number of times we reconfigured due to new manifest being delivered."))),
manifests_received: must(meter.Int64Counter("f3_manifests_received", metric.WithDescription("Number of manifests we have received"))),
headDiverged: must(meter.Int64Counter("f3_head_diverged", metric.WithDescription("Number of times we encountered the head has diverged from base scenario."))),
reconfigured: must(meter.Int64Counter("f3_reconfigured", metric.WithDescription("Number of times we reconfigured due to new manifest being delivered."))),
manifestsReceived: must(meter.Int64Counter("f3_manifests_received", metric.WithDescription("Number of manifests we have received"))),
validationTime: must(meter.Int64Histogram("f3_validation_time_ms",
metric.WithDescription("Histogram of time spent validating broadcasted in milliseconds"),
metric.WithExplicitBucketBoundaries(5.0, 10.0, 20.0, 30.0, 40.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 1000.0),
)),
}

func recordValidationTime(ctx context.Context, start time.Time, result pubsub.ValidationResult) {
var v string
switch result {
case pubsub.ValidationAccept:
v = "accepted"
case pubsub.ValidationReject:
v = "rejected"
case pubsub.ValidationIgnore:
v = "ignored"
default:
v = "unknown"

Check warning on line 39 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L38-L39

Added lines #L38 - L39 were not covered by tests
}
metrics.validationTime.Record(
ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(attribute.KeyValue{Key: "result", Value: attribute.StringValue(v)}))
}

func must[V any](v V, err error) V {
Expand Down

0 comments on commit d6b21b9

Please sign in to comment.