2626from functools import partial
2727from os import environ
2828from time import time
29- from typing import Any , Callable , Final , Literal , TextIO , cast
29+ from typing import Any , Callable , Final , Literal
3030from uuid import uuid4
3131
3232import fsspec
@@ -78,22 +78,17 @@ class CompletionRefs:
7878UploadData = dict [str , Callable [[], JsonEncodeable ]]
7979
8080
81- def fsspec_open (urlpath : str , mode : Literal ["w" ]) -> TextIO :
82- """typed wrapper around `fsspec.open`"""
83- return cast (TextIO , fsspec .open (urlpath , mode )) # pyright: ignore[reportUnknownMemberType]
84-
85-
86- class FsspecUploadCompletionHook (CompletionHook ):
81+ class UploadCompletionHook (CompletionHook ):
8782 """An completion hook using ``fsspec`` to upload to external storage
8883
8984 This function can be used as the
9085 :func:`~opentelemetry.util.genai.completion_hook.load_completion_hook` implementation by
91- setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``fsspec_upload ``.
86+ setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``upload ``.
9287 :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
9388 base path for uploads.
9489
9590 Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
96- implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec ]``
91+ implementation will be used instead. You can use ``opentelemetry-util-genai[upload ]``
9792 as a requirement to achieve this.
9893 """
9994
@@ -104,8 +99,9 @@ def __init__(
10499 max_size : int = 20 ,
105100 upload_format : Format | None = None ,
106101 ) -> None :
107- self ._base_path = base_path
108102 self ._max_size = max_size
103+ self ._fs , base_path = fsspec .url_to_fs (base_path )
104+ self ._base_path = self ._fs .unstrip_protocol (base_path )
109105
110106 if upload_format not in _FORMATS + (None ,):
111107 raise ValueError (
@@ -133,15 +129,15 @@ def done(future: Future[None]) -> None:
133129 try :
134130 future .result ()
135131 except Exception : # pylint: disable=broad-except
136- _logger .exception ("fsspec uploader failed" )
132+ _logger .exception ("uploader failed" )
137133 finally :
138134 self ._semaphore .release ()
139135
140136 for path , json_encodeable in upload_data .items ():
141137 # could not acquire, drop data
142138 if not self ._semaphore .acquire (blocking = False ): # pylint: disable=consider-using-with
143139 _logger .warning (
144- "fsspec upload queue is full, dropping upload %s" ,
140+ "upload queue is full, dropping upload %s" ,
145141 path ,
146142 )
147143 continue
@@ -153,7 +149,7 @@ def done(future: Future[None]) -> None:
153149 fut .add_done_callback (done )
154150 except RuntimeError :
155151 _logger .info (
156- "attempting to upload file after FsspecUploadCompletionHook .shutdown() was already called"
152+ "attempting to upload file after UploadCompletionHook .shutdown() was already called"
157153 )
158154 self ._semaphore .release ()
159155
@@ -188,7 +184,13 @@ def _do_upload(
188184 for message_idx , line in enumerate (message_lines ):
189185 line [_MESSAGE_INDEX_KEY ] = message_idx
190186
191- with fsspec_open (path , "w" ) as file :
187+ content_type = (
188+ "application/json"
189+ if self ._format == "json"
190+ else "application/jsonl"
191+ )
192+
193+ with self ._fs .open (path , "w" , content_type = content_type ) as file :
192194 for message in message_lines :
193195 json .dump (
194196 message ,
0 commit comments