diff --git a/pbsmetrics/config/metrics.go b/pbsmetrics/config/metrics.go index 6a36f9e71c0..903b17bfe6c 100644 --- a/pbsmetrics/config/metrics.go +++ b/pbsmetrics/config/metrics.go @@ -104,6 +104,20 @@ func (me *MultiMetricsEngine) RecordRequestTime(labels pbsmetrics.Labels, length } } +// RecordStoredDataFetchTime across all engines +func (me *MultiMetricsEngine) RecordStoredDataFetchTime(labels pbsmetrics.StoredDataLabels, length time.Duration) { + for _, thisME := range *me { + thisME.RecordStoredDataFetchTime(labels, length) + } +} + +// RecordStoredDataError across all engines +func (me *MultiMetricsEngine) RecordStoredDataError(labels pbsmetrics.StoredDataLabels) { + for _, thisME := range *me { + thisME.RecordStoredDataError(labels) + } +} + // RecordAdapterPanic across all engines func (me *MultiMetricsEngine) RecordAdapterPanic(labels pbsmetrics.AdapterLabels) { for _, thisME := range *me { @@ -244,6 +258,14 @@ func (me *DummyMetricsEngine) RecordLegacyImps(labels pbsmetrics.Labels, numImps func (me *DummyMetricsEngine) RecordRequestTime(labels pbsmetrics.Labels, length time.Duration) { } +// RecordStoredDataFetchTime as a noop +func (me *DummyMetricsEngine) RecordStoredDataFetchTime(labels pbsmetrics.StoredDataLabels, length time.Duration) { +} + +// RecordStoredDataError as a noop +func (me *DummyMetricsEngine) RecordStoredDataError(labels pbsmetrics.StoredDataLabels) { +} + // RecordAdapterPanic as a noop func (me *DummyMetricsEngine) RecordAdapterPanic(labels pbsmetrics.AdapterLabels) { } diff --git a/pbsmetrics/go_metrics.go b/pbsmetrics/go_metrics.go index 26f6ce07b29..3842bfd246a 100644 --- a/pbsmetrics/go_metrics.go +++ b/pbsmetrics/go_metrics.go @@ -27,6 +27,8 @@ type Metrics struct { RequestsQueueTimer map[RequestType]map[bool]metrics.Timer PrebidCacheRequestTimerSuccess metrics.Timer PrebidCacheRequestTimerError metrics.Timer + StoredDataFetchTimer map[StoredDataType]map[StoredDataFetchType]metrics.Timer + StoredDataErrorMeter map[StoredDataType]map[StoredDataError]metrics.Meter StoredReqCacheMeter map[CacheResult]metrics.Meter StoredImpCacheMeter map[CacheResult]metrics.Meter DNSLookupTimer metrics.Timer @@ -131,6 +133,8 @@ func NewBlankMetrics(registry metrics.Registry, exchanges []openrtb_ext.BidderNa RequestsQueueTimer: make(map[RequestType]map[bool]metrics.Timer), PrebidCacheRequestTimerSuccess: blankTimer, PrebidCacheRequestTimerError: blankTimer, + StoredDataFetchTimer: make(map[StoredDataType]map[StoredDataFetchType]metrics.Timer), + StoredDataErrorMeter: make(map[StoredDataType]map[StoredDataError]metrics.Meter), StoredReqCacheMeter: make(map[CacheResult]metrics.Meter), StoredImpCacheMeter: make(map[CacheResult]metrics.Meter), AmpNoCookieMeter: blankMeter, @@ -183,6 +187,17 @@ func NewBlankMetrics(registry metrics.Registry, exchanges []openrtb_ext.BidderNa newMetrics.PrivacyTCFRequestVersion[v] = blankMeter } + for _, dt := range StoredDataTypes() { + newMetrics.StoredDataFetchTimer[dt] = make(map[StoredDataFetchType]metrics.Timer) + newMetrics.StoredDataErrorMeter[dt] = make(map[StoredDataError]metrics.Meter) + for _, ft := range StoredDataFetchTypes() { + newMetrics.StoredDataFetchTimer[dt][ft] = blankTimer + } + for _, e := range StoredDataErrors() { + newMetrics.StoredDataErrorMeter[dt][e] = blankMeter + } + } + //to minimize memory usage, queuedTimeout metric is now supported for video endpoint only //boolean value represents 2 general request statuses: accepted and rejected newMetrics.RequestsQueueTimer["video"] = make(map[bool]metrics.Timer) @@ -218,6 +233,17 @@ func NewMetrics(registry metrics.Registry, exchanges []openrtb_ext.BidderName, d newMetrics.PrebidCacheRequestTimerSuccess = metrics.GetOrRegisterTimer("prebid_cache_request_time.ok", registry) newMetrics.PrebidCacheRequestTimerError = metrics.GetOrRegisterTimer("prebid_cache_request_time.err", registry) + for _, dt := range StoredDataTypes() { + for _, ft := range StoredDataFetchTypes() { + timerName := fmt.Sprintf("stored_%s_fetch_time.%s", string(dt), string(ft)) + newMetrics.StoredDataFetchTimer[dt][ft] = metrics.GetOrRegisterTimer(timerName, registry) + } + for _, e := range StoredDataErrors() { + meterName := fmt.Sprintf("stored_%s_error.%s", string(dt), string(e)) + newMetrics.StoredDataErrorMeter[dt][e] = metrics.GetOrRegisterMeter(meterName, registry) + } + } + newMetrics.AmpNoCookieMeter = metrics.GetOrRegisterMeter("amp_no_cookie_requests", registry) newMetrics.CookieSyncMeter = metrics.GetOrRegisterMeter("cookie_sync_requests", registry) newMetrics.userSyncBadRequest = metrics.GetOrRegisterMeter("usersync.bad_requests", registry) @@ -444,6 +470,16 @@ func (me *Metrics) RecordRequestTime(labels Labels, length time.Duration) { } } +// RecordStoredDataFetchTime implements a part of the MetricsEngine interface +func (me *Metrics) RecordStoredDataFetchTime(labels StoredDataLabels, length time.Duration) { + me.StoredDataFetchTimer[labels.DataType][labels.DataFetchType].Update(length) +} + +// RecordStoredDataError implements a part of the MetricsEngine interface +func (me *Metrics) RecordStoredDataError(labels StoredDataLabels) { + me.StoredDataErrorMeter[labels.DataType][labels.Error].Mark(1) +} + // RecordAdapterPanic implements a part of the MetricsEngine interface func (me *Metrics) RecordAdapterPanic(labels AdapterLabels) { am, ok := me.AdapterMetrics[labels.Adapter] diff --git a/pbsmetrics/go_metrics_test.go b/pbsmetrics/go_metrics_test.go index f676991649d..0a763750d68 100644 --- a/pbsmetrics/go_metrics_test.go +++ b/pbsmetrics/go_metrics_test.go @@ -345,6 +345,151 @@ func TestRecordPrebidCacheRequestTimeWithNotSuccess(t *testing.T) { assert.Equal(t, m.PrebidCacheRequestTimerError.Count(), int64(1)) } +func TestRecordStoredDataFetchTime(t *testing.T) { + tests := []struct { + description string + dataType StoredDataType + fetchType StoredDataFetchType + }{ + { + description: "Update stored_account_fetch_time.all timer", + dataType: AccountDataType, + fetchType: FetchAll, + }, + { + description: "Update stored_amp_fetch_time.all timer", + dataType: AMPDataType, + fetchType: FetchAll, + }, + { + description: "Update stored_category_fetch_time.all timer", + dataType: CategoryDataType, + fetchType: FetchAll, + }, + { + description: "Update stored_request_fetch_time.all timer", + dataType: RequestDataType, + fetchType: FetchAll, + }, + { + description: "Update stored_video_fetch_time.all timer", + dataType: VideoDataType, + fetchType: FetchAll, + }, + { + description: "Update stored_account_fetch_time.delta timer", + dataType: AccountDataType, + fetchType: FetchDelta, + }, + { + description: "Update stored_amp_fetch_time.delta timer", + dataType: AMPDataType, + fetchType: FetchDelta, + }, + { + description: "Update stored_category_fetch_time.delta timer", + dataType: CategoryDataType, + fetchType: FetchDelta, + }, + { + description: "Update stored_request_fetch_time.delta timer", + dataType: RequestDataType, + fetchType: FetchDelta, + }, + { + description: "Update stored_video_fetch_time.delta timer", + dataType: VideoDataType, + fetchType: FetchDelta, + }, + } + + for _, tt := range tests { + registry := metrics.NewRegistry() + m := NewMetrics(registry, []openrtb_ext.BidderName{openrtb_ext.BidderAppnexus, openrtb_ext.BidderRubicon}, config.DisabledMetrics{AccountAdapterDetails: true}) + m.RecordStoredDataFetchTime(StoredDataLabels{ + DataType: tt.dataType, + DataFetchType: tt.fetchType, + }, time.Duration(500)) + + actualCount := m.StoredDataFetchTimer[tt.dataType][tt.fetchType].Count() + assert.Equal(t, int64(1), actualCount, tt.description) + + actualDuration := m.StoredDataFetchTimer[tt.dataType][tt.fetchType].Sum() + assert.Equal(t, int64(500), actualDuration, tt.description) + } +} + +func TestRecordStoredDataError(t *testing.T) { + tests := []struct { + description string + dataType StoredDataType + errorType StoredDataError + }{ + { + description: "Increment stored_account_error.network meter", + dataType: AccountDataType, + errorType: StoredDataErrorNetwork, + }, + { + description: "Increment stored_amp_error.network meter", + dataType: AMPDataType, + errorType: StoredDataErrorNetwork, + }, + { + description: "Increment stored_category_error.network meter", + dataType: CategoryDataType, + errorType: StoredDataErrorNetwork, + }, + { + description: "Increment stored_request_error.network meter", + dataType: RequestDataType, + errorType: StoredDataErrorNetwork, + }, + { + description: "Increment stored_video_error.network meter", + dataType: VideoDataType, + errorType: StoredDataErrorNetwork, + }, + { + description: "Increment stored_account_error.undefined meter", + dataType: AccountDataType, + errorType: StoredDataErrorUndefined, + }, + { + description: "Increment stored_amp_error.undefined meter", + dataType: AMPDataType, + errorType: StoredDataErrorUndefined, + }, + { + description: "Increment stored_category_error.undefined meter", + dataType: CategoryDataType, + errorType: StoredDataErrorUndefined, + }, + { + description: "Increment stored_request_error.undefined meter", + dataType: RequestDataType, + errorType: StoredDataErrorUndefined, + }, + { + description: "Increment stored_video_error.undefined meter", + dataType: VideoDataType, + errorType: StoredDataErrorUndefined, + }, + } + + for _, tt := range tests { + registry := metrics.NewRegistry() + m := NewMetrics(registry, []openrtb_ext.BidderName{openrtb_ext.BidderAppnexus, openrtb_ext.BidderRubicon}, config.DisabledMetrics{AccountAdapterDetails: true}) + m.RecordStoredDataError(StoredDataLabels{ + DataType: tt.dataType, + Error: tt.errorType, + }) + + actualCount := m.StoredDataErrorMeter[tt.dataType][tt.errorType].Count() + assert.Equal(t, int64(1), actualCount, tt.description) + } +} + func TestRecordRequestPrivacy(t *testing.T) { registry := metrics.NewRegistry() m := NewMetrics(registry, []openrtb_ext.BidderName{openrtb_ext.BidderAppnexus, openrtb_ext.BidderRubicon}, config.DisabledMetrics{AccountAdapterDetails: true}) diff --git a/pbsmetrics/metrics.go b/pbsmetrics/metrics.go index 8133bc739a0..7f9d66b55ad 100644 --- a/pbsmetrics/metrics.go +++ b/pbsmetrics/metrics.go @@ -51,6 +51,60 @@ type PrivacyLabels struct { LMTEnforced bool } +type StoredDataType string + +const ( + AccountDataType StoredDataType = "account" + AMPDataType StoredDataType = "amp" + CategoryDataType StoredDataType = "category" + RequestDataType StoredDataType = "request" + VideoDataType StoredDataType = "video" +) + +func StoredDataTypes() []StoredDataType { + return []StoredDataType{ + AccountDataType, + AMPDataType, + CategoryDataType, + RequestDataType, + VideoDataType, + } +} + +type StoredDataFetchType string + +const ( + FetchAll StoredDataFetchType = "all" + FetchDelta StoredDataFetchType = "delta" +) + +func StoredDataFetchTypes() []StoredDataFetchType { + return []StoredDataFetchType{ + FetchAll, + FetchDelta, + } +} + +type StoredDataLabels struct { + DataType StoredDataType + DataFetchType StoredDataFetchType + Error StoredDataError +} + +type StoredDataError string + +const ( + StoredDataErrorNetwork StoredDataError = "network" + StoredDataErrorUndefined StoredDataError = "undefined" +) + +func StoredDataErrors() []StoredDataError { + return []StoredDataError{ + StoredDataErrorNetwork, + StoredDataErrorUndefined, + } +} + // Label typecasting. Se below the type definitions for possible values // DemandSource : Demand source enumeration @@ -314,6 +368,8 @@ type MetricsEngine interface { RecordUserIDSet(userLabels UserLabels) // Function should verify bidder values RecordStoredReqCacheResult(cacheResult CacheResult, inc int) RecordStoredImpCacheResult(cacheResult CacheResult, inc int) + RecordStoredDataFetchTime(labels StoredDataLabels, length time.Duration) + RecordStoredDataError(labels StoredDataLabels) RecordPrebidCacheRequestTime(success bool, length time.Duration) RecordRequestQueueTime(success bool, requestType RequestType, length time.Duration) RecordTimeoutNotice(sucess bool) diff --git a/pbsmetrics/metrics_mock.go b/pbsmetrics/metrics_mock.go index 42a2d1b4c8f..63b05196be0 100644 --- a/pbsmetrics/metrics_mock.go +++ b/pbsmetrics/metrics_mock.go @@ -42,6 +42,16 @@ func (me *MetricsEngineMock) RecordRequestTime(labels Labels, length time.Durati me.Called(labels, length) } +// RecordStoredDataFetchTime mock +func (me *MetricsEngineMock) RecordStoredDataFetchTime(labels StoredDataLabels, length time.Duration) { + me.Called(labels, length) +} + +// RecordStoredDataError mock +func (me *MetricsEngineMock) RecordStoredDataError(labels StoredDataLabels) { + me.Called(labels) +} + // RecordAdapterPanic mock func (me *MetricsEngineMock) RecordAdapterPanic(labels AdapterLabels) { me.Called(labels) diff --git a/pbsmetrics/prometheus/preload.go b/pbsmetrics/prometheus/preload.go index 4f62a18aae9..52620bc657c 100644 --- a/pbsmetrics/prometheus/preload.go +++ b/pbsmetrics/prometheus/preload.go @@ -7,17 +7,19 @@ import ( func preloadLabelValues(m *Metrics) { var ( - actionValues = actionsAsString() - adapterErrorValues = adapterErrorsAsString() - adapterValues = adaptersAsString() - bidTypeValues = []string{markupDeliveryAdm, markupDeliveryNurl} - boolValues = boolValuesAsString() - cacheResultValues = cacheResultsAsString() - connectionErrorValues = []string{connectionAcceptError, connectionCloseError} - cookieValues = cookieTypesAsString() - requestStatusValues = requestStatusesAsString() - requestTypeValues = requestTypesAsString() - sourceValues = []string{sourceRequest} + actionValues = actionsAsString() + adapterErrorValues = adapterErrorsAsString() + adapterValues = adaptersAsString() + bidTypeValues = []string{markupDeliveryAdm, markupDeliveryNurl} + boolValues = boolValuesAsString() + cacheResultValues = cacheResultsAsString() + connectionErrorValues = []string{connectionAcceptError, connectionCloseError} + cookieValues = cookieTypesAsString() + requestStatusValues = requestStatusesAsString() + requestTypeValues = requestTypesAsString() + storedDataFetchTypeValues = storedDataFetchTypesAsString() + storedDataErrorValues = storedDataErrorsAsString() + sourceValues = []string{sourceRequest} ) preloadLabelValuesForCounter(m.connectionsError, map[string][]string{ @@ -44,6 +46,46 @@ func preloadLabelValues(m *Metrics) { requestTypeLabel: requestTypeValues, }) + preloadLabelValuesForHistogram(m.storedAccountFetchTimer, map[string][]string{ + storedDataFetchTypeLabel: storedDataFetchTypeValues, + }) + + preloadLabelValuesForHistogram(m.storedAMPFetchTimer, map[string][]string{ + storedDataFetchTypeLabel: storedDataFetchTypeValues, + }) + + preloadLabelValuesForHistogram(m.storedCategoryFetchTimer, map[string][]string{ + storedDataFetchTypeLabel: storedDataFetchTypeValues, + }) + + preloadLabelValuesForHistogram(m.storedRequestFetchTimer, map[string][]string{ + storedDataFetchTypeLabel: storedDataFetchTypeValues, + }) + + preloadLabelValuesForHistogram(m.storedVideoFetchTimer, map[string][]string{ + storedDataFetchTypeLabel: storedDataFetchTypeValues, + }) + + preloadLabelValuesForCounter(m.storedAccountErrors, map[string][]string{ + storedDataErrorLabel: storedDataErrorValues, + }) + + preloadLabelValuesForCounter(m.storedAMPErrors, map[string][]string{ + storedDataErrorLabel: storedDataErrorValues, + }) + + preloadLabelValuesForCounter(m.storedCategoryErrors, map[string][]string{ + storedDataErrorLabel: storedDataErrorValues, + }) + + preloadLabelValuesForCounter(m.storedRequestErrors, map[string][]string{ + storedDataErrorLabel: storedDataErrorValues, + }) + + preloadLabelValuesForCounter(m.storedVideoErrors, map[string][]string{ + storedDataErrorLabel: storedDataErrorValues, + }) + preloadLabelValuesForCounter(m.requestsWithoutCookie, map[string][]string{ requestTypeLabel: requestTypeValues, }) diff --git a/pbsmetrics/prometheus/prometheus.go b/pbsmetrics/prometheus/prometheus.go index b42399b2a62..046d0769115 100644 --- a/pbsmetrics/prometheus/prometheus.go +++ b/pbsmetrics/prometheus/prometheus.go @@ -28,6 +28,16 @@ type Metrics struct { requestsWithoutCookie *prometheus.CounterVec storedImpressionsCacheResult *prometheus.CounterVec storedRequestCacheResult *prometheus.CounterVec + storedAccountFetchTimer *prometheus.HistogramVec + storedAccountErrors *prometheus.CounterVec + storedAMPFetchTimer *prometheus.HistogramVec + storedAMPErrors *prometheus.CounterVec + storedCategoryFetchTimer *prometheus.HistogramVec + storedCategoryErrors *prometheus.CounterVec + storedRequestFetchTimer *prometheus.HistogramVec + storedRequestErrors *prometheus.CounterVec + storedVideoFetchTimer *prometheus.HistogramVec + storedVideoErrors *prometheus.CounterVec timeoutNotifications *prometheus.CounterVec dnsLookupTimer prometheus.Histogram privacyCCPA *prometheus.CounterVec @@ -102,6 +112,11 @@ const ( sourceRequest = "request" ) +const ( + storedDataFetchTypeLabel = "stored_data_fetch_type" + storedDataErrorLabel = "stored_data_error" +) + // NewMetrics initializes a new Prometheus metrics instance with preloaded label values. func NewMetrics(cfg config.PrometheusMetrics, disabledMetrics config.DisabledMetrics) *Metrics { standardTimeBuckets := []float64{0.05, 0.1, 0.15, 0.20, 0.25, 0.3, 0.4, 0.5, 0.75, 1} @@ -171,6 +186,61 @@ func NewMetrics(cfg config.PrometheusMetrics, disabledMetrics config.DisabledMet "Count of stored request cache requests attempts by hits or miss.", []string{cacheResultLabel}) + metrics.storedAccountFetchTimer = newHistogramVec(cfg, metrics.Registry, + "stored_account_fetch_time_seconds", + "Seconds to fetch stored accounts labeled by fetch type", + []string{storedDataFetchTypeLabel}, + standardTimeBuckets) + + metrics.storedAccountErrors = newCounter(cfg, metrics.Registry, + "stored_account_errors", + "Count of stored account errors by error type", + []string{storedDataErrorLabel}) + + metrics.storedAMPFetchTimer = newHistogramVec(cfg, metrics.Registry, + "stored_amp_fetch_time_seconds", + "Seconds to fetch stored AMP requests labeled by fetch type", + []string{storedDataFetchTypeLabel}, + standardTimeBuckets) + + metrics.storedAMPErrors = newCounter(cfg, metrics.Registry, + "stored_amp_errors", + "Count of stored AMP errors by error type", + []string{storedDataErrorLabel}) + + metrics.storedCategoryFetchTimer = newHistogramVec(cfg, metrics.Registry, + "stored_category_fetch_time_seconds", + "Seconds to fetch stored categories labeled by fetch type", + []string{storedDataFetchTypeLabel}, + standardTimeBuckets) + + metrics.storedCategoryErrors = newCounter(cfg, metrics.Registry, + "stored_category_errors", + "Count of stored category errors by error type", + []string{storedDataErrorLabel}) + + metrics.storedRequestFetchTimer = newHistogramVec(cfg, metrics.Registry, + "stored_request_fetch_time_seconds", + "Seconds to fetch stored requests labeled by fetch type", + []string{storedDataFetchTypeLabel}, + standardTimeBuckets) + + metrics.storedRequestErrors = newCounter(cfg, metrics.Registry, + "stored_request_errors", + "Count of stored request errors by error type", + []string{storedDataErrorLabel}) + + metrics.storedVideoFetchTimer = newHistogramVec(cfg, metrics.Registry, + "stored_video_fetch_time_seconds", + "Seconds to fetch stored video labeled by fetch type", + []string{storedDataFetchTypeLabel}, + standardTimeBuckets) + + metrics.storedVideoErrors = newCounter(cfg, metrics.Registry, + "stored_video_errors", + "Count of stored video errors by error type", + []string{storedDataErrorLabel}) + metrics.timeoutNotifications = newCounter(cfg, metrics.Registry, "timeout_notification", "Count of timeout notifications triggered, and if they were successfully sent.", @@ -387,6 +457,56 @@ func (m *Metrics) RecordRequestTime(labels pbsmetrics.Labels, length time.Durati } } +func (m *Metrics) RecordStoredDataFetchTime(labels pbsmetrics.StoredDataLabels, length time.Duration) { + switch labels.DataType { + case pbsmetrics.AccountDataType: + m.storedAccountFetchTimer.With(prometheus.Labels{ + storedDataFetchTypeLabel: string(labels.DataFetchType), + }).Observe(length.Seconds()) + case pbsmetrics.AMPDataType: + m.storedAMPFetchTimer.With(prometheus.Labels{ + storedDataFetchTypeLabel: string(labels.DataFetchType), + }).Observe(length.Seconds()) + case pbsmetrics.CategoryDataType: + m.storedCategoryFetchTimer.With(prometheus.Labels{ + storedDataFetchTypeLabel: string(labels.DataFetchType), + }).Observe(length.Seconds()) + case pbsmetrics.RequestDataType: + m.storedRequestFetchTimer.With(prometheus.Labels{ + storedDataFetchTypeLabel: string(labels.DataFetchType), + }).Observe(length.Seconds()) + case pbsmetrics.VideoDataType: + m.storedVideoFetchTimer.With(prometheus.Labels{ + storedDataFetchTypeLabel: string(labels.DataFetchType), + }).Observe(length.Seconds()) + } +} + +func (m *Metrics) RecordStoredDataError(labels pbsmetrics.StoredDataLabels) { + switch labels.DataType { + case pbsmetrics.AccountDataType: + m.storedAccountErrors.With(prometheus.Labels{ + storedDataErrorLabel: string(labels.Error), + }).Inc() + case pbsmetrics.AMPDataType: + m.storedAMPErrors.With(prometheus.Labels{ + storedDataErrorLabel: string(labels.Error), + }).Inc() + case pbsmetrics.CategoryDataType: + m.storedCategoryErrors.With(prometheus.Labels{ + storedDataErrorLabel: string(labels.Error), + }).Inc() + case pbsmetrics.RequestDataType: + m.storedRequestErrors.With(prometheus.Labels{ + storedDataErrorLabel: string(labels.Error), + }).Inc() + case pbsmetrics.VideoDataType: + m.storedVideoErrors.With(prometheus.Labels{ + storedDataErrorLabel: string(labels.Error), + }).Inc() + } +} + func (m *Metrics) RecordAdapterRequest(labels pbsmetrics.AdapterLabels) { m.adapterRequests.With(prometheus.Labels{ adapterLabel: string(labels.Adapter), diff --git a/pbsmetrics/prometheus/prometheus_test.go b/pbsmetrics/prometheus/prometheus_test.go index b6153b16278..46be7005439 100644 --- a/pbsmetrics/prometheus/prometheus_test.go +++ b/pbsmetrics/prometheus/prometheus_test.go @@ -407,6 +407,193 @@ func TestRequestTimeMetric(t *testing.T) { } } +func TestRecordStoredDataFetchTime(t *testing.T) { + tests := []struct { + description string + dataType pbsmetrics.StoredDataType + fetchType pbsmetrics.StoredDataFetchType + }{ + { + description: "Update stored account histogram with all label", + dataType: pbsmetrics.AccountDataType, + fetchType: pbsmetrics.FetchAll, + }, + { + description: "Update stored AMP histogram with all label", + dataType: pbsmetrics.AMPDataType, + fetchType: pbsmetrics.FetchAll, + }, + { + description: "Update stored category histogram with all label", + dataType: pbsmetrics.CategoryDataType, + fetchType: pbsmetrics.FetchAll, + }, + { + description: "Update stored request histogram with all label", + dataType: pbsmetrics.RequestDataType, + fetchType: pbsmetrics.FetchAll, + }, + { + description: "Update stored video histogram with all label", + dataType: pbsmetrics.VideoDataType, + fetchType: pbsmetrics.FetchAll, + }, + { + description: "Update stored account histogram with delta label", + dataType: pbsmetrics.AccountDataType, + fetchType: pbsmetrics.FetchDelta, + }, + { + description: "Update stored AMP histogram with delta label", + dataType: pbsmetrics.AMPDataType, + fetchType: pbsmetrics.FetchDelta, + }, + { + description: "Update stored category histogram with delta label", + dataType: pbsmetrics.CategoryDataType, + fetchType: pbsmetrics.FetchDelta, + }, + { + description: "Update stored request histogram with delta label", + dataType: pbsmetrics.RequestDataType, + fetchType: pbsmetrics.FetchDelta, + }, + { + description: "Update stored video histogram with delta label", + dataType: pbsmetrics.VideoDataType, + fetchType: pbsmetrics.FetchDelta, + }, + } + + for _, tt := range tests { + m := createMetricsForTesting() + + fetchTime := time.Duration(0.5 * float64(time.Second)) + m.RecordStoredDataFetchTime(pbsmetrics.StoredDataLabels{ + DataType: tt.dataType, + DataFetchType: tt.fetchType, + }, fetchTime) + + var metricsTimer *prometheus.HistogramVec + switch tt.dataType { + case pbsmetrics.AccountDataType: + metricsTimer = m.storedAccountFetchTimer + case pbsmetrics.AMPDataType: + metricsTimer = m.storedAMPFetchTimer + case pbsmetrics.CategoryDataType: + metricsTimer = m.storedCategoryFetchTimer + case pbsmetrics.RequestDataType: + metricsTimer = m.storedRequestFetchTimer + case pbsmetrics.VideoDataType: + metricsTimer = m.storedVideoFetchTimer + } + + result := getHistogramFromHistogramVec( + metricsTimer, + storedDataFetchTypeLabel, + string(tt.fetchType)) + assertHistogram(t, tt.description, result, 1, 0.5) + } +} + +func TestRecordStoredDataError(t *testing.T) { + tests := []struct { + description string + dataType pbsmetrics.StoredDataType + errorType pbsmetrics.StoredDataError + metricName string + }{ + { + description: "Update stored_account_errors counter with network label", + dataType: pbsmetrics.AccountDataType, + errorType: pbsmetrics.StoredDataErrorNetwork, + metricName: "stored_account_errors", + }, + { + description: "Update stored_amp_errors counter with network label", + dataType: pbsmetrics.AMPDataType, + errorType: pbsmetrics.StoredDataErrorNetwork, + metricName: "stored_amp_errors", + }, + { + description: "Update stored_category_errors counter with network label", + dataType: pbsmetrics.CategoryDataType, + errorType: pbsmetrics.StoredDataErrorNetwork, + metricName: "stored_category_errors", + }, + { + description: "Update stored_request_errors counter with network label", + dataType: pbsmetrics.RequestDataType, + errorType: pbsmetrics.StoredDataErrorNetwork, + metricName: "stored_request_errors", + }, + { + description: "Update stored_video_errors counter with network label", + dataType: pbsmetrics.VideoDataType, + errorType: pbsmetrics.StoredDataErrorNetwork, + metricName: "stored_video_errors", + }, + { + description: "Update stored_account_errors counter with undefined label", + dataType: pbsmetrics.AccountDataType, + errorType: pbsmetrics.StoredDataErrorUndefined, + metricName: "stored_account_errors", + }, + { + description: "Update stored_amp_errors counter with undefined label", + dataType: pbsmetrics.AMPDataType, + errorType: pbsmetrics.StoredDataErrorUndefined, + metricName: "stored_amp_errors", + }, + { + description: "Update stored_category_errors counter with undefined label", + dataType: pbsmetrics.CategoryDataType, + errorType: pbsmetrics.StoredDataErrorUndefined, + metricName: "stored_category_errors", + }, + { + description: "Update stored_request_errors counter with undefined label", + dataType: pbsmetrics.RequestDataType, + errorType: pbsmetrics.StoredDataErrorUndefined, + metricName: "stored_request_errors", + }, + { + description: "Update stored_video_errors counter with undefined label", + dataType: pbsmetrics.VideoDataType, + errorType: pbsmetrics.StoredDataErrorUndefined, + metricName: "stored_video_errors", + }, + } + + for _, tt := range tests { + m := createMetricsForTesting() + m.RecordStoredDataError(pbsmetrics.StoredDataLabels{ + DataType: tt.dataType, + Error: tt.errorType, + }) + + var metricsCounter *prometheus.CounterVec + switch tt.dataType { + case pbsmetrics.AccountDataType: + metricsCounter = m.storedAccountErrors + case pbsmetrics.AMPDataType: + metricsCounter = m.storedAMPErrors + case pbsmetrics.CategoryDataType: + metricsCounter = m.storedCategoryErrors + case pbsmetrics.RequestDataType: + metricsCounter = m.storedRequestErrors + case pbsmetrics.VideoDataType: + metricsCounter = m.storedVideoErrors + } + + assertCounterVecValue(t, tt.description, tt.metricName, metricsCounter, + 1, + prometheus.Labels{ + storedDataErrorLabel: string(tt.errorType), + }) + } +} + func TestAdapterBidReceivedMetric(t *testing.T) { adapterName := "anyName" performTest := func(m *Metrics, hasAdm bool) { @@ -1240,6 +1427,8 @@ func getHistogramFromHistogramVecByTwoKeys(histogram *prometheus.HistogramVec, l valInd := ind if ind == 1 { valInd = 0 + } else { + valInd = 1 } if m.Label[valInd].GetName() == label2Key && m.Label[valInd].GetValue() == label2Value { result = *m.GetHistogram() diff --git a/pbsmetrics/prometheus/type_conversion.go b/pbsmetrics/prometheus/type_conversion.go index 55a7092ed6d..75efb3770fa 100644 --- a/pbsmetrics/prometheus/type_conversion.go +++ b/pbsmetrics/prometheus/type_conversion.go @@ -77,6 +77,33 @@ func requestTypesAsString() []string { return valuesAsString } +func storedDataTypesAsString() []string { + values := pbsmetrics.StoredDataTypes() + valuesAsString := make([]string, len(values)) + for i, v := range values { + valuesAsString[i] = string(v) + } + return valuesAsString +} + +func storedDataFetchTypesAsString() []string { + values := pbsmetrics.StoredDataFetchTypes() + valuesAsString := make([]string, len(values)) + for i, v := range values { + valuesAsString[i] = string(v) + } + return valuesAsString +} + +func storedDataErrorsAsString() []string { + values := pbsmetrics.StoredDataErrors() + valuesAsString := make([]string, len(values)) + for i, v := range values { + valuesAsString[i] = string(v) + } + return valuesAsString +} + func tcfVersionsAsString() []string { values := pbsmetrics.TCFVersions() valuesAsString := make([]string, len(values)) diff --git a/stored_requests/config/config.go b/stored_requests/config/config.go index d32773a7a1d..1036cf0af96 100644 --- a/stored_requests/config/config.go +++ b/stored_requests/config/config.go @@ -65,7 +65,7 @@ func CreateStoredRequests(cfg *config.StoredRequests, metricsEngine pbsmetrics.M } } - eventProducers := newEventProducers(cfg, client, dbc.db, router) + eventProducers := newEventProducers(cfg, client, dbc.db, metricsEngine, router) fetcher = newFetcher(cfg, client, dbc.db) var shutdown1 func() @@ -190,7 +190,7 @@ func newCache(cfg *config.StoredRequests) stored_requests.Cache { return cache } -func newEventProducers(cfg *config.StoredRequests, client *http.Client, db *sql.DB, router *httprouter.Router) (eventProducers []events.EventProducer) { +func newEventProducers(cfg *config.StoredRequests, client *http.Client, db *sql.DB, metricsEngine pbsmetrics.MetricsEngine, router *httprouter.Router) (eventProducers []events.EventProducer) { if cfg.CacheEvents.Enabled { eventProducers = append(eventProducers, newEventsAPI(router, cfg.CacheEvents.Endpoint)) } @@ -205,6 +205,7 @@ func newEventProducers(cfg *config.StoredRequests, client *http.Client, db *sql. CacheInitTimeout: time.Duration(cfg.Postgres.CacheInitialization.Timeout) * time.Millisecond, CacheUpdateQuery: cfg.Postgres.PollUpdates.Query, CacheUpdateTimeout: time.Duration(cfg.Postgres.PollUpdates.Timeout) * time.Millisecond, + MetricsEngine: metricsEngine, } pgEventProducer := postgresEvents.NewPostgresEventProducer(pgEventCfg) fetchInterval := time.Duration(cfg.Postgres.PollUpdates.RefreshRate) * time.Second diff --git a/stored_requests/config/config_test.go b/stored_requests/config/config_test.go index f225f74bad0..712fef32db4 100644 --- a/stored_requests/config/config_test.go +++ b/stored_requests/config/config_test.go @@ -14,11 +14,13 @@ import ( sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/julienschmidt/httprouter" "github.com/prebid/prebid-server/config" + "github.com/prebid/prebid-server/pbsmetrics" "github.com/prebid/prebid-server/stored_requests" "github.com/prebid/prebid-server/stored_requests/backends/empty_fetcher" "github.com/prebid/prebid-server/stored_requests/backends/http_fetcher" "github.com/prebid/prebid-server/stored_requests/events" httpEvents "github.com/prebid/prebid-server/stored_requests/events/http" + "github.com/stretchr/testify/mock" ) func typedConfig(dataType config.DataType, sr *config.StoredRequests) *config.StoredRequests { @@ -76,7 +78,10 @@ func TestNewHTTPEvents(t *testing.T) { Timeout: 1000, }, } - evProducers := newEventProducers(cfg, server1.Client(), nil, nil) + + metricsMock := &pbsmetrics.MetricsEngineMock{} + + evProducers := newEventProducers(cfg, server1.Client(), nil, metricsMock, nil) assertSliceLength(t, evProducers, 1) assertHttpWithURL(t, evProducers[0], server1.URL) } @@ -114,6 +119,10 @@ func TestNewInMemoryAccountCache(t *testing.T) { } func TestNewPostgresEventProducers(t *testing.T) { + metricsMock := &pbsmetrics.MetricsEngineMock{} + metricsMock.Mock.On("RecordStoredDataFetchTime", mock.Anything, mock.Anything).Return() + metricsMock.Mock.On("RecordStoredDataError", mock.Anything).Return() + cfg := &config.StoredRequests{ Postgres: config.PostgresConfig{ CacheInitialization: config.PostgresCacheInitializer{ @@ -134,10 +143,11 @@ func TestNewPostgresEventProducers(t *testing.T) { } mock.ExpectQuery("^" + regexp.QuoteMeta(cfg.Postgres.CacheInitialization.Query) + "$").WillReturnError(errors.New("Query failed")) - evProducers := newEventProducers(cfg, client, db, nil) + evProducers := newEventProducers(cfg, client, db, metricsMock, nil) assertProducerLength(t, evProducers, 1) assertExpectationsMet(t, mock) + metricsMock.AssertExpectations(t) } func TestNewEventsAPI(t *testing.T) { diff --git a/stored_requests/events/postgres/database.go b/stored_requests/events/postgres/database.go index cec9b5048bd..8e1269c6904 100644 --- a/stored_requests/events/postgres/database.go +++ b/stored_requests/events/postgres/database.go @@ -5,10 +5,12 @@ import ( "context" "database/sql" "encoding/json" + "net" "time" "github.com/golang/glog" "github.com/prebid/prebid-server/config" + "github.com/prebid/prebid-server/pbsmetrics" "github.com/prebid/prebid-server/stored_requests/events" "github.com/prebid/prebid-server/util/timeutil" ) @@ -17,6 +19,14 @@ func bytesNull() []byte { return []byte{'n', 'u', 'l', 'l'} } +var storedDataTypeMetricMap = map[config.DataType]pbsmetrics.StoredDataType{ + config.RequestDataType: pbsmetrics.RequestDataType, + config.CategoryDataType: pbsmetrics.CategoryDataType, + config.VideoDataType: pbsmetrics.VideoDataType, + config.AMPRequestDataType: pbsmetrics.AMPDataType, + config.AccountDataType: pbsmetrics.AccountDataType, +} + type PostgresEventProducerConfig struct { DB *sql.DB RequestType config.DataType @@ -24,6 +34,7 @@ type PostgresEventProducerConfig struct { CacheInitTimeout time.Duration CacheUpdateQuery string CacheUpdateTimeout time.Duration + MetricsEngine pbsmetrics.MetricsEngine } type PostgresEventProducer struct { @@ -51,9 +62,9 @@ func NewPostgresEventProducer(cfg PostgresEventProducerConfig) (eventProducer *P func (e *PostgresEventProducer) Run() error { if e.lastUpdate.IsZero() { return e.fetchAll() - } else { - return e.fetchDelta() } + + return e.fetchDelta() } func (e *PostgresEventProducer) Saves() <-chan events.Save { @@ -64,60 +75,96 @@ func (e *PostgresEventProducer) Invalidations() <-chan events.Invalidation { return e.invalidations } -func (e *PostgresEventProducer) fetchAll() error { - thisTimeInUTC := e.time.Now().UTC() - +func (e *PostgresEventProducer) fetchAll() (fetchErr error) { timeout := e.cfg.CacheInitTimeout * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + startTime := e.time.Now().UTC() rows, err := e.cfg.DB.QueryContext(ctx, e.cfg.CacheInitQuery) + elapsedTime := time.Since(startTime) + e.recordFetchTime(elapsedTime, pbsmetrics.FetchAll) if err != nil { glog.Warningf("Failed to fetch all Stored %s data from the DB: %v", e.cfg.RequestType, err) + if _, ok := err.(net.Error); ok { + e.recordError(pbsmetrics.StoredDataErrorNetwork) + } else { + e.recordError(pbsmetrics.StoredDataErrorUndefined) + } return err } + defer func() { if err := rows.Close(); err != nil { glog.Warningf("Failed to close the Stored %s DB connection: %v", e.cfg.RequestType, err) + e.recordError(pbsmetrics.StoredDataErrorUndefined) + fetchErr = err } }() if err := e.sendEvents(rows); err != nil { glog.Warningf("Failed to load all Stored %s data from the DB: %v", e.cfg.RequestType, err) + e.recordError(pbsmetrics.StoredDataErrorUndefined) return err - } else { - e.lastUpdate = thisTimeInUTC } + + e.lastUpdate = startTime return nil } -func (e *PostgresEventProducer) fetchDelta() error { - thisTimeInUTC := e.time.Now().UTC() - +func (e *PostgresEventProducer) fetchDelta() (fetchErr error) { timeout := e.cfg.CacheUpdateTimeout * time.Millisecond ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + startTime := e.time.Now().UTC() rows, err := e.cfg.DB.QueryContext(ctx, e.cfg.CacheUpdateQuery, e.lastUpdate) + elapsedTime := time.Since(startTime) + e.recordFetchTime(elapsedTime, pbsmetrics.FetchDelta) if err != nil { glog.Warningf("Failed to fetch updated Stored %s data from the DB: %v", e.cfg.RequestType, err) + if _, ok := err.(net.Error); ok { + e.recordError(pbsmetrics.StoredDataErrorNetwork) + } else { + e.recordError(pbsmetrics.StoredDataErrorUndefined) + } return err } + defer func() { if err := rows.Close(); err != nil { glog.Warningf("Failed to close the Stored %s DB connection: %v", e.cfg.RequestType, err) + e.recordError(pbsmetrics.StoredDataErrorUndefined) + fetchErr = err } }() if err := e.sendEvents(rows); err != nil { glog.Warningf("Failed to load updated Stored %s data from the DB: %v", e.cfg.RequestType, err) + e.recordError(pbsmetrics.StoredDataErrorUndefined) return err - } else { - e.lastUpdate = thisTimeInUTC } + + e.lastUpdate = startTime return nil } +func (e *PostgresEventProducer) recordFetchTime(elapsedTime time.Duration, fetchType pbsmetrics.StoredDataFetchType) { + e.cfg.MetricsEngine.RecordStoredDataFetchTime( + pbsmetrics.StoredDataLabels{ + DataType: storedDataTypeMetricMap[e.cfg.RequestType], + DataFetchType: fetchType, + }, elapsedTime) +} + +func (e *PostgresEventProducer) recordError(errorType pbsmetrics.StoredDataError) { + e.cfg.MetricsEngine.RecordStoredDataError( + pbsmetrics.StoredDataLabels{ + DataType: storedDataTypeMetricMap[e.cfg.RequestType], + Error: errorType, + }) +} + // sendEvents reads the rows and sends notifications into the channel for any updates. // If it returns an error, then callers can be certain that no events were sent to the channels. func (e *PostgresEventProducer) sendEvents(rows *sql.Rows) (err error) { diff --git a/stored_requests/events/postgres/database_test.go b/stored_requests/events/postgres/database_test.go index da2e0868035..4471ad41638 100644 --- a/stored_requests/events/postgres/database_test.go +++ b/stored_requests/events/postgres/database_test.go @@ -7,8 +7,11 @@ import ( "testing" "time" + "github.com/prebid/prebid-server/config" + "github.com/prebid/prebid-server/pbsmetrics" "github.com/prebid/prebid-server/stored_requests/events" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" sqlmock "github.com/DATA-DOG/go-sqlmock" ) @@ -88,13 +91,21 @@ func TestFetchAllSuccess(t *testing.T) { } for _, tt := range tests { - db, mock, _ := sqlmock.New() - mock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) + db, dbMock, _ := sqlmock.New() + dbMock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) + + metricsMock := &pbsmetrics.MetricsEngineMock{} + metricsMock.Mock.On("RecordStoredDataFetchTime", pbsmetrics.StoredDataLabels{ + DataType: pbsmetrics.RequestDataType, + DataFetchType: pbsmetrics.FetchAll, + }, mock.Anything).Return() eventProducer := NewPostgresEventProducer(PostgresEventProducerConfig{ DB: db, + RequestType: config.RequestDataType, CacheInitTimeout: 100 * time.Millisecond, CacheInitQuery: fakeQuery, + MetricsEngine: metricsMock, }) eventProducer.time = &FakeTime{time: tt.giveFakeTime} err := eventProducer.Run() @@ -119,74 +130,76 @@ func TestFetchAllSuccess(t *testing.T) { assert.Equal(t, tt.wantSavedImps, saves.Imps, tt.description) assert.Equal(t, tt.wantInvalidatedReqs, invalidations.Requests, tt.description) assert.Equal(t, tt.wantInvalidatedImps, invalidations.Imps, tt.description) + + metricsMock.AssertExpectations(t) } } func TestFetchAllErrors(t *testing.T) { tests := []struct { - description string - giveFakeTime time.Time - giveMockRows *sqlmock.Rows - wantReturnedError bool - wantLastUpdate time.Time - wantSavedReqs map[string]json.RawMessage - wantSavedImps map[string]json.RawMessage - wantInvalidatedReqs []string - wantInvalidatedImps []string + description string + giveFakeTime time.Time + giveTimeoutMS int + giveMockRows *sqlmock.Rows + wantRecordedError pbsmetrics.StoredDataError + wantLastUpdate time.Time }{ + { + description: "fetch all timeout", + giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + giveMockRows: nil, + wantRecordedError: pbsmetrics.StoredDataErrorNetwork, + wantLastUpdate: time.Time{}, + }, { description: "fetch all query error", giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + giveTimeoutMS: 100, giveMockRows: nil, - wantReturnedError: true, + wantRecordedError: pbsmetrics.StoredDataErrorUndefined, wantLastUpdate: time.Time{}, }, { - description: "fetch all row error", - giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + description: "fetch all row error", + giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + giveTimeoutMS: 100, giveMockRows: sqlmock.NewRows([]string{"id", "data", "dataType"}). AddRow("stored-req-id", "true", "request"). RowError(0, errors.New("Some row error.")), - wantReturnedError: true, + wantRecordedError: pbsmetrics.StoredDataErrorUndefined, wantLastUpdate: time.Time{}, }, - { - description: "fetch all close error", - giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), - giveMockRows: sqlmock.NewRows([]string{"id", "data", "dataType"}). - AddRow("req-1", "true", "request"). - AddRow("imp-1", "true", "imp"). - AddRow("req-2", "", "request"). - AddRow("imp-2", "", "imp"). - CloseError(errors.New("Some close error.")), - wantReturnedError: false, - wantLastUpdate: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), - wantSavedReqs: map[string]json.RawMessage{"req-1": json.RawMessage(`true`)}, - wantSavedImps: map[string]json.RawMessage{"imp-1": json.RawMessage(`true`)}, - }, } for _, tt := range tests { - db, mock, _ := sqlmock.New() + db, dbMock, _ := sqlmock.New() if tt.giveMockRows == nil { - mock.ExpectQuery(fakeQueryRegex()).WillReturnError(errors.New("Query failed.")) + dbMock.ExpectQuery(fakeQueryRegex()).WillReturnError(errors.New("Query failed.")) } else { - mock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) + dbMock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) } + metricsMock := &pbsmetrics.MetricsEngineMock{} + metricsMock.Mock.On("RecordStoredDataFetchTime", pbsmetrics.StoredDataLabels{ + DataType: pbsmetrics.RequestDataType, + DataFetchType: pbsmetrics.FetchAll, + }, mock.Anything).Return() + metricsMock.Mock.On("RecordStoredDataError", pbsmetrics.StoredDataLabels{ + DataType: pbsmetrics.RequestDataType, + Error: tt.wantRecordedError, + }).Return() + eventProducer := NewPostgresEventProducer(PostgresEventProducerConfig{ DB: db, - CacheInitTimeout: 100 * time.Millisecond, + RequestType: config.RequestDataType, + CacheInitTimeout: time.Duration(tt.giveTimeoutMS) * time.Millisecond, CacheInitQuery: fakeQuery, + MetricsEngine: metricsMock, }) eventProducer.time = &FakeTime{time: tt.giveFakeTime} err := eventProducer.Run() - if tt.wantReturnedError { - assert.NotNil(t, err, tt.description) - } else { - assert.Nil(t, err, tt.description) - } + assert.NotNil(t, err, tt.description) assert.Equal(t, tt.wantLastUpdate, eventProducer.lastUpdate, tt.description) var saves events.Save @@ -202,10 +215,12 @@ func TestFetchAllErrors(t *testing.T) { case <-time.After(10 * time.Millisecond): } - assert.Equal(t, tt.wantSavedReqs, saves.Requests, tt.description) - assert.Equal(t, tt.wantSavedImps, saves.Imps, tt.description) - assert.Equal(t, tt.wantInvalidatedReqs, invalidations.Requests, tt.description) - assert.Equal(t, tt.wantInvalidatedImps, invalidations.Imps, tt.description) + assert.Nil(t, saves.Requests, tt.description) + assert.Nil(t, saves.Imps, tt.description) + assert.Nil(t, invalidations.Requests, tt.description) + assert.Nil(t, invalidations.Requests, tt.description) + + metricsMock.AssertExpectations(t) } } @@ -289,13 +304,21 @@ func TestFetchDeltaSuccess(t *testing.T) { } for _, tt := range tests { - db, mock, _ := sqlmock.New() - mock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) + db, dbMock, _ := sqlmock.New() + dbMock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) + + metricsMock := &pbsmetrics.MetricsEngineMock{} + metricsMock.Mock.On("RecordStoredDataFetchTime", pbsmetrics.StoredDataLabels{ + DataType: pbsmetrics.RequestDataType, + DataFetchType: pbsmetrics.FetchDelta, + }, mock.Anything).Return() eventProducer := NewPostgresEventProducer(PostgresEventProducerConfig{ DB: db, + RequestType: config.RequestDataType, CacheUpdateTimeout: 100 * time.Millisecond, CacheUpdateQuery: fakeQuery, + MetricsEngine: metricsMock, }) eventProducer.lastUpdate = time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC) eventProducer.time = &FakeTime{time: tt.giveFakeTime} @@ -321,81 +344,81 @@ func TestFetchDeltaSuccess(t *testing.T) { assert.Equal(t, tt.wantSavedImps, saves.Imps, tt.description) assert.Equal(t, tt.wantInvalidatedReqs, invalidations.Requests, tt.description) assert.Equal(t, tt.wantInvalidatedImps, invalidations.Imps, tt.description) + + metricsMock.AssertExpectations(t) } } func TestFetchDeltaErrors(t *testing.T) { tests := []struct { - description string - giveFakeTime time.Time - giveLastUpdate time.Time - giveMockRows *sqlmock.Rows - wantReturnedError bool - wantLastUpdate time.Time - wantSavedReqs map[string]json.RawMessage - wantSavedImps map[string]json.RawMessage - wantInvalidatedReqs []string - wantInvalidatedImps []string + description string + giveFakeTime time.Time + giveTimeoutMS int + giveLastUpdate time.Time + giveMockRows *sqlmock.Rows + wantRecordedError pbsmetrics.StoredDataError + wantLastUpdate time.Time }{ + { + description: "fetch delta timeout", + giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + giveLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), + giveMockRows: nil, + wantRecordedError: pbsmetrics.StoredDataErrorNetwork, + wantLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), + }, { description: "fetch delta query error", giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + giveTimeoutMS: 100, giveLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), giveMockRows: nil, - wantReturnedError: true, + wantRecordedError: pbsmetrics.StoredDataErrorUndefined, wantLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), }, { - description: "fetch all row error", + description: "fetch delta row error", giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), + giveTimeoutMS: 100, giveLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), giveMockRows: sqlmock.NewRows([]string{"id", "data", "dataType"}). AddRow("stored-req-id", "true", "request"). RowError(0, errors.New("Some row error.")), - wantReturnedError: true, + wantRecordedError: pbsmetrics.StoredDataErrorUndefined, wantLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), }, - { - description: "fetch all close error", - giveFakeTime: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), - giveLastUpdate: time.Date(2020, time.June, 30, 6, 0, 0, 0, time.UTC), - giveMockRows: sqlmock.NewRows([]string{"id", "data", "dataType"}). - AddRow("req-1", "true", "request"). - AddRow("imp-1", "true", "imp"). - AddRow("req-2", "", "request"). - AddRow("imp-2", "", "imp"). - CloseError(errors.New("Some close error.")), - wantReturnedError: false, - wantLastUpdate: time.Date(2020, time.July, 1, 12, 30, 0, 0, time.UTC), - wantSavedReqs: map[string]json.RawMessage{"req-1": json.RawMessage(`true`)}, - wantSavedImps: map[string]json.RawMessage{"imp-1": json.RawMessage(`true`)}, - wantInvalidatedReqs: []string{"req-2"}, - wantInvalidatedImps: []string{"imp-2"}, - }, } for _, tt := range tests { - db, mock, _ := sqlmock.New() + db, dbMock, _ := sqlmock.New() if tt.giveMockRows == nil { - mock.ExpectQuery(fakeQueryRegex()).WillReturnError(errors.New("Query failed.")) + dbMock.ExpectQuery(fakeQueryRegex()).WillReturnError(errors.New("Query failed.")) } else { - mock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) + dbMock.ExpectQuery(fakeQueryRegex()).WillReturnRows(tt.giveMockRows) } + metricsMock := &pbsmetrics.MetricsEngineMock{} + metricsMock.Mock.On("RecordStoredDataFetchTime", pbsmetrics.StoredDataLabels{ + DataType: pbsmetrics.RequestDataType, + DataFetchType: pbsmetrics.FetchDelta, + }, mock.Anything).Return() + metricsMock.Mock.On("RecordStoredDataError", pbsmetrics.StoredDataLabels{ + DataType: pbsmetrics.RequestDataType, + Error: tt.wantRecordedError, + }).Return() + eventProducer := NewPostgresEventProducer(PostgresEventProducerConfig{ DB: db, - CacheUpdateTimeout: 100 * time.Millisecond, + RequestType: config.RequestDataType, + CacheUpdateTimeout: time.Duration(tt.giveTimeoutMS) * time.Millisecond, CacheUpdateQuery: fakeQuery, + MetricsEngine: metricsMock, }) eventProducer.lastUpdate = tt.giveLastUpdate eventProducer.time = &FakeTime{time: tt.giveFakeTime} err := eventProducer.Run() - if tt.wantReturnedError { - assert.NotNil(t, err, tt.description) - } else { - assert.Nil(t, err, tt.description) - } + assert.NotNil(t, err, tt.description) assert.Equal(t, tt.wantLastUpdate, eventProducer.lastUpdate, tt.description) var saves events.Save @@ -411,9 +434,11 @@ func TestFetchDeltaErrors(t *testing.T) { case <-time.After(10 * time.Millisecond): } - assert.Equal(t, tt.wantSavedReqs, saves.Requests, tt.description) - assert.Equal(t, tt.wantSavedImps, saves.Imps, tt.description) - assert.Equal(t, tt.wantInvalidatedReqs, invalidations.Requests, tt.description) - assert.Equal(t, tt.wantInvalidatedImps, invalidations.Imps, tt.description) + assert.Nil(t, saves.Requests, tt.description) + assert.Nil(t, saves.Imps, tt.description) + assert.Nil(t, invalidations.Requests, tt.description) + assert.Nil(t, invalidations.Requests, tt.description) + + metricsMock.AssertExpectations(t) } }