diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index a978b224c2..855eb7ea3b 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -788,11 +788,25 @@ jobs: kubectl get pods -n kserve kubectl describe pods -n kserve + - name: Log the config map + run: | + kubectl describe configmaps -n kserve inferenceservice-config + - name: Run E2E tests timeout-minutes: 30 run: | ./test/scripts/gh-actions/run-e2e-tests.sh "raw" "6" + - name: Patch inferenceservice config for cluster ip none + run: | + kubectl patch configmaps -n kserve inferenceservice-config --patch-file config/overlays/test/configmap/inferenceservice-enable-cluster-ip.yaml + kubectl describe configmaps -n kserve inferenceservice-config + + - name: Run E2E tests - cluster ip none + timeout-minutes: 30 + run: | + ./test/scripts/gh-actions/run-e2e-tests.sh "rawcipn" "1" + - name: Check system status if: always() run: | diff --git a/.github/workflows/verify-codegen.yml b/.github/workflows/verify-codegen.yml index ff1fd0b1d8..4845ebb2df 100644 --- a/.github/workflows/verify-codegen.yml +++ b/.github/workflows/verify-codegen.yml @@ -58,6 +58,6 @@ jobs: for x in $(git diff-index --name-only HEAD -- ./pkg ./python ./charts); do echo "::error file=$x::Please run make generate.%0A$(git diff $x | urlencode)" done - echo "${{ github.repository }} is out of date. Please run make generate" + echo "${{ github.repository }} is out of date. Please run make generate | manifest" exit 1 fi diff --git a/charts/kserve-resources/README.md b/charts/kserve-resources/README.md index a83a4abc5a..b71aec6464 100644 --- a/charts/kserve-resources/README.md +++ b/charts/kserve-resources/README.md @@ -37,8 +37,10 @@ $ helm install kserve oci://ghcr.io/kserve/charts/kserve --version v0.14.0 | kserve.controller.imagePullSecrets | list | `[]` | Reference to one or more secrets to be used when pulling images. For more information, see [Pull an Image from a Private Registry](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/). For example: imagePullSecrets: - name: "image-pull-secret" | | kserve.controller.knativeAddressableResolver | object | `{"enabled":false}` | Indicates whether to create an addressable resolver ClusterRole for Knative Eventing. This ClusterRole grants the necessary permissions for the Knative's DomainMapping reconciler to resolve InferenceService addressables. | | kserve.controller.labels | object | `{}` | Optional additional labels to add to the controller deployment. | +| kserve.controller.metricsBindAddress | string | `"127.0.0.1"` | Metrics bind address | +| kserve.controller.metricsBindPort | string | `"8080"` | Metrics bind port | | kserve.controller.nodeSelector | object | `{}` | The nodeSelector on Pods tells Kubernetes to schedule Pods on the nodes with matching labels. For more information, see [Assigning Pods to Nodes](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/). | -| kserve.controller.podAnnotations | object | `{}` | Optional additional labels to add to the controller Pods. | +| kserve.controller.podAnnotations | object | `{}` | Optional additional annotations to add to the controller Pods. | | kserve.controller.podLabels | object | `{}` | Optional additional labels to add to the controller Pods. | | kserve.controller.rbacProxy.resources.limits.cpu | string | `"100m"` | | | kserve.controller.rbacProxy.resources.limits.memory | string | `"300Mi"` | | @@ -52,18 +54,22 @@ $ helm install kserve oci://ghcr.io/kserve/charts/kserve --version v0.14.0 | kserve.controller.rbacProxyImage | string | `"quay.io/brancz/kube-rbac-proxy:v0.18.0"` | KServe controller manager rbac proxy contrainer image | | kserve.controller.resources | object | `{"limits":{"cpu":"100m","memory":"300Mi"},"requests":{"cpu":"100m","memory":"300Mi"}}` | Resources to provide to the kserve controller pod. For example: requests: cpu: 10m memory: 32Mi For more information, see [Resource Management for Pods and Containers](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/). | | kserve.controller.securityContext | object | `{"runAsNonRoot":true}` | Pod Security Context. For more information, see [Configure a Security Context for a Pod or Container](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/). | +| kserve.controller.serviceAnnotations | object | `{}` | Optional additional annotations to add to the controller service. | | kserve.controller.tag | string | `"v0.14.0"` | KServe controller contrainer image tag. | | kserve.controller.tolerations | list | `[]` | A list of Kubernetes Tolerations, if required. For more information, see [Toleration v1 core](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#toleration-v1-core). For example: tolerations: - key: foo.bar.com/role operator: Equal value: master effect: NoSchedule | | kserve.controller.topologySpreadConstraints | list | `[]` | A list of Kubernetes TopologySpreadConstraints, if required. For more information, see [Topology spread constraint v1 core](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#topologyspreadconstraint-v1-core For example: topologySpreadConstraints: - maxSkew: 2 topologyKey: topology.kubernetes.io/zone whenUnsatisfiable: ScheduleAnyway labelSelector: matchLabels: app.kubernetes.io/instance: kserve-controller-manager app.kubernetes.io/component: controller | +| kserve.controller.webhookServiceAnnotations | object | `{}` | Optional additional annotations to add to the webhook service. | | kserve.localmodel.agent.hostPath | string | `"/mnt/models"` | | | kserve.localmodel.agent.image | string | `"kserve/kserve-localmodelnode-agent"` | | | kserve.localmodel.agent.nodeSelector | object | `{}` | | +| kserve.localmodel.agent.reconcilationFrequencyInSecs | int | `60` | | | kserve.localmodel.agent.tag | string | `"v0.14.0"` | | | kserve.localmodel.controller.image | string | `"kserve/kserve-localmodel-controller"` | | | kserve.localmodel.controller.tag | string | `"v0.14.0"` | | | kserve.localmodel.enabled | bool | `false` | | | kserve.localmodel.jobNamespace | string | `"kserve-localmodel-jobs"` | | -| kserve.localmodel.securityContext.FSGroup | int | `1000` | | +| kserve.localmodel.jobTTLSecondsAfterFinished | int | `3600` | | +| kserve.localmodel.securityContext.fsGroup | int | `1000` | | | kserve.metricsaggregator.enableMetricAggregation | string | `"false"` | configures metric aggregation annotation. This adds the annotation serving.kserve.io/enable-metric-aggregation to every service with the specified boolean value. If true enables metric aggregation in queue-proxy by setting env vars in the queue proxy container to configure scraping ports. | | kserve.metricsaggregator.enablePrometheusScraping | string | `"false"` | If true, prometheus annotations are added to the pod to scrape the metrics. If serving.kserve.io/enable-metric-aggregation is false, the prometheus port is set with the default prometheus scraping port 9090, otherwise the prometheus port annotation is set with the metric aggregation port. | | kserve.modelmesh.config.modelmeshImage | string | `"kserve/modelmesh"` | | @@ -88,6 +94,7 @@ $ helm install kserve oci://ghcr.io/kserve/charts/kserve --version v0.14.0 | kserve.router.image | string | `"kserve/router"` | | | kserve.router.tag | string | `"v0.14.0"` | | | kserve.security.autoMountServiceAccountToken | bool | `true` | | +| kserve.service.serviceClusterIPNone | bool | `false` | | | kserve.servingruntime.art.defaultVersion | string | `"v0.14.0"` | | | kserve.servingruntime.art.image | string | `"kserve/art-explainer"` | | | kserve.servingruntime.art.imagePullSecrets | list | `[]` | | diff --git a/charts/kserve-resources/templates/configmap.yaml b/charts/kserve-resources/templates/configmap.yaml index 07788c91bf..34c7bcc6a1 100644 --- a/charts/kserve-resources/templates/configmap.yaml +++ b/charts/kserve-resources/templates/configmap.yaml @@ -420,6 +420,19 @@ data: "defaultDeploymentMode": "Serverless" } + # ====================================== SERVICE CONFIGURATION ====================================== + # Example + service: |- + { + "serviceClusterIPNone": "false" + } + service: |- + { + # ServiceClusterIPNone is a flag to indicate if the service should have a clusterIP set to None. + # If the DeploymentMode is Raw, the default value for ServiceClusterIPNone if not set is false + # "serviceClusterIPNone": "false" + } + # ====================================== METRICS CONFIGURATION ====================================== # Example metricsAggregator: |- @@ -451,7 +464,12 @@ data: # defaultJobImage specifies the default image used for the download job. "defaultJobImage" : "kserve/storage-initializer:latest", # Kubernetes modifies the filesystem group ID on the attached volume. - "FSGroup": 1000 + "fsGroup": 1000, + # TTL for the download job after it is finished. + "jobTTLSecondsAfterFinished": 3600, + # The frequency at which the local model agent reconciles the local models + # This is to detect if models are missing from local disk + "reconcilationFrequencyInSecs": {{ .Values.kserve.localmodel.agent.reconcilationFrequencyInSecs }} } agent: |- @@ -503,6 +521,10 @@ data: { "defaultDeploymentMode": "{{ .Values.kserve.controller.deploymentMode }}" } + service: |- + { + "serviceClusterIPNone": "{{ .Values.kserve.service.serviceClusterIPNone }}" + } explainers: |- { "art": { @@ -560,8 +582,10 @@ data: { "enabled": {{ .Values.kserve.localmodel.enabled }}, "jobNamespace": "{{ .Values.kserve.localmodel.jobNamespace }}", - "defaultJobImage" : "kserve/storage-initializer:latest", - "FSGroup": {{ .Values.kserve.localmodel.securityContext.FSGroup }} + "jobTTLSecondsAfterFinished": {{ .Values.kserve.localmodel.jobTTLSecondsAfterFinished }}, + "defaultJobImage": "kserve/storage-initializer:latest", + "fsGroup": {{ .Values.kserve.localmodel.securityContext.fsGroup }}, + "reconcilationFrequencyInSecs": {{ .Values.kserve.localmodel.agent.reconcilationFrequencyInSecs }} } security: |- { diff --git a/charts/kserve-resources/templates/deployment.yaml b/charts/kserve-resources/templates/deployment.yaml index 26339844b9..350f015e25 100644 --- a/charts/kserve-resources/templates/deployment.yaml +++ b/charts/kserve-resources/templates/deployment.yaml @@ -86,7 +86,7 @@ spec: {{- toYaml . | nindent 10 }} {{- end }} args: - - "--metrics-addr=127.0.0.1:8080" + - "--metrics-addr={{ .Values.kserve.controller.metricsBindAddress }}:{{ .Values.kserve.controller.metricsBindPort }}" - "--leader-elect" env: - name: POD_NAMESPACE diff --git a/charts/kserve-resources/templates/service.yaml b/charts/kserve-resources/templates/service.yaml index 148d7eac46..70772e0a54 100644 --- a/charts/kserve-resources/templates/service.yaml +++ b/charts/kserve-resources/templates/service.yaml @@ -4,6 +4,9 @@ kind: Service metadata: name: kserve-webhook-server-service namespace: {{ .Release.Namespace }} + {{- with .Values.kserve.controller.webhookServiceAnnotations }} + annotations: {{ toYaml . | nindent 4 }} + {{- end }} spec: ports: - port: 443 @@ -20,6 +23,9 @@ metadata: labels: control-plane: kserve-controller-manager controller-tools.k8s.io: "1.0" + {{- with .Values.kserve.controller.serviceAnnotations }} + annotations: {{ toYaml . | nindent 4 }} + {{- end }} spec: selector: control-plane: kserve-controller-manager diff --git a/charts/kserve-resources/values.yaml b/charts/kserve-resources/values.yaml index eb1a482d14..dfe52b9a54 100644 --- a/charts/kserve-resources/values.yaml +++ b/charts/kserve-resources/values.yaml @@ -7,6 +7,8 @@ kserve: router: image: kserve/router tag: *defaultVersion + service: + serviceClusterIPNone: false storage: image: kserve/storage-initializer tag: *defaultVersion @@ -111,9 +113,15 @@ kserve: # -- Optional additional annotations to add to the controller deployment. annotations: {} - # -- Optional additional labels to add to the controller Pods. + # -- Optional additional annotations to add to the controller Pods. podAnnotations: {} + # -- Optional additional annotations to add to the controller service. + serviceAnnotations: {} + + # -- Optional additional annotations to add to the webhook service. + webhookServiceAnnotations: {} + # -- Pod Security Context. # For more information, see [Configure a Security Context for a Pod or Container](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/). securityContext: @@ -130,6 +138,12 @@ kserve: readOnlyRootFilesystem: true runAsNonRoot: true + # -- Metrics bind address + metricsBindAddress: "127.0.0.1" + + # -- Metrics bind port + metricsBindPort: "8080" + gateway: # -- Ingress domain for RawDeployment mode, for Serverless it is configured in Knative. domain: example.com @@ -401,12 +415,14 @@ kserve: image: kserve/kserve-localmodel-controller tag: *defaultVersion jobNamespace: kserve-localmodel-jobs + jobTTLSecondsAfterFinished: 3600 securityContext: - FSGroup: 1000 + fsGroup: 1000 agent: nodeSelector: {} hostPath: /mnt/models image: kserve/kserve-localmodelnode-agent tag: *defaultVersion + reconcilationFrequencyInSecs: 60 security: autoMountServiceAccountToken: true diff --git a/config/configmap/inferenceservice.yaml b/config/configmap/inferenceservice.yaml index 5468fc58ce..3c8ac538ac 100644 --- a/config/configmap/inferenceservice.yaml +++ b/config/configmap/inferenceservice.yaml @@ -430,7 +430,20 @@ data: # ModelMesh https://kserve.github.io/website/master/admin/modelmesh/ "defaultDeploymentMode": "Serverless" } - + + # ====================================== SERVICE CONFIGURATION ====================================== + # Example + service: |- + { + "serviceClusterIPNone": false + } + service: |- + { + # ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. + # If the DeploymentMode is Raw, the default value for ServiceClusterIPNone if not set is false + # "serviceClusterIPNone": false + } + # ====================================== METRICS CONFIGURATION ====================================== # Example metricsAggregator: |- @@ -462,7 +475,12 @@ data: # defaultJobImage specifies the default image used for the download job. "defaultJobImage" : "kserve/storage-initializer:latest", # Kubernetes modifies the filesystem group ID on the attached volume. - "FSGroup": 1000 + "fsGroup": 1000, + # TTL for the download job after it is finished. + "jobTTLSecondsAfterFinished": 3600, + # The frequency at which the local model agent reconciles the local models + # This is to detect if models are missing from local disk + "reconcilationFrequencyInSecs": 60 } explainers: |- @@ -577,10 +595,15 @@ data: "enabled": false, "jobNamespace": "kserve-localmodel-jobs", "defaultJobImage" : "kserve/storage-initializer:latest", - "FSGroup": 1000 + "fsGroup": 1000 } security: |- { "autoMountServiceAccountToken": true } + + service: |- + { + "serviceClusterIPNone": true + } \ No newline at end of file diff --git a/config/crd/full/serving.kserve.io_localmodelcaches.yaml b/config/crd/full/serving.kserve.io_localmodelcaches.yaml index e40ba50ac2..b14727a772 100644 --- a/config/crd/full/serving.kserve.io_localmodelcaches.yaml +++ b/config/crd/full/serving.kserve.io_localmodelcaches.yaml @@ -32,8 +32,12 @@ spec: - type: string pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true - nodeGroup: - type: string + nodeGroups: + items: + type: string + maxItems: 1 + minItems: 1 + type: array sourceModelUri: type: string x-kubernetes-validations: @@ -41,7 +45,7 @@ spec: rule: self == oldSelf required: - modelSize - - nodeGroup + - nodeGroups - sourceModelUri type: object status: diff --git a/config/overlays/test/configmap/inferenceservice-enable-cluster-ip.yaml b/config/overlays/test/configmap/inferenceservice-enable-cluster-ip.yaml new file mode 100644 index 0000000000..b877d07a24 --- /dev/null +++ b/config/overlays/test/configmap/inferenceservice-enable-cluster-ip.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: inferenceservice-config + namespace: kserve +data: + service: |- + { + "serviceClusterIPNone": true + } diff --git a/config/overlays/test/configmap/inferenceservice.yaml b/config/overlays/test/configmap/inferenceservice.yaml index 61a919cd73..f1aafac433 100644 --- a/config/overlays/test/configmap/inferenceservice.yaml +++ b/config/overlays/test/configmap/inferenceservice.yaml @@ -74,4 +74,8 @@ data: "memoryLimit": "500Mi", "cpuRequest": "100m", "cpuLimit": "100m" + } + service: |- + { + "serviceClusterIPNone": false } \ No newline at end of file diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index bdfa2bae45..03c4e83b54 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -22,6 +22,9 @@ SCRIPT_DIR="$(dirname "${BASH_SOURCE[0]}")" SCRIPT_ROOT="${SCRIPT_DIR}/.." CODEGEN_VERSION=$(cd "${SCRIPT_ROOT}" && grep 'k8s.io/code-generator' go.mod | awk '{print $2}') +# For debugging purposes +echo "Codegen version ${CODEGEN_VERSION}" + if [ -z "${GOPATH:-}" ]; then GOPATH=$(go env GOPATH) export GOPATH diff --git a/hack/violation_exceptions.list b/hack/violation_exceptions.list index cdb2b85bfb..2e4589521d 100644 --- a/hack/violation_exceptions.list +++ b/hack/violation_exceptions.list @@ -1,6 +1,7 @@ API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,BuiltInAdapter,Env API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,InferenceGraphList,Items API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,InferenceRouter,Steps +API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,LocalModelCacheSpec,NodeGroups API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,LocalModelNodeSpec,LocalModels API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,ServingRuntimePodSpec,Containers API rule violation: list_type_missing,github.com/kserve/kserve/pkg/apis/serving/v1alpha1,ServingRuntimePodSpec,ImagePullSecrets diff --git a/pkg/agent/storage/https.go b/pkg/agent/storage/https.go index cf6bf608d0..694137255e 100644 --- a/pkg/agent/storage/https.go +++ b/pkg/agent/storage/https.go @@ -191,7 +191,7 @@ func extractZipFiles(reader io.Reader, dest string) error { return fmt.Errorf("unable to open file: %w", err) } - _, err = io.CopyN(file, rc, DEFAULT_MAX_DECOMPRESSION_SIZE) // gosec G110 + _, ioErr := io.CopyN(file, rc, DEFAULT_MAX_DECOMPRESSION_SIZE) // gosec G110 closeErr := file.Close() if closeErr != nil { return closeErr @@ -200,7 +200,7 @@ func extractZipFiles(reader io.Reader, dest string) error { if closeErr != nil { return closeErr } - if err != nil { + if ioErr != nil && !errors.Is(ioErr, io.EOF) { return fmt.Errorf("unable to copy file content: %w", err) } } @@ -246,7 +246,8 @@ func extractTarFiles(reader io.Reader, dest string) error { } // gosec G110 - if _, err := io.CopyN(newFile, tr, DEFAULT_MAX_DECOMPRESSION_SIZE); err != nil { + _, ioErr := io.CopyN(newFile, tr, DEFAULT_MAX_DECOMPRESSION_SIZE) + if ioErr != nil && !errors.Is(ioErr, io.EOF) { return fmt.Errorf("unable to copy contents to %s: %w", header.Name, err) } } diff --git a/pkg/apis/serving/v1alpha1/doc.go b/pkg/apis/serving/v1alpha1/doc.go index 7fdd7c9ff0..4e9c37f6ed 100644 --- a/pkg/apis/serving/v1alpha1/doc.go +++ b/pkg/apis/serving/v1alpha1/doc.go @@ -15,7 +15,7 @@ limitations under the License. */ // +groupName=serving.kserve.io -// +k8s:deepcopy-gen=package +// +kubebuilder:object:generate=true // Package v1alpha1 contains API Schema definitions for the serving v1alpha1 API group package v1alpha1 diff --git a/pkg/apis/serving/v1alpha1/inference_graph.go b/pkg/apis/serving/v1alpha1/inference_graph.go index f9c90844c7..7f0b12db6c 100644 --- a/pkg/apis/serving/v1alpha1/inference_graph.go +++ b/pkg/apis/serving/v1alpha1/inference_graph.go @@ -25,7 +25,6 @@ import ( // InferenceGraph is the Schema for the InferenceGraph API for multiple models // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status @@ -310,7 +309,6 @@ type InferenceGraphStatus struct { // InferenceGraphList contains a list of InferenceGraph // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type InferenceGraphList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/inference_graph_validation.go b/pkg/apis/serving/v1alpha1/inference_graph_validation.go index bed0a2c3e6..f51090f600 100644 --- a/pkg/apis/serving/v1alpha1/inference_graph_validation.go +++ b/pkg/apis/serving/v1alpha1/inference_graph_validation.go @@ -58,12 +58,11 @@ var ( ) // +kubebuilder:object:generate=false -// +k8s:deepcopy-gen=false // +k8s:openapi-gen=false // InferenceGraphValidator is responsible for setting default values on the InferenceGraph resources // when created or updated. // -// NOTE: The +kubebuilder:object:generate=false and +k8s:deepcopy-gen=false marker prevents controller-gen from generating DeepCopy methods, +// NOTE: The +kubebuilder:object:generate=false marker prevents controller-gen from generating DeepCopy methods, // as it is used only for temporary operations and does not need to be deeply copied. type InferenceGraphValidator struct{} diff --git a/pkg/apis/serving/v1alpha1/local_model_cache_types.go b/pkg/apis/serving/v1alpha1/local_model_cache_types.go index 579cc276e8..745b2ba954 100644 --- a/pkg/apis/serving/v1alpha1/local_model_cache_types.go +++ b/pkg/apis/serving/v1alpha1/local_model_cache_types.go @@ -30,12 +30,14 @@ type LocalModelCacheSpec struct { // Model size to make sure it does not exceed the disk space reserved for local models. The limit is defined on the NodeGroup. ModelSize resource.Quantity `json:"modelSize" validate:"required"` // group of nodes to cache the model on. - NodeGroup string `json:"nodeGroup" validate:"required"` + // Todo: support more than 1 node groups + // +kubebuilder:validation:MinItems=1 + // +kubebuilder:validation:MaxItems=1 + NodeGroups []string `json:"nodeGroups" validate:"required"` } // LocalModelCache // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status @@ -51,7 +53,6 @@ type LocalModelCache struct { // LocalModelCacheList // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type LocalModelCacheList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/local_model_node_group_types.go b/pkg/apis/serving/v1alpha1/local_model_node_group_types.go index 6f5e979fbe..afd595d81f 100644 --- a/pkg/apis/serving/v1alpha1/local_model_node_group_types.go +++ b/pkg/apis/serving/v1alpha1/local_model_node_group_types.go @@ -34,7 +34,6 @@ type LocalModelNodeGroupSpec struct { } // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:resource:scope="Cluster" @@ -48,7 +47,6 @@ type LocalModelNodeGroup struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type LocalModelNodeGroupList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/local_model_node_types.go b/pkg/apis/serving/v1alpha1/local_model_node_types.go index 0b4a42f3bf..bfb1a219b8 100644 --- a/pkg/apis/serving/v1alpha1/local_model_node_types.go +++ b/pkg/apis/serving/v1alpha1/local_model_node_types.go @@ -32,7 +32,6 @@ type LocalModelNodeSpec struct { } // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status @@ -47,7 +46,6 @@ type LocalModelNode struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type LocalModelNodeList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/servingruntime_types.go b/pkg/apis/serving/v1alpha1/servingruntime_types.go index 0a1ff63ff7..551d655449 100644 --- a/pkg/apis/serving/v1alpha1/servingruntime_types.go +++ b/pkg/apis/serving/v1alpha1/servingruntime_types.go @@ -208,7 +208,6 @@ type BuiltInAdapter struct { // ServingRuntime is the Schema for the servingruntimes API // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:printcolumn:name="Disabled",type="boolean",JSONPath=".spec.disabled" @@ -226,7 +225,6 @@ type ServingRuntime struct { // ServingRuntimeList contains a list of ServingRuntime // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type ServingRuntimeList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` @@ -235,7 +233,6 @@ type ServingRuntimeList struct { // ClusterServingRuntime is the Schema for the servingruntimes API // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:resource:scope="Cluster" @@ -254,7 +251,6 @@ type ClusterServingRuntime struct { // ClusterServingRuntimeList contains a list of ServingRuntime // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type ClusterServingRuntimeList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/storage_container_types.go b/pkg/apis/serving/v1alpha1/storage_container_types.go index 1ee7054ffa..a0ee3bcbf5 100644 --- a/pkg/apis/serving/v1alpha1/storage_container_types.go +++ b/pkg/apis/serving/v1alpha1/storage_container_types.go @@ -52,7 +52,6 @@ const ( ) // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:resource:scope="Cluster" @@ -68,7 +67,6 @@ type ClusterStorageContainer struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type ClusterStorageContainerList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/trained_model.go b/pkg/apis/serving/v1alpha1/trained_model.go index 137deb6ec8..0b48822a59 100644 --- a/pkg/apis/serving/v1alpha1/trained_model.go +++ b/pkg/apis/serving/v1alpha1/trained_model.go @@ -23,7 +23,6 @@ import ( // TrainedModel is the Schema for the TrainedModel API // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status @@ -41,7 +40,6 @@ type TrainedModel struct { // TrainedModelList contains a list of TrainedModel // +k8s:openapi-gen=true // +kubebuilder:object:root=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type TrainedModelList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/pkg/apis/serving/v1alpha1/trainedmodel_webhook.go b/pkg/apis/serving/v1alpha1/trainedmodel_webhook.go index f9f95c1184..c81df71d78 100644 --- a/pkg/apis/serving/v1alpha1/trainedmodel_webhook.go +++ b/pkg/apis/serving/v1alpha1/trainedmodel_webhook.go @@ -50,12 +50,11 @@ var ( ) // +kubebuilder:object:generate=false -// +k8s:deepcopy-gen=false // +k8s:openapi-gen=false // TrainedModelValidator is responsible for setting default values on the TrainedModel resources // when created or updated. // -// NOTE: The +kubebuilder:object:generate=false and +k8s:deepcopy-gen=false marker prevents controller-gen from generating DeepCopy methods, +// NOTE: The +kubebuilder:object:generate=false marker prevents controller-gen from generating DeepCopy methods, // as it is used only for temporary operations and does not need to be deeply copied. type TrainedModelValidator struct{} diff --git a/pkg/apis/serving/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/serving/v1alpha1/zz_generated.deepcopy.go index e547cf7c88..c1a6c62857 100644 --- a/pkg/apis/serving/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/serving/v1alpha1/zz_generated.deepcopy.go @@ -421,6 +421,11 @@ func (in *LocalModelCacheList) DeepCopyObject() runtime.Object { func (in *LocalModelCacheSpec) DeepCopyInto(out *LocalModelCacheSpec) { *out = *in out.ModelSize = in.ModelSize.DeepCopy() + if in.NodeGroups != nil { + in, out := &in.NodeGroups, &out.NodeGroups + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalModelCacheSpec. diff --git a/pkg/apis/serving/v1beta1/configmap.go b/pkg/apis/serving/v1beta1/configmap.go index 24ae8aa7ea..14150e2342 100644 --- a/pkg/apis/serving/v1beta1/configmap.go +++ b/pkg/apis/serving/v1beta1/configmap.go @@ -36,6 +36,7 @@ const ( DeployConfigName = "deploy" LocalModelConfigName = "localModel" SecurityConfigName = "security" + ServiceConfigName = "service" ) const ( @@ -95,10 +96,12 @@ type DeployConfig struct { // +kubebuilder:object:generate=false type LocalModelConfig struct { - Enabled bool `json:"enabled"` - JobNamespace string `json:"jobNamespace"` - DefaultJobImage string `json:"defaultJobImage,omitempty"` - FSGroup *int64 `json:"fsGroup,omitempty"` + Enabled bool `json:"enabled"` + JobNamespace string `json:"jobNamespace"` + DefaultJobImage string `json:"defaultJobImage,omitempty"` + FSGroup *int64 `json:"fsGroup,omitempty"` + JobTTLSecondsAfterFinished *int32 `json:"jobTTLSecondsAfterFinished,omitempty"` + ReconcilationFrequencyInSecs *int64 `json:"reconcilationFrequencyInSecs,omitempty"` } // +kubebuilder:object:generate=false @@ -106,6 +109,13 @@ type SecurityConfig struct { AutoMountServiceAccountToken bool `json:"autoMountServiceAccountToken"` } +// +kubebuilder:object:generate=false +type ServiceConfig struct { + // ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. + // If the DeploymentMode is Raw, the default value for ServiceClusterIPNone is false when the value is absent. + ServiceClusterIPNone bool `json:"serviceClusterIPNone,omitempty"` +} + func NewInferenceServicesConfig(clientset kubernetes.Interface) (*InferenceServicesConfig, error) { configMap, err := clientset.CoreV1().ConfigMaps(constants.KServeNamespace).Get(context.TODO(), constants.InferenceServiceConfigMapName, metav1.GetOptions{}) if err != nil { @@ -236,3 +246,19 @@ func NewSecurityConfig(clientset kubernetes.Interface) (*SecurityConfig, error) } return securityConfig, nil } + +func NewServiceConfig(clientset kubernetes.Interface) (*ServiceConfig, error) { + configMap, err := clientset.CoreV1().ConfigMaps(constants.KServeNamespace).Get(context.TODO(), constants.InferenceServiceConfigMapName, metav1.GetOptions{}) + + if err != nil { + return nil, err + } + serviceConfig := &ServiceConfig{} + if service, ok := configMap.Data[ServiceConfigName]; ok { + err := json.Unmarshal([]byte(service), &serviceConfig) + if err != nil { + return nil, fmt.Errorf("unable to parse service config json: %w", err) + } + } + return serviceConfig, nil +} diff --git a/pkg/apis/serving/v1beta1/configmap_test.go b/pkg/apis/serving/v1beta1/configmap_test.go index b5ee02da06..71e1e53568 100644 --- a/pkg/apis/serving/v1beta1/configmap_test.go +++ b/pkg/apis/serving/v1beta1/configmap_test.go @@ -46,6 +46,9 @@ var ( "additionalIngressDomains": ["%s","%s"] }`, KnativeIngressGateway, KnativeLocalGatewayService, KnativeLocalGateway, LocalGatewayService, IngressDomain, AdditionalDomain, AdditionalDomainExtra) + ServiceConfigData = fmt.Sprintf(`{ + "serviceClusterIPNone" : %t + }`, true) ) func TestNewInferenceServiceConfig(t *testing.T) { @@ -110,3 +113,39 @@ func TestNewDeployConfig(t *testing.T) { g.Expect(err).Should(gomega.BeNil()) g.Expect(deployConfig).ShouldNot(gomega.BeNil()) } + +func TestNewServiceConfig(t *testing.T) { + g := gomega.NewGomegaWithT(t) + // nothing declared + empty := fakeclientset.NewSimpleClientset(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: constants.InferenceServiceConfigMapName, Namespace: constants.KServeNamespace}, + }) + emp, err := NewServiceConfig(empty) + g.Expect(err).Should(gomega.BeNil()) + g.Expect(emp).ShouldNot(gomega.BeNil()) + + // with value + withTrue := fakeclientset.NewSimpleClientset(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: constants.InferenceServiceConfigMapName, Namespace: constants.KServeNamespace}, + Data: map[string]string{ + ServiceConfigName: ServiceConfigData, + }, + }) + wt, err := NewServiceConfig(withTrue) + g.Expect(err).Should(gomega.BeNil()) + g.Expect(wt).ShouldNot(gomega.BeNil()) + g.Expect(wt.ServiceClusterIPNone).Should(gomega.BeTrue()) + + // no value, should be nil + noValue := fakeclientset.NewSimpleClientset(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: constants.InferenceServiceConfigMapName, Namespace: constants.KServeNamespace}, + Data: map[string]string{ + ServiceConfigName: `{}`, + }, + }) + nv, err := NewServiceConfig(noValue) + g.Expect(err).Should(gomega.BeNil()) + g.Expect(nv).ShouldNot(gomega.BeNil()) + g.Expect(nv.ServiceClusterIPNone).Should(gomega.BeFalse()) + +} diff --git a/pkg/apis/serving/v1beta1/doc.go b/pkg/apis/serving/v1beta1/doc.go index aaa65a3532..73df97c2cc 100644 --- a/pkg/apis/serving/v1beta1/doc.go +++ b/pkg/apis/serving/v1beta1/doc.go @@ -16,7 +16,7 @@ limitations under the License. // Package v1beta1 contains API Schema definitions for the serving v1beta1 API group // +k8s:openapi-gen=true -// +k8s:deepcopy-gen=package,register +// +kubebuilder:object:generate=true // +k8s:defaulter-gen=TypeMeta // +groupName=serving.kserve.io package v1beta1 diff --git a/pkg/apis/serving/v1beta1/inference_service.go b/pkg/apis/serving/v1beta1/inference_service.go index df220507c9..18eb1b4456 100644 --- a/pkg/apis/serving/v1beta1/inference_service.go +++ b/pkg/apis/serving/v1beta1/inference_service.go @@ -81,7 +81,6 @@ type Batcher struct { // InferenceService is the Schema for the InferenceServices API // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status @@ -106,7 +105,6 @@ type InferenceService struct { // InferenceServiceList contains a list of Service // +k8s:openapi-gen=true -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:object:root=true type InferenceServiceList struct { metav1.TypeMeta `json:",inline"` diff --git a/pkg/apis/serving/v1beta1/inference_service_defaults.go b/pkg/apis/serving/v1beta1/inference_service_defaults.go index d65280e398..2f65c122c1 100644 --- a/pkg/apis/serving/v1beta1/inference_service_defaults.go +++ b/pkg/apis/serving/v1beta1/inference_service_defaults.go @@ -49,12 +49,11 @@ var ( ) // +kubebuilder:object:generate=false -// +k8s:deepcopy-gen=false // +k8s:openapi-gen=false // InferenceServiceDefaulter is responsible for setting default values on the InferenceService // when created or updated. // -// NOTE: The +kubebuilder:object:generate=false and +k8s:deepcopy-gen=false marker prevents controller-gen from generating DeepCopy methods, +// NOTE: The +kubebuilder:object:generate=false marker prevents controller-gen from generating DeepCopy methods, // as it is used only for temporary operations and does not need to be deeply copied. type InferenceServiceDefaulter struct { } @@ -117,8 +116,9 @@ func (d *InferenceServiceDefaulter) Default(ctx context.Context, obj runtime.Obj return err } + _, localModelDisabledForIsvc := isvc.ObjectMeta.Annotations[constants.DisableLocalModelKey] var models *v1alpha1.LocalModelCacheList - if localModelConfig.Enabled { + if !localModelDisabledForIsvc && localModelConfig.Enabled { var c client.Client if c, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}); err != nil { mutatorLogger.Error(err, "Failed to start client") @@ -468,5 +468,7 @@ func (isvc *InferenceService) setLocalModelLabel(models *v1alpha1.LocalModelCach } isvc.Labels[constants.LocalModelLabel] = localModel.Name isvc.Annotations[constants.LocalModelSourceUriAnnotationKey] = localModel.Spec.SourceModelUri + // TODO: node group needs to be retrieved from isvc node group annotation when we support multiple node groups + isvc.Annotations[constants.LocalModelPVCNameAnnotationKey] = localModel.Name + "-" + localModel.Spec.NodeGroups[0] mutatorLogger.Info("LocalModelCache found", "model", localModel.Name, "namespace", isvc.Namespace, "isvc", isvc.Name) } diff --git a/pkg/apis/serving/v1beta1/inference_service_defaults_test.go b/pkg/apis/serving/v1beta1/inference_service_defaults_test.go index 4b4069c132..cc0e7ee2b7 100644 --- a/pkg/apis/serving/v1beta1/inference_service_defaults_test.go +++ b/pkg/apis/serving/v1beta1/inference_service_defaults_test.go @@ -815,7 +815,7 @@ func TestLocalModelAnnotation(t *testing.T) { Spec: v1alpha1.LocalModelCacheSpec{ SourceModelUri: "gs://testbucket/testmodel", ModelSize: resource.MustParse("123Gi"), - NodeGroup: "gpu", + NodeGroups: []string{"gpu"}, }, } localModels := &v1alpha1.LocalModelCacheList{Items: []v1alpha1.LocalModelCache{*localModel}} diff --git a/pkg/apis/serving/v1beta1/inference_service_validation.go b/pkg/apis/serving/v1beta1/inference_service_validation.go index 2b2c5046d8..f0ec1f02c2 100644 --- a/pkg/apis/serving/v1beta1/inference_service_validation.go +++ b/pkg/apis/serving/v1beta1/inference_service_validation.go @@ -50,12 +50,11 @@ var ( ) // +kubebuilder:object:generate=false -// +k8s:deepcopy-gen=false // +k8s:openapi-gen=false // InferenceServiceValidator is responsible for validating the InferenceService resource // when it is created, updated, or deleted. // -// NOTE: The +kubebuilder:object:generate=false and +k8s:deepcopy-gen=false marker prevents controller-gen from generating DeepCopy methods, +// NOTE: The +kubebuilder:object:generate=false marker prevents controller-gen from generating DeepCopy methods, // as this struct is used only for temporary operations and does not need to be deeply copied. type InferenceServiceValidator struct{} diff --git a/pkg/apis/serving/v1beta1/v1beta1.go b/pkg/apis/serving/v1beta1/v1beta1.go index 64bbc75069..bc9c45d9fb 100644 --- a/pkg/apis/serving/v1beta1/v1beta1.go +++ b/pkg/apis/serving/v1beta1/v1beta1.go @@ -18,7 +18,7 @@ limitations under the License. // Package v1beta1 contains API Schema definitions for the serving v1beta1 API group // +k8s:openapi-gen=true -// +k8s:deepcopy-gen=package,register +// +kubebuilder:object:generate=true // +k8s:defaulter-gen=TypeMeta // +groupName=serving.kserve.io package v1beta1 diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 9ff748f486..89b2d25e22 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -85,6 +85,7 @@ var ( InferenceServiceGKEAcceleratorAnnotationKey = KServeAPIGroupName + "/gke-accelerator" DeploymentMode = KServeAPIGroupName + "/deploymentMode" EnableRoutingTagAnnotationKey = KServeAPIGroupName + "/enable-tag-routing" + DisableLocalModelKey = KServeAPIGroupName + "/disable-localmodel" AutoscalerClass = KServeAPIGroupName + "/autoscalerClass" AutoscalerMetrics = KServeAPIGroupName + "/metrics" TargetUtilizationPercentage = KServeAPIGroupName + "/targetUtilizationPercentage" @@ -126,6 +127,7 @@ var ( PredictorProtocolAnnotationKey = InferenceServiceInternalAnnotationsPrefix + "/predictor-protocol" LocalModelLabel = InferenceServiceInternalAnnotationsPrefix + "/localmodel" LocalModelSourceUriAnnotationKey = InferenceServiceInternalAnnotationsPrefix + "/localmodel-sourceuri" + LocalModelPVCNameAnnotationKey = InferenceServiceInternalAnnotationsPrefix + "/localmodel-pvc-name" ) // kserve networking constants diff --git a/pkg/controller/v1alpha1/localmodel/controller.go b/pkg/controller/v1alpha1/localmodel/controller.go index 7a86f06158..a55feee6dc 100644 --- a/pkg/controller/v1alpha1/localmodel/controller.go +++ b/pkg/controller/v1alpha1/localmodel/controller.go @@ -201,9 +201,11 @@ func (c *LocalModelReconciler) ReconcileForIsvcs(ctx context.Context, localModel } for namespace := range namespaces { + // TODO: node group needs to be retrieved from isvc node group annotation when we support multiple node groups + pvcName := localModel.Name + "-" + localModel.Spec.NodeGroups[0] pv := v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ - Name: localModel.Name + "-" + namespace, + Name: pvcName + "-" + namespace, }, Spec: nodeGroup.Spec.PersistentVolumeSpec, } @@ -213,7 +215,7 @@ func (c *LocalModelReconciler) ReconcileForIsvcs(ctx context.Context, localModel pvc := v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: localModel.Name, + Name: pvcName, Namespace: namespace, }, Spec: nodeGroup.Spec.PersistentVolumeClaimSpec, @@ -246,7 +248,7 @@ func (c *LocalModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) } nodeGroup := &v1alpha1api.LocalModelNodeGroup{} - nodeGroupNamespacedName := types.NamespacedName{Name: localModel.Spec.NodeGroup} + nodeGroupNamespacedName := types.NamespacedName{Name: localModel.Spec.NodeGroups[0]} if err := c.Get(ctx, nodeGroupNamespacedName, nodeGroup); err != nil { return reconcile.Result{}, err } @@ -335,9 +337,9 @@ func (c *LocalModelReconciler) nodeFunc(ctx context.Context, obj client.Object) for _, model := range models.Items { nodeGroup := &v1alpha1api.LocalModelNodeGroup{} - nodeGroupNamespacedName := types.NamespacedName{Name: model.Spec.NodeGroup} + nodeGroupNamespacedName := types.NamespacedName{Name: model.Spec.NodeGroups[0]} if err := c.Get(ctx, nodeGroupNamespacedName, nodeGroup); err != nil { - c.Log.Info("get nodegroup failed", "name", model.Spec.NodeGroup) + c.Log.Info("get nodegroup failed", "name", model.Spec.NodeGroups[0]) continue } matches, err := checkNodeAffinity(&nodeGroup.Spec.PersistentVolumeSpec, *node) diff --git a/pkg/controller/v1alpha1/localmodel/controller_test.go b/pkg/controller/v1alpha1/localmodel/controller_test.go index f5ccd950ac..a798fe457f 100644 --- a/pkg/controller/v1alpha1/localmodel/controller_test.go +++ b/pkg/controller/v1alpha1/localmodel/controller_test.go @@ -44,7 +44,7 @@ var _ = Describe("CachedModel controller", func() { localModelSpec = v1alpha1.LocalModelCacheSpec{ SourceModelUri: sourceModelUri, ModelSize: resource.MustParse("123Gi"), - NodeGroup: "gpu", + NodeGroups: []string{"gpu"}, } clusterStorageContainerSpec = v1alpha1.StorageContainerSpec{ SupportedUriFormats: []v1alpha1.SupportedUriFormat{{Prefix: "s3://"}}, @@ -292,8 +292,8 @@ var _ = Describe("CachedModel controller", func() { }, timeout, interval).Should(BeTrue()) // Expects a pv and a pvc are created in the isvcNamespace - pvLookupKey := types.NamespacedName{Name: modelName + "-" + isvcNamespace} - pvcLookupKey := types.NamespacedName{Name: modelName, Namespace: isvcNamespace} + pvLookupKey := types.NamespacedName{Name: modelName + "-" + nodeGroup.Name + "-" + isvcNamespace} + pvcLookupKey := types.NamespacedName{Name: modelName + "-" + nodeGroup.Name, Namespace: isvcNamespace} persistentVolume := &v1.PersistentVolume{} Eventually(func() bool { diff --git a/pkg/controller/v1alpha1/localmodelnode/controller.go b/pkg/controller/v1alpha1/localmodelnode/controller.go index 6480ef484a..620dfbc41a 100644 --- a/pkg/controller/v1alpha1/localmodelnode/controller.go +++ b/pkg/controller/v1alpha1/localmodelnode/controller.go @@ -31,14 +31,14 @@ import ( "maps" "os" "path/filepath" - "reflect" + "time" "github.com/go-logr/logr" v1alpha1api "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" "github.com/kserve/kserve/pkg/apis/serving/v1beta1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" - apierr "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" @@ -62,35 +62,22 @@ const ( ) var ( - defaultJobImage = "kserve/storage-initializer:latest" // Can be overwritten by the value in the configmap - FSGroup *int64 - jobNamespace string - nodeName = os.Getenv("NODE_NAME") // Name of current node, passed as an env variable via downward API - modelsRootFolder = filepath.Join(MountPath, `models`) - removeAll = os.RemoveAll // For patching os.RemoveAll in controller tests - readDir = os.ReadDir // For patching os.ReadDir in controller tests + defaultJobImage = "kserve/storage-initializer:latest" // Can be overwritten by the value in the configmap + FSGroup *int64 + jobNamespace string + jobTTLSecondsAfterFinished int32 = 3600 // One hour. Can be overwritten by the value in the configmap + reconcilationFreqency time.Duration = time.Minute // Reconcile every one minute to check if model folders exist. Can be overwritten by the value in configmap + nodeName = os.Getenv("NODE_NAME") // Name of current node, passed as an env variable via downward API + modelsRootFolder = filepath.Join(MountPath, "models") + fsHelper FileSystemInterface ) -// Launch a new job or return an existing job -func (c *LocalModelNodeReconciler) launchJob(ctx context.Context, jobName string, localModelNode *v1alpha1api.LocalModelNode, modelInfo v1alpha1api.LocalModelInfo, claimName string) (*batchv1.Job, error) { +func (c *LocalModelNodeReconciler) launchJob(ctx context.Context, localModelNode v1alpha1api.LocalModelNode, modelInfo v1alpha1api.LocalModelInfo) (*batchv1.Job, error) { + jobName := modelInfo.ModelName + "-" + localModelNode.ObjectMeta.Name container, err := c.getContainerSpecForStorageUri(ctx, modelInfo.SourceModelUri) if err != nil { return nil, err } - jobs := c.Clientset.BatchV1().Jobs(jobNamespace) - - job, err := jobs.Get(ctx, jobName, metav1.GetOptions{}) - - // In tests, job is an empty struct, using this bool is easier than checking for empty struct - jobFound := true - if err != nil { - if apierr.IsNotFound(err) { - jobFound = false - } else { - c.Log.Error(err, "Failed to get job", "name", jobName) - return job, err - } - } container.Args = []string{modelInfo.SourceModelUri, MountPath} container.VolumeMounts = []v1.VolumeMount{ @@ -101,12 +88,14 @@ func (c *LocalModelNodeReconciler) launchJob(ctx context.Context, jobName string SubPath: filepath.Join("models", modelInfo.ModelName), }, } - expectedJob := &batchv1.Job{ + job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: jobName + "dryrun", - Namespace: jobNamespace, + GenerateName: jobName, + Namespace: jobNamespace, + Labels: map[string]string{"model": modelInfo.ModelName, "node": localModelNode.Name}, }, Spec: batchv1.JobSpec{ + TTLSecondsAfterFinished: &jobTTLSecondsAfterFinished, Template: v1.PodTemplateSpec{ Spec: v1.PodSpec{ NodeName: nodeName, @@ -117,7 +106,7 @@ func (c *LocalModelNodeReconciler) launchJob(ctx context.Context, jobName string Name: PvcSourceMountName, VolumeSource: v1.VolumeSource{ PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: claimName, + ClaimName: modelInfo.ModelName, }, }, }, @@ -129,33 +118,15 @@ func (c *LocalModelNodeReconciler) launchJob(ctx context.Context, jobName string }, }, } - dryrunJob, err := jobs.Create(ctx, expectedJob, metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) - if err != nil { - return nil, err - } - if job != nil && reflect.DeepEqual(job.Spec.Template.Spec, dryrunJob.Spec.Template.Spec) { - return job, nil - } - if jobFound { - bg := metav1.DeletePropagationBackground - err = jobs.Delete(ctx, job.Name, metav1.DeleteOptions{ - PropagationPolicy: &bg, - }) - if err != nil { - c.Log.Error(err, "Failed to delete job.", "name", job.Name) - return nil, err - } - } - - if err := controllerutil.SetControllerReference(localModelNode, expectedJob, c.Scheme); err != nil { + if err := controllerutil.SetControllerReference(&localModelNode, job, c.Scheme); err != nil { c.Log.Error(err, "Failed to set controller reference", "name", modelInfo.ModelName) return nil, err } - expectedJob.Name = jobName - job, err = jobs.Create(ctx, expectedJob, metav1.CreateOptions{}) + jobs := c.Clientset.BatchV1().Jobs(jobNamespace) + job, err = jobs.Create(ctx, job, metav1.CreateOptions{}) c.Log.Info("Creating job", "name", job.Name, "namespace", job.Namespace) if err != nil { - c.Log.Error(err, "Failed to create job.", "name", expectedJob.Name) + c.Log.Error(err, "Failed to create job.", "name", job.Name) return nil, err } return job, err @@ -192,36 +163,106 @@ func (c *LocalModelNodeReconciler) getContainerSpecForStorageUri(ctx context.Con return defaultContainer, nil } +func (c *LocalModelNodeReconciler) getLatestJob(ctx context.Context, modelName string, nodeName string, excludeSucceeded bool) (*batchv1.Job, error) { + jobList := &batchv1.JobList{} + labelSelector := map[string]string{ + "model": modelName, + "node": nodeName, + } + if err := c.Client.List(ctx, jobList, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)); err != nil { + if errors.IsNotFound(err) { + c.Log.Info("Job not found", "model", modelName) + return nil, nil + } + return nil, err + } + c.Log.Info("Found jobs", "model", modelName, "num of jobs", len(jobList.Items)) + var latestJob *batchv1.Job + for i, job := range jobList.Items { + if excludeSucceeded && job.Status.Succeeded > 0 { + continue + } + if latestJob == nil || job.CreationTimestamp.After(latestJob.CreationTimestamp.Time) { + latestJob = &jobList.Items[i] + } + } + return latestJob, nil +} + +func getModelStatusFromJobStatus(jobStatus batchv1.JobStatus) v1alpha1api.ModelStatus { + switch { + case jobStatus.Succeeded > 0: + return v1alpha1api.ModelDownloaded + case jobStatus.Failed > 0: + return v1alpha1api.ModelDownloadError + case jobStatus.Ready != nil && *jobStatus.Ready > 0: + return v1alpha1api.ModelDownloading + default: + return v1alpha1api.ModelDownloadPending + } +} + // Create jobs to download models if the model is not present locally // Update the status of the LocalModelNode CR func (c *LocalModelNodeReconciler) downloadModels(ctx context.Context, localModelNode *v1alpha1api.LocalModelNode) error { newStatus := map[string]v1alpha1api.ModelStatus{} for _, modelInfo := range localModelNode.Spec.LocalModels { - if status, ok := localModelNode.Status.ModelStatus[modelInfo.ModelName]; ok { - if status == v1alpha1api.ModelDownloaded { - newStatus[modelInfo.ModelName] = v1alpha1api.ModelDownloaded - continue - } - } - - jobName := modelInfo.ModelName + "-" + localModelNode.ObjectMeta.Name - - job, err := c.launchJob(ctx, jobName, localModelNode, modelInfo, modelInfo.ModelName) + c.Log.Info("checking model from spec", "model", modelInfo.ModelName) + var job *batchv1.Job + folderExists, err := fsHelper.hasModelFolder(modelInfo.ModelName) if err != nil { - c.Log.Error(err, "Job error", "name", jobName) + c.Log.Error(err, "Failed to check model folder", "model", modelInfo.ModelName) return err } - - switch { - case job.Status.Succeeded > 0: - newStatus[modelInfo.ModelName] = v1alpha1api.ModelDownloaded - case job.Status.Failed > 0: - newStatus[modelInfo.ModelName] = v1alpha1api.ModelDownloadError - case job.Status.Ready != nil && *job.Status.Ready > 0: - newStatus[modelInfo.ModelName] = v1alpha1api.ModelDownloading - default: - newStatus[modelInfo.ModelName] = v1alpha1api.ModelDownloadPending + if folderExists { + c.Log.Info("Model folder found", "model", modelInfo.ModelName) + // If folder exists and the job has been successfully completed, do nothing + // If the job is cleaned up, no new job is created because the status is already set to ModelDownloaded + if status, ok := localModelNode.Status.ModelStatus[modelInfo.ModelName]; ok { + if status == v1alpha1api.ModelDownloaded { + newStatus[modelInfo.ModelName] = v1alpha1api.ModelDownloaded + continue + } + } + job, err = c.getLatestJob(ctx, modelInfo.ModelName, nodeName, false) + if err != nil { + c.Log.Error(err, "Failed to getLatestJob", "model", modelInfo.ModelName, "node", nodeName) + return err + } + // If job is not found, create a new one. Because download could be incomplete. + if job == nil { + c.Log.Info("Model folder exists, creating download job", "model", modelInfo.ModelName) + job, err = c.launchJob(ctx, *localModelNode, modelInfo) + if err != nil { + c.Log.Error(err, "Failed to create Job", "model", modelInfo.ModelName, "node", nodeName) + return err + } + } + } else { + // Folder does not exist + c.Log.Info("Model folder not found", "model", modelInfo.ModelName) + job, err = c.getLatestJob(ctx, modelInfo.ModelName, nodeName, true) + if err != nil { + c.Log.Error(err, "Failed to getLatestJob", "model", modelInfo.ModelName, "node", nodeName) + return err + } + if job != nil { + c.Log.Info("model status from latest job", "model", modelInfo.ModelName, "status", getModelStatusFromJobStatus(job.Status)) + } + // Recreate job if it has been terminated because the model is missing locally + // If the job has failed, we do not retry here because there are retries on the job. + // To retry the download, users can manually fix the issue and delete the failed job. + if job == nil || job.Status.Succeeded > 0 { + c.Log.Info("Download model", "model", modelInfo.ModelName) + job, err = c.launchJob(ctx, *localModelNode, modelInfo) + if err != nil { + c.Log.Error(err, "Failed to create Job", "model", modelInfo.ModelName, "node", nodeName) + return err + } + } } + newStatus[modelInfo.ModelName] = getModelStatusFromJobStatus(job.Status) + c.Log.Info("Downloading models:", "model", modelInfo.ModelName, "node", localModelNode.ObjectMeta.Name, "status", newStatus[modelInfo.ModelName]) } @@ -236,6 +277,8 @@ func (c *LocalModelNodeReconciler) downloadModels(ctx context.Context, localMode c.Log.Error(err, "Update local model cache status error", "name", localModelNode.Name) return err } + c.Log.Info("status updated", "name", localModelNode.Name, "num of models in status", len(localModelNode.Status.ModelStatus)) + return nil } @@ -243,9 +286,13 @@ func (c *LocalModelNodeReconciler) downloadModels(ctx context.Context, localMode func (c *LocalModelNodeReconciler) deleteModels(localModelNode v1alpha1api.LocalModelNode) error { // 1. Scan model dir and get a list of existing folders representing downloaded models foldersToRemove := map[string]struct{}{} - entries, err := readDir(modelsRootFolder) + if err := fsHelper.ensureModelRootFolderExists(); err != nil { + c.Log.Error(err, "Failed to ensure model root folder exists") + return err + } + entries, err := fsHelper.getModelFolders() if err != nil { - c.Log.Error(err, "Failed to list model folder", "folder", modelsRootFolder) + c.Log.Error(err, "Failed to list model folder") } for _, entry := range entries { // Models could only exist in sub dir @@ -264,9 +311,43 @@ func (c *LocalModelNodeReconciler) deleteModels(localModelNode v1alpha1api.Local c.Log.Info("Found model(s) to remove", "num of models", len(foldersToRemove)) for modelName := range foldersToRemove { c.Log.Info("Removing model", "model", modelName) - modelFolder := filepath.Join(modelsRootFolder, modelName) - if err := removeAll(modelFolder); err != nil { - c.Log.Error(err, "Failed to remove model directory", "dir", modelFolder) + if err := fsHelper.removeModel(modelName); err != nil { + c.Log.Error(err, "Failed to remove model directory", "model", modelName) + } + } + } + return nil +} + +func (c *LocalModelNodeReconciler) cleanupJobs(ctx context.Context, localModelNode v1alpha1api.LocalModelNode) error { + // 1. Get all jobs for the LocalModelNode + jobs := &batchv1.JobList{} + labelSelector := map[string]string{"node": localModelNode.Name} + if err := c.Client.List(ctx, jobs, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)); err != nil { + c.Log.Error(err, "Failed to list jobs", "node", localModelNode.Name) + return err + } + + // 2. Get a list of models that are in the spec + modelsInSpec := map[string]struct{}{} + for _, modelInfo := range localModelNode.Spec.LocalModels { + modelsInSpec[modelInfo.ModelName] = struct{}{} + } + + // 3. Delete jobs that are not in the spec + for i := range jobs.Items { + job := jobs.Items[i] + modelName, ok := job.Labels["model"] + if !ok { + c.Log.Info("Job does not have model label", "job", job.Name) + continue + } + if _, ok := modelsInSpec[modelName]; !ok { + c.Log.Info("Deleting job", "job", job.Name, "model", modelName) + propagationPolicy := metav1.DeletePropagationBackground + if err := c.Client.Delete(ctx, &job, &client.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil { + c.Log.Error(err, "Failed to delete job", "job", job.Name) + return err } } } @@ -281,6 +362,11 @@ func (c *LocalModelNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque c.Log.Info("Agent reconciling LocalModelNode", "name", req.Name, "node", nodeName) + // fsHelper is a global variable to allow mocking in tests + if fsHelper == nil { + fsHelper = NewFileSystemHelper(modelsRootFolder) + } + // Create Jobs to download models if the model is not present locally. // 1. Check if LocalModelNode CR is for current node localModelNode := v1alpha1api.LocalModelNode{} @@ -289,26 +375,39 @@ func (c *LocalModelNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque return reconcile.Result{}, client.IgnoreNotFound(err) } - // 2. Kick off download jobs for all models in spec + // 2. Cleanup jobs for models that are not in the spec + if err := c.cleanupJobs(ctx, localModelNode); err != nil { + c.Log.Error(err, "Job cleanup err") + return reconcile.Result{}, err + } + + // 3. Kick off download jobs for all models in spec localModelConfig, err := v1beta1.NewLocalModelConfig(c.Clientset) if err != nil { c.Log.Error(err, "Failed to get local model config") return reconcile.Result{}, err } - defaultJobImage = localModelConfig.DefaultJobImage jobNamespace = localModelConfig.JobNamespace FSGroup = localModelConfig.FSGroup + if localModelConfig.ReconcilationFrequencyInSecs != nil { + reconcilationFreqency = time.Duration(*localModelConfig.ReconcilationFrequencyInSecs) * time.Second + } + if localModelConfig.JobTTLSecondsAfterFinished != nil { + jobTTLSecondsAfterFinished = *localModelConfig.JobTTLSecondsAfterFinished + } + if err := c.downloadModels(ctx, &localModelNode); err != nil { c.Log.Error(err, "Model download err") return reconcile.Result{}, err } - // 3. Delete models that are not in the spec. This function does not modify the resource. + // 4. Delete models that are not in the spec. This function does not modify the resource. if err := c.deleteModels(localModelNode); err != nil { c.Log.Error(err, "Model deletion err") return reconcile.Result{}, err } - return reconcile.Result{}, nil + // Requeue to check local folders periodically + return reconcile.Result{RequeueAfter: reconcilationFreqency}, nil } func (c *LocalModelNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { diff --git a/pkg/controller/v1alpha1/localmodelnode/controller_test.go b/pkg/controller/v1alpha1/localmodelnode/controller_test.go index 763618c5e0..408694363c 100644 --- a/pkg/controller/v1alpha1/localmodelnode/controller_test.go +++ b/pkg/controller/v1alpha1/localmodelnode/controller_test.go @@ -18,8 +18,9 @@ package localmodelnode import ( "context" + "fmt" "io/fs" - "path/filepath" + "os" "time" "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" @@ -33,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" ) type MockFileInfo struct { @@ -46,7 +48,60 @@ func (m *MockFileInfo) IsDir() bool { return m.isDir } func (m *MockFileInfo) Type() fs.FileMode { return 0 } func (m *MockFileInfo) Info() (fs.FileInfo, error) { return nil, nil } -var _ = Describe("CachedModel controller", func() { +type mockFileSystem struct { + FileSystemInterface + // represents the dirs under /mnt/models/models + subDirs []os.DirEntry +} + +func (f *mockFileSystem) removeModel(model string) error { + newEntries := []os.DirEntry{} + for _, dirEntry := range f.subDirs { + if dirEntry.Name() != model { + newEntries = append(newEntries, dirEntry) + } + } + f.subDirs = newEntries + return nil +} + +func (f *mockFileSystem) hasModelFolder(modelName string) (bool, error) { + for _, dirEntry := range f.subDirs { + if dirEntry.Name() == modelName { + return true, nil + } + } + return false, nil +} + +func (f *mockFileSystem) mockModel(dir os.DirEntry) { + for _, dirEntry := range f.subDirs { + if dirEntry.Name() == dir.Name() { + return + } + } + f.subDirs = append(f.subDirs, dir) +} + +func (f *mockFileSystem) getModelFolders() ([]os.DirEntry, error) { + return f.subDirs, nil +} + +func (f *mockFileSystem) ensureModelRootFolderExists() error { + return nil +} + +func (f *mockFileSystem) clear() { + f.subDirs = []os.DirEntry{} +} + +func newMockFileSystem() *mockFileSystem { + return &mockFileSystem{ + subDirs: []os.DirEntry{}, + } +} + +var _ = Describe("LocalModelNode controller", func() { const ( timeout = time.Second * 10 duration = time.Second * 10 @@ -120,13 +175,10 @@ var _ = Describe("CachedModel controller", func() { ) Context("When creating a local model", func() { - It("Should create download jobs and update model status from jobs", func() { - // Mock readDir to return no models in the local disk - // Todo: fix this mock when we trigger re-download jobs when models don't exist in the local disk - readDir = func(_ string) ([]fs.DirEntry, error) { - return []fs.DirEntry{}, nil - } - + It("Should create download jobs, update model status from jobs, and handle model deletion", func() { + ctx := context.Background() + fsMock.clear() + fsMock.mockModel(&MockFileInfo{name: modelName, isDir: true}) var configMap = &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: constants.InferenceServiceConfigMapName, @@ -166,13 +218,19 @@ var _ = Describe("CachedModel controller", func() { defer k8sClient.Delete(ctx, localModelNode) // Wait for the download job to be created - job := &batchv1.Job{} + jobs := &batchv1.JobList{} + labelSelector := map[string]string{ + "model": modelName, + "node": nodeName, + } Eventually(func() bool { - err := k8sClient.Get(ctx, types.NamespacedName{Name: modelName + "-" + nodeName, Namespace: modelCacheNamespace}, job) - return err == nil + err := k8sClient.List(ctx, jobs, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)) + return err == nil && len(jobs.Items) == 1 }, timeout, interval).Should(BeTrue(), "Download job should be created") // Now let's update the job status to be successful + fsMock.mockModel(&MockFileInfo{name: modelName, isDir: true}) + job := &jobs.Items[0] job.Status.Succeeded = 1 Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) @@ -180,12 +238,15 @@ var _ = Describe("CachedModel controller", func() { Eventually(func() bool { err := k8sClient.Get(ctx, types.NamespacedName{Name: nodeName}, localModelNode) if err != nil { + fmt.Fprintf(GinkgoWriter, "get err") return false } modelStatus, ok := localModelNode.Status.ModelStatus[modelName] if !ok { + fmt.Fprintf(GinkgoWriter, "model not found in status\n") return false } + fmt.Fprintf(GinkgoWriter, "model status %v\n", modelStatus) return modelStatus == v1alpha1.ModelDownloaded }, timeout, interval).Should(BeTrue(), "LocaModelNode status should be downloaded") @@ -203,7 +264,9 @@ var _ = Describe("CachedModel controller", func() { return !ok }, timeout, interval).Should(BeTrue(), "Model should be removed from the status field") }) - It("Should delete models from local disk if the model is not in the spec", func() { + It("Should recreate download jobs if the model is missing from local disk", func() { + fsMock.clear() + var configMap = &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: constants.InferenceServiceConfigMapName, @@ -214,20 +277,94 @@ var _ = Describe("CachedModel controller", func() { Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred()) defer k8sClient.Delete(context.TODO(), configMap) - // Mock readDir to return a fake model folder - readDir = func(_ string) ([]fs.DirEntry, error) { - return []fs.DirEntry{ - &MockFileInfo{name: modelName, isDir: true}, - }, nil + clusterStorageContainer := &v1alpha1.ClusterStorageContainer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterStorageContainerSpec, + } + Expect(k8sClient.Create(ctx, clusterStorageContainer)).Should(Succeed()) + defer k8sClient.Delete(ctx, clusterStorageContainer) + + nodeGroup := &v1alpha1.LocalModelNodeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gpu", + }, + Spec: localModelNodeGroupSpec, } + Expect(k8sClient.Create(ctx, nodeGroup)).Should(Succeed()) + defer k8sClient.Delete(ctx, nodeGroup) - removeAllCalled := false - var pathRemoved string - removeAll = func(path string) error { - pathRemoved = path - removeAllCalled = true - return nil + nodeName = "worker2" + localModelNode := &v1alpha1.LocalModelNode{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Spec: localModelNodeSpec, } + Expect(k8sClient.Create(ctx, localModelNode)).Should(Succeed()) + defer k8sClient.Delete(ctx, localModelNode) + + // Wait for the download job to be created + jobs := &batchv1.JobList{} + labelSelector := map[string]string{ + "model": modelName, + "node": nodeName, + } + Eventually(func() bool { + err := k8sClient.List(ctx, jobs, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)) + return err == nil && len(jobs.Items) == 1 + }, timeout, interval).Should(BeTrue(), "Download job should be created") + + // Now let's update the job status to be successful + fsMock.mockModel(&MockFileInfo{name: modelName, isDir: true}) + job := &jobs.Items[0] + job.Status.Succeeded = 1 + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + + // LocalModelNode status should be updated + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Name: nodeName}, localModelNode) + if err != nil { + fmt.Fprintf(GinkgoWriter, "get err") + return false + } + modelStatus, ok := localModelNode.Status.ModelStatus[modelName] + if !ok { + fmt.Fprintf(GinkgoWriter, "model not found in status\n") + return false + } + fmt.Fprintf(GinkgoWriter, "model status %v\n", modelStatus) + return modelStatus == v1alpha1.ModelDownloaded + }, timeout, interval).Should(BeTrue(), "LocaModelNode status should be downloaded") + + // Delete the model folder + fsMock.clear() + + // Manually trigger reconcillation + patch := client.MergeFrom(localModelNode.DeepCopy()) + localModelNode.Annotations = map[string]string{"foo": "bar"} + Expect(k8sClient.Patch(ctx, localModelNode, patch)).Should(Succeed()) + + Eventually(func() bool { + err := k8sClient.List(ctx, jobs, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)) + return err == nil && len(jobs.Items) == 2 + }, timeout, interval).Should(BeTrue(), "New job should be created") + }) + It("Should delete models from local disk if the model is not in the spec", func() { + fsMock.clear() + var configMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.InferenceServiceConfigMapName, + Namespace: constants.KServeNamespace, + }, + Data: configs, + } + Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred()) + defer k8sClient.Delete(context.TODO(), configMap) + + // Mock readDir to return a fake model folder + fsMock.mockModel(&MockFileInfo{name: modelName, isDir: true}) nodeName = "worker" // Definied in controller.go, representing the name of the curent node // Creates a LocalModelNode with no models but the controller should find a model from local disk and delete it @@ -243,9 +380,69 @@ var _ = Describe("CachedModel controller", func() { defer k8sClient.Delete(ctx, localModelNode) Eventually(func() bool { - return removeAllCalled - }, timeout, interval).Should(BeTrue()) - Expect(pathRemoved).Should(Equal(filepath.Join(modelsRootFolder, modelName)), "Should remove the model folder") + dirs, err := fsMock.getModelFolders() + if err != nil { + return false + } + for _, dir := range dirs { + if dir.Name() == modelName { + return false + } + } + return true + }, timeout, interval).Should(BeTrue(), "Should remove the model folder") + }) + // This test creates a LocalModelNode with a model, then deletes the model from the spec and checks if the job is deleted + It("Should delete jobs if the model is not present", func() { + fsMock.clear() + var configMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.InferenceServiceConfigMapName, + Namespace: constants.KServeNamespace, + }, + Data: configs, + } + Expect(k8sClient.Create(ctx, configMap)).NotTo(HaveOccurred()) + defer k8sClient.Delete(ctx, configMap) + + // Mock readDir to return a fake model folder + fsMock.mockModel(&MockFileInfo{name: modelName, isDir: true}) + + nodeName = "test3" // Definied in controller.go, representing the name of the curent node + // Creates a LocalModelNode with no models but the controller should find a model from local disk and delete it + localModelNode := &v1alpha1.LocalModelNode{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Spec: localModelNodeSpec, + } + Expect(k8sClient.Create(ctx, localModelNode)).Should(Succeed()) + defer k8sClient.Delete(ctx, localModelNode) + + jobs := &batchv1.JobList{} + labelSelector := map[string]string{ + "model": modelName, + "node": nodeName, + } + Eventually(func() bool { + err := k8sClient.List(ctx, jobs, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)) + return err == nil && len(jobs.Items) == 1 + }, timeout, interval).Should(BeTrue(), "Download job should be created") + + // Remove the model from the spec + // Use patch to avoid conflict + patch := client.MergeFrom(localModelNode.DeepCopy()) + localModelNode.Spec = v1alpha1.LocalModelNodeSpec{ + LocalModels: []v1alpha1.LocalModelInfo{}, + } + Expect(k8sClient.Patch(ctx, localModelNode, patch)).Should(Succeed()) + Eventually(func() bool { + err := k8sClient.List(ctx, jobs, client.InNamespace(jobNamespace), client.MatchingLabels(labelSelector)) + if err != nil { + return false + } + return len(jobs.Items) == 0 + }, timeout, interval).Should(BeTrue(), "Download job should be deleted") }) }) }) diff --git a/pkg/controller/v1alpha1/localmodelnode/file_utils.go b/pkg/controller/v1alpha1/localmodelnode/file_utils.go new file mode 100644 index 0000000000..35565dd2dd --- /dev/null +++ b/pkg/controller/v1alpha1/localmodelnode/file_utils.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The KServe Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package localmodelnode + +import ( + "os" + "path/filepath" +) + +type FileSystemInterface interface { + removeModel(modelName string) error + hasModelFolder(modelName string) (bool, error) + getModelFolders() ([]os.DirEntry, error) + ensureModelRootFolderExists() error +} + +type FileSystemHelper struct { + modelsRootFolder string +} + +func NewFileSystemHelper(modelsRootFolder string) *FileSystemHelper { + return &FileSystemHelper{ + modelsRootFolder: modelsRootFolder, + } +} + +// should be used only in this struct +func (f *FileSystemHelper) getModelFolderPrivate(modelName string) string { + return filepath.Join(f.modelsRootFolder, modelName) +} + +func (f *FileSystemHelper) removeModel(modelName string) error { + path := f.getModelFolderPrivate(modelName) + return os.RemoveAll(path) +} + +func (f *FileSystemHelper) getModelFolders() ([]os.DirEntry, error) { + return os.ReadDir(f.modelsRootFolder) +} + +func (f *FileSystemHelper) hasModelFolder(modelName string) (bool, error) { + folder := f.getModelFolderPrivate(modelName) + _, err := os.ReadDir(folder) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} + +func (f *FileSystemHelper) ensureModelRootFolderExists() error { + // If the folder already exists, this will do nothing + if err := os.MkdirAll(f.modelsRootFolder, os.ModePerm); err != nil { + return err + } + return nil +} diff --git a/pkg/controller/v1alpha1/localmodelnode/suite_test.go b/pkg/controller/v1alpha1/localmodelnode/suite_test.go index 8cc7b9fe12..c382106af1 100644 --- a/pkg/controller/v1alpha1/localmodelnode/suite_test.go +++ b/pkg/controller/v1alpha1/localmodelnode/suite_test.go @@ -52,6 +52,7 @@ var ( testEnv *envtest.Environment cancel context.CancelFunc ctx context.Context + fsMock *mockFileSystem ) func TestAPIs(t *testing.T) { @@ -103,6 +104,9 @@ var _ = BeforeSuite(func() { Expect(k8sClient.Create(context.Background(), kserveNamespaceObj)).Should(Succeed()) Expect(k8sClient.Create(context.Background(), jobsNamespaceObj)).Should(Succeed()) + fsMock = newMockFileSystem() + fsHelper = fsMock + k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme.Scheme, Metrics: metricsserver.Options{ diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/raw/raw_kube_reconciler.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/raw/raw_kube_reconciler.go index 64554f1a24..77d494cec3 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/raw/raw_kube_reconciler.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/raw/raw_kube_reconciler.go @@ -19,6 +19,11 @@ package raw import ( "fmt" + "github.com/kserve/kserve/pkg/apis/serving/v1beta1" + autoscaler "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/autoscaler" + deployment "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/deployment" + "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/ingress" + service "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/service" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,13 +32,11 @@ import ( knapis "knative.dev/pkg/apis" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/kserve/kserve/pkg/apis/serving/v1beta1" - autoscaler "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/autoscaler" - deployment "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/deployment" - "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/ingress" - service "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/service" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) +var log = logf.Log.WithName("RawKubeReconciler") + // RawKubeReconciler reconciles the Native K8S Resources type RawKubeReconciler struct { client client.Client @@ -61,19 +64,28 @@ func NewRawKubeReconciler(client client.Client, if err != nil { return nil, err } + var multiNodeEnabled bool if workerPodSpec != nil { multiNodeEnabled = true } + + // do not return error as service config is optional + serviceConfig, err1 := v1beta1.NewServiceConfig(clientset) + if err1 != nil { + log.Error(err1, "failed to get service config") + } + depl, err := deployment.NewDeploymentReconciler(client, clientset, scheme, componentMeta, workerComponentMeta, componentExt, podSpec, workerPodSpec) if err != nil { return nil, err } + return &RawKubeReconciler{ client: client, scheme: scheme, Deployment: depl, - Service: service.NewServiceReconciler(client, scheme, componentMeta, componentExt, podSpec, multiNodeEnabled), + Service: service.NewServiceReconciler(client, scheme, componentMeta, componentExt, podSpec, multiNodeEnabled, serviceConfig), Scaler: as, URL: url, }, nil diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler.go index 084fcb51d1..0d42ca4c5c 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler.go @@ -49,17 +49,18 @@ func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, componentMeta metav1.ObjectMeta, componentExt *v1beta1.ComponentExtensionSpec, - podSpec *corev1.PodSpec, multiNodeEnabled bool) *ServiceReconciler { + podSpec *corev1.PodSpec, multiNodeEnabled bool, + serviceConfig *v1beta1.ServiceConfig) *ServiceReconciler { return &ServiceReconciler{ client: client, scheme: scheme, - ServiceList: createService(componentMeta, componentExt, podSpec, multiNodeEnabled), + ServiceList: createService(componentMeta, componentExt, podSpec, multiNodeEnabled, serviceConfig), componentExt: componentExt, } } func createService(componentMeta metav1.ObjectMeta, componentExt *v1beta1.ComponentExtensionSpec, - podSpec *corev1.PodSpec, multiNodeEnabled bool) []*corev1.Service { + podSpec *corev1.PodSpec, multiNodeEnabled bool, serviceConfig *v1beta1.ServiceConfig) []*corev1.Service { var svcList []*corev1.Service var isWorkerContainer bool @@ -73,11 +74,11 @@ func createService(componentMeta metav1.ObjectMeta, componentExt *v1beta1.Compon if !multiNodeEnabled { // If multiNodeEnabled is false, only defaultSvc will be created. - defaultSvc := createDefaultSvc(componentMeta, componentExt, podSpec) + defaultSvc := createDefaultSvc(componentMeta, componentExt, podSpec, serviceConfig) svcList = append(svcList, defaultSvc) } else if multiNodeEnabled && !isWorkerContainer { // If multiNodeEnabled is true, both defaultSvc and headSvc will be created. - defaultSvc := createDefaultSvc(componentMeta, componentExt, podSpec) + defaultSvc := createDefaultSvc(componentMeta, componentExt, podSpec, serviceConfig) svcList = append(svcList, defaultSvc) headSvc := createHeadlessSvc(componentMeta) @@ -88,7 +89,7 @@ func createService(componentMeta metav1.ObjectMeta, componentExt *v1beta1.Compon } func createDefaultSvc(componentMeta metav1.ObjectMeta, componentExt *v1beta1.ComponentExtensionSpec, - podSpec *corev1.PodSpec) *corev1.Service { + podSpec *corev1.PodSpec, serviceConfig *v1beta1.ServiceConfig) *corev1.Service { var servicePorts []corev1.ServicePort if len(podSpec.Containers) != 0 { @@ -165,12 +166,9 @@ func createDefaultSvc(componentMeta metav1.ObjectMeta, componentExt *v1beta1.Com "app": constants.GetRawServiceLabel(componentMeta.Name), }, Ports: servicePorts, - // TODO - add a control flag - // Need to add a control flag to properly set it, enable/disable this behavior. - // Follow up issue to align with upstream: https://issues.redhat.com/browse/RHOAIENG-5077 - ClusterIP: corev1.ClusterIPNone, }, } + if val, ok := componentMeta.Labels[constants.ODHKserveRawAuth]; ok && val == "true" { if service.ObjectMeta.Annotations == nil { service.ObjectMeta.Annotations = make(map[string]string) @@ -198,6 +196,11 @@ func createDefaultSvc(componentMeta metav1.ObjectMeta, componentExt *v1beta1.Com } service.Spec.Ports = ports } + + if serviceConfig != nil && serviceConfig.ServiceClusterIPNone { + service.Spec.ClusterIP = corev1.ClusterIPNone + } + return service } diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler_test.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler_test.go index 27846174f8..37e285c995 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler_test.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/service/service_reconciler_test.go @@ -21,11 +21,14 @@ import ( "github.com/google/go-cmp/cmp" "github.com/kserve/kserve/pkg/apis/serving/v1beta1" "github.com/kserve/kserve/pkg/constants" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" ) +var emptyServiceConfig = &v1beta1.ServiceConfig{} + func TestCreateDefaultDeployment(t *testing.T) { type args struct { @@ -127,7 +130,6 @@ func TestCreateDefaultDeployment(t *testing.T) { TargetPort: intstr.IntOrString{IntVal: 8080}, }, }, - ClusterIP: corev1.ClusterIPNone, Selector: map[string]string{ constants.RawDeploymentAppLabel: "isvc.default-predictor", }, @@ -159,7 +161,6 @@ func TestCreateDefaultDeployment(t *testing.T) { TargetPort: intstr.IntOrString{IntVal: 8080}, }, }, - ClusterIP: corev1.ClusterIPNone, Selector: map[string]string{ "app": "isvc.default-predictor", }, @@ -185,7 +186,7 @@ func TestCreateDefaultDeployment(t *testing.T) { constants.RawDeploymentAppLabel: "isvc.default-predictor", constants.InferenceServiceGenerationPodLabelKey: "1", }, - ClusterIP: "None", + ClusterIP: corev1.ClusterIPNone, PublishNotReadyAddresses: true, }, }, @@ -220,7 +221,7 @@ func TestCreateDefaultDeployment(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := createService(tt.args.componentMeta, tt.args.componentExt, tt.args.podSpec, tt.args.multiNodeEnabled) + got := createService(tt.args.componentMeta, tt.args.componentExt, tt.args.podSpec, tt.args.multiNodeEnabled, emptyServiceConfig) for i, service := range got { if diff := cmp.Diff(tt.expected[i], service); diff != "" { t.Errorf("Test %q unexpected service (-want +got): %v", tt.name, diff) @@ -230,3 +231,52 @@ func TestCreateDefaultDeployment(t *testing.T) { }) } } + +func TestCreateServiceRawServiceConfigEmpty(t *testing.T) { + // nothing expected + runTestServiceCreate(emptyServiceConfig, "", t) +} + +func TestCreateServiceRawServiceAndConfigNil(t *testing.T) { + serviceConfig := &v1beta1.ServiceConfig{} + serviceConfig = nil + // no service means empty + runTestServiceCreate(serviceConfig, "", t) +} + +func TestCreateServiceRawFalseAndConfigTrue(t *testing.T) { + serviceConfig := &v1beta1.ServiceConfig{ + ServiceClusterIPNone: true, + } + runTestServiceCreate(serviceConfig, corev1.ClusterIPNone, t) +} + +func TestCreateServiceRawTrueAndConfigFalse(t *testing.T) { + serviceConfig := &v1beta1.ServiceConfig{ + ServiceClusterIPNone: false, + } + runTestServiceCreate(serviceConfig, "", t) +} + +func TestCreateServiceRawFalseAndConfigNil(t *testing.T) { + runTestServiceCreate(emptyServiceConfig, "", t) +} + +func TestCreateServiceRawTrueAndConfigNil(t *testing.T) { + // service is there, but no property, should be empty + runTestServiceCreate(emptyServiceConfig, "", t) +} + +func runTestServiceCreate(serviceConfig *v1beta1.ServiceConfig, expectedClusterIP string, t *testing.T) { + componentMeta := metav1.ObjectMeta{ + Name: "test-service", + Namespace: "default", + } + componentExt := &v1beta1.ComponentExtensionSpec{} + podSpec := &corev1.PodSpec{} + + service := createService(componentMeta, componentExt, podSpec, false, serviceConfig) + assert.Equal(t, componentMeta, service[0].ObjectMeta, "Expected ObjectMeta to be equal") + assert.Equal(t, map[string]string{"app": "isvc.test-service"}, service[0].Spec.Selector, "Expected Selector to be equal") + assert.Equal(t, expectedClusterIP, service[0].Spec.ClusterIP, "Expected ClusterIP to be equal") +} diff --git a/pkg/openapi/openapi_generated.go b/pkg/openapi/openapi_generated.go index 321ec13399..56e61580ab 100644 --- a/pkg/openapi/openapi_generated.go +++ b/pkg/openapi/openapi_generated.go @@ -98,6 +98,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/kserve/kserve/pkg/apis/serving/v1beta1.PredictorSpec": schema_pkg_apis_serving_v1beta1_PredictorSpec(ref), "github.com/kserve/kserve/pkg/apis/serving/v1beta1.SKLearnSpec": schema_pkg_apis_serving_v1beta1_SKLearnSpec(ref), "github.com/kserve/kserve/pkg/apis/serving/v1beta1.SecurityConfig": schema_pkg_apis_serving_v1beta1_SecurityConfig(ref), + "github.com/kserve/kserve/pkg/apis/serving/v1beta1.ServiceConfig": schema_pkg_apis_serving_v1beta1_ServiceConfig(ref), "github.com/kserve/kserve/pkg/apis/serving/v1beta1.StorageSpec": schema_pkg_apis_serving_v1beta1_StorageSpec(ref), "github.com/kserve/kserve/pkg/apis/serving/v1beta1.TFServingSpec": schema_pkg_apis_serving_v1beta1_TFServingSpec(ref), "github.com/kserve/kserve/pkg/apis/serving/v1beta1.TorchServeSpec": schema_pkg_apis_serving_v1beta1_TorchServeSpec(ref), @@ -854,16 +855,23 @@ func schema_pkg_apis_serving_v1alpha1_LocalModelCacheSpec(ref common.ReferenceCa Ref: ref("k8s.io/apimachinery/pkg/api/resource.Quantity"), }, }, - "nodeGroup": { + "nodeGroups": { SchemaProps: spec.SchemaProps{ - Description: "group of nodes to cache the model on.", - Default: "", - Type: []string{"string"}, - Format: "", + Description: "group of nodes to cache the model on. Todo: support more than 1 node groups", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, }, }, }, - Required: []string{"sourceModelUri", "modelSize", "nodeGroup"}, + Required: []string{"sourceModelUri", "modelSize", "nodeGroups"}, }, }, Dependencies: []string{ @@ -5976,6 +5984,18 @@ func schema_pkg_apis_serving_v1beta1_LocalModelConfig(ref common.ReferenceCallba Format: "int64", }, }, + "jobTTLSecondsAfterFinished": { + SchemaProps: spec.SchemaProps{ + Type: []string{"integer"}, + Format: "int32", + }, + }, + "reconcilationFrequencyInSecs": { + SchemaProps: spec.SchemaProps{ + Type: []string{"integer"}, + Format: "int64", + }, + }, }, Required: []string{"enabled", "jobNamespace"}, }, @@ -9200,6 +9220,25 @@ func schema_pkg_apis_serving_v1beta1_SecurityConfig(ref common.ReferenceCallback } } +func schema_pkg_apis_serving_v1beta1_ServiceConfig(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "serviceClusterIPNone": { + SchemaProps: spec.SchemaProps{ + Description: "ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. If the DeploymentMode is Raw, the default value for ServiceClusterIPNone is false when the value is absent.", + Type: []string{"boolean"}, + Format: "", + }, + }, + }, + }, + }, + } +} + func schema_pkg_apis_serving_v1beta1_StorageSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/pkg/openapi/swagger.json b/pkg/openapi/swagger.json index 7e5dca8396..428586dbed 100644 --- a/pkg/openapi/swagger.json +++ b/pkg/openapi/swagger.json @@ -419,17 +419,20 @@ "required": [ "sourceModelUri", "modelSize", - "nodeGroup" + "nodeGroups" ], "properties": { "modelSize": { "description": "Model size to make sure it does not exceed the disk space reserved for local models. The limit is defined on the NodeGroup.", "$ref": "#/definitions/resource.Quantity" }, - "nodeGroup": { - "description": "group of nodes to cache the model on.", - "type": "string", - "default": "" + "nodeGroups": { + "description": "group of nodes to cache the model on. Todo: support more than 1 node groups", + "type": "array", + "items": { + "type": "string", + "default": "" + } }, "sourceModelUri": { "description": "Original StorageUri", @@ -3313,6 +3316,14 @@ "jobNamespace": { "type": "string", "default": "" + }, + "jobTTLSecondsAfterFinished": { + "type": "integer", + "format": "int32" + }, + "reconcilationFrequencyInSecs": { + "type": "integer", + "format": "int64" } } }, @@ -5126,6 +5137,15 @@ } } }, + "v1beta1.ServiceConfig": { + "type": "object", + "properties": { + "serviceClusterIPNone": { + "description": "ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. If the DeploymentMode is Raw, the default value for ServiceClusterIPNone is false when the value is absent.", + "type": "boolean" + } + } + }, "v1beta1.StorageSpec": { "type": "object", "properties": { diff --git a/pkg/webhook/admission/localmodelcache/local_model_cache_validation.go b/pkg/webhook/admission/localmodelcache/local_model_cache_validation.go index e0a736dd5c..a9d36b87cc 100644 --- a/pkg/webhook/admission/localmodelcache/local_model_cache_validation.go +++ b/pkg/webhook/admission/localmodelcache/local_model_cache_validation.go @@ -36,12 +36,11 @@ var ( ) // +kubebuilder:object:generate=false -// +k8s:deepcopy-gen=false // +k8s:openapi-gen=false // LocalModelCacheValidator is responsible for validating the LocalModelCache resource // when it is created, updated, or deleted. // -// NOTE: The +kubebuilder:object:generate=false and +k8s:deepcopy-gen=false marker prevents controller-gen from generating DeepCopy methods, +// NOTE: The +kubebuilder:object:generate=false marker prevents controller-gen from generating DeepCopy methods, // as this struct is used only for temporary operations and does not need to be deeply copied. type LocalModelCacheValidator struct { client.Client diff --git a/pkg/webhook/admission/localmodelcache/local_model_cache_validation_test.go b/pkg/webhook/admission/localmodelcache/local_model_cache_validation_test.go index 3987aee492..60d0cc0fd9 100644 --- a/pkg/webhook/admission/localmodelcache/local_model_cache_validation_test.go +++ b/pkg/webhook/admission/localmodelcache/local_model_cache_validation_test.go @@ -67,12 +67,12 @@ func makeTestLocalModelCache() v1alpha1.LocalModelCache { }, Spec: v1alpha1.LocalModelCacheSpec{ ModelSize: resource.MustParse("1Gi"), - NodeGroup: "gpu1", + NodeGroups: []string{"gpu1"}, SourceModelUri: storageURI, }, Status: v1alpha1.LocalModelCacheStatus{ InferenceServices: []v1alpha1.NamespacedName{ - v1alpha1.NamespacedName{ + { Namespace: "default", Name: "sklearn-iris", }, @@ -89,7 +89,7 @@ func makeTestLocalModelCacheWithSameStorageURI() v1alpha1.LocalModelCache { }, Spec: v1alpha1.LocalModelCacheSpec{ ModelSize: resource.MustParse("1Gi"), - NodeGroup: "gpu1", + NodeGroups: []string{"gpu1"}, SourceModelUri: storageURI, }, } diff --git a/pkg/webhook/admission/pod/storage_initializer_injector.go b/pkg/webhook/admission/pod/storage_initializer_injector.go index e5a28040bc..070fa2d9d6 100644 --- a/pkg/webhook/admission/pod/storage_initializer_injector.go +++ b/pkg/webhook/admission/pod/storage_initializer_injector.go @@ -247,7 +247,11 @@ func (mi *StorageInitializerInjector) InjectStorageInitializer(pod *v1.Pod) erro if !strings.HasPrefix(subPath, "/") { subPath = "/" + subPath } - srcURI = "pvc://" + modelName + "/models/" + modelName + subPath + if pvcName, ok := pod.ObjectMeta.Annotations[constants.LocalModelPVCNameAnnotationKey]; ok { + srcURI = "pvc://" + pvcName + "/models/" + modelName + subPath + } else { + return fmt.Errorf("Annotation %s not found", constants.LocalModelPVCNameAnnotationKey) + } } podVolumes := []v1.Volume{} diff --git a/pkg/webhook/admission/pod/storage_initializer_injector_test.go b/pkg/webhook/admission/pod/storage_initializer_injector_test.go index 479b73a85a..b1a9085f84 100644 --- a/pkg/webhook/admission/pod/storage_initializer_injector_test.go +++ b/pkg/webhook/admission/pod/storage_initializer_injector_test.go @@ -4003,30 +4003,35 @@ func TestLocalModelPVC(t *testing.T) { storageUri string localModelLabel string localModelSourceUriLabel string + pvcName string expectedSubPath string }{ "basic": { storageUri: "s3://foo", localModelLabel: "bar", localModelSourceUriLabel: "s3://foo", + pvcName: "model-h100", expectedSubPath: "models/bar/", }, "extra / at the end": { storageUri: "s3://foo/", localModelLabel: "bar", localModelSourceUriLabel: "s3://foo", + pvcName: "model-h100", expectedSubPath: "models/bar/", }, "subfolder": { storageUri: "s3://foo/model1", localModelLabel: "bar", localModelSourceUriLabel: "s3://foo", + pvcName: "model-h100", expectedSubPath: "models/bar/model1", }, "subfolder2": { storageUri: "s3://foo/model1", localModelLabel: "bar", localModelSourceUriLabel: "s3://foo/", + pvcName: "model-h100", expectedSubPath: "models/bar/model1", }, } @@ -4042,6 +4047,7 @@ func TestLocalModelPVC(t *testing.T) { Annotations: map[string]string{ constants.StorageInitializerSourceUriInternalAnnotationKey: scenario.storageUri, constants.LocalModelSourceUriAnnotationKey: scenario.localModelSourceUriLabel, + constants.LocalModelPVCNameAnnotationKey: scenario.pvcName, }, Labels: map[string]string{ constants.LocalModelLabel: scenario.localModelLabel, @@ -4079,7 +4085,7 @@ func TestLocalModelPVC(t *testing.T) { { Name: "kserve-pvc-source", VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: scenario.localModelLabel, ReadOnly: false}, + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: scenario.pvcName, ReadOnly: false}, }, }, }, diff --git a/python/huggingface_server_cpu_openvino.Dockerfile b/python/huggingface_server_cpu_openvino.Dockerfile index d833791afe..4230ad9e4a 100644 --- a/python/huggingface_server_cpu_openvino.Dockerfile +++ b/python/huggingface_server_cpu_openvino.Dockerfile @@ -39,9 +39,11 @@ RUN cd huggingfaceserver && poetry install --no-interaction --no-cache # Clone vllm # Install Python build tools and other dependencies then build vllm from source +# temporary fix for dependency conflict for https://github.com/vllm-project/vllm/issues/11398 WORKDIR /vllm RUN git clone --branch $VLLM_VERSION --depth 1 https://github.com/vllm-project/vllm.git . && \ pip install --upgrade pip && \ + sed -i 's/@main//' ./requirements-openvino.txt && \ pip install -r requirements-build.txt --extra-index-url https://download.pytorch.org/whl/cpu && \ PIP_EXTRA_INDEX_URL="https://download.pytorch.org/whl/cpu" VLLM_TARGET_DEVICE="openvino" python -m pip install -v . diff --git a/python/kserve/docs/V1alpha1LocalModelCacheSpec.md b/python/kserve/docs/V1alpha1LocalModelCacheSpec.md index 56b9df57f7..f274a299fb 100644 --- a/python/kserve/docs/V1alpha1LocalModelCacheSpec.md +++ b/python/kserve/docs/V1alpha1LocalModelCacheSpec.md @@ -5,7 +5,7 @@ LocalModelCacheSpec Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **model_size** | [**ResourceQuantity**](ResourceQuantity.md) | | -**node_group** | **str** | group of nodes to cache the model on. | [default to ''] +**node_groups** | **list[str]** | group of nodes to cache the model on. Todo: support more than 1 node groups | **source_model_uri** | **str** | Original StorageUri | [default to ''] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/python/kserve/docs/V1beta1LocalModelConfig.md b/python/kserve/docs/V1beta1LocalModelConfig.md index 21ba6d8123..bc3df9c3d3 100644 --- a/python/kserve/docs/V1beta1LocalModelConfig.md +++ b/python/kserve/docs/V1beta1LocalModelConfig.md @@ -7,6 +7,8 @@ Name | Type | Description | Notes **enabled** | **bool** | | [default to False] **fs_group** | **int** | | [optional] **job_namespace** | **str** | | [default to ''] +**job_ttl_seconds_after_finished** | **int** | | [optional] +**reconcilation_frequency_in_secs** | **int** | | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/python/kserve/docs/V1beta1ServiceConfig.md b/python/kserve/docs/V1beta1ServiceConfig.md new file mode 100644 index 0000000000..51e99d468b --- /dev/null +++ b/python/kserve/docs/V1beta1ServiceConfig.md @@ -0,0 +1,10 @@ +# V1beta1ServiceConfig + +## Properties +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**service_cluster_ip_none** | **bool** | ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. If the DeploymentMode is Raw, the default value for ServiceClusterIPNone is false when the value is absent. | [optional] + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/python/kserve/kserve/models/__init__.py b/python/kserve/kserve/models/__init__.py index c87f489e9b..d46daa2524 100644 --- a/python/kserve/kserve/models/__init__.py +++ b/python/kserve/kserve/models/__init__.py @@ -97,6 +97,7 @@ from kserve.models.v1beta1_predictor_spec import V1beta1PredictorSpec from kserve.models.v1beta1_sk_learn_spec import V1beta1SKLearnSpec from kserve.models.v1beta1_security_config import V1beta1SecurityConfig +from kserve.models.v1beta1_service_config import V1beta1ServiceConfig from kserve.models.v1beta1_storage_spec import V1beta1StorageSpec from kserve.models.v1beta1_tf_serving_spec import V1beta1TFServingSpec from kserve.models.v1beta1_torch_serve_spec import V1beta1TorchServeSpec diff --git a/python/kserve/kserve/models/v1alpha1_local_model_cache_spec.py b/python/kserve/kserve/models/v1alpha1_local_model_cache_spec.py index 7596d41f84..e02c60696f 100644 --- a/python/kserve/kserve/models/v1alpha1_local_model_cache_spec.py +++ b/python/kserve/kserve/models/v1alpha1_local_model_cache_spec.py @@ -48,29 +48,29 @@ class V1alpha1LocalModelCacheSpec(object): """ openapi_types = { 'model_size': 'ResourceQuantity', - 'node_group': 'str', + 'node_groups': 'list[str]', 'source_model_uri': 'str' } attribute_map = { 'model_size': 'modelSize', - 'node_group': 'nodeGroup', + 'node_groups': 'nodeGroups', 'source_model_uri': 'sourceModelUri' } - def __init__(self, model_size=None, node_group='', source_model_uri='', local_vars_configuration=None): # noqa: E501 + def __init__(self, model_size=None, node_groups=None, source_model_uri='', local_vars_configuration=None): # noqa: E501 """V1alpha1LocalModelCacheSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration self._model_size = None - self._node_group = None + self._node_groups = None self._source_model_uri = None self.discriminator = None self.model_size = model_size - self.node_group = node_group + self.node_groups = node_groups self.source_model_uri = source_model_uri @property @@ -97,29 +97,29 @@ def model_size(self, model_size): self._model_size = model_size @property - def node_group(self): - """Gets the node_group of this V1alpha1LocalModelCacheSpec. # noqa: E501 + def node_groups(self): + """Gets the node_groups of this V1alpha1LocalModelCacheSpec. # noqa: E501 - group of nodes to cache the model on. # noqa: E501 + group of nodes to cache the model on. Todo: support more than 1 node groups # noqa: E501 - :return: The node_group of this V1alpha1LocalModelCacheSpec. # noqa: E501 - :rtype: str + :return: The node_groups of this V1alpha1LocalModelCacheSpec. # noqa: E501 + :rtype: list[str] """ - return self._node_group + return self._node_groups - @node_group.setter - def node_group(self, node_group): - """Sets the node_group of this V1alpha1LocalModelCacheSpec. + @node_groups.setter + def node_groups(self, node_groups): + """Sets the node_groups of this V1alpha1LocalModelCacheSpec. - group of nodes to cache the model on. # noqa: E501 + group of nodes to cache the model on. Todo: support more than 1 node groups # noqa: E501 - :param node_group: The node_group of this V1alpha1LocalModelCacheSpec. # noqa: E501 - :type: str + :param node_groups: The node_groups of this V1alpha1LocalModelCacheSpec. # noqa: E501 + :type: list[str] """ - if self.local_vars_configuration.client_side_validation and node_group is None: # noqa: E501 - raise ValueError("Invalid value for `node_group`, must not be `None`") # noqa: E501 + if self.local_vars_configuration.client_side_validation and node_groups is None: # noqa: E501 + raise ValueError("Invalid value for `node_groups`, must not be `None`") # noqa: E501 - self._node_group = node_group + self._node_groups = node_groups @property def source_model_uri(self): diff --git a/python/kserve/kserve/models/v1beta1_local_model_config.py b/python/kserve/kserve/models/v1beta1_local_model_config.py index c574dd52e7..f6b55844e9 100644 --- a/python/kserve/kserve/models/v1beta1_local_model_config.py +++ b/python/kserve/kserve/models/v1beta1_local_model_config.py @@ -50,17 +50,21 @@ class V1beta1LocalModelConfig(object): 'default_job_image': 'str', 'enabled': 'bool', 'fs_group': 'int', - 'job_namespace': 'str' + 'job_namespace': 'str', + 'job_ttl_seconds_after_finished': 'int', + 'reconcilation_frequency_in_secs': 'int' } attribute_map = { 'default_job_image': 'defaultJobImage', 'enabled': 'enabled', 'fs_group': 'fsGroup', - 'job_namespace': 'jobNamespace' + 'job_namespace': 'jobNamespace', + 'job_ttl_seconds_after_finished': 'jobTTLSecondsAfterFinished', + 'reconcilation_frequency_in_secs': 'reconcilationFrequencyInSecs' } - def __init__(self, default_job_image=None, enabled=False, fs_group=None, job_namespace='', local_vars_configuration=None): # noqa: E501 + def __init__(self, default_job_image=None, enabled=False, fs_group=None, job_namespace='', job_ttl_seconds_after_finished=None, reconcilation_frequency_in_secs=None, local_vars_configuration=None): # noqa: E501 """V1beta1LocalModelConfig - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -70,6 +74,8 @@ def __init__(self, default_job_image=None, enabled=False, fs_group=None, job_nam self._enabled = None self._fs_group = None self._job_namespace = None + self._job_ttl_seconds_after_finished = None + self._reconcilation_frequency_in_secs = None self.discriminator = None if default_job_image is not None: @@ -78,6 +84,10 @@ def __init__(self, default_job_image=None, enabled=False, fs_group=None, job_nam if fs_group is not None: self.fs_group = fs_group self.job_namespace = job_namespace + if job_ttl_seconds_after_finished is not None: + self.job_ttl_seconds_after_finished = job_ttl_seconds_after_finished + if reconcilation_frequency_in_secs is not None: + self.reconcilation_frequency_in_secs = reconcilation_frequency_in_secs @property def default_job_image(self): @@ -167,6 +177,48 @@ def job_namespace(self, job_namespace): self._job_namespace = job_namespace + @property + def job_ttl_seconds_after_finished(self): + """Gets the job_ttl_seconds_after_finished of this V1beta1LocalModelConfig. # noqa: E501 + + + :return: The job_ttl_seconds_after_finished of this V1beta1LocalModelConfig. # noqa: E501 + :rtype: int + """ + return self._job_ttl_seconds_after_finished + + @job_ttl_seconds_after_finished.setter + def job_ttl_seconds_after_finished(self, job_ttl_seconds_after_finished): + """Sets the job_ttl_seconds_after_finished of this V1beta1LocalModelConfig. + + + :param job_ttl_seconds_after_finished: The job_ttl_seconds_after_finished of this V1beta1LocalModelConfig. # noqa: E501 + :type: int + """ + + self._job_ttl_seconds_after_finished = job_ttl_seconds_after_finished + + @property + def reconcilation_frequency_in_secs(self): + """Gets the reconcilation_frequency_in_secs of this V1beta1LocalModelConfig. # noqa: E501 + + + :return: The reconcilation_frequency_in_secs of this V1beta1LocalModelConfig. # noqa: E501 + :rtype: int + """ + return self._reconcilation_frequency_in_secs + + @reconcilation_frequency_in_secs.setter + def reconcilation_frequency_in_secs(self, reconcilation_frequency_in_secs): + """Sets the reconcilation_frequency_in_secs of this V1beta1LocalModelConfig. + + + :param reconcilation_frequency_in_secs: The reconcilation_frequency_in_secs of this V1beta1LocalModelConfig. # noqa: E501 + :type: int + """ + + self._reconcilation_frequency_in_secs = reconcilation_frequency_in_secs + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/python/kserve/kserve/models/v1beta1_service_config.py b/python/kserve/kserve/models/v1beta1_service_config.py new file mode 100644 index 0000000000..f30e70f6d2 --- /dev/null +++ b/python/kserve/kserve/models/v1beta1_service_config.py @@ -0,0 +1,136 @@ +# Copyright 2023 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +""" + KServe + + Python SDK for KServe # noqa: E501 + + The version of the OpenAPI document: v0.1 + Generated by: https://openapi-generator.tech +""" + + +import pprint +import re # noqa: F401 + +import six + +from kserve.configuration import Configuration + + +class V1beta1ServiceConfig(object): + """NOTE: This class is auto generated by OpenAPI Generator. + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + """ + Attributes: + openapi_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + openapi_types = { + 'service_cluster_ip_none': 'bool' + } + + attribute_map = { + 'service_cluster_ip_none': 'serviceClusterIPNone' + } + + def __init__(self, service_cluster_ip_none=None, local_vars_configuration=None): # noqa: E501 + """V1beta1ServiceConfig - a model defined in OpenAPI""" # noqa: E501 + if local_vars_configuration is None: + local_vars_configuration = Configuration() + self.local_vars_configuration = local_vars_configuration + + self._service_cluster_ip_none = None + self.discriminator = None + + if service_cluster_ip_none is not None: + self.service_cluster_ip_none = service_cluster_ip_none + + @property + def service_cluster_ip_none(self): + """Gets the service_cluster_ip_none of this V1beta1ServiceConfig. # noqa: E501 + + ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. If the DeploymentMode is Raw, the default value for ServiceClusterIPNone is false when the value is absent. # noqa: E501 + + :return: The service_cluster_ip_none of this V1beta1ServiceConfig. # noqa: E501 + :rtype: bool + """ + return self._service_cluster_ip_none + + @service_cluster_ip_none.setter + def service_cluster_ip_none(self, service_cluster_ip_none): + """Sets the service_cluster_ip_none of this V1beta1ServiceConfig. + + ServiceClusterIPNone is a boolean flag to indicate if the service should have a clusterIP set to None. If the DeploymentMode is Raw, the default value for ServiceClusterIPNone is false when the value is absent. # noqa: E501 + + :param service_cluster_ip_none: The service_cluster_ip_none of this V1beta1ServiceConfig. # noqa: E501 + :type: bool + """ + + self._service_cluster_ip_none = service_cluster_ip_none + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.openapi_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, V1beta1ServiceConfig): + return False + + return self.to_dict() == other.to_dict() + + def __ne__(self, other): + """Returns true if both objects are not equal""" + if not isinstance(other, V1beta1ServiceConfig): + return True + + return self.to_dict() != other.to_dict() diff --git a/python/kserve/test/test_v1alpha1_local_model_cache.py b/python/kserve/test/test_v1alpha1_local_model_cache.py index 017a524e63..4babd35f88 100644 --- a/python/kserve/test/test_v1alpha1_local_model_cache.py +++ b/python/kserve/test/test_v1alpha1_local_model_cache.py @@ -26,15 +26,16 @@ from __future__ import absolute_import -import unittest import datetime +import unittest -import kserve -from kserve.models.v1alpha1_local_model_cache import ( +from kserve.models.v1alpha1_local_model_cache import ( # noqa: E501 V1alpha1LocalModelCache, -) # noqa: E501 +) from kserve.rest import ApiException +import kserve + class TestV1alpha1LocalModelCache(unittest.TestCase): """V1alpha1LocalModelCache unit test stubs""" @@ -58,7 +59,7 @@ def make_instance(self, include_optional): metadata=None, spec=kserve.models.v1alpha1_local_model_cache_spec.V1alpha1LocalModelCacheSpec( model_size="1Gi", - node_group="0", + node_groups=["0"], source_model_uri="0", ), status=None, diff --git a/python/kserve/test/test_v1alpha1_local_model_cache_list.py b/python/kserve/test/test_v1alpha1_local_model_cache_list.py index 283a913cc1..bfadefb06a 100644 --- a/python/kserve/test/test_v1alpha1_local_model_cache_list.py +++ b/python/kserve/test/test_v1alpha1_local_model_cache_list.py @@ -26,15 +26,16 @@ from __future__ import absolute_import -import unittest import datetime +import unittest -import kserve -from kserve.models.v1alpha1_local_model_cache_list import ( +from kserve.models.v1alpha1_local_model_cache_list import ( # noqa: E501 V1alpha1LocalModelCacheList, -) # noqa: E501 +) from kserve.rest import ApiException +import kserve + class TestV1alpha1LocalModelCacheList(unittest.TestCase): """V1alpha1LocalModelCacheList unit test stubs""" @@ -61,7 +62,7 @@ def make_instance(self, include_optional): metadata=None, spec=kserve.models.v1alpha1_local_model_cache_spec.V1alpha1LocalModelCacheSpec( model_size="1Gi", - node_group="0", + node_groups=["0"], source_model_uri="0", ), status=None, @@ -79,7 +80,7 @@ def make_instance(self, include_optional): metadata=None, spec=kserve.models.v1alpha1_local_model_cache_spec.V1alpha1LocalModelCacheSpec( model_size="1Gi", - node_group="0", + node_groups=["0"], source_model_uri="0", ), status=None, diff --git a/python/kserve/test/test_v1alpha1_local_model_cache_spec.py b/python/kserve/test/test_v1alpha1_local_model_cache_spec.py index 954f861d9d..656954f8ee 100644 --- a/python/kserve/test/test_v1alpha1_local_model_cache_spec.py +++ b/python/kserve/test/test_v1alpha1_local_model_cache_spec.py @@ -26,15 +26,16 @@ from __future__ import absolute_import -import unittest import datetime +import unittest -import kserve -from kserve.models.v1alpha1_local_model_cache_spec import ( +from kserve.models.v1alpha1_local_model_cache_spec import ( # noqa: E501 V1alpha1LocalModelCacheSpec, -) # noqa: E501 +) from kserve.rest import ApiException +import kserve + class TestV1alpha1LocalModelCacheSpec(unittest.TestCase): """V1alpha1LocalModelCacheSpec unit test stubs""" @@ -53,12 +54,12 @@ def make_instance(self, include_optional): # model = kserve.models.v1alpha1_local_model_cache_spec.V1alpha1LocalModelCacheSpec() # noqa: E501 if include_optional: return V1alpha1LocalModelCacheSpec( - model_size="1Gi", node_group="0", source_model_uri="0" + model_size="1Gi", node_groups=["0"], source_model_uri="0" ) else: return V1alpha1LocalModelCacheSpec( model_size="1Gi", - node_group="0", + node_groups=["0"], source_model_uri="0", ) diff --git a/python/kserve/test/test_v1beta1_service_config.py b/python/kserve/test/test_v1beta1_service_config.py new file mode 100644 index 0000000000..1b239307f7 --- /dev/null +++ b/python/kserve/test/test_v1beta1_service_config.py @@ -0,0 +1,64 @@ +# Copyright 2023 The KServe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +""" + KServe + + Python SDK for KServe # noqa: E501 + + The version of the OpenAPI document: v0.1 + Generated by: https://openapi-generator.tech +""" + + +from __future__ import absolute_import + +import unittest +import datetime + +import kserve +from kserve.models.v1beta1_service_config import V1beta1ServiceConfig # noqa: E501 +from kserve.rest import ApiException + + +class TestV1beta1ServiceConfig(unittest.TestCase): + """V1beta1ServiceConfig unit test stubs""" + + def setUp(self): + pass + + def tearDown(self): + pass + + def make_instance(self, include_optional): + """Test V1beta1ServiceConfig + include_option is a boolean, when False only required + params are included, when True both required and + optional params are included""" + # model = kserve.models.v1beta1_service_config.V1beta1ServiceConfig() # noqa: E501 + if include_optional: + return V1beta1ServiceConfig(service_cluster_ip_none=True) + else: + return V1beta1ServiceConfig() + + def testV1beta1ServiceConfig(self): + """Test V1beta1ServiceConfig""" + inst_req_only = self.make_instance(include_optional=False) + inst_req_and_optional = self.make_instance(include_optional=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/crds/serving.kserve.io_inferenceservices.yaml b/test/crds/serving.kserve.io_inferenceservices.yaml index 29e70c8922..732fc5bd60 100644 --- a/test/crds/serving.kserve.io_inferenceservices.yaml +++ b/test/crds/serving.kserve.io_inferenceservices.yaml @@ -26185,8 +26185,12 @@ spec: - type: string pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true - nodeGroup: - type: string + nodeGroups: + items: + type: string + maxItems: 1 + minItems: 1 + type: array sourceModelUri: type: string x-kubernetes-validations: @@ -26194,7 +26198,7 @@ spec: rule: self == oldSelf required: - modelSize - - nodeGroup + - nodeGroups - sourceModelUri type: object status: diff --git a/test/crds/serving.kserve.io_localmodelcaches.yaml b/test/crds/serving.kserve.io_localmodelcaches.yaml deleted file mode 100644 index e40ba50ac2..0000000000 --- a/test/crds/serving.kserve.io_localmodelcaches.yaml +++ /dev/null @@ -1,83 +0,0 @@ ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.16.2 - name: localmodelcaches.serving.kserve.io -spec: - group: serving.kserve.io - names: - kind: LocalModelCache - listKind: LocalModelCacheList - plural: localmodelcaches - singular: localmodelcache - scope: Cluster - versions: - - name: v1alpha1 - schema: - openAPIV3Schema: - properties: - apiVersion: - type: string - kind: - type: string - metadata: - type: object - spec: - properties: - modelSize: - anyOf: - - type: integer - - type: string - pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ - x-kubernetes-int-or-string: true - nodeGroup: - type: string - sourceModelUri: - type: string - x-kubernetes-validations: - - message: StorageUri is immutable - rule: self == oldSelf - required: - - modelSize - - nodeGroup - - sourceModelUri - type: object - status: - properties: - copies: - properties: - available: - type: integer - failed: - type: integer - total: - type: integer - type: object - inferenceServices: - items: - properties: - name: - type: string - namespace: - type: string - type: object - type: array - nodeStatus: - additionalProperties: - enum: - - "" - - NodeNotReady - - NodeDownloadPending - - NodeDownloading - - NodeDownloaded - - NodeDownloadError - type: string - type: object - type: object - type: object - served: true - storage: true - subresources: - status: {} diff --git a/test/e2e/logger/test_raw_logger.py b/test/e2e/logger/test_raw_logger.py index fea1e69b8b..e8c8fa06fc 100644 --- a/test/e2e/logger/test_raw_logger.py +++ b/test/e2e/logger/test_raw_logger.py @@ -15,13 +15,15 @@ import os from kubernetes import client -from kserve import KServeClient -from kserve import constants -from kserve import V1beta1PredictorSpec -from kserve import V1beta1SKLearnSpec -from kserve import V1beta1InferenceServiceSpec -from kserve import V1beta1InferenceService -from kserve import V1beta1LoggerSpec +from kserve import ( + KServeClient, + constants, + V1beta1PredictorSpec, + V1beta1SKLearnSpec, + V1beta1InferenceServiceSpec, + V1beta1InferenceService, + V1beta1LoggerSpec, +) from kubernetes.client import V1ResourceRequirements from kubernetes.client import V1Container import pytest @@ -30,14 +32,67 @@ kserve_client = KServeClient(config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) +annotations = {"serving.kserve.io/deploymentMode": "RawDeployment"} @pytest.mark.raw @pytest.mark.asyncio(scope="session") async def test_kserve_logger(rest_v1_client): msg_dumper = "message-dumper-raw" - annotations = {"serving.kserve.io/deploymentMode": "RawDeployment"} + before(msg_dumper) + service_name = "isvc-logger-raw" + predictor = V1beta1PredictorSpec( + min_replicas=1, + logger=V1beta1LoggerSpec( + mode="all", + url="http://" + + msg_dumper + + "-predictor" + + "." + + KSERVE_TEST_NAMESPACE + + ".svc.cluster.local", + ), + sklearn=V1beta1SKLearnSpec( + storage_uri="gs://kfserving-examples/models/sklearn/1.0/model", + resources=V1ResourceRequirements( + requests={"cpu": "10m", "memory": "128Mi"}, + limits={"cpu": "100m", "memory": "256Mi"}, + ), + ), + ) + base_test(msg_dumper, service_name, predictor, rest_v1_client) + + +@pytest.mark.rawcipn +async def test_kserve_logger_cipn(rest_v1_client): + msg_dumper = "message-dumper-raw-cipn" + before(msg_dumper) + + service_name = "isvc-logger-raw-cipn" + predictor = V1beta1PredictorSpec( + min_replicas=1, + logger=V1beta1LoggerSpec( + mode="all", + url="http://" + + msg_dumper + + "-predictor" + + "." + + KSERVE_TEST_NAMESPACE + + ".svc.cluster.local:8080", + ), + sklearn=V1beta1SKLearnSpec( + storage_uri="gs://kfserving-examples/models/sklearn/1.0/model", + resources=V1ResourceRequirements( + requests={"cpu": "10m", "memory": "128Mi"}, + limits={"cpu": "100m", "memory": "256Mi"}, + ), + ), + ) + await base_test(msg_dumper, service_name, predictor, rest_v1_client) + + +def before(msg_dumper): predictor = V1beta1PredictorSpec( min_replicas=1, containers=[ @@ -64,27 +119,8 @@ async def test_kserve_logger(rest_v1_client): kserve_client.create(isvc) kserve_client.wait_isvc_ready(msg_dumper, namespace=KSERVE_TEST_NAMESPACE) - service_name = "isvc-logger-raw" - predictor = V1beta1PredictorSpec( - min_replicas=1, - logger=V1beta1LoggerSpec( - mode="all", - url="http://" - + msg_dumper - + "-predictor" - + "." - + KSERVE_TEST_NAMESPACE - + ".svc.cluster.local", - ), - sklearn=V1beta1SKLearnSpec( - storage_uri="gs://kfserving-examples/models/sklearn/1.0/model", - resources=V1ResourceRequirements( - requests={"cpu": "10m", "memory": "128Mi"}, - limits={"cpu": "100m", "memory": "256Mi"}, - ), - ), - ) +async def base_test(msg_dumper, service_name, predictor, rest_v1_client): isvc = V1beta1InferenceService( api_version=constants.KSERVE_V1BETA1, kind=constants.KSERVE_KIND, @@ -120,10 +156,8 @@ async def test_kserve_logger(rest_v1_client): container="kserve-container", ) print(log) - # TODO, as part of the https://issues.redhat.com/browse/RHOAIENG-5077 - # add the control flag here to check the logs when headless service is disabled - # assert ("org.kubeflow.serving.inference.request" in log) - # assert ("org.kubeflow.serving.inference.response" in log) + assert "org.kubeflow.serving.inference.request" in log + assert "org.kubeflow.serving.inference.response" in log kserve_client.delete(service_name, KSERVE_TEST_NAMESPACE) kserve_client.delete(msg_dumper, KSERVE_TEST_NAMESPACE) diff --git a/test/e2e/pytest.ini b/test/e2e/pytest.ini index 67bf287f15..18c5d0120e 100644 --- a/test/e2e/pytest.ini +++ b/test/e2e/pytest.ini @@ -10,6 +10,7 @@ markers = graph: inference graph tests helm: helm e2e tests raw: raw e2e tests + rawcipn: raw e2e tests with cluster ip none kourier: e2e tests using kourier as networking layer collocation: transformer and predictor collocation e2e tests predictor: predictor e2e tests including grpc