diff --git a/README.md b/README.md index 25280d0..0df8ba4 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## Paka Highlights - **Cloud-Agnostic Resource Provisioning**: paka starts by breaking down the barriers of cloud vendor lock-in, currently supporting EKS with plans to expand to more cloud services. -- **Optimized Model Execution**: Designed for efficiency, paka runs LLM models on CPUs, with imminent support for GPUs, ensuring optimal performance. Auto-scaling of model replicas based on CPU usage, request rate, and latency. +- **Optimized Model Execution**: Designed for efficiency, paka runs LLM models on CPUs and Nvidia GPUs, ensuring optimal performance. Auto-scaling of model replicas based on CPU usage, request rate, and latency. - **Scalable Batch Job Management**: paka excels in managing batch jobs that dynamically scale out and in, catering to varying workload demands without manual intervention. - **Seamless Application Deployment**: With support for running Langchain and LlamaIndex applications as functions, paka offers scalability to zero and back up, along with rolling updates to ensure no downtime. - **Comprehensive Monitoring and Tracing**: Embedded with built-in support for metrics collection via Prometheus and Grafana, along with tracing through Zipkin. @@ -105,6 +105,8 @@ paka cluster down -f cluster.yaml ## Dependencies - docker daemon +- pack cli (https://buildpacks.io/docs/for-platform-operators/how-to/integrate-ci/pack/) +- pulumi cli (https://www.pulumi.com/docs/install/) - aws cli and credentials for the AWS deployment ```bash # Make sure aws credentials and cli are set up. Your aws credentials should have access to the following services: diff --git a/examples/invoice_extraction/README.md b/examples/invoice_extraction/README.md new file mode 100644 index 0000000..368b8d5 --- /dev/null +++ b/examples/invoice_extraction/README.md @@ -0,0 +1,67 @@ +## Invoice Extraction +This code provides an example of how to build a RESTful API that converts an invoice PDF into a structured data format (JSON). It extracts text from the PDF and then uses the langchain and llama2-7B to extract structured data from the text. + +## Running the Example + +Follow the steps below to run the example: + +1. **Install the necessary dependencies:** + ```bash + pip install paka + + # Ensure AWS credentials and CLI are set up. Your AWS credentials should have access to the following services: + # - S3 + # - ECR + # - EKS + # - EC2 + aws configure + + # Install pack CLI and verify it is working (https://buildpacks.io/docs/for-platform-operators/how-to/integrate-ci/pack/) + pack --version + + # Install pulumi CLI and verify it is working (https://www.pulumi.com/docs/install/) + pulumi version + ``` + +2. **Ensure the Docker daemon is running:** + ```bash + docker info + ``` + +3. **Provision the cluster:** + ```bash + cd examples/invoice_extraction + + # Provision the cluster and update ~/.kube/config + paka cluster up -f cluster.yaml -u + + # Provision a cluster with Nvidia GPUs + paka cluster up -f gpu_cluster.yaml -u + ``` + +4. **Deploy the App:** + ```bash + # The command below will build the source and deploy it as a serverless function. + paka function deploy --name invoice-extraction --source . --entrypoint serve + ``` + +5. **Check the status of the functions:** + ```bash + paka function list + ``` + + If everything is successful, you should see the function in the list with a status of "READY". By default, the function is exposed through a publicly accessible REST API endpoint. + +6. **Test the App:** + + Submit the PDF invoices by hitting the `/extract_invoice` endpoint of the deployed function. + + ```bash + curl -X POST -H "Content-Type: multipart/form-data" -F "file=@/path/to/invoices/invoice-2024-02-29.pdf" http://invoice-extraction.default.xxxx.sslip.io/extract_invoice + ``` + + If the invoice extraction is successful, you should see the structured data in the response, e.g. + + ```json + {"number":"#25927345","date":"2024-01-31T05:07:53","company":"Akamai Technologies, Inc.","company_address":"249 Arch St. Philadelphia, PA 19106 USA","tax_id":"United States EIN: 04-3432319","customer":"John Doe","customer_address":"1 Hacker Way Menlo Park, CA 94025","amount":"$5.00"} + ``` diff --git a/examples/invoice_extraction/gpu_cluster.yaml b/examples/invoice_extraction/gpu_cluster.yaml new file mode 100644 index 0000000..1d58d10 --- /dev/null +++ b/examples/invoice_extraction/gpu_cluster.yaml @@ -0,0 +1,30 @@ +aws: + cluster: + name: invoice-extraction + region: us-west-2 + namespace: default + nodeType: t2.medium + minNodes: 2 + maxNodes: 4 + prometheus: + enabled: true + tracing: + enabled: false + modelGroups: + - nodeType: g4dn.xlarge + minInstances: 1 + maxInstances: 1 + name: llama2-7b + resourceRequest: + cpu: 3600m + memory: 14Gi + awsGpu: # This would enable inference on CUDA devices + diskSize: 40 + autoScaleTriggers: + - type: prometheus + metadata: + serverAddress: http://kube-prometheus-stack-prometheus.prometheus.svc.cluster.local:9090 + metricName: max_qps + threshold: '5' + query: | + max(rate(istio_requests_total{destination_service_name="llama2-7b", destination_app="model-group", response_code="200"}[1m])) diff --git a/examples/invoice_extraction/invoices/invoice-2024-01-01.pdf b/examples/invoice_extraction/invoices/invoice-2024-01-01.pdf new file mode 100644 index 0000000..dc30047 Binary files /dev/null and b/examples/invoice_extraction/invoices/invoice-2024-01-01.pdf differ diff --git a/examples/invoice_extraction/invoices/invoice-2024-01-31.pdf b/examples/invoice_extraction/invoices/invoice-2024-01-31.pdf new file mode 100644 index 0000000..68b7c33 Binary files /dev/null and b/examples/invoice_extraction/invoices/invoice-2024-01-31.pdf differ diff --git a/examples/invoice_extraction/invoices/invoice-2024-02-29.pdf b/examples/invoice_extraction/invoices/invoice-2024-02-29.pdf new file mode 100644 index 0000000..9d0a736 Binary files /dev/null and b/examples/invoice_extraction/invoices/invoice-2024-02-29.pdf differ diff --git a/examples/invoice_extraction/invoices/invoice-2024-03-31.pdf b/examples/invoice_extraction/invoices/invoice-2024-03-31.pdf new file mode 100644 index 0000000..ce183e3 Binary files /dev/null and b/examples/invoice_extraction/invoices/invoice-2024-03-31.pdf differ diff --git a/examples/invoice_extraction/serve.py b/examples/invoice_extraction/serve.py index 12c5645..ad68474 100644 --- a/examples/invoice_extraction/serve.py +++ b/examples/invoice_extraction/serve.py @@ -49,8 +49,11 @@ def extract(pdf_path: str) -> str: Only returns the extracted JSON object, don't say anything else. """ + # Future paka code will be able to handle this + chat_template = f"[INST] <><>\n\n{template} [/INST]\n" + prompt = PromptTemplate( - template=template, + template=chat_template, input_variables=["invoice_text"], partial_variables={ "format_instructions": invoice_parser.get_format_instructions() @@ -60,7 +63,6 @@ def extract(pdf_path: str) -> str: llm = LlamaCpp( model_url=LLM_URL, temperature=0, - max_tokens=2500, streaming=False, ) diff --git a/examples/website_rag/README.md b/examples/website_rag/README.md index 12069ba..cb8de97 100644 --- a/examples/website_rag/README.md +++ b/examples/website_rag/README.md @@ -15,6 +15,12 @@ pip install paka # - EKS # - EC2 aws configure + +# Install pack CLI and verify it is working (https://buildpacks.io/docs/for-platform-operators/how-to/integrate-ci/pack/) +pack --version + +# Install pulumi CLI and verify it is working (https://www.pulumi.com/docs/install/) +pulumi version ``` ### Make sure docker daemon is running diff --git a/paka/__init__.py b/paka/__init__.py index 485f44a..e435dbd 100644 --- a/paka/__init__.py +++ b/paka/__init__.py @@ -1 +1,6 @@ -__version__ = "0.1.1" +from importlib.metadata import PackageNotFoundError, version + +try: + __version__ = version(__name__) +except PackageNotFoundError: + __version__ = "" diff --git a/paka/cli/cluster.py b/paka/cli/cluster.py index 018efb2..6b57467 100644 --- a/paka/cli/cluster.py +++ b/paka/cli/cluster.py @@ -4,6 +4,7 @@ import typer from paka.cli.utils import load_cluster_manager +from paka.k8s import remove_crd_finalizers from paka.k8s import update_kubeconfig as merge_update_kubeconfig from paka.logger import logger @@ -64,6 +65,18 @@ def down( "all resources and data will be permanently deleted.", default=False, ): + # Sometime finalizers might block CRD deletion, so we need to force delete those + # TODO: better way to handle this + remove_crd_finalizers( + "scaledobjects.keda.sh", + ) + remove_crd_finalizers( + "routes.serving.knative.dev", + ) + remove_crd_finalizers( + "ingresses.networking.internal.knative.dev", + ) + cluster_manager = load_cluster_manager(cluster_config) cluster_manager.destroy() diff --git a/paka/cluster/aws/eks.py b/paka/cluster/aws/eks.py index f65a268..c10eef8 100644 --- a/paka/cluster/aws/eks.py +++ b/paka/cluster/aws/eks.py @@ -14,6 +14,7 @@ from paka.cluster.keda import create_keda from paka.cluster.knative import create_knative_and_istio from paka.cluster.namespace import create_namespace +from paka.cluster.nvidia_device_plugin import install_nvidia_device_plugin from paka.cluster.prometheus import create_prometheus from paka.cluster.qdrant import create_qdrant from paka.cluster.redis import create_redis @@ -79,10 +80,6 @@ def create_node_group_for_model_group( node_group_name=f"{project}-{kubify_name(model_group.name)}-group", cluster=cluster, instance_types=[model_group.nodeType], - # Set the desired size of the node group to the minimum number of instances - # specified for the model group. - # Note: Scaling down to 0 is not supported, since cold starting time is - # too long for model group services. scaling_config=aws.eks.NodeGroupScalingConfigArgs( desired_size=model_group.minInstances, min_size=model_group.minInstances, @@ -95,8 +92,6 @@ def create_node_group_for_model_group( }, node_role_arn=worker_role.arn, subnet_ids=vpc.private_subnet_ids, - # Apply taints to ensure that only pods belonging to the same model group - # can be scheduled on this node group. taints=[ aws.eks.NodeGroupTaintArgs( effect="NO_SCHEDULE", key="app", value="model-group" @@ -105,6 +100,13 @@ def create_node_group_for_model_group( effect="NO_SCHEDULE", key="model", value=model_group.name ), ], + # Supported AMI types https://docs.aws.amazon.com/eks/latest/APIReference/API_Nodegroup.html#AmazonEKS-Type-Nodegroup-amiType + ami_type=("AL2_x86_64_GPU" if model_group.awsGpu else None), + disk_size=( + model_group.awsGpu.diskSize + if model_group.awsGpu + else model_group.diskSize + ), ) @@ -301,6 +303,9 @@ def create_eks_resources(kubeconfig_json: str) -> None: enable_cloudwatch(config, k8s_provider) create_prometheus(config, k8s_provider) create_zipkin(config, k8s_provider) + # Install the NVIDIA device plugin for GPU support + # Even if the cluster doesn't have GPUs, this won't cause any issues + install_nvidia_device_plugin(k8s_provider) # TODO: Set timeout to be the one used by knative update_elb_idle_timeout(kubeconfig_json, 300) diff --git a/paka/cluster/nvidia_device_plugin.py b/paka/cluster/nvidia_device_plugin.py new file mode 100644 index 0000000..ce0c919 --- /dev/null +++ b/paka/cluster/nvidia_device_plugin.py @@ -0,0 +1,87 @@ +import pulumi +import pulumi_kubernetes as k8s + + +def install_nvidia_device_plugin( + k8s_provider: k8s.Provider, version: str = "v0.15.0-rc.2" +) -> None: + """ + Installs the NVIDIA device plugin for GPU support in the cluster. + + This function deploys the NVIDIA device plugin to the cluster using a DaemonSet. + The device plugin allows Kubernetes to discover and manage GPU resources on the nodes. + + Args: + k8s_provider (k8s.Provider): The Kubernetes provider to use for deploying the device plugin. + + Returns: + None + """ + + k8s.apps.v1.DaemonSet( + "nvidia-device-plugin-daemonset", + metadata=k8s.meta.v1.ObjectMetaArgs( + namespace="kube-system", + ), + spec=k8s.apps.v1.DaemonSetSpecArgs( + selector=k8s.meta.v1.LabelSelectorArgs( + match_labels={ + "name": "nvidia-device-plugin-ds", + }, + ), + update_strategy=k8s.apps.v1.DaemonSetUpdateStrategyArgs( + type="RollingUpdate", + ), + template=k8s.core.v1.PodTemplateSpecArgs( + metadata=k8s.meta.v1.ObjectMetaArgs( + labels={ + "name": "nvidia-device-plugin-ds", + }, + ), + spec=k8s.core.v1.PodSpecArgs( + tolerations=[ + k8s.core.v1.TolerationArgs( + key="nvidia.com/gpu", + operator="Exists", + effect="NoSchedule", + ), + k8s.core.v1.TolerationArgs(operator="Exists"), + ], + priority_class_name="system-node-critical", + containers=[ + k8s.core.v1.ContainerArgs( + image=f"nvcr.io/nvidia/k8s-device-plugin:{version}", + name="nvidia-device-plugin-ctr", + env=[ + k8s.core.v1.EnvVarArgs( + name="FAIL_ON_INIT_ERROR", + value="false", + ) + ], + security_context=k8s.core.v1.SecurityContextArgs( + allow_privilege_escalation=False, + capabilities=k8s.core.v1.CapabilitiesArgs( + drop=["ALL"], + ), + ), + volume_mounts=[ + k8s.core.v1.VolumeMountArgs( + name="device-plugin", + mount_path="/var/lib/kubelet/device-plugins", + ) + ], + ) + ], + volumes=[ + k8s.core.v1.VolumeArgs( + name="device-plugin", + host_path=k8s.core.v1.HostPathVolumeSourceArgs( + path="/var/lib/kubelet/device-plugins", + ), + ) + ], + ), + ), + ), + opts=pulumi.ResourceOptions(provider=k8s_provider), + ) diff --git a/paka/config.py b/paka/config.py index 45c61ef..2808827 100644 --- a/paka/config.py +++ b/paka/config.py @@ -1,5 +1,5 @@ import re -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel, field_validator, model_validator from ruamel.yaml import YAML @@ -33,10 +33,12 @@ class ResourceRequest(BaseModel): Attributes: cpu (str): The amount of CPU to request. memory (str): The amount of memory to request. + gpu (Optional[int]): The number of GPUs to request. Defaults to None. """ cpu: str memory: str + gpu: Optional[int] = None @field_validator("cpu", mode="before") def validate_cpu(cls, v: str) -> str: @@ -72,6 +74,54 @@ def validate_memory(cls, v: str) -> str: """ return validate_size(v, "Invalid memory format") + @field_validator("gpu") + def validate_gpu(cls, v: Optional[int]) -> Optional[int]: + """ + Validates the value of the gpu field. + + Args: + v (Optional[int]): The value of the gpu field. + + Returns: + Optional[int]: The input value if validation is successful. + + Raises: + ValueError: If the value is less than 0. + """ + if v is not None and v < 0: + raise ValueError("GPU count cannot be less than 0") + return v + + +class AwsGpuNode(BaseModel): + """ + Represents a configuration for an AWS GPU node. + + Attributes: + diskSize (int): The size of the disk for the GPU node in GB. + """ + + diskSize: int + + +class GcpGpuNode(BaseModel): + """ + Represents a Google Cloud Platform GPU node. + + Attributes: + imageType (str): The type of image used for the GPU node. + acceleratorType (str): The type of accelerator used for the GPU node. + acceleratorCount (int): The number of accelerators attached to the GPU node. + diskType (str): The type of disk used for the GPU node. + diskSize (int): The size of the disk attached to the GPU node in GB. + """ + + imageType: str + acceleratorType: str + acceleratorCount: int + diskType: str + diskSize: int + class CloudNode(BaseModel): """ @@ -79,10 +129,23 @@ class CloudNode(BaseModel): Attributes: nodeType (str): The type of the node. - + diskSize (int): The size of the disk attached to the node in GB. + awsGpu (Optional[AwsGpuNode]): The AWS GPU node configuration, if applicable. + gcpGpu (Optional[GcpGpuNode]): The GCP GPU node configuration, if applicable. """ nodeType: str + diskSize: int = 20 + awsGpu: Optional[AwsGpuNode] = None + gcpGpu: Optional[GcpGpuNode] = None + + @model_validator(mode="before") + def validate_gpu( + cls, values: Dict[str, Union[AwsGpuNode, GcpGpuNode]] + ) -> Dict[str, Union[AwsGpuNode, GcpGpuNode]]: + if values.get("awsGpu") and values.get("gcpGpu"): + raise ValueError("At most one of awsGpu or gcpGpu can exist") + return values class ModelGroup(BaseModel): diff --git a/paka/k8s.py b/paka/k8s.py index ff7efbd..c51ed53 100644 --- a/paka/k8s.py +++ b/paka/k8s.py @@ -621,5 +621,11 @@ def tail_logs(namespace: str, pod_name: str, container_name: str) -> None: logger.info(event) +def remove_crd_finalizers(name: str) -> None: + api = client.ApiextensionsV1Api() + body = [{"op": "remove", "path": "/metadata/finalizers"}] + api.patch_custom_resource_definition(name, body) + + # Load the kubeconfig when this module is imported try_load_kubeconfig() diff --git a/paka/kube_resources/model_group/model.py b/paka/kube_resources/model_group/model.py index 2e327fc..b8e1970 100644 --- a/paka/kube_resources/model_group/model.py +++ b/paka/kube_resources/model_group/model.py @@ -8,7 +8,10 @@ from botocore.exceptions import ClientError from paka.kube_resources.model_group.manifest import Manifest -from paka.kube_resources.model_group.supported_models import SUPPORTED_MODELS +from paka.kube_resources.model_group.supported_models import ( + SUPPORTED_MODELS, + SUPPORTED_MODELS_V2, +) from paka.logger import logger from paka.utils import read_current_cluster_data, to_yaml @@ -204,47 +207,56 @@ def download_model(name: str) -> None: Returns: None """ - if name not in SUPPORTED_MODELS: - logger.error( - f"Model {name} is not supported." - f"Available models are: {', '.join(SUPPORTED_MODELS.keys())}" - ) - raise Exception(f"Model {name} is not supported.") - model = SUPPORTED_MODELS[name] - - logger.info(f"Downloading model from {model.url}...") - # Get the model name from the URL - model_file_name = model.url.split("/")[-1] - model_path = f"{MODEL_PATH_PREFIX}/{name}" - - full_model_file_path = f"{model_path}/{model_file_name}" - bucket = read_current_cluster_data("bucket") - - if s3_file_prefix_exists(bucket, f"{model_path}/"): - logger.info(f"Model {name} already exists.") - return - - sha256 = download_file_to_s3(model.url, bucket, full_model_file_path) - if sha256 != model.sha256: - logger.error(f"SHA256 hash of the downloaded file does not match.") - # Delete the file - delete_s3_file(bucket, full_model_file_path) - raise Exception(f"SHA256 hash of the downloaded file does not match.") - - # Save model manifest - manifest = Manifest( - name=name, - sha256=model.sha256, - url=model.url, - type="gguf", # TODO: hard-coded for now - file=model_file_name, - ) + if name in SUPPORTED_MODELS_V2: + # new version of the model class + new_model = SUPPORTED_MODELS_V2[name] + if str(new_model) == "HuggingFaceModel": + new_model.upload_files() + return + else: + # old version + if name not in SUPPORTED_MODELS: + logger.error( + f"Model {name} is not supported." + f"Available models are: {', '.join(SUPPORTED_MODELS.keys())}" + ) + raise Exception(f"Model {name} is not supported.") + + model = SUPPORTED_MODELS[name] + + logger.info(f"Downloading model from {model.url}...") + # Get the model name from the URL + model_file_name = model.url.split("/")[-1] + model_path = f"{MODEL_PATH_PREFIX}/{name}" + + full_model_file_path = f"{model_path}/{model_file_name}" + bucket = read_current_cluster_data("bucket") + + if s3_file_prefix_exists(bucket, f"{model_path}/"): + logger.info(f"Model {name} already exists.") + return + + sha256 = download_file_to_s3(model.url, bucket, full_model_file_path) + if sha256 != model.sha256: + logger.error(f"SHA256 hash of the downloaded file does not match.") + # Delete the file + delete_s3_file(bucket, full_model_file_path) + raise Exception(f"SHA256 hash of the downloaded file does not match.") + + # Save model manifest + manifest = Manifest( + name=name, + sha256=model.sha256, + url=model.url, + type="gguf", # TODO: hard-coded for now + file=model_file_name, + ) - manifest_yaml = to_yaml(manifest.model_dump(exclude_none=True)) - save_string_to_s3(bucket, f"{model_path}/manifest.yaml", manifest_yaml) + manifest_yaml = to_yaml(manifest.model_dump(exclude_none=True)) + save_string_to_s3(bucket, f"{model_path}/manifest.yaml", manifest_yaml) - logger.info(f"Model {name} downloaded successfully.") + logger.info(f"Model {name} downloaded successfully.") def get_model_file_name(model_name: str) -> str: diff --git a/paka/kube_resources/model_group/models/__init__.py b/paka/kube_resources/model_group/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/paka/kube_resources/model_group/models/base.py b/paka/kube_resources/model_group/models/base.py new file mode 100644 index 0000000..1c5a5ee --- /dev/null +++ b/paka/kube_resources/model_group/models/base.py @@ -0,0 +1,371 @@ +import concurrent.futures +import hashlib +from threading import Lock +from typing import Any, Dict, List + +import boto3 +import requests +from botocore.client import Config +from botocore.exceptions import ClientError +from tqdm import tqdm + +from paka.logger import logger +from paka.utils import read_current_cluster_data + +MODEL_PATH_PREFIX = "models" + + +class Model: + def __init__( + self, + name: str, + inference_devices: list[str] = ["cpu"], + quantization: str = "GPTQ", + runtime: str = "llama.cpp", + prompt_template: str = "chatml", + download_max_concurrency: int = 10, + s3_chunk_size: int = 8 * 1024 * 1024, + s3_max_concurrency: int = 20, + ) -> None: + """ + Initializes a Model object. + + Args: + name (str): The name of the model, repository id. + inference_devices (list[str], optional): The list (cpu, gpu, tpu, etc) of inference devices to use. Defaults to ['cpu']. + quantization (str, optional): The quantization method (GPTQ, AWQ, GGUF_Q4_0, etc) to use. Defaults to 'GPTQ'. + runtime (str, optional): The runtime (vLLM, pytorch, etc) to use. Defaults to 'llama.cpp'. + prompt_template (str, optional): The prompt template (chatml, llama-2, gemma, etc) to use. Defaults to 'chatml'. + download_max_concurrency (int, optional): The maximum number of concurrent downloads. Defaults to 10. + s3_chunk_size (int, optional): The size of each chunk to upload to S3 in bytes. Defaults to 8 * 1024 * 1024. + s3_max_concurrency (int, optional): The maximum number of concurrent uploads to S3. Defaults to 20. + """ + # model info + self.name = name + self.inference_devices = inference_devices + self.quantization = quantization + self.runtime = runtime + self.prompt_template = prompt_template + + # s3 bucket + self.s3_bucket = read_current_cluster_data("bucket") + self.s3_chunk_size = s3_chunk_size + self.download_max_concurrency = download_max_concurrency + self.s3_max_concurrency = s3_max_concurrency + self.s3 = boto3.client("s3", config=Config(signature_version="s3v4")) + # Shared counter + self.counter: dict[str, int] = {} + self.counter_lock = Lock() + self.pbar: tqdm = None + self.files_size: dict[str, int] = {} + + def get_s3_file_path(self, file_path: str) -> str: + """ + Returns the S3 file path for a given file name. + + Args: + file_path (str): The path of the file. + + Returns: + str: The S3 file path. + """ + return f"{MODEL_PATH_PREFIX}/{file_path}" + + def download(self, url: str, sha256: str | None = None) -> None: + """ + Downloads a single file from a URL. + + Args: + url (str): The URL of the file to download. + sha256 (str, optional): The expected SHA256 hash of the downloaded file. + + Raises: + Exception: If the SHA256 hash of the downloaded file does not match the expected value. + """ + full_model_file_path = self.get_s3_file_path( + f"{self.name}/{url.split('/')[-1]}" + ) + if self.s3_file_exists(full_model_file_path): + logger.info(f"Model file {full_model_file_path} already exists.") + return + + logger.info(f"Downloading model from {url}") + completed_upload_id = None + try: + with requests.get(url, stream=True) as response: + response.raise_for_status() + upload_id, sha256_value = self.upload_to_s3( + response, full_model_file_path + ) + if sha256 is not None and sha256 != sha256_value: + self.delete_s3_file(full_model_file_path) + raise Exception( + f"SHA256 hash of the downloaded file does not match the expected value. {full_model_file_path}" + ) + completed_upload_id = upload_id + except Exception as e: + logger.error(f"An error occurred, download: {str(e)}") + raise e + finally: + # If an error occurred and upload was not completed + if completed_upload_id is None: + self.s3.abort_multipart_upload( + Bucket=self.s3_bucket, Key=full_model_file_path, UploadId=upload_id + ) + + def download_all(self, urls: list[str], sha256s: list[str | None] = []) -> None: + """ + Downloads multiple files from a list of URLs. + + Args: + urls (list[str]): A list of URLs of the files to download. + sha256s (list[str], optional): A list of expected SHA256 hashes for the downloaded files. + """ + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.download_max_concurrency + ) as executor: + executor.map(self.download, urls, sha256s) + + def upload_to_s3( + self, response: requests.Response, s3_file_name: str + ) -> tuple[Any, str]: + """ + Uploads a single file to S3. + + Args: + response (requests.Response): The response object from the file download. + s3_file_name (str): The name of the file in S3. + + Returns: + tuple: A tuple containing the upload ID and the SHA256 hash of the file. + """ + logger.info(f"Uploading model to {s3_file_name}") + sha256 = hashlib.sha256() + total_size = int(response.headers.get("content-length", 0)) + processed_size = 0 + + upload = self.s3.create_multipart_upload( + Bucket=self.s3_bucket, Key=s3_file_name + ) + upload_id = upload["UploadId"] + parts = [] + + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.s3_max_concurrency + ) as executor: + futures: List[concurrent.futures.Future] = [] + part_number = 1 + + for chunk in response.iter_content(chunk_size=self.s3_chunk_size): + sha256.update(chunk) + while len(futures) >= self.s3_max_concurrency: + # Wait for one of the uploads to complete + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for future in done: + parts.append(future.result()) + futures.remove(future) + + # Submit new chunk for upload + future = executor.submit( + self.upload_part, + s3_file_name, + upload_id, + part_number, + chunk, + ) + futures.append(future) + part_number += 1 + processed_size += len(chunk) + progress = (processed_size / total_size) * 100 + print(f"Progress: {progress:.2f}%", end="\r") + + # Wait for all remaining uploads to complete + for future in concurrent.futures.as_completed(futures): + parts.append(future.result()) + + parts.sort(key=lambda part: part["PartNumber"]) + self.s3.complete_multipart_upload( + Bucket=self.s3_bucket, + Key=s3_file_name, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + + logger.info(f"File uploaded to S3: {s3_file_name}") + sha256_value = sha256.hexdigest() + logger.info(f"SHA256 hash of the file: {sha256_value}") + return upload_id, sha256_value + + def upload_fs_to_s3(self, fs: Any, s3_file_name: str, upload_id: str) -> str: + """ + Uploads a single file to S3. + + Args: + fs (Any): The file stream object. + s3_file_name (str): The name of the file in S3. + upload_id: The upload ID of the multipart upload. + + Returns: + tuple: the SHA256 hash of the file. + """ + self.pbar.postfix = f"{s3_file_name}" + sha256 = hashlib.sha256() + processed_size = 0 + parts = [] + + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.s3_max_concurrency + ) as executor: + futures: List[concurrent.futures.Future] = [] + part_number = 1 + + for chunk in iter(lambda: fs.read(self.s3_chunk_size), b""): + sha256.update(chunk) + while len(futures) >= self.s3_max_concurrency: + # Wait for one of the uploads to complete + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for future in done: + parts.append(future.result()) + futures.remove(future) + + # Submit new chunk for upload + future = executor.submit( + self.upload_part, + s3_file_name, + upload_id, + part_number, + chunk, + ) + futures.append(future) + part_number += 1 + processed_size += len(chunk) + with self.counter_lock: + self.counter[s3_file_name] = processed_size + total_progress = sum(self.counter.values()) + self.pbar.update(total_progress - self.pbar.n) + + # Wait for all remaining uploads to complete + for future in concurrent.futures.as_completed(futures): + parts.append(future.result()) + + parts.sort(key=lambda part: part["PartNumber"]) + self.s3.complete_multipart_upload( + Bucket=self.s3_bucket, + Key=s3_file_name, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + sha256_value = sha256.hexdigest() + self.pbar.set_postfix(f"Uploaded {s3_file_name}, SHA256: {sha256_value}") + return sha256_value + + def upload_part( + self, + s3_file_name: str, + upload_id: str, + part_number: int, + chunk: bytes, + ) -> Dict[str, Any]: + """ + Uploads a part of a file to S3. + + Args: + s3_file_name (str): The name of the file in S3. + upload_id (str): The upload ID of the multipart upload. + part_number (int): The part number of the chunk being uploaded. + chunk (bytes): The chunk of data to upload. + + Returns: + dict: A dictionary containing the part number and the ETag of the uploaded part. + """ + part = self.s3.upload_part( + Body=chunk, + Bucket=self.s3_bucket, + Key=s3_file_name, + UploadId=upload_id, + PartNumber=part_number, + ) + return {"PartNumber": part_number, "ETag": part["ETag"]} + + def s3_file_exists(self, s3_file_name: str) -> bool: + """ + Checks if a file exists in S3. + + Args: + s3_file_name (str): The name of the file in S3. + + Returns: + bool: True if the file exists, False otherwise. + """ + try: + self.s3.head_object(Bucket=self.s3_bucket, Key=s3_file_name) + return True + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + else: + raise # some other error occurred + + def s3_file_prefix_exists(self, s3_file_name: str) -> bool: + """ + Checks if a file prefix exists in S3. + + Args: + s3_file_name (str): The prefix of the file name in S3. + + Returns: + bool: True if the file prefix exists, False otherwise. + """ + bucket = self.s3.Bucket(self.s3_bucket) + return any(bucket.objects.filter(Prefix=s3_file_name)) + + def delete_s3_file(self, s3_file_name: str) -> None: + """ + Deletes the specified file from the S3 bucket. + + Args: + s3_file_name (str): The name of the file to be deleted. + + Returns: + None + """ + if self.s3_file_exists(s3_file_name): + self.s3.delete_object(Bucket=self.s3_bucket, Key=s3_file_name) + logger.info(f"{s3_file_name} deleted.") + else: + logger.info(f"{s3_file_name} not found.") + + def clear_counter(self) -> None: + with self.counter_lock: + self.counter = {} + + def create_pbar(self) -> None: + total_size = sum(self.files_size.values()) + self.pbar = tqdm(total=total_size, unit="B", unit_scale=True, desc="Uploading") + + def close_pbar(self) -> None: + self.pbar.close() + self.pbar = None + + def logging_for_class(self, message: str, type: str = "info") -> None: + """ + Logs an informational message. + + Args: + message (str): The message to log. + + Returns: + None + """ + if type == "info": + logger.info(f"{self.__str__()} ({self.name}): {message}") + elif type == "warn": + logger.warn(f"{self.__str__()} ({self.name}): {message}") + elif type == "error": + logger.error(f"{self.__str__()} ({self.name}): {message}") + else: + logger.info(f"{self.__str__()} ({self.name}): {message}") diff --git a/paka/kube_resources/model_group/models/http_source_model.py b/paka/kube_resources/model_group/models/http_source_model.py new file mode 100644 index 0000000..e69de29 diff --git a/paka/kube_resources/model_group/models/hugging_face_model.py b/paka/kube_resources/model_group/models/hugging_face_model.py new file mode 100644 index 0000000..3d4405c --- /dev/null +++ b/paka/kube_resources/model_group/models/hugging_face_model.py @@ -0,0 +1,203 @@ +import concurrent.futures +from typing import Any + +import boto3 +from huggingface_hub import HfFileSystem +from pydantic import BaseModel + +from paka.kube_resources.model_group.models.base import Model +from paka.logger import logger +from paka.utils import to_yaml + + +class Manifest(BaseModel): + repo_id: str + files: list[tuple[str, str]] + inference_devices: list[str] + quantization: str + runtime: str + prompt_template: str + + +class HuggingFaceModel(Model): + def __str__(self) -> str: + return "HuggingFaceModel" + + def __init__( + self, + repo_id: str, + files: list[str], + inference_devices: list[str] = ["cpu"], + quantization: str = "GPTQ", + runtime: str = "llama.cpp", + prompt_template: str = "chatml", + download_max_concurrency: int = 10, + s3_chunk_size: int = 8 * 1024 * 1024, + s3_max_concurrency: int = 20, + ) -> None: + super().__init__( + repo_id, + inference_devices, + quantization, + runtime, + prompt_template, + download_max_concurrency, + s3_chunk_size, + s3_max_concurrency, + ) + self.repo_id: str = repo_id + self.fs = HfFileSystem() + self.orginal_files = files + self.files: list[str] = [] + self.files_sha256: dict[str, str] = {} + self.completed_files: list[tuple[str, str]] = [] + + def validate_files(self) -> None: + """ + Validates the list of files to download. + """ + verified_files: list[str] = [] + for file in self.orginal_files: + match_files = self.fs.glob(f"{self.repo_id}/{file}") + if len(match_files) > 0: + verified_files = verified_files + match_files + else: + self.logging_for_class( + f"File {file} not found in repository {self.repo_id}", "warn" + ) + + for file in verified_files: + file_info = self.get_file_info(file) + self.files_size[file] = file_info["size"] + self.files_sha256[file] = ( + file_info["lfs"]["sha256"] + if "lfs" in file_info and file_info["lfs"] + else None + ) + + self.files = verified_files + + def get_file_info(self, hf_file_path: str) -> dict[str, Any]: + """ + Get information about a file on Hugging Face. + + Args: + hf_file_path (str): The path to the file on Hugging Face. + + Returns: + dict: A dictionary containing information about the file. + """ + # Get the file information + file_info: dict[str, Any] = self.fs.stat(hf_file_path) + + return file_info + + def upload_file_to_s3(self, hf_file_path: str) -> None: + """ + Upload a file from Hugging Face to S3. + + Args: + hf_file_path (str): The path to the file on Hugging Face. + + Returns: + None + """ + full_model_file_path = self.get_s3_file_path(hf_file_path) + if self.s3_file_exists(full_model_file_path): + self.pbar.postfix(f"Model file {full_model_file_path} already exists.") + self.counter[full_model_file_path] = self.files_size[hf_file_path] + return + + upload_id = None + file_uploaded = False + sha256 = self.files_sha256[hf_file_path] + try: + with self.fs.open(hf_file_path, "rb") as hf_file: + upload = self.s3.create_multipart_upload( + Bucket=self.s3_bucket, Key=full_model_file_path + ) + upload_id = upload["UploadId"] + sha256_value = self.upload_fs_to_s3( + hf_file, full_model_file_path, upload_id + ) + if sha256 is not None and sha256 != sha256_value: + self.delete_s3_file(full_model_file_path) + raise Exception( + f"SHA256 hash of the downloaded file does not match the expected value. {full_model_file_path}" + ) + file_uploaded = True + except Exception as e: + self.logging_for_class(f"An error occurred, download: {str(e)}", "error") + raise e + finally: + # If an error occurred and upload was not completed + if upload_id is not None and not file_uploaded: + self.s3.abort_multipart_upload( + Bucket=self.s3_bucket, Key=full_model_file_path, UploadId=upload_id + ) + else: + self.completed_files.append((hf_file_path, sha256)) + self.logging_for_class( + f"Model file {full_model_file_path} uploaded successfully." + ) + + def save_manifest_yml(self) -> None: + """ + Saves the manifest YAML file for the model. + + This method creates a `Manifest` object with the specified parameters and converts it to YAML format. + The resulting YAML is then saved to an S3 bucket with the file path "{repo_id}/manifest.yml". + + Returns: + None + + Raises: + None + """ + manifest = Manifest( + repo_id=self.repo_id, + files=self.completed_files, + inference_devices=self.inference_devices, + quantization=self.quantization, + runtime=self.runtime, + prompt_template=self.prompt_template, + ) + manifest_yaml = to_yaml(manifest.model_dump(exclude_none=True)) + file_path = self.get_s3_file_path(f"{self.repo_id}/manifest.yml") + s3 = boto3.resource("s3") + s3.Object(self.s3_bucket, file_path).put(Body=manifest_yaml) + self.logging_for_class(f"Manifest file saved to {file_path}") + + def upload_files(self) -> None: + """ + Upload multiple files from Hugging Face to S3 in parallel. + Returns: + None + """ + self.logging_for_class("Uploading files to S3...") + self.validate_files() + self.create_pbar() + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.download_max_concurrency + ) as executor: + futures = [ + executor.submit(self.upload_file_to_s3, file) for file in self.files + ] + concurrent.futures.wait(futures) + # Callback function to handle completion of all workers + self.handle_upload_completion() + + def handle_upload_completion(self) -> None: + """ + Callback function to handle completion of all workers. + This function will be called after all files have been uploaded. + Returns: + None + """ + # Add your code here to handle the completion of all workers + # For example, you can log a message or perform any post-processing tasks + self.save_manifest_yml() + self.completed_files = [] + self.clear_counter() + self.close_pbar() + self.logging_for_class("All files have been uploaded.") diff --git a/paka/kube_resources/model_group/service.py b/paka/kube_resources/model_group/service.py index 804ff95..2e2d92c 100644 --- a/paka/kube_resources/model_group/service.py +++ b/paka/kube_resources/model_group/service.py @@ -8,9 +8,12 @@ from paka.kube_resources.model_group.model import MODEL_PATH_PREFIX, download_model from paka.utils import kubify_name, read_cluster_data -# We hardcode the image here for now +# `latest` will be stale because of the `IfNotPresent` policy +# We hardcode the image here for now, we can make it configurable later LLAMA_CPP_PYTHON_IMAGE = "ghcr.io/abetlen/llama-cpp-python:latest" +LLAMA_CPP_PYTHON_CUDA = "jijunleng/llama-cpp-python-cuda:latest" + try_load_kubeconfig() @@ -115,8 +118,9 @@ def create_pod( ], "env": [ client.V1EnvVar( - name="USE_MLOCK", # Model weights are locked in RAM or not - value="0", + name="N_GPU_LAYERS", + # -1 means all layers are GPU layers, 0 means no GPU layers + value=("-1" if model_group.awsGpu else "0"), ), client.V1EnvVar( name="MODEL", @@ -159,6 +163,17 @@ def create_pod( }, ) + if model_group.awsGpu: + if "resources" not in container_args: + container_args["resources"] = client.V1ResourceRequirements() + if container_args["resources"].limits is None: + container_args["resources"].limits = {} + gpu_count = 1 + if model_group.resourceRequest and model_group.resourceRequest.gpu: + gpu_count = model_group.resourceRequest.gpu + # Ah, we only support nvidia GPUs for now + container_args["resources"].limits["nvidia.com/gpu"] = gpu_count + return client.V1Pod( metadata=client.V1ObjectMeta( name=f"{kubify_name(model_group.name)}", @@ -488,7 +503,13 @@ def create_model_group_service( port = 8000 - pod = create_pod(namespace, config, model_group, LLAMA_CPP_PYTHON_IMAGE, port) + pod = create_pod( + namespace, + config, + model_group, + (LLAMA_CPP_PYTHON_CUDA if model_group.awsGpu else LLAMA_CPP_PYTHON_IMAGE), + port, + ) deployment = create_deployment(namespace, model_group, pod) apply_resource(deployment) diff --git a/paka/kube_resources/model_group/supported_models.py b/paka/kube_resources/model_group/supported_models.py index 749f16a..0f12b05 100644 --- a/paka/kube_resources/model_group/supported_models.py +++ b/paka/kube_resources/model_group/supported_models.py @@ -1,5 +1,7 @@ from collections import namedtuple +from paka.kube_resources.model_group.models.hugging_face_model import HuggingFaceModel + Model = namedtuple("Model", ["name", "url", "sha256"]) SUPPORTED_MODELS = { @@ -24,3 +26,14 @@ sha256="2413866ece3b8b9eedf6c2a4393d4b56dbfa363c173ca3ba3a2f2a44db158982", ), } + +SUPPORTED_MODELS_V2 = { + "llama2-7b": HuggingFaceModel( + repo_id="TheBloke/Llama-2-7B-Chat-GGUF", + files=["*.json", "llama-2-7b-chat.Q4_0.gguf", "llama-2-7b-chat.Q3_K_S.gguf"], + ), + "gte-base": HuggingFaceModel( + repo_id="jjleng/gte-base-gguf", + files=["gte-base.q4_0.gguf", "gte-base.f32.gguf", "gte-base.f16.gguf"], + ), +} diff --git a/pyproject.toml b/pyproject.toml index fe2d578..6af7d36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "paka" -version = "0.1.1" +version = "0.1.2" description = "LLMOps tool designed to simplify the deployment and management of large language model (LLM) applications" authors = ["Jijun Leng"] readme = "README.md" @@ -40,6 +40,7 @@ requests = "^2.31.0" kubernetes = "^v29.0.0b1" boto3 = "^1.34.22" tabulate = "^0.9.0" +huggingface-hub = "^0.22.2" [tool.poetry.group.dev.dependencies] codespell = "^2.2.6" diff --git a/tests/config/snapshots/test_config/test_aws_yaml/aws_yaml.txt b/tests/config/snapshots/test_config/test_aws_yaml/aws_yaml.txt index 917e48e..8a0bfa2 100644 --- a/tests/config/snapshots/test_config/test_aws_yaml/aws_yaml.txt +++ b/tests/config/snapshots/test_config/test_aws_yaml/aws_yaml.txt @@ -9,6 +9,7 @@ aws: logRetentionDays: 14 modelGroups: - nodeType: t2.micro + diskSize: 20 name: test-model-group minInstances: 1 maxInstances: 2 diff --git a/tests/config/test_config.py b/tests/config/test_config.py index 401675d..1275813 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -52,6 +52,11 @@ def test_invalid_memory_resource_request() -> None: ResourceRequest(cpu="500m", memory="2G") +def test_invalid_gpu_resource_request() -> None: + with pytest.raises(ValueError, match="GPU count cannot be less than 0"): + ResourceRequest(cpu="500m", memory="2Gi", gpu=-1) + + def test_model_group() -> None: # Test with valid minInstances and maxInstances model_group = ModelGroup(name="test", minInstances=1, maxInstances=2) @@ -190,6 +195,7 @@ def test_parse_yaml() -> None: minInstances: 1 maxInstances: 1 name: llama2-7b + awsGpu: vectorStore: nodeType: t2.small replicas: 2 @@ -211,10 +217,36 @@ def test_parse_yaml() -> None: assert model_group.minInstances == 1 assert model_group.maxInstances == 1 assert model_group.name == "llama2-7b" + assert model_group.awsGpu is None assert config.aws.vectorStore is not None assert config.aws.vectorStore.nodeType == "t2.small" assert config.aws.vectorStore.replicas == 2 + yaml_str = """ + aws: + cluster: + name: test_cluster + region: us-west-2 + nodeType: t2.medium + minNodes: 2 + maxNodes: 4 + modelGroups: + - nodeType: c7a.xlarge + minInstances: 1 + maxInstances: 1 + name: llama2-7b + awsGpu: + diskSize: 100 + """ + config = parse_yaml(yaml_str) + assert isinstance(config, Config) + assert config.aws is not None + assert config.aws.modelGroups is not None + assert len(config.aws.modelGroups) == 1 + model_group = config.aws.modelGroups[0] + assert model_group.awsGpu is not None + assert model_group.awsGpu.diskSize == 100 + def test_round_trip() -> None: original_config = Config(aws=cloud_config) diff --git a/tests/model_group/models/__init__.py b/tests/model_group/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/model_group/models/test_base.py b/tests/model_group/models/test_base.py new file mode 100644 index 0000000..0ca7bac --- /dev/null +++ b/tests/model_group/models/test_base.py @@ -0,0 +1,79 @@ +import unittest +from unittest.mock import ANY, MagicMock, Mock, patch + +from paka.kube_resources.model_group.models.base import Model + + +class TestModel(unittest.TestCase): + def setUp(self) -> None: + self.model = Model("TheBloke/Llama-2-7B-Chat-GGUF") + + @patch("paka.kube_resources.model_group.models.base.read_current_cluster_data") + @patch("paka.kube_resources.model_group.models.base.boto3.client") + def test_init( + self, mock_boto3_client: Mock, mock_read_current_cluster_data: Mock + ) -> None: + mock_read_current_cluster_data.return_value = "test_bucket" + self.model = Model( + "TheBloke/Llama-2-7B-Chat-GGUF", + download_max_concurrency=5, + s3_chunk_size=4 * 1024 * 1024, + s3_max_concurrency=10, + ) + self.assertEqual(self.model.name, "TheBloke/Llama-2-7B-Chat-GGUF") + self.assertEqual(self.model.s3_bucket, "test_bucket") + self.assertEqual(self.model.s3_chunk_size, 4 * 1024 * 1024) + self.assertEqual(self.model.download_max_concurrency, 5) + self.assertEqual(self.model.s3_max_concurrency, 10) + mock_read_current_cluster_data.assert_called_once_with("bucket") + # mock_boto3_client.assert_called_once_with("s3", config=MagicMock(signature_version="s3v4")) + + @patch("paka.kube_resources.model_group.models.base.logger") + @patch("paka.kube_resources.model_group.models.base.requests.get") + @patch("paka.kube_resources.model_group.models.base.Model.s3_file_exists") + @patch("paka.kube_resources.model_group.models.base.Model.upload_part") + @patch("paka.kube_resources.model_group.models.base.Model.upload_to_s3") + def test_download( + self, + mock_upload_to_s3: Mock, + mock_upload_part: Mock, + mock_s3_file_exists: Mock, + mock_requests_get: Mock, + mock_logger: Mock, + ) -> None: + mock_s3_file_exists.return_value = False + mock_upload_part.return_value = {"PartNumber": 1, "ETag": "test_etag"} + mock_response = MagicMock() + mock_response.headers.get.return_value = "100" + mock_response.iter_content.return_value = [b"chunk1", b"chunk2"] + mock_response.raise_for_status.return_value = True + mock_requests_get.return_value.__enter__.return_value = mock_response + url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q4_0.gguf" + sha256 = "9958ee9b670594147b750bbc7d0540b928fa12dcc5dd4c58cc56ed2eb85e371b" + mock_upload_to_s3.return_value = ("test_upload_id", sha256) + self.model.download(url, sha256) + mock_s3_file_exists.assert_called_once_with( + "models/TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q4_0.gguf" + ) + mock_logger.info.assert_called_with(f"Downloading model from {url}") + mock_requests_get.assert_called_once_with(url, stream=True) + mock_response.raise_for_status.assert_called_once() + mock_upload_to_s3.assert_called_once_with( + mock_response, + "models/TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q4_0.gguf", + ) + + @patch.object(Model, "download") + def test_download_all(self, mock_download: Mock) -> None: + urls = [ + "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q4_0.gguf", + "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGUF/resolve/main/llama-2-7b-chat.Q2_K.gguf", + ] + sha256s: list[str | None] = [ + "9958ee9b670594147b750bbc7d0540b928fa12dcc5dd4c58cc56ed2eb85e371b", + None, + ] + + self.model.download_all(urls, sha256s) + self.assertEqual(mock_download.call_count, 2) + mock_download.assert_called_with(ANY, ANY) diff --git a/tests/model_group/models/test_hugging_face_model.py b/tests/model_group/models/test_hugging_face_model.py new file mode 100644 index 0000000..24f5722 --- /dev/null +++ b/tests/model_group/models/test_hugging_face_model.py @@ -0,0 +1,125 @@ +import unittest +from unittest.mock import ANY, MagicMock, Mock, patch + +from paka.kube_resources.model_group.models.hugging_face_model import ( + HuggingFaceModel, # replace with the actual module name +) + + +class TestHuggingFaceModel(unittest.TestCase): + def setUp(self) -> None: + self.model = HuggingFaceModel( + "TheBloke/Llama-2-7B-Chat-GGUF", + files=[ + "llama-2-7b-chat.Q4_0.gguf", + "llama-2-7b-chat.Q2_K.gguf", + ], + ) + + @patch.object(HuggingFaceModel, "upload_fs_to_s3") + @patch.object(HuggingFaceModel, "s3_file_exists") + @patch.object(HuggingFaceModel, "save_manifest_yml") + def test_upload_file_to_s3( + self, + mock_save_manifest_yml: Mock, + mock_s3_file_exists: Mock, + mock_upload_fs_to_s3: Mock, + ) -> None: + mock_s3_file_exists.return_value = False + mock_upload_fs_to_s3.return_value = ( + "9958ee9b670594147b750bbc7d0540b928fa12dcc5dd4c58cc56ed2eb85e371b" + ) + + hf_file_path = "TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q4_0.gguf" + full_model_file_path = self.model.get_s3_file_path(hf_file_path) + + # Act + self.model.s3 = MagicMock() + self.model.s3.create_multipart_upload = MagicMock() + self.model.s3.create_multipart_upload.return_value = {"UploadId": "test"} + self.model.files_sha256[hf_file_path] = ( + "9958ee9b670594147b750bbc7d0540b928fa12dcc5dd4c58cc56ed2eb85e371b" + ) + self.model.upload_file_to_s3(hf_file_path) + + # Assert + mock_upload_fs_to_s3.assert_called_once_with(ANY, full_model_file_path, "test") + + @patch.object(HuggingFaceModel, "upload_file_to_s3") + @patch.object(HuggingFaceModel, "save_manifest_yml") + @patch.object(HuggingFaceModel, "validate_files") + @patch.object(HuggingFaceModel, "create_pbar") + @patch.object(HuggingFaceModel, "close_pbar") + def test_upload_files( + self, + mock_close_pbar: Mock, + mock_create_pbar: Mock, + mock_validate_files: Mock, + mock_save_manifest_yml: Mock, + mock_upload_file_to_s3: Mock, + ) -> None: + # Act + self.model.files = ["file1", "file2"] + self.model.upload_files() + + # Assert + self.assertEqual(mock_upload_file_to_s3.call_count, 2) + mock_upload_file_to_s3.assert_called_with(ANY) + + @patch.object(HuggingFaceModel, "get_file_info") + @patch.object(HuggingFaceModel, "logging_for_class") + def test_validate_files( + self, mock_logging_for_class: Mock, mock_get_file_info: Mock + ) -> None: + # Arrange + self.model.fs = MagicMock() + self.model.fs.glob = MagicMock() + self.model.fs.glob.side_effect = [ + ["TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q4_0.gguf"], + ["TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q2_K.gguf"], + ] + mock_get_file_info.side_effect = [ + {"size": 100, "lfs": {"sha256": "abc123"}}, + {"size": 200, "lfs": {"sha256": "edb322"}}, + ] + + # Act + self.model.validate_files() + + # Assert + self.assertEqual(len(self.model.files_size), 2) + self.assertEqual(len(self.model.files_sha256), 2) + self.assertEqual( + self.model.files_size, + { + "TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q4_0.gguf": 100, + "TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q2_K.gguf": 200, + }, + ) + self.assertEqual( + self.model.files_sha256, + { + "TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q4_0.gguf": "abc123", + "TheBloke/Llama-2-7B-Chat-GGUF/llama-2-7b-chat.Q2_K.gguf": "edb322", + }, + ) + mock_logging_for_class.assert_not_called() + + @patch.object(HuggingFaceModel, "get_file_info") + @patch.object(HuggingFaceModel, "logging_for_class") + def test_validate_files_file_not_found( + self, mock_logging_for_class: Mock, mock_get_file_info: Mock + ) -> None: + # Arrange + self.model.fs = MagicMock() + self.model.fs.glob = MagicMock() + self.model.fs.glob.return_value = [] + + # Act + self.model.validate_files() + + # Assert + self.assertEqual(len(self.model.files_size), 0) + self.assertEqual(len(self.model.files_sha256), 0) + self.assertEqual(mock_logging_for_class.call_count, 2) + mock_get_file_info.assert_not_called() diff --git a/tests/model_group/test_model.py b/tests/model_group/test_model.py index 4093e26..e0583f7 100644 --- a/tests/model_group/test_model.py +++ b/tests/model_group/test_model.py @@ -71,13 +71,13 @@ def test_download_model() -> None: return_value=False, ) as mock_exists, patch( "paka.kube_resources.model_group.model.download_file_to_s3", - return_value=SUPPORTED_MODELS["llama2-7b"].sha256, + return_value=SUPPORTED_MODELS["mistral-7b"].sha256, ) as mock_download, patch( "paka.kube_resources.model_group.model.delete_s3_file" ) as mock_delete, patch( "paka.kube_resources.model_group.model.save_string_to_s3" ) as mock_save: - download_model("llama2-7b") + download_model("mistral-7b") mock_exists.assert_called_once() mock_download.assert_called_once() diff --git a/tests/model_group/test_service.py b/tests/model_group/test_service.py new file mode 100644 index 0000000..bac7d3b --- /dev/null +++ b/tests/model_group/test_service.py @@ -0,0 +1,98 @@ +from kubernetes.client import V1Pod + +from paka.config import ( + AwsGpuNode, + CloudConfig, + CloudModelGroup, + ClusterConfig, + Config, + ResourceRequest, +) +from paka.kube_resources.model_group.service import create_pod + + +def test_create_pod() -> None: + model_group = CloudModelGroup( + nodeType="c7a.xlarge", + minInstances=1, + maxInstances=1, + name="llama2-7b", + resourceRequest=ResourceRequest(cpu="100m", memory="256Mi", gpu=2), + awsGpu=AwsGpuNode(diskSize=100), + ) + + config = Config( + aws=CloudConfig( + cluster=ClusterConfig( + name="test_cluster", + region="us-west-2", + nodeType="t2.medium", + minNodes=2, + maxNodes=4, + ), + modelGroups=[model_group], + ) + ) + + pod = create_pod("test_namespace", config, model_group, "runtime_image", 8080) + + assert isinstance(pod, V1Pod) + + assert pod.metadata.name == "llama2-7b" + assert pod.metadata.namespace == "test_namespace" + assert len(pod.spec.containers) == 1 + container = pod.spec.containers[0] + assert container.name == "llama2-7b" + assert container.image == "runtime_image" + assert container.resources.requests["cpu"] == "100m" + assert container.resources.requests["memory"] == "256Mi" + assert container.resources.limits["nvidia.com/gpu"] == 2 + assert len(container.volume_mounts) == 1 + assert container.volume_mounts[0].name == "model-data" + assert container.volume_mounts[0].mount_path == "/data" + assert len(container.env) == 3 + assert container.env[0].name == "N_GPU_LAYERS" + assert container.env[0].value == "-1" # Offload all layers to GPU + assert container.env[1].name == "MODEL" + assert container.env[1].value == "/data/my_model.gguf" + assert container.env[2].name == "PORT" + assert container.env[2].value == "8080" + + model_group = CloudModelGroup( + nodeType="c7a.xlarge", + minInstances=1, + maxInstances=1, + name="llama2-7b", + ) + + config = Config( + aws=CloudConfig( + cluster=ClusterConfig( + name="test_cluster", + region="us-west-2", + nodeType="t2.medium", + minNodes=2, + maxNodes=4, + ), + modelGroups=[model_group], + ) + ) + + pod = create_pod("test_namespace", config, model_group, "runtime_image", 8080) + + assert isinstance(pod, V1Pod) + + assert len(pod.spec.containers) == 1 + container = pod.spec.containers[0] + assert container.name == "llama2-7b" + assert container.image == "runtime_image" + assert len(container.volume_mounts) == 1 + assert container.volume_mounts[0].name == "model-data" + assert container.volume_mounts[0].mount_path == "/data" + assert len(container.env) == 3 + assert container.env[0].name == "N_GPU_LAYERS" + assert container.env[0].value == "0" # Offload no layers to GPU + assert container.env[1].name == "MODEL" + assert container.env[1].value == "/data/my_model.gguf" + assert container.env[2].name == "PORT" + assert container.env[2].value == "8080"