Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7fcca85
Add project metadata tracking with telemetry (#3929)
Leo10Gama Jun 7, 2022
91a0905
Fix retrieval process for directory name (#3962)
Leo10Gama Jun 13, 2022
e23b69a
Implement event data structure and relevant unit tests (#3955)
Leo10Gama Jun 13, 2022
43a9793
Implement tracker methods (#3970)
Leo10Gama Jun 15, 2022
ae7de58
Send Event metrics via Telemetry (#3979)
Leo10Gama Jun 22, 2022
5b131ee
Add BuildRuntime event and trackers (#3994)
Leo10Gama Jun 24, 2022
e62a655
Add UsedFeature event and trackers (#4003)
Leo10Gama Jun 28, 2022
c2790c9
Add thread lock to EventTracker (#4019)
Leo10Gama Jun 30, 2022
6edf835
Refactor Event data structure (#4027)
Leo10Gama Jul 6, 2022
4cbca01
Update tests to verify EventTracker thread lock is working (#4035)
Leo10Gama Jul 6, 2022
33ca6c5
Add send_events method to EventTracker (#4039)
Leo10Gama Jul 15, 2022
f88493f
Upgrade encryption algorithm from SHA-1 to SHA-256 (#4061)
Leo10Gama Jul 18, 2022
fda06ae
Add long event tracker (#4067)
Leo10Gama Jul 20, 2022
a5c3060
Send events via thread when at capacity (#4072)
Leo10Gama Jul 22, 2022
9a597d7
Add `sam sync` Events (#4076)
Leo10Gama Jul 28, 2022
6d81fa4
Merge with current develop branch
Leo10Gama Jul 28, 2022
b0964e8
Merge pull request #4086 from Leo10Gama/customer-journey
hawflau Jul 29, 2022
1b02888
Add workflow events (#4096)
Leo10Gama Aug 8, 2022
790e078
Fix send_events logic (#4104)
Leo10Gama Aug 10, 2022
e46d19a
Make minor tweaks before launch (#4111)
Leo10Gama Aug 15, 2022
c739c06
Merge branch 'customer-journey' into develop
Leo10Gama Aug 18, 2022
6f27629
Merge branch 'develop' into develop
Leo10Gama Aug 19, 2022
009d20e
Merge branch 'develop' into develop
Leo10Gama Aug 22, 2022
e17431c
Merge branch 'develop' of github.com:aws/aws-sam-cli into develop
May 16, 2023
b61e7c8
Fix thread ID bug in EventTracker
May 16, 2023
677b1ab
Reformat files to standard
May 16, 2023
a84a933
Clean up unnecessary files
May 16, 2023
ba4597d
Merge branch 'develop' into thread-id-bug
Leo10Gama May 16, 2023
438adb8
Add thread ID tracking to infra sync executor
May 16, 2023
c9a323e
Merge changes
May 16, 2023
5500e58
Merge branch 'develop' into thread-id-bug
Leo10Gama May 16, 2023
df99af4
Merge branch 'develop' into thread-id-bug
Leo10Gama May 16, 2023
dd0e54d
Add missing thread ID to tracked event
May 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions samcli/lib/sync/infra_sync_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Optional, Set
from uuid import uuid4

from boto3 import Session
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -174,8 +175,9 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult:
days_since_last_infra_sync = (current_time - last_infra_sync_time).days

# Will not combine the comparisons in order to save operation cost
thread_id = uuid4()
if self._sync_context.skip_deploy_sync and first_sync and (days_since_last_infra_sync <= AUTO_INFRA_SYNC_DAYS):
EventTracker.track_event("SyncFlowStart", "SkipInfraSyncExecute")
EventTracker.track_event("SyncFlowStart", "SkipInfraSyncExecute", thread_id=thread_id)
try:
if self._auto_skip_infra_sync(
self._package_context.output_template_file,
Expand All @@ -186,7 +188,7 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult:
# If higher than the threshold, we perform infra sync to improve performance
if len(self.code_sync_resources) < SYNC_FLOW_THRESHOLD:
LOG.info("Template haven't been changed since last deployment, skipping infra sync...")
EventTracker.track_event("SyncFlowEnd", "SkipInfraSyncExecute")
EventTracker.track_event("SyncFlowEnd", "SkipInfraSyncExecute", thread_id=thread_id)
return InfraSyncResult(False, self.code_sync_resources)
else:
LOG.info(
Expand All @@ -199,15 +201,15 @@ def execute_infra_sync(self, first_sync: bool = False) -> InfraSyncResult:
"Could not skip infra sync by comparing to a previously deployed template, starting infra sync"
)

EventTracker.track_event("SyncFlowStart", "InfraSyncExecute")
EventTracker.track_event("SyncFlowStart", "InfraSyncExecute", thread_id=thread_id)
if days_since_last_infra_sync > AUTO_INFRA_SYNC_DAYS:
LOG.info(
"Infrastructure Sync hasn't been run in the last %s days, sam sync will be queuing up the stack"
" deployment to minimize the drift in CloudFormation.",
AUTO_INFRA_SYNC_DAYS,
)
self._deploy_context.run()
EventTracker.track_event("SyncFlowEnd", "InfraSyncExecute")
EventTracker.track_event("SyncFlowEnd", "InfraSyncExecute", thread_id=thread_id)

# Update latest infra sync time in sync state
self._sync_context.update_infra_sync_time()
Expand Down
6 changes: 4 additions & 2 deletions samcli/lib/sync/sync_flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from queue import Queue
from threading import RLock
from typing import Callable, List, Optional, Set
from uuid import uuid4

from botocore.exceptions import ClientError

Expand Down Expand Up @@ -336,11 +337,12 @@ def _sync_flow_execute_wrapper(sync_flow: SyncFlow) -> SyncFlowResult:
dependent_sync_flows = []
sync_types = EventType.get_accepted_values(EventName.SYNC_FLOW_START)
sync_type: Optional[str] = type(sync_flow).__name__
thread_id = uuid4()
if sync_type not in sync_types:
sync_type = None
try:
if sync_type:
EventTracker.track_event("SyncFlowStart", sync_type)
EventTracker.track_event("SyncFlowStart", sync_type, thread_id=thread_id)
dependent_sync_flows = sync_flow.execute()
except ClientError as e:
if e.response.get("Error", dict()).get("Code", "") == "ResourceNotFoundException":
Expand All @@ -350,5 +352,5 @@ def _sync_flow_execute_wrapper(sync_flow: SyncFlow) -> SyncFlowResult:
raise SyncFlowException(sync_flow, e) from e
finally:
if sync_type:
EventTracker.track_event("SyncFlowEnd", sync_type)
EventTracker.track_event("SyncFlowEnd", sync_type, thread_id=thread_id)
return SyncFlowResult(sync_flow=sync_flow, dependent_sync_flows=dependent_sync_flows)
46 changes: 37 additions & 9 deletions samcli/lib/telemetry/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from enum import Enum
from typing import List, Optional
from uuid import UUID, uuid4

from samcli.cli.context import Context
from samcli.lib.build.workflows import ALL_CONFIGS
Expand Down Expand Up @@ -83,14 +84,19 @@ class Event:

event_name: EventName
event_value: str # Validated by EventType.get_accepted_values to never be an arbitrary string
thread_id = threading.get_ident() # The thread ID; used to group Events from the same command run
thread_id: Optional[UUID] # The thread ID; used to group Events from the same command run
time_stamp: str
exception_name: Optional[str]

def __init__(self, event_name: str, event_value: str, exception_name: Optional[str] = None):
def __init__(
self, event_name: str, event_value: str, thread_id: Optional[UUID] = None, exception_name: Optional[str] = None
):
Event._verify_event(event_name, event_value)
self.event_name = EventName(event_name)
self.event_value = event_value
if not thread_id:
thread_id = uuid4()
Comment on lines +97 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still use threading.get_ident() for default value or other fields will same for that purpose?
@hawflau what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think uuid4() is good. From what I've seen empirically, session_id and threading.get_ident() are producing the same grouping effects.

self.thread_id = thread_id
self.time_stamp = str(datetime.utcnow())[:-3] # format microseconds from 6 -> 3 figures to allow SQL casting
self.exception_name = exception_name

Expand All @@ -105,7 +111,7 @@ def __repr__(self):
return (
f"Event(event_name={self.event_name.value}, "
f"event_value={self.event_value}, "
f"thread_id={self.thread_id}, "
f"thread_id={self.thread_id.hex}, "
f"time_stamp={self.time_stamp})",
f"exception_name={self.exception_name})",
)
Expand All @@ -114,7 +120,7 @@ def to_json(self):
return {
"event_name": self.event_name.value,
"event_value": self.event_value,
"thread_id": self.thread_id,
"thread_id": self.thread_id.hex,
"time_stamp": self.time_stamp,
"exception_name": self.exception_name,
}
Expand Down Expand Up @@ -144,7 +150,11 @@ class EventTracker:

@staticmethod
def track_event(
event_name: str, event_value: str, session_id: Optional[str] = None, exception_name: Optional[str] = None
event_name: str,
event_value: str,
session_id: Optional[str] = None,
thread_id: Optional[UUID] = None,
exception_name: Optional[str] = None,
):
"""Method to track an event where and when it occurs.

Expand All @@ -162,6 +172,8 @@ def track_event(
passed event_name, or an EventCreationError will be thrown.
session_id: Optional[str]
The session ID to set to link back to the original command run
thread_id: Optional[UUID]
The thread ID of the Event to track, as a UUID.
exception_name: Optional[str]
The name of the exception that this event encountered when tracking a feature

Expand All @@ -182,8 +194,13 @@ def track_event(

try:
should_send: bool = False
# Validate the thread ID
if not thread_id: # Passed value is not a UUID or None
thread_id = uuid4()
with EventTracker._event_lock:
EventTracker._events.append(Event(event_name, event_value, exception_name=exception_name))
EventTracker._events.append(
Event(event_name, event_value, thread_id=thread_id, exception_name=exception_name)
)

# Get the session ID (needed for multithreading sending)
EventTracker._set_session_id()
Expand Down Expand Up @@ -248,7 +265,13 @@ def _send_events_in_thread():
telemetry.emit(metric)


def track_long_event(start_event_name: str, start_event_value: str, end_event_name: str, end_event_value: str):
def track_long_event(
start_event_name: str,
start_event_value: str,
end_event_name: str,
end_event_value: str,
thread_id: Optional[UUID] = None,
):
"""Decorator for tracking events that occur at start and end of a function.

The decorator tracks two Events total, where the first Event occurs
Expand Down Expand Up @@ -281,6 +304,8 @@ def track_long_event(start_event_name: str, start_event_value: str, end_event_na
decorated function's execution. Must be a valid EventType
value for the passed `end_event_name` or the decorator
will not run.
thread_id: Optional[UUID]
The thread ID of the Events to track, as a UUID.

Examples
--------
Expand All @@ -297,6 +322,9 @@ def func2(...):
# Check that passed values are valid Events
Event(start_event_name, start_event_value)
Event(end_event_name, end_event_value)
# Validate the thread ID
if not thread_id: # Passed value is not a UUID or None
thread_id = uuid4()
except EventCreationError as e:
LOG.debug("Error occurred while trying to track an event: %s\nDecorator not run.", e)
should_track = False
Expand All @@ -307,7 +335,7 @@ def decorator_for_events(func):
def wrapped(*args, **kwargs):
# Track starting event
if should_track:
EventTracker.track_event(start_event_name, start_event_value)
EventTracker.track_event(start_event_name, start_event_value, thread_id=thread_id)
exception = None
# Run the function
try:
Expand All @@ -316,7 +344,7 @@ def wrapped(*args, **kwargs):
exception = e
# Track ending event
if should_track:
EventTracker.track_event(end_event_name, end_event_value)
EventTracker.track_event(end_event_name, end_event_value, thread_id=thread_id)
EventTracker.send_events() # Ensure Events are sent at the end of execution
if exception:
raise exception
Expand Down
41 changes: 32 additions & 9 deletions tests/unit/lib/telemetry/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest import TestCase
from unittest.mock import ANY, Mock, patch
from samcli.cli.context import Context
from uuid import UUID, uuid4

from samcli.lib.telemetry.event import Event, EventCreationError, EventTracker, track_long_event

Expand All @@ -26,13 +27,32 @@ def test_create_event_exists(self, name_mock, type_mock, verify_mock):
name_mock.return_value = Mock(value="TestOne")
type_mock.get_accepted_values.return_value = ["value1", "value2"]
verify_mock.return_value = None
thread_id = uuid4()

test_event = Event("TestOne", "value1", thread_id)

name_mock.assert_called_once()
self.assertEqual(test_event.event_name.value, "TestOne")
self.assertEqual(test_event.event_value, "value1")
self.assertEqual(test_event.thread_id, thread_id) # Should be on the same thread

@patch("samcli.lib.telemetry.event.uuid4")
@patch("samcli.lib.telemetry.event.Event._verify_event")
@patch("samcli.lib.telemetry.event.EventType")
@patch("samcli.lib.telemetry.event.EventName")
def test_create_event_exists_no_thread_id(self, name_mock, type_mock, verify_mock, uuid_mock):
name_mock.return_value = Mock(value="TestOne")
type_mock.get_accepted_values.return_value = ["value1", "value2"]
verify_mock.return_value = None
thread_id = uuid4()
uuid_mock.return_value = thread_id

test_event = Event("TestOne", "value1")

name_mock.assert_called_once()
self.assertEqual(test_event.event_name.value, "TestOne")
self.assertEqual(test_event.event_value, "value1")
self.assertEqual(test_event.thread_id, threading.get_ident()) # Should be on the same thread
self.assertEqual(test_event.thread_id, thread_id)

@patch("samcli.lib.telemetry.event.EventType")
@patch("samcli.lib.telemetry.event.EventName")
Expand Down Expand Up @@ -60,15 +80,16 @@ def test_event_to_json(self, name_mock, type_mock, verify_mock):
name_mock.return_value = Mock(value="Testing")
type_mock.get_accepted_values.return_value = ["value1"]
verify_mock.return_value = None
thread_id = uuid4()

test_event = Event("Testing", "value1")
test_event = Event("Testing", "value1", thread_id)

self.assertEqual(
test_event.to_json(),
{
"event_name": "Testing",
"event_value": "value1",
"thread_id": threading.get_ident(),
"thread_id": thread_id.hex,
"time_stamp": ANY,
"exception_name": None,
},
Expand All @@ -86,7 +107,7 @@ def test_track_event(self, event_mock, lock_mock):
lock_mock.__exit__ = Mock()

# Test that an event can be tracked
dummy_event = Mock(event_name="Test", event_value="SomeValue", thread_id=threading.get_ident(), timestamp=ANY)
dummy_event = Mock(event_name="Test", event_value="SomeValue", thread_id=uuid4(), timestamp=ANY)
event_mock.return_value = dummy_event

EventTracker.track_event("Test", "SomeValue")
Expand Down Expand Up @@ -114,6 +135,7 @@ def test_events_get_sent(self, telemetry_mock):
dummy_telemetry.emit.return_value = None
dummy_telemetry.emit.side_effect = mock_emit
telemetry_mock.return_value = dummy_telemetry
thread_id = uuid4()

# Verify that no events are sent if tracker is empty
# Note we are using the in-line version of the method, as the regular send_events will
Expand All @@ -124,9 +146,7 @@ def test_events_get_sent(self, telemetry_mock):
dummy_telemetry.emit.assert_not_called() # Nothing should have been sent (empty list)

# Verify that events get sent when they exist in tracker
dummy_event = Mock(
event_name=Mock(value="Test"), event_value="SomeValue", thread_id=threading.get_ident(), time_stamp=ANY
)
dummy_event = Mock(event_name=Mock(value="Test"), event_value="SomeValue", thread_id=thread_id, time_stamp=ANY)
dummy_event.to_json.return_value = Event.to_json(dummy_event)
EventTracker._events.append(dummy_event)

Expand All @@ -148,7 +168,7 @@ def test_events_get_sent(self, telemetry_mock):
{
"event_name": "Test",
"event_value": "SomeValue",
"thread_id": ANY,
"thread_id": thread_id.hex,
"time_stamp": ANY,
"exception_name": ANY,
}
Expand Down Expand Up @@ -205,9 +225,11 @@ def test_long_event_is_tracked(self, event_mock, track_mock, send_mock):
mock_tracker = {}
mock_tracker["tracked_events"]: List[Tuple[str, str]] = [] # Tuple to bypass Event verification
mock_tracker["emitted_events"]: List[Tuple[str, str]] = []
tracked_threads: List[UUID] = []

def mock_track(name, value):
def mock_track(name, value, thread_id):
mock_tracker["tracked_events"].append((name, value))
tracked_threads.append(thread_id)

def mock_send():
mock_tracker["emitted_events"] = mock_tracker["tracked_events"]
Expand All @@ -227,6 +249,7 @@ def func():
self.assertEqual(len(mock_tracker["emitted_events"]), 2, "Unexpected number of emitted events.")
self.assertIn(("StartEvent", "StartValue"), mock_tracker["emitted_events"], "Starting event not tracked.")
self.assertIn(("EndEvent", "EndValue"), mock_tracker["emitted_events"], "Ending event not tracked.")
self.assertEqual(tracked_threads[0], tracked_threads[1], "Thread ID for start and end events differ.")

@patch("samcli.lib.telemetry.event.EventTracker.send_events")
@patch("samcli.lib.telemetry.event.EventTracker.track_event")
Expand Down