diff --git a/.github/workflows/prb.yaml b/.github/workflows/prb.yaml index 5952fa7..05f5a97 100644 --- a/.github/workflows/prb.yaml +++ b/.github/workflows/prb.yaml @@ -7,5 +7,5 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: "1.16" + go-version: "1.21" - run: make check diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b0d8754..6e5e3c1 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -8,6 +8,6 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: "1.16" + go-version: "1.21" - run: make check - run: make publish diff --git a/api.go b/api.go index a3283b6..7d7384b 100644 --- a/api.go +++ b/api.go @@ -7,8 +7,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/unit" ) // Config use to configure a controller. @@ -128,7 +126,7 @@ func (f EventHandlerFunc) Call(ctx context.Context, jobId string) error { // MeteredEventHandler adds metrics any event reconciler func MeteredEventHandler(meter metric.Meter, name string, child EventHandler) (EventHandler, error) { - counter, err := meter.SyncInt64().Counter("kreconciler_stream_event_count") + counter, err := meter.Int64Counter("kreconciler_stream_event_count") if err != nil { return nil, err } @@ -143,7 +141,7 @@ func MeteredEventHandler(meter metric.Meter, name string, child EventHandler) (E attributes = append(attributes, attribute.Bool("error", false)) } - counter.Add(ctx, 1, attributes...) + counter.Add(ctx, 1, metric.WithAttributes(attributes...)) }() err = child.Call(ctx, jobId) return @@ -175,17 +173,17 @@ var NoopStream = EventStreamFunc(func(ctx context.Context, handler EventHandler) // This is used for rerunning the control-loop for all entities periodically. // Having one of these is recommended for any controller. func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn func(ctx context.Context) ([]string, error)) (EventStream, error) { - count, err := obs.Meter.SyncInt64().Counter("kreconciler_stream_resync_item_count", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("Increased by the number of items returned by the listFn"), + count, err := obs.Meter.Int64Counter("kreconciler_stream_resync_item_count", + metric.WithUnit("{count}"), + metric.WithDescription("Increased by the number of items returned by the listFn"), ) if err != nil { return nil, err } - recorder, err := obs.Meter.SyncInt64().Histogram("kreconciler_stream_resync_millis", - instrument.WithUnit(unit.Milliseconds), - instrument.WithDescription("time spent calling the listFn"), + recorder, err := obs.Meter.Int64Histogram("kreconciler_stream_resync_millis", + metric.WithUnit("ms"), + metric.WithDescription("time spent calling the listFn"), ) if err != nil { return nil, err @@ -199,7 +197,7 @@ func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn fun // Queue the objects to be handled. elts, err := listFn(ctx) if err != nil { - recorder.Record(ctx, time.Since(start).Milliseconds(), attribute.String("status", "error")) + recorder.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attribute.String("status", "error"))) obs.Error("Failed resync loop call", "error", err) select { case <-ctx.Done(): @@ -212,7 +210,7 @@ func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn fun } obs.Info("Adding events", "count", len(elts)) count.Add(ctx, int64(len(elts))) - recorder.Record(ctx, time.Since(start).Milliseconds(), attribute.String("status", "success")) + recorder.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attribute.String("status", "success"))) for _, id := range elts { // Listed objects enqueue as present. err = handler.Call(ctx, id) diff --git a/go.mod b/go.mod index f8a70b1..eb1fdfd 100644 --- a/go.mod +++ b/go.mod @@ -1,22 +1,22 @@ module github.com/koyeb/kreconciler -go 1.18 +go 1.21 require ( - github.com/stretchr/testify v1.7.1 - go.opentelemetry.io/otel v1.7.0 - go.opentelemetry.io/otel/metric v0.30.0 - go.opentelemetry.io/otel/sdk v1.7.0 - go.opentelemetry.io/otel/sdk/metric v0.30.0 - go.opentelemetry.io/otel/trace v1.7.0 + github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/metric v1.19.0 + go.opentelemetry.io/otel/sdk v1.19.0 + go.opentelemetry.io/otel/sdk/metric v1.19.0 + go.opentelemetry.io/otel/trace v1.19.0 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect - github.com/go-logr/logr v1.2.3 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.1.0 // indirect - golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect - gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + github.com/stretchr/objx v0.5.0 // indirect + golang.org/x/sys v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f58a6e5..6275425 100644 --- a/go.sum +++ b/go.sum @@ -1,35 +1,37 @@ -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= -github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM= -go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= -go.opentelemetry.io/otel/metric v0.30.0 h1:Hs8eQZ8aQgs0U49diZoaS6Uaxw3+bBE3lcMUKBFIk3c= -go.opentelemetry.io/otel/metric v0.30.0/go.mod h1:/ShZ7+TS4dHzDFmfi1kSXMhMVubNoP0oIaBp70J6UXU= -go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0= -go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU= -go.opentelemetry.io/otel/sdk/metric v0.30.0 h1:XTqQ4y3erR2Oj8xSAOL5ovO5011ch2ELg51z4fVkpME= -go.opentelemetry.io/otel/sdk/metric v0.30.0/go.mod h1:8AKFRi5HyvTR0RRty3paN1aMC9HMT+NzcEhw/BLkLX8= -go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o= -go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= -golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/sdk/metric v1.19.0 h1:EJoTO5qysMsYCa+w4UghwFV/ptQgqSL/8Ni+hx+8i1k= +go.opentelemetry.io/otel/sdk/metric v1.19.0/go.mod h1:XjG0jQyFJrv2PbMvwND7LwCEhsJzCzV5210euduKcKY= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/observability.go b/observability.go index 5344354..8ec77cd 100644 --- a/observability.go +++ b/observability.go @@ -5,7 +5,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/nonrecording" "go.opentelemetry.io/otel/trace" ) @@ -18,7 +17,7 @@ type Observability struct { // DefaultObservability uses noopLogger and otel.GetMeter and otel.GetTracer func DefaultObservability() Observability { - return NewObservability(NoopLogger{}, nonrecording.NewNoopMeterProvider(), otel.GetTracerProvider()) + return NewObservability(NoopLogger{}, otel.GetMeterProvider(), otel.GetTracerProvider()) } // LoggerWithCtx add the tracing context to the logger diff --git a/observability_test.go b/observability_test.go index a631d89..ded4129 100644 --- a/observability_test.go +++ b/observability_test.go @@ -4,7 +4,7 @@ import ( "fmt" "testing" - "go.opentelemetry.io/otel/sdk/metric/metrictest" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" ) @@ -19,7 +19,8 @@ func (o obsTest) SpanRecorder() *tracetest.SpanRecorder { } func (o obsTest) Observability() Observability { - meterProvider, _ := metrictest.NewTestMeterProvider() + reader := metric.NewManualReader() + meterProvider := metric.NewMeterProvider(metric.WithReader(reader)) return Observability{ Logger: o.log, Meter: meterProvider.Meter("test"), diff --git a/worker.go b/worker.go index 5b06a35..072ce11 100644 --- a/worker.go +++ b/worker.go @@ -10,23 +10,19 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/asyncint64" - "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/trace" ) type metrics struct { - queueSizeObserver asyncint64.UpDownCounter - dequeue syncint64.Counter - handleResult syncint64.Counter - delay syncint64.Histogram - handleLatency syncint64.Histogram - enqueue syncint64.Counter - enqueueFull syncint64.Counter - enqueueAlreadyPresent syncint64.Counter - queueTime syncint64.Histogram + queueSizeObserver metric.Int64ObservableUpDownCounter + dequeue metric.Int64Counter + handleResult metric.Int64Counter + delay metric.Int64Histogram + handleLatency metric.Int64Histogram + enqueue metric.Int64Counter + enqueueFull metric.Int64Counter + enqueueAlreadyPresent metric.Int64Counter + queueTime metric.Int64Histogram } type worker struct { @@ -69,62 +65,64 @@ func attrWorkerId(id int) attribute.KeyValue { } func decorateMeter(w *worker, meter metric.Meter) error { - queueSizeObserver, err := meter.AsyncInt64().UpDownCounter("kreconciler_worker_queue_size", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("The number of outstanding items to reconcile"), + queueSizeObserver, err := meter.Int64ObservableUpDownCounter("kreconciler_worker_queue_size", + metric.WithUnit("{call}"), + metric.WithDescription("The number of outstanding items to reconcile"), ) if err != nil { return err } w.metrics.queueSizeObserver = queueSizeObserver - meter.RegisterCallback([]instrument.Asynchronous{w.metrics.queueSizeObserver}, func(ctx context.Context) { - w.metrics.queueSizeObserver.Observe(ctx, int64(w.objectLocks.Size()), attrWorkerId(w.id)) - }) + meter.RegisterCallback( + func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(queueSizeObserver, int64(w.objectLocks.Size()), metric.WithAttributes(attrWorkerId(w.id))) + return nil + }) - enqueue, err := meter.SyncInt64().Counter("kreconciler_enqueue", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("The number of times an item was added to the reconcile queue"), + enqueue, err := meter.Int64Counter("kreconciler_enqueue", + metric.WithUnit("{call}"), + metric.WithDescription("The number of times an item was added to the reconcile queue"), ) if err != nil { return err } w.metrics.enqueue = enqueue - w.metrics.dequeue, err = meter.SyncInt64().Counter("kreconciler_dequeue", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("The number of times an item was removed from the reconcile queue (to be handled)"), + w.metrics.dequeue, err = meter.Int64Counter("kreconciler_dequeue", + metric.WithUnit("{call}"), + metric.WithDescription("The number of times an item was removed from the reconcile queue (to be handled)"), ) if err != nil { return err } - w.metrics.handleResult, err = meter.SyncInt64().Counter("kreconciler_handle_result", - instrument.WithUnit(unit.Dimensionless), - instrument.WithDescription("The outcome of the call to handle"), + w.metrics.handleResult, err = meter.Int64Counter("kreconciler_handle_result", + metric.WithUnit("{call}"), + metric.WithDescription("The outcome of the call to handle"), ) if err != nil { return err } - w.metrics.delay, err = meter.SyncInt64().Histogram("kreconciler_requeue_delay_millis", - instrument.WithUnit(unit.Milliseconds), - instrument.WithDescription("How long we are reenqueing item for"), + w.metrics.delay, err = meter.Int64Histogram("kreconciler_requeue_delay_millis", + metric.WithUnit("ms"), + metric.WithDescription("How long we are reenqueing item for"), ) if err != nil { return err } - w.metrics.handleLatency, err = meter.SyncInt64().Histogram("kreconciler_handle_millis", - instrument.WithUnit(unit.Milliseconds), - instrument.WithDescription("How long we're taking to process an item"), + w.metrics.handleLatency, err = meter.Int64Histogram("kreconciler_handle_millis", + metric.WithUnit("ms"), + metric.WithDescription("How long we're taking to process an item"), ) if err != nil { return err } - w.metrics.queueTime, err = meter.SyncInt64().Histogram("kreconciler_queue_millis", - instrument.WithUnit(unit.Milliseconds), - instrument.WithDescription("How long we spent in the queue"), + w.metrics.queueTime, err = meter.Int64Histogram("kreconciler_queue_millis", + metric.WithUnit("ms"), + metric.WithDescription("How long we spent in the queue"), ) if err != nil { return err @@ -194,18 +192,18 @@ func (w *worker) enqueue(i item) error { l := w.Observability.LoggerWithCtx(i.ctx) switch w.objectLocks.Take(i.id) { case errAlreadyPresent: - w.metrics.enqueue.Add(i.ctx, 1, attrWorkerId(w.id), attribute.String("status", "already_present")) + w.metrics.enqueue.Add(i.ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("status", "already_present"))) parentSpan.SetStatus(codes.Ok, "already_present") parentSpan.End() l.Debug("Item already present in the queue, ignoring enqueue", "object_id", i.id) return nil case errQueueOverflow: - w.metrics.enqueue.Add(i.ctx, 1, attrWorkerId(w.id), attribute.String("status", "queue_full")) + w.metrics.enqueue.Add(i.ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("status", "queue_full"))) parentSpan.SetStatus(codes.Error, "queue_full") parentSpan.End() return errQueueAtCapacityError default: - w.metrics.enqueue.Add(i.ctx, 1, attrWorkerId(w.id), attribute.String("status", "ok")) + w.metrics.enqueue.Add(i.ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("status", "ok"))) parentSpan.AddEvent("enqueue") w.queue <- i return nil @@ -233,7 +231,7 @@ func (w *worker) Run(ctx context.Context) { parentSpan := trace.SpanFromContext(itm.ctx) parentSpan.AddEvent("dequeue") l := w.Observability.LoggerWithCtx(ctx) - w.metrics.dequeue.Add(ctx, 1, attrWorkerId(w.id)) + w.metrics.dequeue.Add(ctx, 1, metric.WithAttributes(attrWorkerId(w.id))) // process the object. res := w.handle(itm) delay := res.RequeueDelayWithDefault(w.delayQueue.resolution) @@ -243,14 +241,14 @@ func (w *worker) Run(ctx context.Context) { parentSpan.SetStatus(codes.Error, "Max try exceeded") parentSpan.End() l.Error("Max retry exceeded, dropping item", "object_id", itm.id) - w.metrics.handleResult.Add(ctx, 1, attrWorkerId(w.id), attribute.String("result", "drop_max_tries")) + w.metrics.handleResult.Add(ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("result", "drop_max_tries"))) } else { if res.Error != nil { - w.metrics.handleResult.Add(ctx, 1, attrWorkerId(w.id), attribute.String("result", "error_requeue")) - w.metrics.delay.Record(ctx, delay.Milliseconds(), attrWorkerId(w.id), attribute.Bool("error", true)) + w.metrics.handleResult.Add(ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("result", "error_requeue"))) + w.metrics.delay.Record(ctx, delay.Milliseconds(), metric.WithAttributes(attrWorkerId(w.id), attribute.Bool("error", true))) } else { - w.metrics.handleResult.Add(ctx, 1, attrWorkerId(w.id), attribute.String("result", "delay_requeue")) - w.metrics.delay.Record(ctx, delay.Milliseconds(), attrWorkerId(w.id), attribute.Bool("error", false)) + w.metrics.handleResult.Add(ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("result", "delay_requeue"))) + w.metrics.delay.Record(ctx, delay.Milliseconds(), metric.WithAttributes(attrWorkerId(w.id), attribute.Bool("error", false))) } parentSpan.AddEvent("enqueue_with_delay", trace.WithAttributes(attribute.Int64("schedule.millis", delay.Milliseconds()), attribute.Int("try_count", itm.tryCount), attribute.Int("max_try", itm.maxTries))) l.Debug("Delay item retry", "object_id", itm.id) @@ -263,7 +261,7 @@ func (w *worker) Run(ctx context.Context) { } } } else { - w.metrics.handleResult.Add(ctx, 1, attrWorkerId(w.id), attribute.String("result", "ok")) + w.metrics.handleResult.Add(ctx, 1, metric.WithAttributes(attrWorkerId(w.id), attribute.String("result", "ok"))) l.Debug("Done", "object_id", itm.id) parentSpan.SetStatus(codes.Ok, "") parentSpan.End() @@ -282,17 +280,17 @@ func (w *worker) handle(i item) Result { l := w.Observability.LoggerWithCtx(i.ctx) l.Debug("Get event for item", "object_id", i.id) start := time.Now() - w.metrics.queueTime.Record(i.ctx, start.Sub(i.lastEnqueueTime).Milliseconds(), attrWorkerId(w.id)) + w.metrics.queueTime.Record(i.ctx, start.Sub(i.lastEnqueueTime).Milliseconds(), metric.WithAttributes(attrWorkerId(w.id))) res := w.handler.Apply(handleCtx, i.id) // Retry if required based on the result. if res.Error != nil { span.RecordError(res.Error) span.SetStatus(codes.Error, "") l.Warn("Failed reconcile loop", "object_id", i.id, "error", res.Error) - w.metrics.handleLatency.Record(i.ctx, time.Since(start).Milliseconds(), attrWorkerId(w.id), attribute.Bool("error", true)) + w.metrics.handleLatency.Record(i.ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrWorkerId(w.id), attribute.Bool("error", true))) } else { span.SetStatus(codes.Ok, "") - w.metrics.handleLatency.Record(i.ctx, time.Since(start).Milliseconds(), attrWorkerId(w.id), attribute.Bool("error", false)) + w.metrics.handleLatency.Record(i.ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrWorkerId(w.id), attribute.Bool("error", false))) } return res }