Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Litellm dev 11 30 2024 #6974

Merged
merged 9 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/my-website/docs/proxy/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,13 @@ Here is a screenshot of the metrics you can monitor with the LiteLLM Grafana Das
|----------------------|--------------------------------------|
| `litellm_llm_api_failed_requests_metric` | **deprecated** use `litellm_proxy_failed_requests_metric` |
| `litellm_requests_metric` | **deprecated** use `litellm_proxy_total_requests_metric` |


## FAQ

### What are `_created` vs. `_total` metrics?

- `_created` metrics are metrics that are created when the proxy starts
- `_total` metrics are metrics that are incremented for each request

You should consume the `_total` metrics for your counting purposes
39 changes: 18 additions & 21 deletions enterprise/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from typing import Optional, List
from litellm._logging import verbose_logger
from litellm.proxy.proxy_server import PrismaClient, HTTPException
from litellm.llms.custom_httpx.http_handler import HTTPHandler
import collections
import httpx
from datetime import datetime


Expand Down Expand Up @@ -114,7 +116,6 @@ async def ui_get_spend_by_tags(


def _forecast_daily_cost(data: list):
import requests # type: ignore
from datetime import datetime, timedelta

if len(data) == 0:
Expand All @@ -136,17 +137,17 @@ def _forecast_daily_cost(data: list):

print("last entry date", last_entry_date)

# Assuming today_date is a datetime object
today_date = datetime.now()

# Calculate the last day of the month
last_day_of_todays_month = datetime(
today_date.year, today_date.month % 12 + 1, 1
) - timedelta(days=1)

print("last day of todays month", last_day_of_todays_month)
# Calculate the remaining days in the month
remaining_days = (last_day_of_todays_month - last_entry_date).days

print("remaining days", remaining_days)

current_spend_this_month = 0
series = {}
for entry in data:
Expand Down Expand Up @@ -176,13 +177,19 @@ def _forecast_daily_cost(data: list):
"Content-Type": "application/json",
}

response = requests.post(
url="https://trend-api-production.up.railway.app/forecast",
json=payload,
headers=headers,
)
# check the status code
response.raise_for_status()
client = HTTPHandler()

try:
response = client.post(
url="https://trend-api-production.up.railway.app/forecast",
json=payload,
headers=headers,
)
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=500,
detail={"error": f"Error getting forecast: {e.response.text}"},
)

json_response = response.json()
forecast_data = json_response["forecast"]
Expand All @@ -206,13 +213,3 @@ def _forecast_daily_cost(data: list):
f"Predicted Spend for { today_month } 2024, ${total_predicted_spend}"
)
return {"response": response_data, "predicted_spend": predicted_spend}

# print(f"Date: {entry['date']}, Spend: {entry['spend']}, Response: {response.text}")


# _forecast_daily_cost(
# [
# {"date": "2022-01-01", "spend": 100},

# ]
# )
6 changes: 5 additions & 1 deletion litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
_turn_on_json,
log_level,
)
from litellm.constants import ROUTER_MAX_FALLBACKS
from litellm.constants import (
DEFAULT_BATCH_SIZE,
DEFAULT_FLUSH_INTERVAL_SECONDS,
ROUTER_MAX_FALLBACKS,
)
from litellm.types.guardrails import GuardrailItem
from litellm.proxy._types import (
KeyManagementSystem,
Expand Down
2 changes: 2 additions & 0 deletions litellm/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
ROUTER_MAX_FALLBACKS = 5
DEFAULT_BATCH_SIZE = 512
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
13 changes: 5 additions & 8 deletions litellm/integrations/custom_batch_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,31 @@
import time
from typing import List, Literal, Optional

import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger

DEFAULT_BATCH_SIZE = 512
DEFAULT_FLUSH_INTERVAL_SECONDS = 5


class CustomBatchLogger(CustomLogger):

def __init__(
self,
flush_lock: Optional[asyncio.Lock] = None,
batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS,
batch_size: Optional[int] = None,
flush_interval: Optional[int] = None,
**kwargs,
) -> None:
"""
Args:
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
"""
self.log_queue: List = []
self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS
self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE
self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS
self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE
self.last_flush_time = time.time()
self.flush_lock = flush_lock

super().__init__(**kwargs)
pass

async def periodic_flush(self):
while True:
Expand Down
26 changes: 22 additions & 4 deletions litellm/integrations/langsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(
self.log_queue: List[LangsmithQueueObject] = []
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()

super().__init__(**kwargs, flush_lock=self.flush_lock)

def get_credentials_from_env(
Expand Down Expand Up @@ -122,7 +123,7 @@ def _prepare_log_data(
"project_name", credentials["LANGSMITH_PROJECT"]
)
run_name = metadata.get("run_name", self.langsmith_default_run_name)
run_id = metadata.get("id", None)
run_id = metadata.get("id", metadata.get("run_id", None))
parent_run_id = metadata.get("parent_run_id", None)
trace_id = metadata.get("trace_id", None)
session_id = metadata.get("session_id", None)
Expand Down Expand Up @@ -173,14 +174,28 @@ def _prepare_log_data(
if dotted_order:
data["dotted_order"] = dotted_order

run_id: Optional[str] = data.get("id") # type: ignore
if "id" not in data or data["id"] is None:
"""
for /batch langsmith requires id, trace_id and dotted_order passed as params
"""
run_id = str(uuid.uuid4())
data["id"] = str(run_id)
data["trace_id"] = str(run_id)
data["dotted_order"] = self.make_dot_order(run_id=run_id)

data["id"] = run_id

if (
"trace_id" not in data
or data["trace_id"] is None
and (run_id is not None and isinstance(run_id, str))
):
data["trace_id"] = run_id

if (
"dotted_order" not in data
or data["dotted_order"] is None
and (run_id is not None and isinstance(run_id, str))
):
data["dotted_order"] = self.make_dot_order(run_id=run_id) # type: ignore

verbose_logger.debug("Langsmith Logging data on langsmith: %s", data)

Expand Down Expand Up @@ -364,6 +379,9 @@ async def _log_batch_on_langsmith(
elements_to_log = [queue_object["data"] for queue_object in queue_objects]

try:
verbose_logger.debug(
"Sending batch of %s runs to Langsmith", len(elements_to_log)
)
response = await self.async_httpx_client.post(
url=url,
json={"post": elements_to_log},
Expand Down
38 changes: 7 additions & 31 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,29 +437,6 @@ def handle_cohere_chunk(self, chunk):
except Exception:
raise ValueError(f"Unable to parse response. Original response: {chunk}")

def handle_cohere_chat_chunk(self, chunk):
chunk = chunk.decode("utf-8")
data_json = json.loads(chunk)
print_verbose(f"chunk: {chunk}")
try:
text = ""
is_finished = False
finish_reason = ""
if "text" in data_json:
text = data_json["text"]
elif "is_finished" in data_json and data_json["is_finished"] is True:
is_finished = data_json["is_finished"]
finish_reason = data_json["finish_reason"]
else:
return
return {
"text": text,
"is_finished": is_finished,
"finish_reason": finish_reason,
}
except Exception:
raise ValueError(f"Unable to parse response. Original response: {chunk}")

def handle_azure_chunk(self, chunk):
is_finished = False
finish_reason = ""
Expand Down Expand Up @@ -949,7 +926,12 @@ def return_processed_chunk_logic( # noqa
"function_call" in completion_obj
and completion_obj["function_call"] is not None
)
or (
"provider_specific_fields" in response_obj
and response_obj["provider_specific_fields"] is not None
)
): # cannot set content of an OpenAI Object to be an empty string

self.safety_checker()
hold, model_response_str = self.check_special_tokens(
chunk=completion_obj["content"],
Expand Down Expand Up @@ -1058,6 +1040,7 @@ def return_processed_chunk_logic( # noqa
and model_response.choices[0].delta.audio is not None
):
return model_response

else:
if hasattr(model_response, "usage"):
self.chunks.append(model_response)
Expand All @@ -1066,6 +1049,7 @@ def return_processed_chunk_logic( # noqa
def chunk_creator(self, chunk): # type: ignore # noqa: PLR0915
model_response = self.model_response_creator()
response_obj: dict = {}

try:
# return this for all models
completion_obj = {"content": ""}
Expand Down Expand Up @@ -1256,14 +1240,6 @@ def chunk_creator(self, chunk): # type: ignore # noqa: PLR0915
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
self.received_finish_reason = response_obj["finish_reason"]
elif self.custom_llm_provider == "cohere_chat":
response_obj = self.handle_cohere_chat_chunk(chunk)
if response_obj is None:
return
completion_obj["content"] = response_obj["text"]
if response_obj["is_finished"]:
self.received_finish_reason = response_obj["finish_reason"]

elif self.custom_llm_provider == "petals":
if len(self.completion_stream) == 0:
if self.received_finish_reason is not None:
Expand Down
Loading
Loading