Skip to content

Commit

Permalink
Merge pull request #1749 from BerriAI/litellm_vertex_ai_model_garden
Browse files Browse the repository at this point in the history
feat(vertex_ai.py): vertex ai model garden support
  • Loading branch information
krrishdholakia authored Feb 2, 2024
2 parents 443225a + 5db2d4d commit 7fc03bf
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 38 deletions.
237 changes: 199 additions & 38 deletions litellm/llms/vertex_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,41 @@ def get_config(cls):
}


import asyncio


class TextStreamer:
"""
Fake streaming iterator for Vertex AI Model Garden calls
"""

def __init__(self, text):
self.text = text.split() # let's assume words as a streaming unit
self.index = 0

def __iter__(self):
return self

def __next__(self):
if self.index < len(self.text):
result = self.text[self.index]
self.index += 1
return result
else:
raise StopIteration

def __aiter__(self):
return self

async def __anext__(self):
if self.index < len(self.text):
result = self.text[self.index]
self.index += 1
return result
else:
raise StopAsyncIteration # once we run out of data to stream, we raise this error


def _get_image_bytes_from_url(image_url: str) -> bytes:
try:
response = requests.get(image_url)
Expand Down Expand Up @@ -236,12 +271,17 @@ def completion(
Part,
GenerationConfig,
)
from google.cloud import aiplatform
from google.protobuf import json_format # type: ignore
from google.protobuf.struct_pb2 import Value # type: ignore
from google.cloud.aiplatform_v1beta1.types import content as gapic_content_types
import google.auth

## Load credentials with the correct quota project ref: https://github.com/googleapis/python-aiplatform/issues/2557#issuecomment-1709284744
creds, _ = google.auth.default(quota_project_id=vertex_project)
vertexai.init(project=vertex_project, location=vertex_location, credentials=creds)
vertexai.init(
project=vertex_project, location=vertex_location, credentials=creds
)

## Load Config
config = litellm.VertexAIConfig.get_config()
Expand Down Expand Up @@ -275,6 +315,11 @@ def completion(

request_str = ""
response_obj = None
async_client = None
instances = None
client_options = {
"api_endpoint": f"{vertex_location}-aiplatform.googleapis.com"
}
if (
model in litellm.vertex_language_models
or model in litellm.vertex_vision_models
Expand All @@ -294,39 +339,51 @@ def completion(
llm_model = CodeGenerationModel.from_pretrained(model)
mode = "text"
request_str += f"llm_model = CodeGenerationModel.from_pretrained({model})\n"
else: # vertex_code_llm_models
elif model in litellm.vertex_code_chat_models: # vertex_code_llm_models
llm_model = CodeChatModel.from_pretrained(model)
mode = "chat"
request_str += f"llm_model = CodeChatModel.from_pretrained({model})\n"
else: # assume vertex model garden
client = aiplatform.gapic.PredictionServiceClient(
client_options=client_options
)

if acompletion == True: # [TODO] expand support to vertex ai chat + text models
instances = [optional_params]
instances[0]["prompt"] = prompt
instances = [
json_format.ParseDict(instance_dict, Value())
for instance_dict in instances
]
llm_model = client.endpoint_path(
project=vertex_project, location=vertex_location, endpoint=model
)

mode = "custom"
request_str += f"llm_model = client.endpoint_path(project={vertex_project}, location={vertex_location}, endpoint={model})\n"

if acompletion == True:
data = {
"llm_model": llm_model,
"mode": mode,
"prompt": prompt,
"logging_obj": logging_obj,
"request_str": request_str,
"model": model,
"model_response": model_response,
"encoding": encoding,
"messages": messages,
"print_verbose": print_verbose,
"client_options": client_options,
"instances": instances,
"vertex_location": vertex_location,
"vertex_project": vertex_project,
**optional_params,
}
if optional_params.get("stream", False) is True:
# async streaming
return async_streaming(
llm_model=llm_model,
mode=mode,
prompt=prompt,
logging_obj=logging_obj,
request_str=request_str,
model=model,
model_response=model_response,
messages=messages,
print_verbose=print_verbose,
**optional_params,
)
return async_completion(
llm_model=llm_model,
mode=mode,
prompt=prompt,
logging_obj=logging_obj,
request_str=request_str,
model=model,
model_response=model_response,
encoding=encoding,
messages=messages,
print_verbose=print_verbose,
**optional_params,
)
return async_streaming(**data)

return async_completion(**data)

if mode == "vision":
print_verbose("\nMaking VertexAI Gemini Pro Vision Call")
Expand Down Expand Up @@ -471,7 +528,36 @@ def completion(
},
)
completion_response = llm_model.predict(prompt, **optional_params).text
elif mode == "custom":
"""
Vertex AI Model Garden
"""
request_str += (
f"client.predict(endpoint={llm_model}, instances={instances})\n"
)
## LOGGING
logging_obj.pre_call(
input=prompt,
api_key=None,
additional_args={
"complete_input_dict": optional_params,
"request_str": request_str,
},
)

response = client.predict(
endpoint=llm_model,
instances=instances,
).predictions
completion_response = response[0]
if (
isinstance(completion_response, str)
and "\nOutput:\n" in completion_response
):
completion_response = completion_response.split("\nOutput:\n", 1)[1]
if "stream" in optional_params and optional_params["stream"] == True:
response = TextStreamer(completion_response)
return response
## LOGGING
logging_obj.post_call(
input=prompt, api_key=None, original_response=completion_response
Expand Down Expand Up @@ -539,6 +625,10 @@ async def async_completion(
encoding=None,
messages=None,
print_verbose=None,
client_options=None,
instances=None,
vertex_project=None,
vertex_location=None,
**optional_params,
):
"""
Expand Down Expand Up @@ -627,7 +717,43 @@ async def async_completion(
)
response_obj = await llm_model.predict_async(prompt, **optional_params)
completion_response = response_obj.text
elif mode == "custom":
"""
Vertex AI Model Garden
"""
from google.cloud import aiplatform

async_client = aiplatform.gapic.PredictionServiceAsyncClient(
client_options=client_options
)
llm_model = async_client.endpoint_path(
project=vertex_project, location=vertex_location, endpoint=model
)

request_str += (
f"client.predict(endpoint={llm_model}, instances={instances})\n"
)
## LOGGING
logging_obj.pre_call(
input=prompt,
api_key=None,
additional_args={
"complete_input_dict": optional_params,
"request_str": request_str,
},
)

response_obj = await async_client.predict(
endpoint=llm_model,
instances=instances,
)
response = response_obj.predictions
completion_response = response[0]
if (
isinstance(completion_response, str)
and "\nOutput:\n" in completion_response
):
completion_response = completion_response.split("\nOutput:\n", 1)[1]
## LOGGING
logging_obj.post_call(
input=prompt, api_key=None, original_response=completion_response
Expand Down Expand Up @@ -657,14 +783,12 @@ async def async_completion(
# init prompt tokens
# this block attempts to get usage from response_obj if it exists, if not it uses the litellm token counter
prompt_tokens, completion_tokens, total_tokens = 0, 0, 0
if response_obj is not None:
if hasattr(response_obj, "usage_metadata") and hasattr(
response_obj.usage_metadata, "prompt_token_count"
):
prompt_tokens = response_obj.usage_metadata.prompt_token_count
completion_tokens = (
response_obj.usage_metadata.candidates_token_count
)
if response_obj is not None and (
hasattr(response_obj, "usage_metadata")
and hasattr(response_obj.usage_metadata, "prompt_token_count")
):
prompt_tokens = response_obj.usage_metadata.prompt_token_count
completion_tokens = response_obj.usage_metadata.candidates_token_count
else:
prompt_tokens = len(encoding.encode(prompt))
completion_tokens = len(
Expand Down Expand Up @@ -693,8 +817,13 @@ async def async_streaming(
model_response: ModelResponse,
logging_obj=None,
request_str=None,
encoding=None,
messages=None,
print_verbose=None,
client_options=None,
instances=None,
vertex_project=None,
vertex_location=None,
**optional_params,
):
"""
Expand Down Expand Up @@ -763,15 +892,47 @@ async def async_streaming(
},
)
response = llm_model.predict_streaming_async(prompt, **optional_params)
elif mode == "custom":
from google.cloud import aiplatform

async_client = aiplatform.gapic.PredictionServiceAsyncClient(
client_options=client_options
)
llm_model = async_client.endpoint_path(
project=vertex_project, location=vertex_location, endpoint=model
)

request_str += f"client.predict(endpoint={llm_model}, instances={instances})\n"
## LOGGING
logging_obj.pre_call(
input=prompt,
api_key=None,
additional_args={
"complete_input_dict": optional_params,
"request_str": request_str,
},
)

response_obj = await async_client.predict(
endpoint=llm_model,
instances=instances,
)
response = response_obj.predictions
completion_response = response[0]
if (
isinstance(completion_response, str)
and "\nOutput:\n" in completion_response
):
completion_response = completion_response.split("\nOutput:\n", 1)[1]
if "stream" in optional_params and optional_params["stream"] == True:
response = TextStreamer(completion_response)
streamwrapper = CustomStreamWrapper(
completion_stream=response,
model=model,
custom_llm_provider="vertex_ai",
logging_obj=logging_obj,
)
async for transformed_chunk in streamwrapper:
yield transformed_chunk
return streamwrapper


def embedding():
Expand Down
3 changes: 3 additions & 0 deletions litellm/tests/test_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ def test_completion_azure_gpt4_vision():
except openai.RateLimitError as e:
print("got a rate liimt error", e)
pass
except openai.APIStatusError as e:
print("got an api status error", e)
pass
except Exception as e:
pytest.fail(f"Error occurred: {e}")

Expand Down
1 change: 1 addition & 0 deletions litellm/tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ def test_completion_mistral_api_stream():
def test_completion_deep_infra_stream():
# deep infra currently includes role in the 2nd chunk
# waiting for them to make a fix on this
litellm.set_verbose = True
try:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
Expand Down
17 changes: 17 additions & 0 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6977,6 +6977,21 @@ def exception_type(
llm_provider="azure",
response=original_exception.response,
)
elif original_exception.status_code == 503:
exception_mapping_worked = True
raise ServiceUnavailableError(
message=f"AzureException - {original_exception.message}",
model=model,
llm_provider="azure",
response=original_exception.response,
)
elif original_exception.status_code == 504: # gateway timeout error
exception_mapping_worked = True
raise Timeout(
message=f"AzureException - {original_exception.message}",
model=model,
llm_provider="azure",
)
else:
exception_mapping_worked = True
raise APIError(
Expand Down Expand Up @@ -8061,6 +8076,7 @@ def chunk_creator(self, chunk):
if len(original_chunk.choices) > 0:
try:
delta = dict(original_chunk.choices[0].delta)
print_verbose(f"original delta: {delta}")
model_response.choices[0].delta = Delta(**delta)
except Exception as e:
model_response.choices[0].delta = Delta()
Expand All @@ -8069,6 +8085,7 @@ def chunk_creator(self, chunk):
model_response.system_fingerprint = (
original_chunk.system_fingerprint
)
print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}")
if self.sent_first_chunk == False:
model_response.choices[0].delta["role"] = "assistant"
self.sent_first_chunk = True
Expand Down

0 comments on commit 7fc03bf

Please sign in to comment.