From f8be45ba1287de4db69608c5e7eda92584d4aa69 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 30 Oct 2024 13:03:15 -0400 Subject: [PATCH 1/2] Fix constant_tags accumulation in ThreadStatsWriter.flush to prevent metric loss --- datadog_lambda/thread_stats_writer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/thread_stats_writer.py b/datadog_lambda/thread_stats_writer.py index 367b8b21..422a9a0a 100644 --- a/datadog_lambda/thread_stats_writer.py +++ b/datadog_lambda/thread_stats_writer.py @@ -27,8 +27,11 @@ def flush(self, tags=None): Modified based on `datadog.threadstats.base.ThreadStats.flush()`, to gain better control over exception handling. """ + original_constant_tags = self.thread_stats.constant_tags.copy() if tags: - self.thread_stats.constant_tags = self.thread_stats.constant_tags + tags + # Temporarily add tags for this flush + self.thread_stats.constant_tags = original_constant_tags + tags + _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) count_dists = len(dists) if not count_dists: @@ -62,6 +65,9 @@ def flush(self, tags=None): logger.debug( "Flush #%s failed", self.thread_stats.flush_count, exc_info=True ) + finally: + # Reset constant_tags to its original state + self.thread_stats.constant_tags = original_constant_tags def stop(self): self.thread_stats.stop() From d7c43d5da423337498ad155f8b9c880e0e785280 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 30 Oct 2024 14:44:49 -0400 Subject: [PATCH 2/2] add tests --- tests/test_metric.py | 68 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/tests/test_metric.py b/tests/test_metric.py index 241f563b..345740a4 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -119,13 +119,69 @@ def test_retry_on_remote_disconnected(self): def test_flush_stats_with_tags(self): lambda_stats = ThreadStatsWriter(True) + original_constant_tags = lambda_stats.thread_stats.constant_tags.copy() tags = ["tag1:value1", "tag2:value2"] - lambda_stats.flush(tags) - self.mock_threadstats_flush_distributions.assert_called_once_with( - lambda_stats.thread_stats._get_aggregate_metrics_and_dists(float("inf"))[1] - ) - for tag in tags: - self.assertTrue(tag in lambda_stats.thread_stats.constant_tags) + + # Add a metric to be flushed + lambda_stats.distribution("test.metric", 1, tags=["metric:tag"]) + + with patch.object( + lambda_stats.thread_stats.reporter, "flush_distributions" + ) as mock_flush_distributions: + lambda_stats.flush(tags) + mock_flush_distributions.assert_called_once() + # Verify that after flush, constant_tags is reset to original + self.assertEqual( + lambda_stats.thread_stats.constant_tags, original_constant_tags + ) + + def test_flush_temp_constant_tags(self): + lambda_stats = ThreadStatsWriter(flush_in_thread=True) + lambda_stats.thread_stats.constant_tags = ["initial:tag"] + original_constant_tags = lambda_stats.thread_stats.constant_tags.copy() + + lambda_stats.distribution("test.metric", 1, tags=["metric:tag"]) + flush_tags = ["flush:tag1", "flush:tag2"] + + with patch.object( + lambda_stats.thread_stats.reporter, "flush_distributions" + ) as mock_flush_distributions: + lambda_stats.flush(tags=flush_tags) + mock_flush_distributions.assert_called_once() + flushed_dists = mock_flush_distributions.call_args[0][0] + + # Expected tags: original constant_tags + flush_tags + metric tags + expected_tags = original_constant_tags + flush_tags + ["metric:tag"] + + # Verify the tags on the metric + self.assertEqual(len(flushed_dists), 1) + metric = flushed_dists[0] + self.assertEqual(sorted(metric["tags"]), sorted(expected_tags)) + + # Verify that constant_tags is reset after flush + self.assertEqual( + lambda_stats.thread_stats.constant_tags, original_constant_tags + ) + + # Repeat to ensure tags do not accumulate over multiple flushes + new_flush_tags = ["flush:tag3"] + lambda_stats.distribution("test.metric2", 2, tags=["metric2:tag"]) + + with patch.object( + lambda_stats.thread_stats.reporter, "flush_distributions" + ) as mock_flush_distributions: + lambda_stats.flush(tags=new_flush_tags) + mock_flush_distributions.assert_called_once() + flushed_dists = mock_flush_distributions.call_args[0][0] + # Expected tags for the new metric + expected_tags = original_constant_tags + new_flush_tags + ["metric2:tag"] + + self.assertEqual(len(flushed_dists), 1) + metric = flushed_dists[0] + self.assertEqual(sorted(metric["tags"]), sorted(expected_tags)) + self.assertEqual( + lambda_stats.thread_stats.constant_tags, original_constant_tags + ) def test_flush_stats_without_context(self): flush_stats(lambda_context=None)