Skip to content

Commit ea30e1d

Browse files
committed
refactor(ipc): more general shared file
We refactor the file queue into a more generic shared string file for IPC that can be used by more components.
1 parent c426d0f commit ea30e1d

File tree

3 files changed

+154
-108
lines changed

3 files changed

+154
-108
lines changed

ddtrace/internal/_file_queue.py

Lines changed: 0 additions & 105 deletions
This file was deleted.

ddtrace/internal/ipc.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import os
2+
import secrets
3+
import tempfile
4+
import typing
5+
6+
from ddtrace.internal._unpatched import unpatched_open
7+
from ddtrace.internal.compat import Path
8+
from ddtrace.internal.logger import get_logger
9+
10+
11+
log = get_logger(__name__)
12+
13+
14+
MAX_FILE_SIZE = 8192
15+
16+
17+
class BaseLock:
18+
def __init__(self, file: typing.IO[typing.Any]):
19+
self.file = file
20+
21+
def acquire(self):
22+
...
23+
24+
def release(self):
25+
...
26+
27+
def __enter__(self):
28+
self.acquire()
29+
return self
30+
31+
def __exit__(self, exc_type, exc_value, exc_tb):
32+
self.release()
33+
34+
35+
try:
36+
# Unix based file locking
37+
# Availability: Unix, not Emscripten, not WASI.
38+
import fcntl
39+
40+
class ReadLock(BaseLock):
41+
def __init__(self, file: typing.IO[typing.Any]):
42+
self.file = file
43+
44+
def acquire(self):
45+
fcntl.lockf(self.file, fcntl.LOCK_SH)
46+
47+
def release(self):
48+
fcntl.lockf(self.file, fcntl.LOCK_UN)
49+
50+
class WriteLock(BaseLock):
51+
def __init__(self, file: typing.IO[typing.Any]):
52+
self.file = file
53+
54+
def acquire(self):
55+
fcntl.lockf(self.file, fcntl.LOCK_EX)
56+
57+
def release(self):
58+
fcntl.lockf(self.file, fcntl.LOCK_UN)
59+
60+
def open_file(path, mode):
61+
return unpatched_open(path, mode)
62+
63+
except ModuleNotFoundError:
64+
# Availability: Windows
65+
import msvcrt
66+
67+
class ReadLock(BaseLock):
68+
def __init__(self, file: typing.IO[typing.Any]):
69+
self.file = file
70+
71+
def acquire(self):
72+
fcntl.lockf(self.file, fcntl.LOCK_SH)
73+
74+
def release(self):
75+
fcntl.lockf(self.file, fcntl.LOCK_UN)
76+
77+
class WriteLock(BaseLock):
78+
def __init__(self, file: typing.IO[typing.Any]):
79+
self.file = file
80+
81+
def acquire(self):
82+
self.file.seek(0)
83+
msvcrt.locking(self.file.fileno(), msvcrt.LK_RLCK, MAX_FILE_SIZE)
84+
85+
def release(self):
86+
self.file.seek(0)
87+
msvcrt.locking(self.file.fileno(), msvcrt.LK_UNLCK, MAX_FILE_SIZE)
88+
89+
def open_file(path, mode):
90+
import _winapi
91+
92+
# force all modes to be read/write binary
93+
mode = "r+b"
94+
flag = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
95+
fd_flag = os.O_RDWR | os.O_CREAT | os.O_BINARY | os.O_RANDOM
96+
SHARED_READ_WRITE = 0x7
97+
OPEN_ALWAYS = 4
98+
RANDOM_ACCESS = 0x10000000
99+
handle = _winapi.CreateFile(path, flag, SHARED_READ_WRITE, 0, OPEN_ALWAYS, RANDOM_ACCESS, 0)
100+
fd = msvcrt.open_osfhandle(handle, fd_flag | os.O_NOINHERIT)
101+
return unpatched_open(fd, mode)
102+
103+
104+
TMPDIR = Path(tempfile.gettempdir())
105+
106+
107+
class SharedStringFile:
108+
"""A simple shared-file implementation for multiprocess communication."""
109+
110+
def __init__(self) -> None:
111+
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8))
112+
113+
def put(self, data: str) -> None:
114+
"""Put a string into the file."""
115+
if self.filename is None:
116+
return
117+
118+
try:
119+
with open_file(self.filename, "ab") as f, WriteLock(f):
120+
f.seek(0, os.SEEK_END)
121+
dt = (data + "\x00").encode()
122+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
123+
f.write(dt)
124+
except Exception: # nosec
125+
pass
126+
127+
def peekall(self) -> typing.List[str]:
128+
"""Peek at all strings from the file."""
129+
if self.filename is None:
130+
return []
131+
132+
try:
133+
with open_file(self.filename, "r+b") as f, ReadLock(f):
134+
return f.read().strip(b"\x00").decode().split("\x00")
135+
except Exception: # nosec
136+
return []
137+
138+
def snatchall(self) -> typing.List[str]:
139+
"""Retrieve and remove all strings from the file."""
140+
if self.filename is None:
141+
return []
142+
143+
try:
144+
with open_file(self.filename, "r+b") as f, WriteLock(f):
145+
try:
146+
return f.read().strip(b"\x00").decode().split("\x00")
147+
finally:
148+
f.seek(0)
149+
f.truncate()
150+
except Exception: # nosec
151+
return []

ddtrace/settings/_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,9 +510,9 @@ def __init__(self):
510510
self._extra_services_queue = None
511511
if self._remote_config_enabled and not in_aws_lambda():
512512
# lazy load slow import
513-
from ddtrace.internal._file_queue import File_Queue
513+
from ddtrace.internal.ipc import SharedStringFile
514514

515-
self._extra_services_queue = File_Queue()
515+
self._extra_services_queue = SharedStringFile()
516516

517517
self._unparsed_service_mapping = _get_config("DD_SERVICE_MAPPING", "")
518518
self.service_mapping = parse_tags_str(self._unparsed_service_mapping)
@@ -703,7 +703,7 @@ def _get_extra_services(self):
703703
# type: () -> set[str]
704704
if self._extra_services_queue is None:
705705
return set()
706-
self._extra_services.update(self._extra_services_queue.get_all())
706+
self._extra_services.update(set(self._extra_services_queue.snatchall()))
707707
while len(self._extra_services) > 64:
708708
self._extra_services.pop()
709709
return self._extra_services

0 commit comments

Comments
 (0)