Skip to content

Commit 5ed0a6c

Browse files
feature(profiling): add support for direct profiling of threading.Semaphore objects
1 parent 39fba2f commit 5ed0a6c

File tree

2 files changed

+243
-3
lines changed

2 files changed

+243
-3
lines changed

ddtrace/profiling/collector/threading.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ class _ProfiledThreadingRLock(_lock._ProfiledLock):
1818
pass
1919

2020

21+
class _ProfiledThreadingSemaphore(_lock._ProfiledLock):
22+
pass
23+
24+
2125
class ThreadingLockCollector(_lock.LockCollector):
2226
"""Record threading.Lock usage."""
2327

@@ -48,6 +52,21 @@ def _set_patch_target(
4852
threading.RLock = value
4953

5054

55+
class ThreadingSemaphoreCollector(_lock.LockCollector):
56+
"""Record threading.Semaphore usage."""
57+
58+
PROFILED_LOCK_CLASS = _ProfiledThreadingSemaphore
59+
60+
def _get_patch_target(self) -> typing.Type[threading.Semaphore]:
61+
return threading.Semaphore
62+
63+
def _set_patch_target(
64+
self,
65+
value: typing.Any,
66+
) -> None:
67+
threading.Semaphore = value
68+
69+
5170
# Also patch threading.Thread so echion can track thread lifetimes
5271
def init_stack_v2() -> None:
5372
if config.stack.enabled and stack_v2.is_available:

tests/profiling_v2/collector/test_threading.py

Lines changed: 224 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ddtrace.internal.datadog.profiling import ddup
2222
from ddtrace.profiling.collector.threading import ThreadingLockCollector
2323
from ddtrace.profiling.collector.threading import ThreadingRLockCollector
24+
from ddtrace.profiling.collector.threading import ThreadingSemaphoreCollector
2425
from tests.profiling.collector import pprof_utils
2526
from tests.profiling.collector import test_collector
2627
from tests.profiling.collector.lock_utils import LineNo
@@ -30,11 +31,12 @@
3031

3132

3233
# Type aliases for supported classes
33-
LockClassType = Union[Type[threading.Lock], Type[threading.RLock]]
34-
CollectorClassType = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector]]
34+
LockClassType = Union[Type[threading.Lock], Type[threading.RLock], Type[threading.Semaphore]]
35+
CollectorClassType = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector], Type[ThreadingSemaphoreCollector]]
3536
# threading.Lock and threading.RLock are factory functions that return _thread types.
3637
# We reference the underlying _thread types directly to avoid creating instances at import time.
37-
LockClassInst = Union[_thread.LockType, _thread.RLock]
38+
# threading.Semaphore is a Python class, not a factory function.
39+
LockClassInst = Union[_thread.LockType, _thread.RLock, threading.Semaphore]
3840

3941
# Module-level globals for testing global lock profiling
4042
_test_global_lock: LockClassInst
@@ -1328,3 +1330,222 @@ def test_lock_getattr(self) -> None:
13281330
# After releasing, it should not be owned
13291331
lock.release()
13301332
assert not lock._is_owned()
1333+
1334+
1335+
class TestThreadingSemaphoreCollector(BaseThreadingLockCollectorTest):
1336+
"""Test Semaphore profiling"""
1337+
1338+
@property
1339+
def collector_class(self) -> Type[ThreadingSemaphoreCollector]:
1340+
return ThreadingSemaphoreCollector
1341+
1342+
@property
1343+
def lock_class(self) -> Type[threading.Semaphore]:
1344+
return threading.Semaphore
1345+
1346+
def test_semaphore_with_value(self) -> None:
1347+
"""Test that Semaphore works with different initial values."""
1348+
with self.collector_class(capture_pct=100):
1349+
from ddtrace.profiling.collector._lock import _ProfiledLock
1350+
1351+
# Test with value=1
1352+
sem1 = self.lock_class(1)
1353+
assert isinstance(sem1, _ProfiledLock)
1354+
assert sem1.acquire(timeout=1)
1355+
assert not sem1.acquire(timeout=0.01) # Should block
1356+
sem1.release()
1357+
1358+
# Test with value=3
1359+
sem3 = self.lock_class(3)
1360+
assert isinstance(sem3, _ProfiledLock)
1361+
for i in range(3):
1362+
assert sem3.acquire(timeout=1), f"Acquire {i+1} failed"
1363+
assert not sem3.acquire(timeout=0.01) # Should block at 4th
1364+
for i in range(3):
1365+
sem3.release()
1366+
1367+
# Test with default value (1)
1368+
sem_default = self.lock_class()
1369+
assert isinstance(sem_default, _ProfiledLock)
1370+
assert sem_default.acquire(timeout=1)
1371+
assert not sem_default.acquire(timeout=0.01) # Should block
1372+
sem_default.release()
1373+
1374+
def test_semaphore_multiple_acquires(self) -> None:
1375+
"""Test that Semaphore correctly handles multiple acquires."""
1376+
with self.collector_class(capture_pct=100):
1377+
from ddtrace.profiling.collector._lock import _ProfiledLock
1378+
1379+
sem = self.lock_class(2)
1380+
assert isinstance(sem, _ProfiledLock)
1381+
1382+
# Should be able to acquire twice
1383+
assert sem.acquire(timeout=1)
1384+
assert sem.acquire(timeout=1)
1385+
1386+
# Third acquire should fail (timeout)
1387+
assert not sem.acquire(timeout=0.01)
1388+
1389+
# Release one and try again
1390+
sem.release()
1391+
assert sem.acquire(timeout=1)
1392+
1393+
# Clean up
1394+
sem.release()
1395+
sem.release()
1396+
1397+
def test_semaphore_non_blocking_acquire(self) -> None:
1398+
"""Test non-blocking acquire behavior."""
1399+
with self.collector_class(capture_pct=100):
1400+
from ddtrace.profiling.collector._lock import _ProfiledLock
1401+
1402+
sem = self.lock_class(2)
1403+
assert isinstance(sem, _ProfiledLock)
1404+
1405+
# Non-blocking acquires should succeed immediately
1406+
assert sem.acquire(blocking=False)
1407+
assert sem.acquire(blocking=False)
1408+
1409+
# Third should fail immediately (not timeout)
1410+
assert not sem.acquire(blocking=False)
1411+
1412+
# Clean up
1413+
sem.release()
1414+
sem.release()
1415+
1416+
def test_semaphore_concurrent_threads(self) -> None:
1417+
"""Test that multiple threads can hold semaphore simultaneously."""
1418+
import time
1419+
1420+
with self.collector_class(capture_pct=100):
1421+
from ddtrace.profiling.collector._lock import _ProfiledLock
1422+
1423+
sem = self.lock_class(3) # Allow 3 threads
1424+
assert isinstance(sem, _ProfiledLock)
1425+
1426+
results = []
1427+
threads_holding = []
1428+
1429+
def worker(worker_id):
1430+
# Acquire the semaphore
1431+
if sem.acquire(timeout=2):
1432+
threads_holding.append(worker_id)
1433+
results.append(f"worker-{worker_id}-acquired")
1434+
time.sleep(0.05) # Hold it briefly
1435+
results.append(f"worker-{worker_id}-releasing")
1436+
sem.release()
1437+
else:
1438+
results.append(f"worker-{worker_id}-timeout")
1439+
1440+
# Start 5 threads, but only 3 can hold semaphore at once
1441+
threads = []
1442+
for i in range(5):
1443+
t = threading.Thread(target=worker, args=(i,))
1444+
threads.append(t)
1445+
t.start()
1446+
1447+
# Wait for all threads
1448+
for t in threads:
1449+
t.join(timeout=5)
1450+
1451+
# All workers should have acquired and released
1452+
assert len([r for r in results if "acquired" in r]) == 5
1453+
assert len([r for r in results if "releasing" in r]) == 5
1454+
1455+
# At most 3 threads should have been holding at once
1456+
# (This is approximate since we're just checking they all got through)
1457+
1458+
def test_semaphore_blocking_contention(self) -> None:
1459+
"""Test that threads block when semaphore is at capacity."""
1460+
import time
1461+
1462+
with self.collector_class(capture_pct=100):
1463+
from ddtrace.profiling.collector._lock import _ProfiledLock
1464+
1465+
sem = self.lock_class(1) # Only 1 thread allowed
1466+
assert isinstance(sem, _ProfiledLock)
1467+
1468+
acquired_times = []
1469+
released_times = []
1470+
1471+
def holder():
1472+
"""Hold the semaphore for a while"""
1473+
sem.acquire()
1474+
acquired_times.append(time.time())
1475+
time.sleep(0.1) # Hold for 100ms
1476+
released_times.append(time.time())
1477+
sem.release()
1478+
1479+
def waiter():
1480+
"""Try to acquire - should wait"""
1481+
start = time.time()
1482+
sem.acquire(timeout=1)
1483+
acquired_times.append(time.time())
1484+
wait_time = time.time() - start
1485+
sem.release()
1486+
return wait_time
1487+
1488+
# Start holder thread
1489+
holder_thread = threading.Thread(target=holder)
1490+
holder_thread.start()
1491+
time.sleep(0.01) # Ensure holder gets it first
1492+
1493+
# Start waiter thread - should block
1494+
waiter_result = []
1495+
waiter_thread = threading.Thread(target=lambda: waiter_result.append(waiter()))
1496+
waiter_thread.start()
1497+
1498+
# Wait for both
1499+
holder_thread.join(timeout=2)
1500+
waiter_thread.join(timeout=2)
1501+
1502+
# Waiter should have waited for holder to release
1503+
assert len(waiter_result) == 1
1504+
assert waiter_result[0] >= 0.08 # Should have waited ~100ms
1505+
1506+
def test_semaphore_zero_value(self) -> None:
1507+
"""Test semaphore with value=0 (initially blocking)."""
1508+
import time
1509+
1510+
with self.collector_class(capture_pct=100):
1511+
from ddtrace.profiling.collector._lock import _ProfiledLock
1512+
1513+
sem = self.lock_class(0) # No permits available
1514+
assert isinstance(sem, _ProfiledLock)
1515+
1516+
# Immediate acquire should fail
1517+
assert not sem.acquire(blocking=False)
1518+
assert not sem.acquire(timeout=0.01)
1519+
1520+
# Release to make one available
1521+
sem.release()
1522+
1523+
# Now acquire should work
1524+
assert sem.acquire(timeout=1)
1525+
sem.release()
1526+
1527+
def test_semaphore_bounded_semaphore(self) -> None:
1528+
"""Test that BoundedSemaphore gets wrapped and prevents over-release."""
1529+
with self.collector_class(capture_pct=100):
1530+
from ddtrace.profiling.collector._lock import _ProfiledLock
1531+
1532+
# BoundedSemaphore might be wrapped if it inherits from Semaphore patching
1533+
bounded_sem = threading.BoundedSemaphore(2)
1534+
1535+
# Check if it's wrapped (may not be if BoundedSemaphore needs separate collector)
1536+
if isinstance(bounded_sem, _ProfiledLock):
1537+
# Test basic functionality
1538+
assert bounded_sem.acquire(timeout=1)
1539+
bounded_sem.release()
1540+
1541+
# BoundedSemaphore should raise ValueError on over-release
1542+
bounded_sem.acquire()
1543+
bounded_sem.release()
1544+
try:
1545+
bounded_sem.release() # Over-release
1546+
assert False, "Should have raised ValueError"
1547+
except ValueError:
1548+
pass # Expected
1549+
else:
1550+
# Document that BoundedSemaphore is not currently wrapped
1551+
print("Note: BoundedSemaphore not wrapped, needs separate collector")

0 commit comments

Comments
 (0)