diff --git a/samcli/lib/sync/infra_sync_executor.py b/samcli/lib/sync/infra_sync_executor.py index 7b4a6ecde4..257194a70a 100644 --- a/samcli/lib/sync/infra_sync_executor.py +++ b/samcli/lib/sync/infra_sync_executor.py @@ -7,6 +7,7 @@ from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Dict, Optional, Set +from uuid import uuid4 from boto3 import Session from botocore.exceptions import ClientError @@ -174,8 +175,9 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult: days_since_last_infra_sync = (current_time - last_infra_sync_time).days # Will not combine the comparisons in order to save operation cost + thread_id = uuid4() if self._sync_context.skip_deploy_sync and first_sync and (days_since_last_infra_sync <= AUTO_INFRA_SYNC_DAYS): - EventTracker.track_event("SyncFlowStart", "SkipInfraSyncExecute") + EventTracker.track_event("SyncFlowStart", "SkipInfraSyncExecute", thread_id=thread_id) try: if self._auto_skip_infra_sync( self._package_context.output_template_file, @@ -186,7 +188,7 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult: # If higher than the threshold, we perform infra sync to improve performance if len(self.code_sync_resources) < SYNC_FLOW_THRESHOLD: LOG.info("Template haven't been changed since last deployment, skipping infra sync...") - EventTracker.track_event("SyncFlowEnd", "SkipInfraSyncExecute") + EventTracker.track_event("SyncFlowEnd", "SkipInfraSyncExecute", thread_id=thread_id) return InfraSyncResult(False, self.code_sync_resources) else: LOG.info( @@ -199,7 +201,7 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult: "Could not skip infra sync by comparing to a previously deployed template, starting infra sync" ) - EventTracker.track_event("SyncFlowStart", "InfraSyncExecute") + EventTracker.track_event("SyncFlowStart", "InfraSyncExecute", thread_id=thread_id) if days_since_last_infra_sync > AUTO_INFRA_SYNC_DAYS: LOG.info( "Infrastructure Sync hasn't been run in the last %s days, sam sync will be queuing up the stack" @@ -207,7 +209,7 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult: AUTO_INFRA_SYNC_DAYS, ) self._deploy_context.run() - EventTracker.track_event("SyncFlowEnd", "InfraSyncExecute") + EventTracker.track_event("SyncFlowEnd", "InfraSyncExecute", thread_id=thread_id) # Update latest infra sync time in sync state self._sync_context.update_infra_sync_time() diff --git a/samcli/lib/sync/sync_flow_executor.py b/samcli/lib/sync/sync_flow_executor.py index 8812c19943..d6f712cfea 100644 --- a/samcli/lib/sync/sync_flow_executor.py +++ b/samcli/lib/sync/sync_flow_executor.py @@ -6,6 +6,7 @@ from queue import Queue from threading import RLock from typing import Callable, List, Optional, Set +from uuid import uuid4 from botocore.exceptions import ClientError @@ -336,11 +337,12 @@ def _sync_flow_execute_wrapper(sync_flow: SyncFlow) -> SyncFlowResult: dependent_sync_flows = [] sync_types = EventType.get_accepted_values(EventName.SYNC_FLOW_START) sync_type: Optional[str] = type(sync_flow).__name__ + thread_id = uuid4() if sync_type not in sync_types: sync_type = None try: if sync_type: - EventTracker.track_event("SyncFlowStart", sync_type) + EventTracker.track_event("SyncFlowStart", sync_type, thread_id=thread_id) dependent_sync_flows = sync_flow.execute() except ClientError as e: if e.response.get("Error", dict()).get("Code", "") == "ResourceNotFoundException": @@ -350,5 +352,5 @@ def _sync_flow_execute_wrapper(sync_flow: SyncFlow) -> SyncFlowResult: raise SyncFlowException(sync_flow, e) from e finally: if sync_type: - EventTracker.track_event("SyncFlowEnd", sync_type) + EventTracker.track_event("SyncFlowEnd", sync_type, thread_id=thread_id) return SyncFlowResult(sync_flow=sync_flow, dependent_sync_flows=dependent_sync_flows) diff --git a/samcli/lib/telemetry/event.py b/samcli/lib/telemetry/event.py index ef21a00d1e..2d819e37bf 100644 --- a/samcli/lib/telemetry/event.py +++ b/samcli/lib/telemetry/event.py @@ -7,6 +7,7 @@ from datetime import datetime from enum import Enum from typing import List, Optional +from uuid import UUID, uuid4 from samcli.cli.context import Context from samcli.lib.build.workflows import ALL_CONFIGS @@ -83,14 +84,19 @@ class Event: event_name: EventName event_value: str # Validated by EventType.get_accepted_values to never be an arbitrary string - thread_id = threading.get_ident() # The thread ID; used to group Events from the same command run + thread_id: Optional[UUID] # The thread ID; used to group Events from the same command run time_stamp: str exception_name: Optional[str] - def __init__(self, event_name: str, event_value: str, exception_name: Optional[str] = None): + def __init__( + self, event_name: str, event_value: str, thread_id: Optional[UUID] = None, exception_name: Optional[str] = None + ): Event._verify_event(event_name, event_value) self.event_name = EventName(event_name) self.event_value = event_value + if not thread_id: + thread_id = uuid4() + self.thread_id = thread_id self.time_stamp = str(datetime.utcnow())[:-3] # format microseconds from 6 -> 3 figures to allow SQL casting self.exception_name = exception_name @@ -105,7 +111,7 @@ def __repr__(self): return ( f"Event(event_name={self.event_name.value}, " f"event_value={self.event_value}, " - f"thread_id={self.thread_id}, " + f"thread_id={self.thread_id.hex}, " f"time_stamp={self.time_stamp})", f"exception_name={self.exception_name})", ) @@ -114,7 +120,7 @@ def to_json(self): return { "event_name": self.event_name.value, "event_value": self.event_value, - "thread_id": self.thread_id, + "thread_id": self.thread_id.hex, "time_stamp": self.time_stamp, "exception_name": self.exception_name, } @@ -144,7 +150,11 @@ class EventTracker: @staticmethod def track_event( - event_name: str, event_value: str, session_id: Optional[str] = None, exception_name: Optional[str] = None + event_name: str, + event_value: str, + session_id: Optional[str] = None, + thread_id: Optional[UUID] = None, + exception_name: Optional[str] = None, ): """Method to track an event where and when it occurs. @@ -162,6 +172,8 @@ def track_event( passed event_name, or an EventCreationError will be thrown. session_id: Optional[str] The session ID to set to link back to the original command run + thread_id: Optional[UUID] + The thread ID of the Event to track, as a UUID. exception_name: Optional[str] The name of the exception that this event encountered when tracking a feature @@ -182,8 +194,13 @@ def track_event( try: should_send: bool = False + # Validate the thread ID + if not thread_id: # Passed value is not a UUID or None + thread_id = uuid4() with EventTracker._event_lock: - EventTracker._events.append(Event(event_name, event_value, exception_name=exception_name)) + EventTracker._events.append( + Event(event_name, event_value, thread_id=thread_id, exception_name=exception_name) + ) # Get the session ID (needed for multithreading sending) EventTracker._set_session_id() @@ -248,7 +265,13 @@ def _send_events_in_thread(): telemetry.emit(metric) -def track_long_event(start_event_name: str, start_event_value: str, end_event_name: str, end_event_value: str): +def track_long_event( + start_event_name: str, + start_event_value: str, + end_event_name: str, + end_event_value: str, + thread_id: Optional[UUID] = None, +): """Decorator for tracking events that occur at start and end of a function. The decorator tracks two Events total, where the first Event occurs @@ -281,6 +304,8 @@ def track_long_event(start_event_name: str, start_event_value: str, end_event_na decorated function's execution. Must be a valid EventType value for the passed `end_event_name` or the decorator will not run. + thread_id: Optional[UUID] + The thread ID of the Events to track, as a UUID. Examples -------- @@ -297,6 +322,9 @@ def func2(...): # Check that passed values are valid Events Event(start_event_name, start_event_value) Event(end_event_name, end_event_value) + # Validate the thread ID + if not thread_id: # Passed value is not a UUID or None + thread_id = uuid4() except EventCreationError as e: LOG.debug("Error occurred while trying to track an event: %s\nDecorator not run.", e) should_track = False @@ -307,7 +335,7 @@ def decorator_for_events(func): def wrapped(*args, **kwargs): # Track starting event if should_track: - EventTracker.track_event(start_event_name, start_event_value) + EventTracker.track_event(start_event_name, start_event_value, thread_id=thread_id) exception = None # Run the function try: @@ -316,7 +344,7 @@ def wrapped(*args, **kwargs): exception = e # Track ending event if should_track: - EventTracker.track_event(end_event_name, end_event_value) + EventTracker.track_event(end_event_name, end_event_value, thread_id=thread_id) EventTracker.send_events() # Ensure Events are sent at the end of execution if exception: raise exception diff --git a/tests/unit/lib/telemetry/test_event.py b/tests/unit/lib/telemetry/test_event.py index 5738743788..b8dc01fd34 100644 --- a/tests/unit/lib/telemetry/test_event.py +++ b/tests/unit/lib/telemetry/test_event.py @@ -8,6 +8,7 @@ from unittest import TestCase from unittest.mock import ANY, Mock, patch from samcli.cli.context import Context +from uuid import UUID, uuid4 from samcli.lib.telemetry.event import Event, EventCreationError, EventTracker, track_long_event @@ -26,13 +27,32 @@ def test_create_event_exists(self, name_mock, type_mock, verify_mock): name_mock.return_value = Mock(value="TestOne") type_mock.get_accepted_values.return_value = ["value1", "value2"] verify_mock.return_value = None + thread_id = uuid4() + + test_event = Event("TestOne", "value1", thread_id) + + name_mock.assert_called_once() + self.assertEqual(test_event.event_name.value, "TestOne") + self.assertEqual(test_event.event_value, "value1") + self.assertEqual(test_event.thread_id, thread_id) # Should be on the same thread + + @patch("samcli.lib.telemetry.event.uuid4") + @patch("samcli.lib.telemetry.event.Event._verify_event") + @patch("samcli.lib.telemetry.event.EventType") + @patch("samcli.lib.telemetry.event.EventName") + def test_create_event_exists_no_thread_id(self, name_mock, type_mock, verify_mock, uuid_mock): + name_mock.return_value = Mock(value="TestOne") + type_mock.get_accepted_values.return_value = ["value1", "value2"] + verify_mock.return_value = None + thread_id = uuid4() + uuid_mock.return_value = thread_id test_event = Event("TestOne", "value1") name_mock.assert_called_once() self.assertEqual(test_event.event_name.value, "TestOne") self.assertEqual(test_event.event_value, "value1") - self.assertEqual(test_event.thread_id, threading.get_ident()) # Should be on the same thread + self.assertEqual(test_event.thread_id, thread_id) @patch("samcli.lib.telemetry.event.EventType") @patch("samcli.lib.telemetry.event.EventName") @@ -60,15 +80,16 @@ def test_event_to_json(self, name_mock, type_mock, verify_mock): name_mock.return_value = Mock(value="Testing") type_mock.get_accepted_values.return_value = ["value1"] verify_mock.return_value = None + thread_id = uuid4() - test_event = Event("Testing", "value1") + test_event = Event("Testing", "value1", thread_id) self.assertEqual( test_event.to_json(), { "event_name": "Testing", "event_value": "value1", - "thread_id": threading.get_ident(), + "thread_id": thread_id.hex, "time_stamp": ANY, "exception_name": None, }, @@ -86,7 +107,7 @@ def test_track_event(self, event_mock, lock_mock): lock_mock.__exit__ = Mock() # Test that an event can be tracked - dummy_event = Mock(event_name="Test", event_value="SomeValue", thread_id=threading.get_ident(), timestamp=ANY) + dummy_event = Mock(event_name="Test", event_value="SomeValue", thread_id=uuid4(), timestamp=ANY) event_mock.return_value = dummy_event EventTracker.track_event("Test", "SomeValue") @@ -114,6 +135,7 @@ def test_events_get_sent(self, telemetry_mock): dummy_telemetry.emit.return_value = None dummy_telemetry.emit.side_effect = mock_emit telemetry_mock.return_value = dummy_telemetry + thread_id = uuid4() # Verify that no events are sent if tracker is empty # Note we are using the in-line version of the method, as the regular send_events will @@ -124,9 +146,7 @@ def test_events_get_sent(self, telemetry_mock): dummy_telemetry.emit.assert_not_called() # Nothing should have been sent (empty list) # Verify that events get sent when they exist in tracker - dummy_event = Mock( - event_name=Mock(value="Test"), event_value="SomeValue", thread_id=threading.get_ident(), time_stamp=ANY - ) + dummy_event = Mock(event_name=Mock(value="Test"), event_value="SomeValue", thread_id=thread_id, time_stamp=ANY) dummy_event.to_json.return_value = Event.to_json(dummy_event) EventTracker._events.append(dummy_event) @@ -148,7 +168,7 @@ def test_events_get_sent(self, telemetry_mock): { "event_name": "Test", "event_value": "SomeValue", - "thread_id": ANY, + "thread_id": thread_id.hex, "time_stamp": ANY, "exception_name": ANY, } @@ -205,9 +225,11 @@ def test_long_event_is_tracked(self, event_mock, track_mock, send_mock): mock_tracker = {} mock_tracker["tracked_events"]: List[Tuple[str, str]] = [] # Tuple to bypass Event verification mock_tracker["emitted_events"]: List[Tuple[str, str]] = [] + tracked_threads: List[UUID] = [] - def mock_track(name, value): + def mock_track(name, value, thread_id): mock_tracker["tracked_events"].append((name, value)) + tracked_threads.append(thread_id) def mock_send(): mock_tracker["emitted_events"] = mock_tracker["tracked_events"] @@ -227,6 +249,7 @@ def func(): self.assertEqual(len(mock_tracker["emitted_events"]), 2, "Unexpected number of emitted events.") self.assertIn(("StartEvent", "StartValue"), mock_tracker["emitted_events"], "Starting event not tracked.") self.assertIn(("EndEvent", "EndValue"), mock_tracker["emitted_events"], "Ending event not tracked.") + self.assertEqual(tracked_threads[0], tracked_threads[1], "Thread ID for start and end events differ.") @patch("samcli.lib.telemetry.event.EventTracker.send_events") @patch("samcli.lib.telemetry.event.EventTracker.track_event")