Skip to content

Commit

Permalink
(fix) Langfuse key based logging (#6372)
Browse files Browse the repository at this point in the history
* langfuse use helper for get_langfuse_logging_config

* fix get_langfuse_logger_for_request

* fix import

* fix get_langfuse_logger_for_request

* test_get_langfuse_logger_for_request_with_dynamic_params

* unit testing for test_get_langfuse_logger_for_request_with_no_dynamic_params

* parameterized langfuse testing

* fix langfuse test

* fix langfuse logging

* fix test_aaalangfuse_logging_metadata

* fix langfuse log metadata test

* fix langfuse logger

* use create_langfuse_logger_from_credentials

* fix test_get_langfuse_logger_for_request_with_no_dynamic_params

* fix correct langfuse/ folder structure

* use static methods for langfuse logger

* add commment on langfuse handler

* fix linting error

* add unit testing for langfuse logging

* fix linting

* fix failure handler langfuse
  • Loading branch information
ishaan-jaff authored Oct 23, 2024
1 parent b70147f commit 72a91ea
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import inspect
import os
import traceback
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional

from packaging.version import Version
from pydantic import BaseModel
Expand All @@ -16,6 +16,11 @@
from litellm.types.integrations.langfuse import *
from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload

if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import DynamicLoggingCache
else:
DynamicLoggingCache = Any


class LangFuseLogger:
# Class variables or attributes
Expand Down Expand Up @@ -798,53 +803,3 @@ def log_requester_metadata(clean_metadata: dict):
returned_metadata.update({"requester_metadata": requester_metadata})

return returned_metadata


def get_langfuse_logging_config(
standard_callback_dynamic_params: StandardCallbackDynamicParams,
globalLangfuseLogger: Optional[LangFuseLogger] = None,
) -> LangfuseLoggingConfig:
"""
This function is used to get the Langfuse logging config for the Langfuse Logger.
It checks if the dynamic parameters are provided in the standard_callback_dynamic_params and uses them to get the Langfuse logging config.
If no dynamic parameters are provided, it uses the default values.
"""
# only use dynamic params if langfuse credentials are passed dynamically
if _dynamic_langfuse_credentials_are_passed(standard_callback_dynamic_params):
return LangfuseLoggingConfig(
langfuse_secret=standard_callback_dynamic_params.get("langfuse_secret")
or standard_callback_dynamic_params.get("langfuse_secret_key"),
langfuse_public_key=standard_callback_dynamic_params.get(
"langfuse_public_key"
),
langfuse_host=standard_callback_dynamic_params.get("langfuse_host"),
)
elif globalLangfuseLogger is not None:
return LangfuseLoggingConfig(
langfuse_secret=globalLangfuseLogger.secret_key,
langfuse_public_key=globalLangfuseLogger.public_key,
langfuse_host=globalLangfuseLogger.langfuse_host,
)
raise ValueError(
"`langfuse` set as a callback but globalLangfuseLogger is not provided and dynamic params are not passed"
)


def _dynamic_langfuse_credentials_are_passed(
standard_callback_dynamic_params: StandardCallbackDynamicParams,
) -> bool:
"""
This function is used to check if the dynamic langfuse credentials are passed in standard_callback_dynamic_params
Returns:
bool: True if the dynamic langfuse credentials are passed, False otherwise
"""
if (
standard_callback_dynamic_params.get("langfuse_host") is not None
or standard_callback_dynamic_params.get("langfuse_public_key") is not None
or standard_callback_dynamic_params.get("langfuse_secret") is not None
or standard_callback_dynamic_params.get("langfuse_secret_key") is not None
):
return True
return False
168 changes: 168 additions & 0 deletions litellm/integrations/langfuse/langfuse_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
This file contains the LangFuseHandler class
Used to get the LangFuseLogger for a given request
Handles Key/Team Based Langfuse Logging
"""

from typing import TYPE_CHECKING, Any, Dict, Optional

from litellm.litellm_core_utils.litellm_logging import StandardCallbackDynamicParams

from .langfuse import LangFuseLogger, LangfuseLoggingConfig

if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import DynamicLoggingCache
else:
DynamicLoggingCache = Any


class LangFuseHandler:

@staticmethod
def get_langfuse_logger_for_request(
standard_callback_dynamic_params: StandardCallbackDynamicParams,
in_memory_dynamic_logger_cache: DynamicLoggingCache,
globalLangfuseLogger: Optional[LangFuseLogger] = None,
) -> LangFuseLogger:
"""
This function is used to get the LangFuseLogger for a given request
1. If dynamic credentials are passed
- check if a LangFuseLogger is cached for the dynamic credentials
- if cached LangFuseLogger is not found, create a new LangFuseLogger and cache it
2. If dynamic credentials are not passed return the globalLangfuseLogger
"""
temp_langfuse_logger: Optional[LangFuseLogger] = globalLangfuseLogger
if (
LangFuseHandler._dynamic_langfuse_credentials_are_passed(
standard_callback_dynamic_params
)
is False
):
return LangFuseHandler._return_global_langfuse_logger(
globalLangfuseLogger=globalLangfuseLogger,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
)

# get langfuse logging config to use for this request, based on standard_callback_dynamic_params
_credentials = LangFuseHandler.get_dynamic_langfuse_logging_config(
globalLangfuseLogger=globalLangfuseLogger,
standard_callback_dynamic_params=standard_callback_dynamic_params,
)
credentials_dict = dict(_credentials)

# check if langfuse logger is already cached
temp_langfuse_logger = in_memory_dynamic_logger_cache.get_cache(
credentials=credentials_dict, service_name="langfuse"
)

# if not cached, create a new langfuse logger and cache it
if temp_langfuse_logger is None:
temp_langfuse_logger = (
LangFuseHandler._create_langfuse_logger_from_credentials(
credentials=credentials_dict,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
)
)

return temp_langfuse_logger

@staticmethod
def _return_global_langfuse_logger(
globalLangfuseLogger: Optional[LangFuseLogger],
in_memory_dynamic_logger_cache: DynamicLoggingCache,
) -> LangFuseLogger:
"""
Returns the Global LangfuseLogger set on litellm
(this is the default langfuse logger - used when no dynamic credentials are passed)
If no Global LangfuseLogger is set, it will check in_memory_dynamic_logger_cache for a cached LangFuseLogger
This function is used to return the globalLangfuseLogger if it exists, otherwise it will check in_memory_dynamic_logger_cache for a cached LangFuseLogger
"""
if globalLangfuseLogger is not None:
return globalLangfuseLogger

credentials_dict: Dict[str, Any] = (
{}
) # the global langfuse logger uses Environment Variables, there are no dynamic credentials
globalLangfuseLogger = in_memory_dynamic_logger_cache.get_cache(
credentials=credentials_dict,
service_name="langfuse",
)
if globalLangfuseLogger is None:
globalLangfuseLogger = (
LangFuseHandler._create_langfuse_logger_from_credentials(
credentials=credentials_dict,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
)
)
return globalLangfuseLogger

@staticmethod
def _create_langfuse_logger_from_credentials(
credentials: Dict,
in_memory_dynamic_logger_cache: DynamicLoggingCache,
) -> LangFuseLogger:
"""
This function is used to
1. create a LangFuseLogger from the credentials
2. cache the LangFuseLogger to prevent re-creating it for the same credentials
"""

langfuse_logger = LangFuseLogger(
langfuse_public_key=credentials.get("langfuse_public_key"),
langfuse_secret=credentials.get("langfuse_secret"),
langfuse_host=credentials.get("langfuse_host"),
)
in_memory_dynamic_logger_cache.set_cache(
credentials=credentials,
service_name="langfuse",
logging_obj=langfuse_logger,
)
return langfuse_logger

@staticmethod
def get_dynamic_langfuse_logging_config(
standard_callback_dynamic_params: StandardCallbackDynamicParams,
globalLangfuseLogger: Optional[LangFuseLogger] = None,
) -> LangfuseLoggingConfig:
"""
This function is used to get the Langfuse logging config to use for a given request.
It checks if the dynamic parameters are provided in the standard_callback_dynamic_params and uses them to get the Langfuse logging config.
If no dynamic parameters are provided, it uses the `globalLangfuseLogger` values
"""
# only use dynamic params if langfuse credentials are passed dynamically
return LangfuseLoggingConfig(
langfuse_secret=standard_callback_dynamic_params.get("langfuse_secret")
or standard_callback_dynamic_params.get("langfuse_secret_key"),
langfuse_public_key=standard_callback_dynamic_params.get(
"langfuse_public_key"
),
langfuse_host=standard_callback_dynamic_params.get("langfuse_host"),
)

@staticmethod
def _dynamic_langfuse_credentials_are_passed(
standard_callback_dynamic_params: StandardCallbackDynamicParams,
) -> bool:
"""
This function is used to check if the dynamic langfuse credentials are passed in standard_callback_dynamic_params
Returns:
bool: True if the dynamic langfuse credentials are passed, False otherwise
"""
if (
standard_callback_dynamic_params.get("langfuse_host") is not None
or standard_callback_dynamic_params.get("langfuse_public_key") is not None
or standard_callback_dynamic_params.get("langfuse_secret") is not None
or standard_callback_dynamic_params.get("langfuse_secret_key") is not None
):
return True
return False
128 changes: 15 additions & 113 deletions litellm/litellm_core_utils/litellm_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
from ..integrations.greenscale import GreenscaleLogger
from ..integrations.helicone import HeliconeLogger
from ..integrations.lago import LagoLogger
from ..integrations.langfuse import LangFuseLogger
from ..integrations.langfuse.langfuse import LangFuseLogger
from ..integrations.langfuse.langfuse_handler import LangFuseHandler
from ..integrations.langsmith import LangsmithLogger
from ..integrations.litedebugger import LiteDebugger
from ..integrations.literal_ai import LiteralAILogger
Expand Down Expand Up @@ -1116,74 +1117,13 @@ def success_handler( # noqa: PLR0915
print_verbose("reaches langfuse for streaming logging!")
result = kwargs["complete_streaming_response"]

temp_langfuse_logger = langFuseLogger
if langFuseLogger is None or (
(
self.standard_callback_dynamic_params.get(
"langfuse_public_key"
)
is not None
and self.standard_callback_dynamic_params.get(
"langfuse_public_key"
)
!= langFuseLogger.public_key
)
or (
self.standard_callback_dynamic_params.get(
"langfuse_secret"
)
is not None
and self.standard_callback_dynamic_params.get(
"langfuse_secret"
)
!= langFuseLogger.secret_key
)
or (
self.standard_callback_dynamic_params.get(
"langfuse_host"
)
is not None
and self.standard_callback_dynamic_params.get(
"langfuse_host"
)
!= langFuseLogger.langfuse_host
)
):
credentials = {
"langfuse_public_key": self.standard_callback_dynamic_params.get(
"langfuse_public_key"
),
"langfuse_secret": self.standard_callback_dynamic_params.get(
"langfuse_secret"
),
"langfuse_host": self.standard_callback_dynamic_params.get(
"langfuse_host"
),
}
temp_langfuse_logger = (
in_memory_dynamic_logger_cache.get_cache(
credentials=credentials, service_name="langfuse"
)
)
if temp_langfuse_logger is None:
temp_langfuse_logger = LangFuseLogger(
langfuse_public_key=self.standard_callback_dynamic_params.get(
"langfuse_public_key"
),
langfuse_secret=self.standard_callback_dynamic_params.get(
"langfuse_secret"
),
langfuse_host=self.standard_callback_dynamic_params.get(
"langfuse_host"
),
)
in_memory_dynamic_logger_cache.set_cache(
credentials=credentials,
service_name="langfuse",
logging_obj=temp_langfuse_logger,
)
if temp_langfuse_logger is not None:
_response = temp_langfuse_logger.log_event(
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
globalLangfuseLogger=langFuseLogger,
standard_callback_dynamic_params=self.standard_callback_dynamic_params,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
)
if langfuse_logger_to_use is not None:
_response = langfuse_logger_to_use.log_event(
kwargs=kwargs,
response_obj=result,
start_time=start_time,
Expand Down Expand Up @@ -1909,50 +1849,12 @@ def failure_handler( # noqa: PLR0915
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
if langFuseLogger is None or (
(
self.standard_callback_dynamic_params.get(
"langfuse_public_key"
)
is not None
and self.standard_callback_dynamic_params.get(
"langfuse_public_key"
)
!= langFuseLogger.public_key
)
or (
self.standard_callback_dynamic_params.get(
"langfuse_public_key"
)
is not None
and self.standard_callback_dynamic_params.get(
"langfuse_public_key"
)
!= langFuseLogger.public_key
)
or (
self.standard_callback_dynamic_params.get(
"langfuse_host"
)
is not None
and self.standard_callback_dynamic_params.get(
"langfuse_host"
)
!= langFuseLogger.langfuse_host
)
):
langFuseLogger = LangFuseLogger(
langfuse_public_key=self.standard_callback_dynamic_params.get(
"langfuse_public_key"
),
langfuse_secret=self.standard_callback_dynamic_params.get(
"langfuse_secret"
),
langfuse_host=self.standard_callback_dynamic_params.get(
"langfuse_host"
),
)
_response = langFuseLogger.log_event(
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
globalLangfuseLogger=langFuseLogger,
standard_callback_dynamic_params=self.standard_callback_dynamic_params,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
)
_response = langfuse_logger_to_use.log_event(
start_time=start_time,
end_time=end_time,
response_obj=None,
Expand Down
Loading

0 comments on commit 72a91ea

Please sign in to comment.