Skip to content

Commit 456912a

Browse files
committed
Add fsspec gen ai upload hook
1 parent 7eb9203 commit 456912a

File tree

9 files changed

+479
-5
lines changed

9 files changed

+479
-5
lines changed

docs-requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ pymemcache~=1.3
99
# Required by conf
1010
django>=2.2
1111

12+
# Require by opentelemetry-util-genai
13+
fsspec>=2025.9.0
14+
1215
# Required by instrumentation and exporter packages
1316
aio_pika~=7.2.0
1417
aiohttp~=3.0

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
None,
131131
),
132132
"redis": ("https://redis.readthedocs.io/en/latest/", None),
133+
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
133134
}
134135

135136
# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky

docs/instrumentation-genai/util.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
2525
:members:
2626
:undoc-members:
2727
:show-inheritance:
28+
29+
.. automodule:: opentelemetry.util.genai._fsspec_upload
30+
:members:
31+
:show-inheritance:

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ deps =
10601060
{[testenv]test_deps}
10611061
{toxinidir}/opentelemetry-instrumentation
10621062
{toxinidir}/util/opentelemetry-util-http
1063-
{toxinidir}/util/opentelemetry-util-genai
1063+
{toxinidir}/util/opentelemetry-util-genai[fsspec]
10641064
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
10651065
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
10661066
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]

util/opentelemetry-util-genai/pyproject.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ dependencies = [
3030
"opentelemetry-api>=1.31.0",
3131
]
3232

33+
[project.entry-points.opentelemetry_genai_upload_hook]
34+
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
35+
3336
[project.optional-dependencies]
34-
test = [
35-
"pytest>=7.0.0",
36-
]
37+
test = ["pytest>=7.0.0"]
38+
fsspec = ["fsspec>=2025.9.0"]
3739

3840
[project.urls]
3941
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import json
18+
import logging
19+
import posixpath
20+
import threading
21+
from concurrent.futures import Future, ThreadPoolExecutor
22+
from dataclasses import asdict, dataclass
23+
from functools import partial
24+
from os import environ
25+
from typing import Any, Callable, Literal, TextIO, cast
26+
from uuid import uuid4
27+
28+
from opentelemetry._logs import LogRecord
29+
from opentelemetry.trace import Span
30+
from opentelemetry.util.genai import types
31+
from opentelemetry.util.genai.environment_variables import (
32+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
33+
)
34+
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
35+
36+
# If fsspec is not installed the hook will be a no-op.
37+
try:
38+
import fsspec
39+
except ImportError:
40+
fsspec = None
41+
42+
_logger = logging.getLogger(__name__)
43+
44+
45+
@dataclass
46+
class Completion:
47+
inputs: list[types.InputMessage]
48+
outputs: list[types.OutputMessage]
49+
system_instruction: list[types.MessagePart]
50+
51+
52+
@dataclass
53+
class CompletionRefs:
54+
inputs_ref: str
55+
outputs_ref: str
56+
system_instruction_ref: str
57+
58+
59+
JsonEncodeable = list[dict[str, Any]]
60+
61+
# mapping of upload path to function computing upload data dict
62+
UploadData = dict[str, Callable[[], JsonEncodeable]]
63+
64+
if fsspec is not None:
65+
# save a copy for the type checker
66+
fsspec_copy = fsspec
67+
68+
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
69+
"""typed wrapper around `fsspec.open`"""
70+
return cast(TextIO, fsspec_copy.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
71+
72+
class FsspecUploader:
73+
"""Implements uploading GenAI completions to a generic backend using fsspec
74+
75+
This class is used by the `BatchUploadHook` to upload completions to an external
76+
storage.
77+
"""
78+
79+
def upload( # pylint: disable=no-self-use
80+
self,
81+
path: str,
82+
json_encodeable: Callable[[], JsonEncodeable],
83+
) -> None:
84+
with fsspec_open(path, "w") as file:
85+
json.dump(json_encodeable(), file, separators=(",", ":"))
86+
87+
class FsspecUploadHook(UploadHook):
88+
"""An upload hook using ``fsspec`` to upload to external storage
89+
90+
This function can be used as the
91+
:func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by
92+
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``.
93+
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
94+
base path for uploads.
95+
96+
Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
97+
implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]``
98+
as a requirement to achieve this.
99+
"""
100+
101+
def __init__(
102+
self,
103+
*,
104+
uploader: FsspecUploader,
105+
base_path: str,
106+
max_size: int = 20,
107+
) -> None:
108+
self._base_path = base_path
109+
self._uploader = uploader
110+
self._max_size = max_size
111+
112+
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
113+
# limits the number of queued tasks. If the queue is full, data will be dropped.
114+
self._executor = ThreadPoolExecutor(max_workers=max_size)
115+
self._semaphore = threading.BoundedSemaphore(max_size)
116+
117+
def _submit_all(self, upload_data: UploadData) -> None:
118+
def done(future: Future[None]) -> None:
119+
self._semaphore.release()
120+
121+
try:
122+
future.result()
123+
except Exception: # pylint: disable=broad-except
124+
_logger.exception("fsspec uploader failed")
125+
126+
for path, json_encodeable in upload_data.items():
127+
# could not acquire, drop data
128+
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
129+
_logger.warning(
130+
"fsspec upload queue is full, dropping upload %s",
131+
path,
132+
)
133+
return
134+
135+
try:
136+
fut = self._executor.submit(
137+
self._uploader.upload, path, json_encodeable
138+
)
139+
fut.add_done_callback(done)
140+
except RuntimeError:
141+
_logger.info(
142+
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
143+
)
144+
break
145+
146+
def calculate_ref_path(self, completion: Completion) -> CompletionRefs:
147+
"""Generate a path to the reference
148+
149+
The default implementation uses :func:`~uuid.uuid4` to generate a random name per completion.
150+
"""
151+
# TODO: experimental with using the trace_id and span_id, or fetching
152+
# gen_ai.response.id from the active span.
153+
154+
uuid_str = str(uuid4())
155+
return CompletionRefs(
156+
inputs_ref=posixpath.join(
157+
self._base_path, f"{uuid_str}_inputs.json"
158+
),
159+
outputs_ref=posixpath.join(
160+
self._base_path, f"{uuid_str}_outputs.json"
161+
),
162+
system_instruction_ref=posixpath.join(
163+
self._base_path, f"{uuid_str}_system_instruction.json"
164+
),
165+
)
166+
167+
def upload(
168+
self,
169+
*,
170+
inputs: list[types.InputMessage],
171+
outputs: list[types.OutputMessage],
172+
system_instruction: list[types.MessagePart],
173+
span: Span | None = None,
174+
log_record: LogRecord | None = None,
175+
**kwargs: Any,
176+
) -> None:
177+
completion = Completion(
178+
inputs=inputs,
179+
outputs=outputs,
180+
system_instruction=system_instruction,
181+
)
182+
# generate the paths to upload to
183+
ref_names = self.calculate_ref_path(completion)
184+
185+
def to_dict(
186+
dataclass_list: list[types.InputMessage]
187+
| list[types.OutputMessage]
188+
| list[types.MessagePart],
189+
) -> list[dict[str, Any]]:
190+
return [asdict(dc) for dc in dataclass_list]
191+
192+
self._submit_all(
193+
{
194+
ref_names.inputs_ref: partial(to_dict, completion.inputs),
195+
ref_names.outputs_ref: partial(
196+
to_dict, completion.outputs
197+
),
198+
ref_names.system_instruction_ref: partial(
199+
to_dict, completion.system_instruction
200+
),
201+
},
202+
)
203+
204+
# TODO: stamp the refs on telemetry
205+
206+
def shutdown(self) -> None:
207+
# TODO: support timeout
208+
self._executor.shutdown()
209+
210+
def fsspec_upload_hook() -> UploadHook:
211+
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
212+
if not base_path:
213+
return _NoOpUploadHook()
214+
215+
return FsspecUploadHook(
216+
uploader=FsspecUploader(),
217+
base_path=base_path,
218+
)
219+
else:
220+
221+
def fsspec_upload_hook() -> UploadHook:
222+
return _NoOpUploadHook()

util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,27 @@
2121
)
2222
"""
2323
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK
24+
25+
The only known value is ``fsspec``, which
26+
"""
27+
28+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = (
29+
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH"
30+
)
31+
"""
32+
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH
33+
34+
An :func:`fsspec.open` compatible URI/path for uploading prompts and responses. Can be a local
35+
path like ``file:./prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more
36+
information, see
37+
38+
* `Instantiate a file-system
39+
<https://filesystem-spec.readthedocs.io/en/latest/usage.html#instantiate-a-file-system>`_ for supported values and how to
40+
install support for additional backend implementations.
41+
* `Configuration
42+
<https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_ for
43+
configuring a backend with environment variables.
44+
* `URL Chaining
45+
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
46+
use cases.
2447
"""
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pytest==7.4.4
2-
-e opentelemetry-instrumentation
2+
fsspec==2025.9.0
3+
-e opentelemetry-instrumentation

0 commit comments

Comments
 (0)