From 8fb33250131544d613846c71ca5d68c63abcd916 Mon Sep 17 00:00:00 2001 From: Subbarao Vakati Date: Wed, 11 Sep 2024 15:44:39 -0500 Subject: [PATCH 1/2] fix: Replace counter with gauge cron tracker alert counter resetting on restarts was causing false alerts --- warehouse/router/tracker.go | 4 ++-- warehouse/router/tracker_test.go | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/warehouse/router/tracker.go b/warehouse/router/tracker.go index 629a58ae8e..bc037eba80 100644 --- a/warehouse/router/tracker.go +++ b/warehouse/router/tracker.go @@ -23,13 +23,13 @@ import ( // CronTracker Track the status of the staging file whether it has reached the terminal state or not for every warehouse // we pick the staging file which is oldest within the range NOW() - 2 * syncFrequency and NOW() - 3 * syncFrequency func (r *Router) CronTracker(ctx context.Context) error { - tick := r.statsFactory.NewTaggedStat("warehouse_cron_tracker_tick", stats.CountType, stats.Tags{ + cronTrackerExecTimestamp := r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, stats.Tags{ "module": moduleName, "destType": r.destType, }) for { - tick.Count(1) + cronTrackerExecTimestamp.Gauge(time.Now().Unix()) r.configSubscriberLock.RLock() warehouses := append([]model.Warehouse{}, r.warehouses...) diff --git a/warehouse/router/tracker_test.go b/warehouse/router/tracker_test.go index 716a67d062..5de2942076 100644 --- a/warehouse/router/tracker_test.go +++ b/warehouse/router/tracker_test.go @@ -215,16 +215,17 @@ func TestRouter_CronTracker(t *testing.T) { mockLogger.EXPECT().Infon("context is cancelled, stopped running tracking").Times(1) + executionTime := time.Now().Unix() err = r.CronTracker(ctx) require.NoError(t, err) - m := statsStore.GetByName("warehouse_cron_tracker_tick") + m := statsStore.GetByName("warehouse_cron_tracker_timestamp_seconds") require.Equal(t, len(m), 1) - require.Equal(t, m[0].Name, "warehouse_cron_tracker_tick") + require.Equal(t, m[0].Name, "warehouse_cron_tracker_timestamp_seconds") require.Equal(t, m[0].Tags, stats.Tags{ "module": moduleName, "destType": warehouseutils.POSTGRES, }) - require.GreaterOrEqual(t, m[0].Value, 1.0) + require.GreaterOrEqual(t, m[0].Value, float64(executionTime)) }) } From 5f7d753b520e46f4b96733956a2bdc173c8f0475 Mon Sep 17 00:00:00 2001 From: Subbarao Vakati Date: Thu, 12 Sep 2024 14:09:17 -0500 Subject: [PATCH 2/2] Disregard cron runtime in scheduling the next run Once in a while, cron runtime may be delayed due to various reasons. This commit disregards the cron runtime in scheduling the next run. --- warehouse/router/tracker.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/warehouse/router/tracker.go b/warehouse/router/tracker.go index bc037eba80..d66bc0ffe3 100644 --- a/warehouse/router/tracker.go +++ b/warehouse/router/tracker.go @@ -29,7 +29,8 @@ func (r *Router) CronTracker(ctx context.Context) error { }) for { - cronTrackerExecTimestamp.Gauge(time.Now().Unix()) + execTime := time.Now() + cronTrackerExecTimestamp.Gauge(execTime.Unix()) r.configSubscriberLock.RLock() warehouses := append([]model.Warehouse{}, r.warehouses...) @@ -51,11 +52,12 @@ func (r *Router) CronTracker(ctx context.Context) error { } } + nextExecTime := execTime.Add(r.config.uploadStatusTrackFrequency) select { case <-ctx.Done(): r.logger.Infon("context is cancelled, stopped running tracking") return nil - case <-time.After(r.config.uploadStatusTrackFrequency): + case <-time.After(time.Until(nextExecTime)): } } }