Skip to content

Commit 06564a7

Browse files
authored
refactor(ipc): more general shared file (#15085)
## Description We refactor the file queue into a more generic shared string file for IPC that can be used by more components.
1 parent 48a3a00 commit 06564a7

File tree

6 files changed

+151
-111
lines changed

6 files changed

+151
-111
lines changed

ddtrace/internal/_file_queue.py

Lines changed: 0 additions & 105 deletions
This file was deleted.

ddtrace/internal/ipc.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
from contextlib import contextmanager
2+
import os
3+
import secrets
4+
import tempfile
5+
import typing
6+
7+
from ddtrace.internal._unpatched import unpatched_open
8+
from ddtrace.internal.compat import Path
9+
from ddtrace.internal.logger import get_logger
10+
11+
12+
log = get_logger(__name__)
13+
14+
15+
MAX_FILE_SIZE = 8192
16+
17+
18+
class BaseLock:
19+
def __init__(self, file: typing.IO[typing.Any]):
20+
self.file = file
21+
22+
def acquire(self):
23+
...
24+
25+
def release(self):
26+
...
27+
28+
def __enter__(self):
29+
self.acquire()
30+
return self
31+
32+
def __exit__(self, exc_type, exc_value, exc_tb):
33+
self.release()
34+
35+
36+
try:
37+
# Unix based file locking
38+
# Availability: Unix, not Emscripten, not WASI.
39+
import fcntl
40+
41+
class BaseUnixLock(BaseLock):
42+
__acquire_mode__: typing.Optional[int] = None
43+
44+
def acquire(self):
45+
if self.__acquire_mode__ is None:
46+
msg = f"Cannot use lock of type {type(self)} directly"
47+
raise ValueError(msg)
48+
49+
fcntl.lockf(self.file, self.__acquire_mode__)
50+
51+
def release(self):
52+
fcntl.lockf(self.file, fcntl.LOCK_UN)
53+
54+
class ReadLock(BaseUnixLock):
55+
__acquire_mode__ = fcntl.LOCK_SH
56+
57+
class WriteLock(BaseUnixLock):
58+
__acquire_mode__ = fcntl.LOCK_EX
59+
60+
@contextmanager
61+
def open_file(path, mode):
62+
yield unpatched_open(path, mode)
63+
64+
except ModuleNotFoundError:
65+
# Availability: Windows
66+
import msvcrt
67+
68+
class BaseWinLock(BaseLock):
69+
def acquire(self):
70+
f = self.file
71+
f.seek(0)
72+
msvcrt.locking(f.fileno(), msvcrt.LK_RLCK, MAX_FILE_SIZE)
73+
74+
def release(self):
75+
f = self.file
76+
f.seek(0)
77+
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, MAX_FILE_SIZE)
78+
79+
ReadLock = WriteLock = BaseWinLock # type: ignore
80+
81+
def open_file(path, mode):
82+
import _winapi
83+
84+
# force all modes to be read/write binary
85+
mode = "r+b"
86+
flag = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
87+
fd_flag = os.O_RDWR | os.O_CREAT | os.O_BINARY | os.O_RANDOM
88+
SHARED_READ_WRITE = 0x7
89+
OPEN_ALWAYS = 4
90+
RANDOM_ACCESS = 0x10000000
91+
handle = _winapi.CreateFile(path, flag, SHARED_READ_WRITE, 0, OPEN_ALWAYS, RANDOM_ACCESS, 0)
92+
fd = msvcrt.open_osfhandle(handle, fd_flag | os.O_NOINHERIT)
93+
return unpatched_open(fd, mode)
94+
95+
96+
TMPDIR = Path(tempfile.gettempdir())
97+
98+
99+
class SharedStringFile:
100+
"""A simple shared-file implementation for multiprocess communication."""
101+
102+
def __init__(self) -> None:
103+
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8))
104+
105+
def put(self, data: str) -> None:
106+
"""Put a string into the file."""
107+
if self.filename is None:
108+
return
109+
110+
try:
111+
with open_file(self.filename, "ab") as f, WriteLock(f):
112+
f.seek(0, os.SEEK_END)
113+
dt = (data + "\x00").encode()
114+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
115+
f.write(dt)
116+
except Exception: # nosec
117+
pass
118+
119+
def peekall(self) -> typing.List[str]:
120+
"""Peek at all strings from the file."""
121+
if self.filename is None:
122+
return []
123+
124+
try:
125+
with open_file(self.filename, "r+b") as f, ReadLock(f):
126+
f.seek(0)
127+
return f.read().strip(b"\x00").decode().split("\x00")
128+
except Exception: # nosec
129+
return []
130+
131+
def snatchall(self) -> typing.List[str]:
132+
"""Retrieve and remove all strings from the file."""
133+
if self.filename is None:
134+
return []
135+
136+
try:
137+
with open_file(self.filename, "r+b") as f, WriteLock(f):
138+
f.seek(0)
139+
strings = f.read().strip(b"\x00").decode().split("\x00")
140+
141+
f.seek(0)
142+
f.truncate()
143+
144+
return strings
145+
except Exception: # nosec
146+
return []

ddtrace/settings/_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,9 +510,9 @@ def __init__(self):
510510
self._extra_services_queue = None
511511
if self._remote_config_enabled and not in_aws_lambda():
512512
# lazy load slow import
513-
from ddtrace.internal._file_queue import File_Queue
513+
from ddtrace.internal.ipc import SharedStringFile
514514

515-
self._extra_services_queue = File_Queue()
515+
self._extra_services_queue = SharedStringFile()
516516

517517
self._unparsed_service_mapping = _get_config("DD_SERVICE_MAPPING", "")
518518
self.service_mapping = parse_tags_str(self._unparsed_service_mapping)
@@ -704,7 +704,7 @@ def _get_extra_services(self):
704704
# type: () -> set[str]
705705
if self._extra_services_queue is None:
706706
return set()
707-
self._extra_services.update(self._extra_services_queue.get_all())
707+
self._extra_services.update(set(self._extra_services_queue.snatchall()) - {""})
708708
while len(self._extra_services) > 64:
709709
self._extra_services.pop()
710710
return self._extra_services

tests/internal/service_name/test_extra_services_names.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_config_extra_service_names_duplicates(run_python_code_in_subprocess):
5656
5757
extra_services = ddtrace.config._get_extra_services()
5858
extra_services.discard("sqlite") # coverage
59-
assert extra_services == {"extra_service_1"}
59+
assert extra_services == {"extra_service_1"}, extra_services
6060
"""
6161

6262
env = os.environ.copy()

tests/profiling/suitespec.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ components:
3333
core:
3434
- ddtrace/internal/__init__.py
3535
- ddtrace/internal/_exceptions.py
36-
- ddtrace/internal/_file_queue.py
3736
- ddtrace/internal/_rand.pyi
3837
- ddtrace/internal/_rand.pyx
3938
- ddtrace/internal/_stdint.h

tests/suitespec.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ components:
3636
core:
3737
- ddtrace/internal/__init__.py
3838
- ddtrace/internal/_exceptions.py
39-
- ddtrace/internal/_file_queue.py
4039
- ddtrace/internal/_rand.pyi
4140
- ddtrace/internal/_rand.pyx
4241
- ddtrace/internal/_stdint.h
@@ -53,6 +52,7 @@ components:
5352
- ddtrace/internal/forksafe.py
5453
- ddtrace/internal/gitmetadata.py
5554
- ddtrace/internal/glob_matching.py
55+
- ddtrace/internal/ipc.py
5656
- ddtrace/internal/logger.py
5757
- ddtrace/_logger.py
5858
- ddtrace/internal/hostname.py

0 commit comments

Comments
 (0)