diff --git a/go.mod b/go.mod index 0e8689ac07..33c6dc2120 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/flyteorg/flytestdlib -go 1.13 +go 1.16 require ( cloud.google.com/go v0.75.0 // indirect diff --git a/promutils/workqueue.go b/promutils/workqueue.go index f7bdd055f0..ba128b201a 100644 --- a/promutils/workqueue.go +++ b/promutils/workqueue.go @@ -17,8 +17,6 @@ limitations under the License. package promutils import ( - "fmt" - "k8s.io/client-go/util/workqueue" "github.com/prometheus/client_golang/prometheus" @@ -28,18 +26,23 @@ import ( // prometheus metrics. To use this package, you just have to import it. func init() { - var provider interface{} //nolint - provider = prometheusMetricsProvider{} - if p, casted := provider.(workqueue.MetricsProvider); casted { - workqueue.SetProvider(p) - } else { - // This case happens in future versions of client-go where the interface has added methods - fmt.Println("Warn: No metricsProvider set for the workqueue") - } + provider := prometheusMetricsProvider{} + workqueue.SetProvider(provider) } type prometheusMetricsProvider struct{} +func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { + unfinishedWork := prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: name, + Name: "longest_running_processor_s", + Help: "How many microseconds longest running processor from workqueue" + name + " takes.", + }) + + prometheus.MustRegister(unfinishedWork) + return unfinishedWork +} + func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { unfinishedWork := prometheus.NewGauge(prometheus.GaugeOpts{ Subsystem: name, @@ -80,8 +83,8 @@ func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMet return adds } -func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.SummaryMetric { - latency := prometheus.NewSummary(prometheus.SummaryOpts{ +func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { + latency := prometheus.NewHistogram(prometheus.HistogramOpts{ Subsystem: name, Name: "queue_latency_us", Help: "How long an item stays in workqueue" + name + " before being requested.", @@ -90,8 +93,8 @@ func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.Summary return latency } -func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric { - workDuration := prometheus.NewSummary(prometheus.SummaryOpts{ +func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { + workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{ Subsystem: name, Name: "work_duration_us", Help: "How long processing an item from workqueue" + name + " takes.", diff --git a/promutils/workqueue_test.go b/promutils/workqueue_test.go index 4c5bbcae9e..bacf7a2fa8 100644 --- a/promutils/workqueue_test.go +++ b/promutils/workqueue_test.go @@ -39,4 +39,22 @@ func TestPrometheusMetricsProvider(t *testing.T) { _, ok := c.(prometheus.Summary) assert.True(t, ok) }) + + t.Run("NewLongestRunningProcessorSecondsMetric", func(t *testing.T) { + c := provider.NewLongestRunningProcessorSecondsMetric("x") + _, ok := c.(prometheus.Gauge) + assert.True(t, ok) + }) + + t.Run("NewUnfinishedWorkSecondsMetric", func(t *testing.T) { + c := provider.NewUnfinishedWorkSecondsMetric("x") + _, ok := c.(prometheus.Gauge) + assert.True(t, ok) + }) + + t.Run("NewLongestRunningProcessorMicrosecondsMetric", func(t *testing.T) { + c := provider.NewLongestRunningProcessorMicrosecondsMetric("x") + _, ok := c.(prometheus.Gauge) + assert.True(t, ok) + }) }