From 3e49e0fd151555ef5289b692edfbc40fa182313f Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Wed, 2 Jul 2025 23:43:49 -0300 Subject: [PATCH 1/7] feat(tracer): enhance tracing functionality with helper methods for input extraction and logging finalization - Introduced `_extract_function_inputs` to streamline input extraction for logging. - Added `_finalize_step_logging` to encapsulate step timing and logging logic. - Implemented `_handle_trace_completion` for improved trace completion handling. - Enhanced `trace_async` decorator to support both async functions and async generators with optimized logging. - Refactored existing tracing logic to utilize new helper functions for better maintainability and readability. --- src/openlayer/lib/tracing/tracer.py | 390 +++++++++++++++++++--------- 1 file changed, 271 insertions(+), 119 deletions(-) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index bc02ad88..5e2bb4cf 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -143,7 +143,141 @@ def add_chat_completion_step_to_trace(**kwargs) -> None: step.log(**kwargs) +# ----------------------------- Helper functions for tracing ---------------------------- # + +def _extract_function_inputs( + func_signature: inspect.Signature, + func_args: tuple, + func_kwargs: dict, + context_kwarg: Optional[str] = None +) -> dict: + """Extract and clean function inputs for logging.""" + bound = func_signature.bind(*func_args, **func_kwargs) + bound.apply_defaults() + inputs = dict(bound.arguments) + inputs.pop("self", None) + inputs.pop("cls", None) + + # Handle context kwarg if specified + if context_kwarg: + if context_kwarg in inputs: + log_context(inputs.get(context_kwarg)) + else: + logger.warning( + "Context kwarg `%s` not found in inputs of the current function.", + context_kwarg, + ) + + return inputs + +def _finalize_step_logging( + step: steps.Step, + inputs: dict, + output: Any, + start_time: float, + exception: Optional[Exception] = None +) -> None: + """Finalize step timing and logging.""" + if step.end_time is None: + step.end_time = time.time() + if step.latency is None: + step.latency = (step.end_time - start_time) * 1000 # in ms + + step.log( + inputs=inputs, + output=output, + end_time=step.end_time, + latency=step.latency, + ) + +def _handle_trace_completion( + is_root_step: bool, + step_name: str, + inference_pipeline_id: Optional[str] = None +) -> None: + """Handle trace completion and data streaming.""" + if is_root_step: + logger.debug("Ending the trace...") + current_trace = get_current_trace() + trace_data, input_variable_names = post_process_trace(current_trace) + + config = dict( + ConfigLlmData( + output_column_name="output", + input_variable_names=input_variable_names, + latency_column_name="latency", + cost_column_name="cost", + timestamp_column_name="inferenceTimestamp", + inference_id_column_name="inferenceId", + num_of_token_column_name="tokens", + ) + ) + if "groundTruth" in trace_data: + config.update({"ground_truth_column_name": "groundTruth"}) + if "context" in trace_data: + config.update({"context_column_name": "context"}) + + if isinstance(get_current_step(), steps.ChatCompletionStep): + config.update( + { + "prompt": get_current_step().inputs.get("prompt"), + } + ) + if _publish: + try: + _client.inference_pipelines.data.stream( + inference_pipeline_id=inference_pipeline_id + or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), + rows=[trace_data], + config=config, + ) + except Exception as err: # pylint: disable=broad-except + logger.error("Could not stream data to Openlayer %s", err) + else: + logger.debug("Ending step %s", step_name) + +@contextmanager +def _create_step_for_async_generator( + step_name: str, + step_args: tuple, + inference_pipeline_id: Optional[str] = None, + **step_kwargs +) -> Generator[Tuple[steps.Step, bool, Any], None, None]: + """Create and manage step for async generators without interfering with yields.""" + # Create step manually + new_step = steps.step_factory( + step_type=enums.StepType.USER_CALL, + name=step_name, + inputs=None, + output=None, + metadata=None + ) + new_step.start_time = time.time() + + parent_step = get_current_step() + is_root_step = parent_step is None + + if parent_step is None: + logger.debug("Starting a new trace...") + current_trace = traces.Trace() + _current_trace.set(current_trace) + _rag_context.set(None) + current_trace.add_step(new_step) + else: + logger.debug("Adding step %s to parent step %s", step_name, parent_step.name) + current_trace = get_current_trace() + parent_step.add_nested_step(new_step) + + token = _current_step.set(new_step) + + try: + yield new_step, is_root_step, token + finally: + _current_step.reset(token) + _handle_trace_completion(is_root_step, step_name, inference_pipeline_id) + # ----------------------------- Tracing decorator ---------------------------- # + def trace( *step_args, inference_pipeline_id: Optional[str] = None, @@ -193,135 +327,23 @@ def decorator(func): def wrapper(*func_args, **func_kwargs): if step_kwargs.get("name") is None: step_kwargs["name"] = func.__name__ + with create_step( *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs ) as step: output = exception = None try: output = func(*func_args, **func_kwargs) - # pylint: disable=broad-except - except Exception as exc: - step.log(metadata={"Exceptions": str(exc)}) - exception = exc - end_time = time.time() - latency = (end_time - step.start_time) * 1000 # in ms - - bound = func_signature.bind(*func_args, **func_kwargs) - bound.apply_defaults() - inputs = dict(bound.arguments) - inputs.pop("self", None) - inputs.pop("cls", None) - - if context_kwarg: - if context_kwarg in inputs: - log_context(inputs.get(context_kwarg)) - else: - logger.warning( - "Context kwarg `%s` not found in inputs of the " - "current function.", - context_kwarg, - ) - - step.log( - inputs=inputs, - output=output, - end_time=end_time, - latency=latency, - ) - - if exception is not None: - raise exception - return output - - return wrapper - - return decorator - - -def trace_async( - *step_args, - inference_pipeline_id: Optional[str] = None, - context_kwarg: Optional[str] = None, - **step_kwargs, -): - """Decorator to trace a function. - - Examples - -------- - - To trace a function, simply decorate it with the ``@trace()`` decorator. By doing - so, the functions inputs, outputs, and metadata will be automatically logged to your - Openlayer project. - - >>> import os - >>> from openlayer.tracing import tracer - >>> - >>> # Set the environment variables - >>> os.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE" - >>> os.environ["OPENLAYER_PROJECT_NAME"] = "YOUR_OPENLAYER_PROJECT_NAME_HERE" - >>> - >>> # Decorate all the functions you want to trace - >>> @tracer.trace_async() - >>> async def main(user_query: str) -> str: - >>> context = retrieve_context(user_query) - >>> answer = generate_answer(user_query, context) - >>> return answer - >>> - >>> @tracer.trace_async() - >>> def retrieve_context(user_query: str) -> str: - >>> return "Some context" - >>> - >>> @tracer.trace_async() - >>> def generate_answer(user_query: str, context: str) -> str: - >>> return "Some answer" - >>> - >>> # Every time the main function is called, the data is automatically - >>> # streamed to your Openlayer project. E.g.: - >>> tracer.run_async_func(main("What is the meaning of life?")) - """ - - def decorator(func): - func_signature = inspect.signature(func) - - @wraps(func) - async def wrapper(*func_args, **func_kwargs): - if step_kwargs.get("name") is None: - step_kwargs["name"] = func.__name__ - with create_step( - *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs - ) as step: - output = exception = None - try: - output = await func(*func_args, **func_kwargs) - # pylint: disable=broad-except except Exception as exc: step.log(metadata={"Exceptions": str(exc)}) exception = exc - end_time = time.time() - latency = (end_time - step.start_time) * 1000 # in ms - - bound = func_signature.bind(*func_args, **func_kwargs) - bound.apply_defaults() - inputs = dict(bound.arguments) - inputs.pop("self", None) - inputs.pop("cls", None) - - if context_kwarg: - if context_kwarg in inputs: - log_context(inputs.get(context_kwarg)) - else: - logger.warning( - "Context kwarg `%s` not found in inputs of the " - "current function.", - context_kwarg, - ) - - step.log( - inputs=inputs, - output=output, - end_time=end_time, - latency=latency, + + # Extract inputs and finalize logging using optimized helper + inputs = _extract_function_inputs( + func_signature, func_args, func_kwargs, context_kwarg ) + + _finalize_step_logging(step, inputs, output, step.start_time, exception) if exception is not None: raise exception @@ -402,3 +424,133 @@ def post_process_trace( trace_data["context"] = context return trace_data, input_variable_names + + +def trace_async( + *step_args, + inference_pipeline_id: Optional[str] = None, + context_kwarg: Optional[str] = None, + **step_kwargs, +): + """Decorator to trace async functions and async generators. + + This decorator automatically detects whether the function is a regular async function + or an async generator and handles both cases appropriately. + + Examples + -------- + + To trace a regular async function: + + >>> @tracer.trace_async() + >>> async def main(user_query: str) -> str: + >>> context = retrieve_context(user_query) + >>> answer = generate_answer(user_query, context) + >>> return answer + + To trace an async generator function: + + >>> @tracer.trace_async() + >>> async def stream_response(query: str): + >>> async for chunk in openai_client.chat.completions.create(...): + >>> yield chunk.choices[0].delta.content + """ + + def decorator(func): + func_signature = inspect.signature(func) + + if step_kwargs.get("name") is None: + step_kwargs["name"] = func.__name__ + step_name = step_kwargs["name"] + + if asyncio.iscoroutinefunction(func): + # Check if it's specifically an async generator function + if inspect.isasyncgenfunction(func): + # Create a specific async generator wrapper WITHOUT context manager + @wraps(func) + async def async_generator_wrapper(*func_args, **func_kwargs): + with _create_step_for_async_generator( + step_name, step_args, inference_pipeline_id, **step_kwargs + ) as (step, is_root_step, token): + output_chunks = [] + exception = None + + try: + # Execute the async generator function + async_gen = func(*func_args, **func_kwargs) + + # Yield each chunk and collect for logging + async for chunk in async_gen: + output_chunks.append(chunk) + yield chunk # This makes our wrapper an async generator + + except Exception as exc: + step.log(metadata={"Exceptions": str(exc)}) + exception = exc + raise + finally: + # Extract inputs and finalize logging + inputs = _extract_function_inputs( + func_signature, func_args, func_kwargs, context_kwarg + ) + + # Combine chunks for output + output = "".join(str(chunk) for chunk in output_chunks if chunk is not None) + + _finalize_step_logging(step, inputs, output, step.start_time, exception) + + return async_generator_wrapper + else: + # Create wrapper for regular async functions + @wraps(func) + async def async_function_wrapper(*func_args, **func_kwargs): + with create_step( + *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs + ) as step: + output = exception = None + + try: + output = await func(*func_args, **func_kwargs) + except Exception as exc: + step.log(metadata={"Exceptions": str(exc)}) + exception = exc + raise + + # Extract inputs and finalize logging + inputs = _extract_function_inputs( + func_signature, func_args, func_kwargs, context_kwarg + ) + + _finalize_step_logging(step, inputs, output, step.start_time, exception) + + return output + + return async_function_wrapper + else: + # For sync functions, use the existing logic with optimizations + @wraps(func) + def sync_wrapper(*func_args, **func_kwargs): + with create_step( + *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs + ) as step: + output = exception = None + try: + output = func(*func_args, **func_kwargs) + except Exception as exc: + step.log(metadata={"Exceptions": str(exc)}) + exception = exc + + # Extract inputs and finalize logging + inputs = _extract_function_inputs( + func_signature, func_args, func_kwargs, context_kwarg + ) + + _finalize_step_logging(step, inputs, output, step.start_time, exception) + + if exception is not None: + raise exception + return output + + return sync_wrapper + + return decorator From c45b1b1c7a63ea791eb29be9007ee04c0f54d6f4 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Thu, 3 Jul 2025 00:06:20 -0300 Subject: [PATCH 2/7] feat(tracer): implement lazy initialization for Openlayer client - Refactored client initialization logic into a new `_get_client` function for better lazy loading. - Ensured the Openlayer client is only created when needed, improving resource management. - Updated data streaming calls to utilize the new client retrieval method, enhancing code readability and maintainability. --- src/openlayer/lib/tracing/tracer.py | 55 ++++++++++++++++++----------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 5e2bb4cf..dcc356d3 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -24,15 +24,24 @@ utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true" ).lower() in TRUE_LIST _client = None -if _publish: - if _verify_ssl: - _client = Openlayer() - else: - _client = Openlayer( - http_client=DefaultHttpxClient( - verify=False, - ), - ) + +def _get_client() -> Optional[Openlayer]: + """Get or create the Openlayer client with lazy initialization.""" + global _client + if not _publish: + return None + + if _client is None: + # Lazy initialization - create client when first needed + if _verify_ssl: + _client = Openlayer() + else: + _client = Openlayer( + http_client=DefaultHttpxClient( + verify=False, + ), + ) + return _client _current_step = contextvars.ContextVar("current_step") _current_trace = contextvars.ContextVar("current_trace") @@ -122,12 +131,14 @@ def create_step( ) if _publish: try: - _client.inference_pipelines.data.stream( - inference_pipeline_id=inference_pipeline_id - or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), - rows=[trace_data], - config=config, - ) + client = _get_client() + if client: + client.inference_pipelines.data.stream( + inference_pipeline_id=inference_pipeline_id + or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), + rows=[trace_data], + config=config, + ) except Exception as err: # pylint: disable=broad-except logger.error("Could not stream data to Openlayer %s", err) else: @@ -225,12 +236,14 @@ def _handle_trace_completion( ) if _publish: try: - _client.inference_pipelines.data.stream( - inference_pipeline_id=inference_pipeline_id - or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), - rows=[trace_data], - config=config, - ) + client = _get_client() + if client: + client.inference_pipelines.data.stream( + inference_pipeline_id=inference_pipeline_id + or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), + rows=[trace_data], + config=config, + ) except Exception as err: # pylint: disable=broad-except logger.error("Could not stream data to Openlayer %s", err) else: From 68d394482eba5094f6439bc9132e930cdcc53a2f Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 8 Jul 2025 15:01:19 -0300 Subject: [PATCH 3/7] feat(tracer): refactor step creation and logging for improved clarity and maintainability - Introduced `_create_and_initialize_step` to encapsulate step creation and parent-child relationships. - Enhanced `_handle_trace_completion` to streamline trace completion and data streaming logic. - Refactored existing tracing functions to utilize new helper methods, improving code readability. - Added `_log_step_exception` and `_process_wrapper_inputs_and_outputs` for better error handling and input/output processing. - Updated async generator handling to ensure proper tracing during iteration. --- src/openlayer/lib/tracing/tracer.py | 695 +++++++++++++++------------- 1 file changed, 383 insertions(+), 312 deletions(-) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index dcc356d3..f7274a36 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -17,6 +17,8 @@ logger = logging.getLogger(__name__) +# ----------------------------- Module setup and globals ----------------------------- # + TRUE_LIST = ["true", "on", "1"] _publish = utils.get_env_variable("OPENLAYER_DISABLE_PUBLISH") not in TRUE_LIST @@ -47,6 +49,7 @@ def _get_client() -> Optional[Openlayer]: _current_trace = contextvars.ContextVar("current_trace") _rag_context = contextvars.ContextVar("rag_context") +# ----------------------------- Public API functions ----------------------------- # def get_current_trace() -> Optional[traces.Trace]: """Returns the current trace.""" @@ -73,26 +76,13 @@ def create_step( inference_pipeline_id: Optional[str] = None, ) -> Generator[steps.Step, None, None]: """Starts a trace and yields a Step object.""" - new_step: steps.Step = steps.step_factory( - step_type=step_type, name=name, inputs=inputs, output=output, metadata=metadata + new_step, is_root_step, token = _create_and_initialize_step( + step_name=name, + step_type=step_type, + inputs=inputs, + output=output, + metadata=metadata ) - new_step.start_time = time.time() - - parent_step: Optional[steps.Step] = get_current_step() - is_root_step: bool = parent_step is None - - if parent_step is None: - logger.debug("Starting a new trace...") - current_trace = traces.Trace() - _current_trace.set(current_trace) # Set the current trace in context - _rag_context.set(None) # Reset the context - current_trace.add_step(new_step) - else: - logger.debug("Adding step %s to parent step %s", name, parent_step.name) - current_trace = get_current_trace() - parent_step.add_nested_step(new_step) - - token = _current_step.set(new_step) try: yield new_step finally: @@ -103,46 +93,11 @@ def create_step( new_step.latency = latency _current_step.reset(token) - if is_root_step: - logger.debug("Ending the trace...") - trace_data, input_variable_names = post_process_trace(current_trace) - - config = dict( - ConfigLlmData( - output_column_name="output", - input_variable_names=input_variable_names, - latency_column_name="latency", - cost_column_name="cost", - timestamp_column_name="inferenceTimestamp", - inference_id_column_name="inferenceId", - num_of_token_column_name="tokens", - ) - ) - if "groundTruth" in trace_data: - config.update({"ground_truth_column_name": "groundTruth"}) - if "context" in trace_data: - config.update({"context_column_name": "context"}) - - if isinstance(new_step, steps.ChatCompletionStep): - config.update( - { - "prompt": new_step.inputs.get("prompt"), - } - ) - if _publish: - try: - client = _get_client() - if client: - client.inference_pipelines.data.stream( - inference_pipeline_id=inference_pipeline_id - or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), - rows=[trace_data], - config=config, - ) - except Exception as err: # pylint: disable=broad-except - logger.error("Could not stream data to Openlayer %s", err) - else: - logger.debug("Ending step %s", name) + _handle_trace_completion( + is_root_step=is_root_step, + step_name=name, + inference_pipeline_id=inference_pipeline_id + ) def add_chat_completion_step_to_trace(**kwargs) -> None: @@ -154,143 +109,6 @@ def add_chat_completion_step_to_trace(**kwargs) -> None: step.log(**kwargs) -# ----------------------------- Helper functions for tracing ---------------------------- # - -def _extract_function_inputs( - func_signature: inspect.Signature, - func_args: tuple, - func_kwargs: dict, - context_kwarg: Optional[str] = None -) -> dict: - """Extract and clean function inputs for logging.""" - bound = func_signature.bind(*func_args, **func_kwargs) - bound.apply_defaults() - inputs = dict(bound.arguments) - inputs.pop("self", None) - inputs.pop("cls", None) - - # Handle context kwarg if specified - if context_kwarg: - if context_kwarg in inputs: - log_context(inputs.get(context_kwarg)) - else: - logger.warning( - "Context kwarg `%s` not found in inputs of the current function.", - context_kwarg, - ) - - return inputs - -def _finalize_step_logging( - step: steps.Step, - inputs: dict, - output: Any, - start_time: float, - exception: Optional[Exception] = None -) -> None: - """Finalize step timing and logging.""" - if step.end_time is None: - step.end_time = time.time() - if step.latency is None: - step.latency = (step.end_time - start_time) * 1000 # in ms - - step.log( - inputs=inputs, - output=output, - end_time=step.end_time, - latency=step.latency, - ) - -def _handle_trace_completion( - is_root_step: bool, - step_name: str, - inference_pipeline_id: Optional[str] = None -) -> None: - """Handle trace completion and data streaming.""" - if is_root_step: - logger.debug("Ending the trace...") - current_trace = get_current_trace() - trace_data, input_variable_names = post_process_trace(current_trace) - - config = dict( - ConfigLlmData( - output_column_name="output", - input_variable_names=input_variable_names, - latency_column_name="latency", - cost_column_name="cost", - timestamp_column_name="inferenceTimestamp", - inference_id_column_name="inferenceId", - num_of_token_column_name="tokens", - ) - ) - if "groundTruth" in trace_data: - config.update({"ground_truth_column_name": "groundTruth"}) - if "context" in trace_data: - config.update({"context_column_name": "context"}) - - if isinstance(get_current_step(), steps.ChatCompletionStep): - config.update( - { - "prompt": get_current_step().inputs.get("prompt"), - } - ) - if _publish: - try: - client = _get_client() - if client: - client.inference_pipelines.data.stream( - inference_pipeline_id=inference_pipeline_id - or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), - rows=[trace_data], - config=config, - ) - except Exception as err: # pylint: disable=broad-except - logger.error("Could not stream data to Openlayer %s", err) - else: - logger.debug("Ending step %s", step_name) - -@contextmanager -def _create_step_for_async_generator( - step_name: str, - step_args: tuple, - inference_pipeline_id: Optional[str] = None, - **step_kwargs -) -> Generator[Tuple[steps.Step, bool, Any], None, None]: - """Create and manage step for async generators without interfering with yields.""" - # Create step manually - new_step = steps.step_factory( - step_type=enums.StepType.USER_CALL, - name=step_name, - inputs=None, - output=None, - metadata=None - ) - new_step.start_time = time.time() - - parent_step = get_current_step() - is_root_step = parent_step is None - - if parent_step is None: - logger.debug("Starting a new trace...") - current_trace = traces.Trace() - _current_trace.set(current_trace) - _rag_context.set(None) - current_trace.add_step(new_step) - else: - logger.debug("Adding step %s to parent step %s", step_name, parent_step.name) - current_trace = get_current_trace() - parent_step.add_nested_step(new_step) - - token = _current_step.set(new_step) - - try: - yield new_step, is_root_step, token - finally: - _current_step.reset(token) - _handle_trace_completion(is_root_step, step_name, inference_pipeline_id) - -# ----------------------------- Tracing decorator ---------------------------- # - def trace( *step_args, inference_pipeline_id: Optional[str] = None, @@ -348,15 +166,18 @@ def wrapper(*func_args, **func_kwargs): try: output = func(*func_args, **func_kwargs) except Exception as exc: - step.log(metadata={"Exceptions": str(exc)}) + _log_step_exception(step, exc) exception = exc # Extract inputs and finalize logging using optimized helper - inputs = _extract_function_inputs( - func_signature, func_args, func_kwargs, context_kwarg + _process_wrapper_inputs_and_outputs( + step=step, + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg, + output=output ) - - _finalize_step_logging(step, inputs, output, step.start_time, exception) if exception is not None: raise exception @@ -367,78 +188,6 @@ def wrapper(*func_args, **func_kwargs): return decorator -async def _invoke_with_context( - coroutine: Awaitable[Any], -) -> Tuple[contextvars.Context, Any]: - """Runs a coroutine and preserves the context variables set within it.""" - result = await coroutine - context = contextvars.copy_context() - return context, result - - -def run_async_func(coroutine: Awaitable[Any]) -> Any: - """Runs an async function while preserving the context. This is needed - for tracing async functions. - """ - context, result = asyncio.run(_invoke_with_context(coroutine)) - for key, value in context.items(): - key.set(value) - return result - - -def log_context(context: List[str]) -> None: - """Logs context information to the current step of the trace. - - The `context` parameter should be a list of strings representing the - context chunks retrieved by the context retriever.""" - current_step = get_current_step() - if current_step: - _rag_context.set(context) - current_step.log(metadata={"context": context}) - else: - logger.warning("No current step found to log context.") - - -# --------------------- Helper post-processing functions --------------------- # -def post_process_trace( - trace_obj: traces.Trace, -) -> Tuple[Dict[str, Any], List[str]]: - """Post processing of the trace data before uploading to Openlayer. - - This is done to ensure backward compatibility with data on Openlayer. - """ - root_step = trace_obj.steps[0] - - input_variables = root_step.inputs - if input_variables: - input_variable_names = list(input_variables.keys()) - else: - input_variable_names = [] - - processed_steps = trace_obj.to_dict() - - trace_data = { - "inferenceTimestamp": root_step.start_time, - "inferenceId": str(root_step.id), - "output": root_step.output, - "latency": root_step.latency, - "cost": processed_steps[0].get("cost", 0), - "tokens": processed_steps[0].get("tokens", 0), - "steps": processed_steps, - **root_step.metadata, - } - if root_step.ground_truth: - trace_data["groundTruth"] = root_step.ground_truth - if input_variables: - trace_data.update(input_variables) - - context = get_rag_context() - if context: - trace_data["context"] = context - - return trace_data, input_variable_names - - def trace_async( *step_args, inference_pipeline_id: Optional[str] = None, @@ -476,41 +225,76 @@ def decorator(func): step_kwargs["name"] = func.__name__ step_name = step_kwargs["name"] - if asyncio.iscoroutinefunction(func): + if asyncio.iscoroutinefunction(func) or inspect.isasyncgenfunction(func): # Check if it's specifically an async generator function if inspect.isasyncgenfunction(func): - # Create a specific async generator wrapper WITHOUT context manager + # For async generators, use class-based approach to delay trace creation + # until actual iteration begins (not when generator object is created) @wraps(func) - async def async_generator_wrapper(*func_args, **func_kwargs): - with _create_step_for_async_generator( - step_name, step_args, inference_pipeline_id, **step_kwargs - ) as (step, is_root_step, token): - output_chunks = [] - exception = None - - try: - # Execute the async generator function - async_gen = func(*func_args, **func_kwargs) + def async_generator_wrapper(*func_args, **func_kwargs): + class TracedAsyncGenerator: + def __init__(self): + self._original_gen = None + self._step = None + self._is_root_step = False + self._token = None + self._output_chunks = [] + self._trace_initialized = False - # Yield each chunk and collect for logging - async for chunk in async_gen: - output_chunks.append(chunk) - yield chunk # This makes our wrapper an async generator - - except Exception as exc: - step.log(metadata={"Exceptions": str(exc)}) - exception = exc - raise - finally: - # Extract inputs and finalize logging - inputs = _extract_function_inputs( - func_signature, func_args, func_kwargs, context_kwarg - ) + def __aiter__(self): + return self - # Combine chunks for output - output = "".join(str(chunk) for chunk in output_chunks if chunk is not None) + async def __anext__(self): + # Initialize tracing on first iteration only + if not self._trace_initialized: + self._original_gen = func(*func_args, **func_kwargs) + self._step, self._is_root_step, self._token = _create_step_for_async_generator( + step_name=step_name, + inference_pipeline_id=inference_pipeline_id, + **step_kwargs + ) + self._inputs = _extract_function_inputs( + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg + ) + self._trace_initialized = True - _finalize_step_logging(step, inputs, output, step.start_time, exception) + try: + chunk = await self._original_gen.__anext__() + self._output_chunks.append(chunk) + return chunk + except StopAsyncIteration: + # Finalize trace when generator is exhausted + output = _join_output_chunks(self._output_chunks) + _finalize_async_generator_step( + step=self._step, + token=self._token, + is_root_step=self._is_root_step, + step_name=step_name, + inputs=self._inputs, + output=output, + inference_pipeline_id=inference_pipeline_id + ) + raise + except Exception as exc: + # Handle exceptions + if self._step: + _log_step_exception(self._step, exc) + output = _join_output_chunks(self._output_chunks) + _finalize_async_generator_step( + step=self._step, + token=self._token, + is_root_step=self._is_root_step, + step_name=step_name, + inputs=self._inputs, + output=output, + inference_pipeline_id=inference_pipeline_id + ) + raise + + return TracedAsyncGenerator() return async_generator_wrapper else: @@ -525,17 +309,20 @@ async def async_function_wrapper(*func_args, **func_kwargs): try: output = await func(*func_args, **func_kwargs) except Exception as exc: - step.log(metadata={"Exceptions": str(exc)}) + _log_step_exception(step, exc) exception = exc raise # Extract inputs and finalize logging - inputs = _extract_function_inputs( - func_signature, func_args, func_kwargs, context_kwarg + _process_wrapper_inputs_and_outputs( + step=step, + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg, + output=output ) - _finalize_step_logging(step, inputs, output, step.start_time, exception) - return output return async_function_wrapper @@ -550,15 +337,18 @@ def sync_wrapper(*func_args, **func_kwargs): try: output = func(*func_args, **func_kwargs) except Exception as exc: - step.log(metadata={"Exceptions": str(exc)}) + _log_step_exception(step, exc) exception = exc # Extract inputs and finalize logging - inputs = _extract_function_inputs( - func_signature, func_args, func_kwargs, context_kwarg + _process_wrapper_inputs_and_outputs( + step=step, + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg, + output=output ) - - _finalize_step_logging(step, inputs, output, step.start_time, exception) if exception is not None: raise exception @@ -567,3 +357,284 @@ def sync_wrapper(*func_args, **func_kwargs): return sync_wrapper return decorator + + +def log_context(context: List[str]) -> None: + """Logs context information to the current step of the trace. + + The `context` parameter should be a list of strings representing the + context chunks retrieved by the context retriever.""" + current_step = get_current_step() + if current_step: + _rag_context.set(context) + current_step.log(metadata={"context": context}) + else: + logger.warning("No current step found to log context.") + + +def run_async_func(coroutine: Awaitable[Any]) -> Any: + """Runs an async function while preserving the context. This is needed + for tracing async functions. + """ + context, result = asyncio.run(_invoke_with_context(coroutine)) + for key, value in context.items(): + key.set(value) + return result + +# ----------------------------- Helper functions for create_step ----------------------------- # + +def _create_and_initialize_step( + step_name: str, + step_type: enums.StepType = enums.StepType.USER_CALL, + inputs: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> Tuple[steps.Step, bool, Any]: + """Create a new step and initialize trace/parent relationships. + + Returns: + Tuple of (step, is_root_step, token) + """ + new_step = steps.step_factory( + step_type=step_type, + name=step_name, + inputs=inputs, + output=output, + metadata=metadata + ) + new_step.start_time = time.time() + + parent_step = get_current_step() + is_root_step = parent_step is None + + if parent_step is None: + logger.debug("Starting a new trace...") + current_trace = traces.Trace() + _current_trace.set(current_trace) + _rag_context.set(None) + current_trace.add_step(new_step) + else: + logger.debug("Adding step %s to parent step %s", step_name, parent_step.name) + current_trace = get_current_trace() + parent_step.add_nested_step(new_step) + + token = _current_step.set(new_step) + return new_step, is_root_step, token + + +def _handle_trace_completion( + is_root_step: bool, + step_name: str, + inference_pipeline_id: Optional[str] = None +) -> None: + """Handle trace completion and data streaming.""" + if is_root_step: + logger.debug("Ending the trace...") + current_trace = get_current_trace() + trace_data, input_variable_names = post_process_trace(current_trace) + + config = dict( + ConfigLlmData( + output_column_name="output", + input_variable_names=input_variable_names, + latency_column_name="latency", + cost_column_name="cost", + timestamp_column_name="inferenceTimestamp", + inference_id_column_name="inferenceId", + num_of_token_column_name="tokens", + ) + ) + if "groundTruth" in trace_data: + config.update({"ground_truth_column_name": "groundTruth"}) + if "context" in trace_data: + config.update({"context_column_name": "context"}) + + if isinstance(get_current_step(), steps.ChatCompletionStep): + config.update( + { + "prompt": get_current_step().inputs.get("prompt"), + } + ) + if _publish: + try: + client = _get_client() + if client: + client.inference_pipelines.data.stream( + inference_pipeline_id=inference_pipeline_id + or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), + rows=[trace_data], + config=config, + ) + except Exception as err: # pylint: disable=broad-except + logger.error("Could not stream data to Openlayer %s", err) + else: + logger.debug("Ending step %s", step_name) + +# ----------------------------- Helper functions for trace decorators ----------------------------- # + +def _log_step_exception(step: steps.Step, exception: Exception) -> None: + """Log exception metadata to a step.""" + step.log(metadata={"Exceptions": str(exception)}) + + +def _process_wrapper_inputs_and_outputs( + step: steps.Step, + func_signature: inspect.Signature, + func_args: tuple, + func_kwargs: dict, + context_kwarg: Optional[str], + output: Any, +) -> None: + """Extract function inputs and finalize step logging - common pattern across wrappers.""" + inputs = _extract_function_inputs( + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg + ) + _finalize_step_logging( + step=step, + inputs=inputs, + output=output, + start_time=step.start_time + ) + + +def _extract_function_inputs( + func_signature: inspect.Signature, + func_args: tuple, + func_kwargs: dict, + context_kwarg: Optional[str] = None +) -> dict: + """Extract and clean function inputs for logging.""" + bound = func_signature.bind(*func_args, **func_kwargs) + bound.apply_defaults() + inputs = dict(bound.arguments) + inputs.pop("self", None) + inputs.pop("cls", None) + + # Handle context kwarg if specified + if context_kwarg: + if context_kwarg in inputs: + log_context(inputs.get(context_kwarg)) + else: + logger.warning( + "Context kwarg `%s` not found in inputs of the current function.", + context_kwarg, + ) + + return inputs + + +def _finalize_step_logging( + step: steps.Step, + inputs: dict, + output: Any, + start_time: float, +) -> None: + """Finalize step timing and logging.""" + if step.end_time is None: + step.end_time = time.time() + if step.latency is None: + step.latency = (step.end_time - start_time) * 1000 # in ms + + step.log( + inputs=inputs, + output=output, + end_time=step.end_time, + latency=step.latency, + ) + +# ----------------------------- Async generator specific functions ----------------------------- # + +def _create_step_for_async_generator( + step_name: str, + inference_pipeline_id: Optional[str] = None, + **step_kwargs +) -> Tuple[steps.Step, bool, Any]: + """Create and initialize step for async generators - no context manager.""" + return _create_and_initialize_step( + step_name=step_name, + step_type=enums.StepType.USER_CALL, + inputs=None, + output=None, + metadata=None + ) + + +def _finalize_async_generator_step( + step: steps.Step, + token: Any, + is_root_step: bool, + step_name: str, + inputs: dict, + output: Any, + inference_pipeline_id: Optional[str] = None, +) -> None: + """Finalize async generator step - called when generator is consumed.""" + _current_step.reset(token) + _finalize_step_logging( + step=step, + inputs=inputs, + output=output, + start_time=step.start_time + ) + _handle_trace_completion( + is_root_step=is_root_step, + step_name=step_name, + inference_pipeline_id=inference_pipeline_id + ) + + +def _join_output_chunks(output_chunks: List[Any]) -> str: + """Join output chunks into a single string, filtering out None values.""" + return "".join(str(chunk) for chunk in output_chunks if chunk is not None) + +# ----------------------------- Utility functions ----------------------------- # + +async def _invoke_with_context( + coroutine: Awaitable[Any], +) -> Tuple[contextvars.Context, Any]: + """Runs a coroutine and preserves the context variables set within it.""" + result = await coroutine + context = contextvars.copy_context() + return context, result + + +def post_process_trace( + trace_obj: traces.Trace, +) -> Tuple[Dict[str, Any], List[str]]: + """Post processing of the trace data before uploading to Openlayer. + + This is done to ensure backward compatibility with data on Openlayer. + """ + root_step = trace_obj.steps[0] + + input_variables = root_step.inputs + if input_variables: + input_variable_names = list(input_variables.keys()) + else: + input_variable_names = [] + + processed_steps = trace_obj.to_dict() + + trace_data = { + "inferenceTimestamp": root_step.start_time, + "inferenceId": str(root_step.id), + "output": root_step.output, + "latency": root_step.latency, + "cost": processed_steps[0].get("cost", 0), + "tokens": processed_steps[0].get("tokens", 0), + "steps": processed_steps, + **root_step.metadata, + } + if root_step.ground_truth: + trace_data["groundTruth"] = root_step.ground_truth + if input_variables: + trace_data.update(input_variables) + + context = get_rag_context() + if context: + trace_data["context"] = context + + return trace_data, input_variable_names From 29578cd55544b7c26f5353c3b3c21d41a0badb6d Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 8 Jul 2025 17:57:51 -0300 Subject: [PATCH 4/7] refactor(tracer): streamline code formatting and improve readability --- src/openlayer/lib/tracing/tracer.py | 143 ++++++++++------------------ 1 file changed, 51 insertions(+), 92 deletions(-) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index f7274a36..65414ca5 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -22,17 +22,16 @@ TRUE_LIST = ["true", "on", "1"] _publish = utils.get_env_variable("OPENLAYER_DISABLE_PUBLISH") not in TRUE_LIST -_verify_ssl = ( - utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true" -).lower() in TRUE_LIST +_verify_ssl = (utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true").lower() in TRUE_LIST _client = None + def _get_client() -> Optional[Openlayer]: """Get or create the Openlayer client with lazy initialization.""" global _client if not _publish: return None - + if _client is None: # Lazy initialization - create client when first needed if _verify_ssl: @@ -45,12 +44,14 @@ def _get_client() -> Optional[Openlayer]: ) return _client + _current_step = contextvars.ContextVar("current_step") _current_trace = contextvars.ContextVar("current_trace") _rag_context = contextvars.ContextVar("rag_context") # ----------------------------- Public API functions ----------------------------- # + def get_current_trace() -> Optional[traces.Trace]: """Returns the current trace.""" return _current_trace.get(None) @@ -77,11 +78,7 @@ def create_step( ) -> Generator[steps.Step, None, None]: """Starts a trace and yields a Step object.""" new_step, is_root_step, token = _create_and_initialize_step( - step_name=name, - step_type=step_type, - inputs=inputs, - output=output, - metadata=metadata + step_name=name, step_type=step_type, inputs=inputs, output=output, metadata=metadata ) try: yield new_step @@ -93,11 +90,7 @@ def create_step( new_step.latency = latency _current_step.reset(token) - _handle_trace_completion( - is_root_step=is_root_step, - step_name=name, - inference_pipeline_id=inference_pipeline_id - ) + _handle_trace_completion(is_root_step=is_root_step, step_name=name, inference_pipeline_id=inference_pipeline_id) def add_chat_completion_step_to_trace(**kwargs) -> None: @@ -158,17 +151,15 @@ def decorator(func): def wrapper(*func_args, **func_kwargs): if step_kwargs.get("name") is None: step_kwargs["name"] = func.__name__ - - with create_step( - *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs - ) as step: + + with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step: output = exception = None try: output = func(*func_args, **func_kwargs) except Exception as exc: _log_step_exception(step, exc) exception = exc - + # Extract inputs and finalize logging using optimized helper _process_wrapper_inputs_and_outputs( step=step, @@ -176,7 +167,7 @@ def wrapper(*func_args, **func_kwargs): func_args=func_args, func_kwargs=func_kwargs, context_kwarg=context_kwarg, - output=output + output=output, ) if exception is not None: @@ -220,7 +211,7 @@ def trace_async( def decorator(func): func_signature = inspect.signature(func) - + if step_kwargs.get("name") is None: step_kwargs["name"] = func.__name__ step_name = step_kwargs["name"] @@ -240,27 +231,25 @@ def __init__(self): self._token = None self._output_chunks = [] self._trace_initialized = False - + def __aiter__(self): return self - + async def __anext__(self): # Initialize tracing on first iteration only if not self._trace_initialized: self._original_gen = func(*func_args, **func_kwargs) self._step, self._is_root_step, self._token = _create_step_for_async_generator( - step_name=step_name, - inference_pipeline_id=inference_pipeline_id, - **step_kwargs + step_name=step_name, inference_pipeline_id=inference_pipeline_id, **step_kwargs ) self._inputs = _extract_function_inputs( func_signature=func_signature, func_args=func_args, func_kwargs=func_kwargs, - context_kwarg=context_kwarg + context_kwarg=context_kwarg, ) self._trace_initialized = True - + try: chunk = await self._original_gen.__anext__() self._output_chunks.append(chunk) @@ -275,7 +264,7 @@ async def __anext__(self): step_name=step_name, inputs=self._inputs, output=output, - inference_pipeline_id=inference_pipeline_id + inference_pipeline_id=inference_pipeline_id, ) raise except Exception as exc: @@ -290,29 +279,27 @@ async def __anext__(self): step_name=step_name, inputs=self._inputs, output=output, - inference_pipeline_id=inference_pipeline_id + inference_pipeline_id=inference_pipeline_id, ) raise - + return TracedAsyncGenerator() - + return async_generator_wrapper else: # Create wrapper for regular async functions @wraps(func) async def async_function_wrapper(*func_args, **func_kwargs): - with create_step( - *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs - ) as step: + with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step: output = exception = None - + try: output = await func(*func_args, **func_kwargs) except Exception as exc: _log_step_exception(step, exc) exception = exc raise - + # Extract inputs and finalize logging _process_wrapper_inputs_and_outputs( step=step, @@ -320,26 +307,24 @@ async def async_function_wrapper(*func_args, **func_kwargs): func_args=func_args, func_kwargs=func_kwargs, context_kwarg=context_kwarg, - output=output + output=output, ) - + return output - + return async_function_wrapper else: # For sync functions, use the existing logic with optimizations @wraps(func) def sync_wrapper(*func_args, **func_kwargs): - with create_step( - *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs - ) as step: + with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step: output = exception = None try: output = func(*func_args, **func_kwargs) except Exception as exc: _log_step_exception(step, exc) exception = exc - + # Extract inputs and finalize logging _process_wrapper_inputs_and_outputs( step=step, @@ -347,7 +332,7 @@ def sync_wrapper(*func_args, **func_kwargs): func_args=func_args, func_kwargs=func_kwargs, context_kwarg=context_kwarg, - output=output + output=output, ) if exception is not None: @@ -381,8 +366,10 @@ def run_async_func(coroutine: Awaitable[Any]) -> Any: key.set(value) return result + # ----------------------------- Helper functions for create_step ----------------------------- # + def _create_and_initialize_step( step_name: str, step_type: enums.StepType = enums.StepType.USER_CALL, @@ -391,17 +378,11 @@ def _create_and_initialize_step( metadata: Optional[Dict[str, Any]] = None, ) -> Tuple[steps.Step, bool, Any]: """Create a new step and initialize trace/parent relationships. - + Returns: Tuple of (step, is_root_step, token) """ - new_step = steps.step_factory( - step_type=step_type, - name=step_name, - inputs=inputs, - output=output, - metadata=metadata - ) + new_step = steps.step_factory(step_type=step_type, name=step_name, inputs=inputs, output=output, metadata=metadata) new_step.start_time = time.time() parent_step = get_current_step() @@ -422,11 +403,7 @@ def _create_and_initialize_step( return new_step, is_root_step, token -def _handle_trace_completion( - is_root_step: bool, - step_name: str, - inference_pipeline_id: Optional[str] = None -) -> None: +def _handle_trace_completion(is_root_step: bool, step_name: str, inference_pipeline_id: Optional[str] = None) -> None: """Handle trace completion and data streaming.""" if is_root_step: logger.debug("Ending the trace...") @@ -470,8 +447,10 @@ def _handle_trace_completion( else: logger.debug("Ending step %s", step_name) + # ----------------------------- Helper functions for trace decorators ----------------------------- # + def _log_step_exception(step: steps.Step, exception: Exception) -> None: """Log exception metadata to a step.""" step.log(metadata={"Exceptions": str(exception)}) @@ -487,24 +466,13 @@ def _process_wrapper_inputs_and_outputs( ) -> None: """Extract function inputs and finalize step logging - common pattern across wrappers.""" inputs = _extract_function_inputs( - func_signature=func_signature, - func_args=func_args, - func_kwargs=func_kwargs, - context_kwarg=context_kwarg - ) - _finalize_step_logging( - step=step, - inputs=inputs, - output=output, - start_time=step.start_time + func_signature=func_signature, func_args=func_args, func_kwargs=func_kwargs, context_kwarg=context_kwarg ) + _finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time) def _extract_function_inputs( - func_signature: inspect.Signature, - func_args: tuple, - func_kwargs: dict, - context_kwarg: Optional[str] = None + func_signature: inspect.Signature, func_args: tuple, func_kwargs: dict, context_kwarg: Optional[str] = None ) -> dict: """Extract and clean function inputs for logging.""" bound = func_signature.bind(*func_args, **func_kwargs) @@ -512,7 +480,7 @@ def _extract_function_inputs( inputs = dict(bound.arguments) inputs.pop("self", None) inputs.pop("cls", None) - + # Handle context kwarg if specified if context_kwarg: if context_kwarg in inputs: @@ -522,7 +490,7 @@ def _extract_function_inputs( "Context kwarg `%s` not found in inputs of the current function.", context_kwarg, ) - + return inputs @@ -537,7 +505,7 @@ def _finalize_step_logging( step.end_time = time.time() if step.latency is None: step.latency = (step.end_time - start_time) * 1000 # in ms - + step.log( inputs=inputs, output=output, @@ -545,20 +513,16 @@ def _finalize_step_logging( latency=step.latency, ) + # ----------------------------- Async generator specific functions ----------------------------- # + def _create_step_for_async_generator( - step_name: str, - inference_pipeline_id: Optional[str] = None, - **step_kwargs + step_name: str, inference_pipeline_id: Optional[str] = None, **step_kwargs ) -> Tuple[steps.Step, bool, Any]: """Create and initialize step for async generators - no context manager.""" return _create_and_initialize_step( - step_name=step_name, - step_type=enums.StepType.USER_CALL, - inputs=None, - output=None, - metadata=None + step_name=step_name, step_type=enums.StepType.USER_CALL, inputs=None, output=None, metadata=None ) @@ -573,16 +537,9 @@ def _finalize_async_generator_step( ) -> None: """Finalize async generator step - called when generator is consumed.""" _current_step.reset(token) - _finalize_step_logging( - step=step, - inputs=inputs, - output=output, - start_time=step.start_time - ) + _finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time) _handle_trace_completion( - is_root_step=is_root_step, - step_name=step_name, - inference_pipeline_id=inference_pipeline_id + is_root_step=is_root_step, step_name=step_name, inference_pipeline_id=inference_pipeline_id ) @@ -590,8 +547,10 @@ def _join_output_chunks(output_chunks: List[Any]) -> str: """Join output chunks into a single string, filtering out None values.""" return "".join(str(chunk) for chunk in output_chunks if chunk is not None) + # ----------------------------- Utility functions ----------------------------- # + async def _invoke_with_context( coroutine: Awaitable[Any], ) -> Tuple[contextvars.Context, Any]: From 6e64eac1da962ceae62af27302d61e5148063a80 Mon Sep 17 00:00:00 2001 From: Gustavo Cid Date: Wed, 9 Jul 2025 11:04:31 -0300 Subject: [PATCH 5/7] fix: update client retrieval for LangChain callback handler --- .../lib/integrations/langchain_callback.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/openlayer/lib/integrations/langchain_callback.py b/src/openlayer/lib/integrations/langchain_callback.py index d476dfb5..8f5dfd3f 100644 --- a/src/openlayer/lib/integrations/langchain_callback.py +++ b/src/openlayer/lib/integrations/langchain_callback.py @@ -163,13 +163,15 @@ def _process_and_upload_trace(self, root_step: steps.Step) -> None: if tracer._publish: try: - tracer._client.inference_pipelines.data.stream( - inference_pipeline_id=utils.get_env_variable( - "OPENLAYER_INFERENCE_PIPELINE_ID" - ), - rows=[trace_data], - config=config, - ) + client = tracer._get_client() + if client: + client.inference_pipelines.data.stream( + inference_pipeline_id=utils.get_env_variable( + "OPENLAYER_INFERENCE_PIPELINE_ID" + ), + rows=[trace_data], + config=config, + ) except Exception as err: # pylint: disable=broad-except tracer.logger.error("Could not stream data to Openlayer %s", err) From 999479f27fe074b260e2fb4aa957fde3e10129b2 Mon Sep 17 00:00:00 2001 From: Gustavo Cid Date: Wed, 9 Jul 2025 11:05:15 -0300 Subject: [PATCH 6/7] chore: format file --- src/openlayer/lib/tracing/tracer.py | 80 +++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 16 deletions(-) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 65414ca5..37767446 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -22,7 +22,9 @@ TRUE_LIST = ["true", "on", "1"] _publish = utils.get_env_variable("OPENLAYER_DISABLE_PUBLISH") not in TRUE_LIST -_verify_ssl = (utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true").lower() in TRUE_LIST +_verify_ssl = ( + utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true" +).lower() in TRUE_LIST _client = None @@ -78,7 +80,11 @@ def create_step( ) -> Generator[steps.Step, None, None]: """Starts a trace and yields a Step object.""" new_step, is_root_step, token = _create_and_initialize_step( - step_name=name, step_type=step_type, inputs=inputs, output=output, metadata=metadata + step_name=name, + step_type=step_type, + inputs=inputs, + output=output, + metadata=metadata, ) try: yield new_step @@ -90,7 +96,11 @@ def create_step( new_step.latency = latency _current_step.reset(token) - _handle_trace_completion(is_root_step=is_root_step, step_name=name, inference_pipeline_id=inference_pipeline_id) + _handle_trace_completion( + is_root_step=is_root_step, + step_name=name, + inference_pipeline_id=inference_pipeline_id, + ) def add_chat_completion_step_to_trace(**kwargs) -> None: @@ -152,7 +162,9 @@ def wrapper(*func_args, **func_kwargs): if step_kwargs.get("name") is None: step_kwargs["name"] = func.__name__ - with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step: + with create_step( + *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs + ) as step: output = exception = None try: output = func(*func_args, **func_kwargs) @@ -239,8 +251,12 @@ async def __anext__(self): # Initialize tracing on first iteration only if not self._trace_initialized: self._original_gen = func(*func_args, **func_kwargs) - self._step, self._is_root_step, self._token = _create_step_for_async_generator( - step_name=step_name, inference_pipeline_id=inference_pipeline_id, **step_kwargs + self._step, self._is_root_step, self._token = ( + _create_step_for_async_generator( + step_name=step_name, + inference_pipeline_id=inference_pipeline_id, + **step_kwargs, + ) ) self._inputs = _extract_function_inputs( func_signature=func_signature, @@ -290,7 +306,11 @@ async def __anext__(self): # Create wrapper for regular async functions @wraps(func) async def async_function_wrapper(*func_args, **func_kwargs): - with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step: + with create_step( + *step_args, + inference_pipeline_id=inference_pipeline_id, + **step_kwargs, + ) as step: output = exception = None try: @@ -317,7 +337,11 @@ async def async_function_wrapper(*func_args, **func_kwargs): # For sync functions, use the existing logic with optimizations @wraps(func) def sync_wrapper(*func_args, **func_kwargs): - with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step: + with create_step( + *step_args, + inference_pipeline_id=inference_pipeline_id, + **step_kwargs, + ) as step: output = exception = None try: output = func(*func_args, **func_kwargs) @@ -382,7 +406,13 @@ def _create_and_initialize_step( Returns: Tuple of (step, is_root_step, token) """ - new_step = steps.step_factory(step_type=step_type, name=step_name, inputs=inputs, output=output, metadata=metadata) + new_step = steps.step_factory( + step_type=step_type, + name=step_name, + inputs=inputs, + output=output, + metadata=metadata, + ) new_step.start_time = time.time() parent_step = get_current_step() @@ -403,7 +433,9 @@ def _create_and_initialize_step( return new_step, is_root_step, token -def _handle_trace_completion(is_root_step: bool, step_name: str, inference_pipeline_id: Optional[str] = None) -> None: +def _handle_trace_completion( + is_root_step: bool, step_name: str, inference_pipeline_id: Optional[str] = None +) -> None: """Handle trace completion and data streaming.""" if is_root_step: logger.debug("Ending the trace...") @@ -466,13 +498,21 @@ def _process_wrapper_inputs_and_outputs( ) -> None: """Extract function inputs and finalize step logging - common pattern across wrappers.""" inputs = _extract_function_inputs( - func_signature=func_signature, func_args=func_args, func_kwargs=func_kwargs, context_kwarg=context_kwarg + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg, + ) + _finalize_step_logging( + step=step, inputs=inputs, output=output, start_time=step.start_time ) - _finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time) def _extract_function_inputs( - func_signature: inspect.Signature, func_args: tuple, func_kwargs: dict, context_kwarg: Optional[str] = None + func_signature: inspect.Signature, + func_args: tuple, + func_kwargs: dict, + context_kwarg: Optional[str] = None, ) -> dict: """Extract and clean function inputs for logging.""" bound = func_signature.bind(*func_args, **func_kwargs) @@ -522,7 +562,11 @@ def _create_step_for_async_generator( ) -> Tuple[steps.Step, bool, Any]: """Create and initialize step for async generators - no context manager.""" return _create_and_initialize_step( - step_name=step_name, step_type=enums.StepType.USER_CALL, inputs=None, output=None, metadata=None + step_name=step_name, + step_type=enums.StepType.USER_CALL, + inputs=None, + output=None, + metadata=None, ) @@ -537,9 +581,13 @@ def _finalize_async_generator_step( ) -> None: """Finalize async generator step - called when generator is consumed.""" _current_step.reset(token) - _finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time) + _finalize_step_logging( + step=step, inputs=inputs, output=output, start_time=step.start_time + ) _handle_trace_completion( - is_root_step=is_root_step, step_name=step_name, inference_pipeline_id=inference_pipeline_id + is_root_step=is_root_step, + step_name=step_name, + inference_pipeline_id=inference_pipeline_id, ) From 619ac01a4e00f2c38843ca964caafb971f99be87 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Wed, 9 Jul 2025 11:55:02 -0300 Subject: [PATCH 7/7] refactor(tracer): simplify async step creation by consolidating functions - Replaced `_create_step_for_async_generator` with a direct call to `_create_and_initialize_step` to streamline async step creation. - Updated the parameters for step initialization to enhance clarity and maintainability. - Improved overall code readability by reducing function complexity. --- src/openlayer/lib/tracing/tracer.py | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 37767446..d27771ad 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -251,12 +251,12 @@ async def __anext__(self): # Initialize tracing on first iteration only if not self._trace_initialized: self._original_gen = func(*func_args, **func_kwargs) - self._step, self._is_root_step, self._token = ( - _create_step_for_async_generator( - step_name=step_name, - inference_pipeline_id=inference_pipeline_id, - **step_kwargs, - ) + self._step, self._is_root_step, self._token = _create_and_initialize_step( + step_name=step_name, + step_type=enums.StepType.USER_CALL, + inputs=None, + output=None, + metadata=None, ) self._inputs = _extract_function_inputs( func_signature=func_signature, @@ -557,18 +557,6 @@ def _finalize_step_logging( # ----------------------------- Async generator specific functions ----------------------------- # -def _create_step_for_async_generator( - step_name: str, inference_pipeline_id: Optional[str] = None, **step_kwargs -) -> Tuple[steps.Step, bool, Any]: - """Create and initialize step for async generators - no context manager.""" - return _create_and_initialize_step( - step_name=step_name, - step_type=enums.StepType.USER_CALL, - inputs=None, - output=None, - metadata=None, - ) - def _finalize_async_generator_step( step: steps.Step,