From c459f054a5e4020bf8391b0170431d9e8116a41f Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 07:09:45 +0300 Subject: [PATCH 01/14] Move parallelism fields from autoscaling to predictor --- pkg/operator/operator/k8s_specs.go | 12 +++++----- pkg/types/spec/validations.go | 35 +++++++++++++++--------------- pkg/types/userconfig/api.go | 24 +++++--------------- pkg/types/userconfig/config_key.go | 4 ++-- 4 files changed, 32 insertions(+), 43 deletions(-) diff --git a/pkg/operator/operator/k8s_specs.go b/pkg/operator/operator/k8s_specs.go index 45d31119d2..6591d8892d 100644 --- a/pkg/operator/operator/k8s_specs.go +++ b/pkg/operator/operator/k8s_specs.go @@ -618,11 +618,11 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { }, kcore.EnvVar{ Name: "CORTEX_WORKERS_PER_REPLICA", - Value: s.Int32(api.Autoscaling.WorkersPerReplica), + Value: s.Int32(api.Predictor.WorkersPerReplica), }, kcore.EnvVar{ Name: "CORTEX_THREADS_PER_WORKER", - Value: s.Int32(api.Autoscaling.ThreadsPerWorker), + Value: s.Int32(api.Predictor.ThreadsPerWorker), }, kcore.EnvVar{ Name: "CORTEX_MAX_REPLICA_CONCURRENCY", @@ -631,7 +631,7 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { kcore.EnvVar{ Name: "CORTEX_MAX_WORKER_CONCURRENCY", // add 1 because it was required to achieve the target concurrency for 1 worker, 1 thread - Value: s.Int64(1 + int64(math.Round(float64(api.Autoscaling.MaxReplicaConcurrency)/float64(api.Autoscaling.WorkersPerReplica)))), + Value: s.Int64(1 + int64(math.Round(float64(api.Autoscaling.MaxReplicaConcurrency)/float64(api.Predictor.WorkersPerReplica)))), }, kcore.EnvVar{ Name: "CORTEX_SO_MAX_CONN", @@ -703,7 +703,7 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { envVars = append(envVars, kcore.EnvVar{ Name: "NEURONCORE_GROUP_SIZES", - Value: s.Int64(api.Compute.Inf * consts.NeuronCoresPerInf / int64(api.Autoscaling.WorkersPerReplica)), + Value: s.Int64(api.Compute.Inf * consts.NeuronCoresPerInf / int64(api.Predictor.WorkersPerReplica)), }, kcore.EnvVar{ Name: "NEURON_RTD_ADDRESS", @@ -717,7 +717,7 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { envVars = append(envVars, kcore.EnvVar{ Name: "TF_WORKERS", - Value: s.Int32(api.Autoscaling.WorkersPerReplica), + Value: s.Int32(api.Predictor.WorkersPerReplica), }, kcore.EnvVar{ Name: "CORTEX_TF_BASE_SERVING_PORT", @@ -760,7 +760,7 @@ func tensorflowServingContainer(api *spec.API, volumeMounts []kcore.VolumeMount, } if api.Compute.Inf > 0 { - numPorts := api.Autoscaling.WorkersPerReplica + numPorts := api.Predictor.WorkersPerReplica for i := int32(1); i < numPorts; i++ { ports = append(ports, kcore.ContainerPort{ ContainerPort: _tfBaseServingPortInt32 + i, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 6bda48aef5..4c899b2883 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -104,6 +104,21 @@ func predictorValidation() *cr.StructFieldValidation { Required: true, }, }, + { + StructField: "WorkersPerReplica", + Int32Validation: &cr.Int32Validation{ + Default: 1, + GreaterThanOrEqualTo: pointer.Int32(1), + LessThanOrEqualTo: pointer.Int32(20), + }, + }, + { + StructField: "ThreadsPerWorker", + Int32Validation: &cr.Int32Validation{ + Default: 1, + GreaterThanOrEqualTo: pointer.Int32(1), + }, + }, { StructField: "Model", StringPtrValidation: &cr.StringPtrValidation{}, @@ -286,21 +301,6 @@ func autoscalingValidation(provider types.ProviderType) *cr.StructFieldValidatio GreaterThan: pointer.Int32(0), }, }, - { - StructField: "WorkersPerReplica", - Int32Validation: &cr.Int32Validation{ - Default: 1, - GreaterThanOrEqualTo: pointer.Int32(1), - LessThanOrEqualTo: pointer.Int32(20), - }, - }, - { - StructField: "ThreadsPerWorker", - Int32Validation: &cr.Int32Validation{ - Default: 1, - GreaterThanOrEqualTo: pointer.Int32(1), - }, - }, { StructField: "TargetReplicaConcurrency", Float64PtrValidation: &cr.Float64PtrValidation{ @@ -942,9 +942,10 @@ func validatePythonPath(pythonPath string, projectFiles ProjectFiles) error { func validateAutoscaling(api *userconfig.API) error { autoscaling := api.Autoscaling + predictor := api.Predictor if autoscaling.TargetReplicaConcurrency == nil { - autoscaling.TargetReplicaConcurrency = pointer.Float64(float64(autoscaling.WorkersPerReplica * autoscaling.ThreadsPerWorker)) + autoscaling.TargetReplicaConcurrency = pointer.Float64(float64(predictor.WorkersPerReplica * predictor.ThreadsPerWorker)) } if *autoscaling.TargetReplicaConcurrency > float64(autoscaling.MaxReplicaConcurrency) { @@ -965,7 +966,7 @@ func validateAutoscaling(api *userconfig.API) error { if api.Compute.Inf > 0 { numNeuronCores := api.Compute.Inf * consts.NeuronCoresPerInf - workersPerReplica := int64(api.Autoscaling.WorkersPerReplica) + workersPerReplica := int64(predictor.WorkersPerReplica) if !libmath.IsDivisibleByInt64(numNeuronCores, workersPerReplica) { return ErrorInvalidNumberOfInfWorkers(workersPerReplica, api.Compute.Inf, numNeuronCores) } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 32ea51f875..843f421fd0 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -47,6 +47,8 @@ type API struct { type Predictor struct { Type PredictorType `json:"type" yaml:"type"` Path string `json:"path" yaml:"path"` + WorkersPerReplica int32 `json:"workers_per_replica" yaml:"workers_per_replica"` + ThreadsPerWorker int32 `json:"threads_per_worker" yaml:"threads_per_worker"` Model *string `json:"model" yaml:"model"` Models []*ModelResource `json:"models" yaml:"models"` PythonPath *string `json:"python_path" yaml:"python_path"` @@ -83,8 +85,6 @@ type Autoscaling struct { MinReplicas int32 `json:"min_replicas" yaml:"min_replicas"` MaxReplicas int32 `json:"max_replicas" yaml:"max_replicas"` InitReplicas int32 `json:"init_replicas" yaml:"init_replicas"` - WorkersPerReplica int32 `json:"workers_per_replica" yaml:"workers_per_replica"` - ThreadsPerWorker int32 `json:"threads_per_worker" yaml:"threads_per_worker"` TargetReplicaConcurrency *float64 `json:"target_replica_concurrency" yaml:"target_replica_concurrency"` MaxReplicaConcurrency int64 `json:"max_replica_concurrency" yaml:"max_replica_concurrency"` Window time.Duration `json:"window" yaml:"window"` @@ -177,8 +177,8 @@ func (api *API) ToK8sAnnotations() map[string]string { APIGatewayAnnotationKey: api.Networking.APIGateway.String(), MinReplicasAnnotationKey: s.Int32(api.Autoscaling.MinReplicas), MaxReplicasAnnotationKey: s.Int32(api.Autoscaling.MaxReplicas), - WorkersPerReplicaAnnotationKey: s.Int32(api.Autoscaling.WorkersPerReplica), - ThreadsPerWorkerAnnotationKey: s.Int32(api.Autoscaling.ThreadsPerWorker), + WorkersPerReplicaAnnotationKey: s.Int32(api.Predictor.WorkersPerReplica), + ThreadsPerWorkerAnnotationKey: s.Int32(api.Predictor.ThreadsPerWorker), TargetReplicaConcurrencyAnnotationKey: s.Float64(*api.Autoscaling.TargetReplicaConcurrency), MaxReplicaConcurrencyAnnotationKey: s.Int64(api.Autoscaling.MaxReplicaConcurrency), WindowAnnotationKey: api.Autoscaling.Window.String(), @@ -214,18 +214,6 @@ func AutoscalingFromAnnotations(k8sObj kmeta.Object) (*Autoscaling, error) { } a.MaxReplicas = maxReplicas - workersPerReplica, err := k8s.ParseInt32Annotation(k8sObj, WorkersPerReplicaAnnotationKey) - if err != nil { - return nil, err - } - a.WorkersPerReplica = workersPerReplica - - threadsPerWorker, err := k8s.ParseInt32Annotation(k8sObj, ThreadsPerWorkerAnnotationKey) - if err != nil { - return nil, err - } - a.ThreadsPerWorker = threadsPerWorker - targetReplicaConcurrency, err := k8s.ParseFloat64Annotation(k8sObj, TargetReplicaConcurrencyAnnotationKey) if err != nil { return nil, err @@ -331,6 +319,8 @@ func (predictor *Predictor) UserStr() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: %s\n", TypeKey, predictor.Type)) sb.WriteString(fmt.Sprintf("%s: %s\n", PathKey, predictor.Path)) + sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(predictor.WorkersPerReplica))) + sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(predictor.ThreadsPerWorker))) if predictor.Model != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", ModelKey, *predictor.Model)) } @@ -442,8 +432,6 @@ func (autoscaling *Autoscaling) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", MinReplicasKey, s.Int32(autoscaling.MinReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicasKey, s.Int32(autoscaling.MaxReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", InitReplicasKey, s.Int32(autoscaling.InitReplicas))) - sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(autoscaling.WorkersPerReplica))) - sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(autoscaling.ThreadsPerWorker))) sb.WriteString(fmt.Sprintf("%s: %s\n", TargetReplicaConcurrencyKey, s.Float64(*autoscaling.TargetReplicaConcurrency))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicaConcurrencyKey, s.Int64(autoscaling.MaxReplicaConcurrency))) sb.WriteString(fmt.Sprintf("%s: %s\n", WindowKey, autoscaling.Window.String())) diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 6d0937916a..ef325987a2 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -31,6 +31,8 @@ const ( // Predictor TypeKey = "type" PathKey = "path" + WorkersPerReplicaKey = "workers_per_replica" + ThreadsPerWorkerKey = "threads_per_worker" ModelKey = "model" ModelsKey = "models" PythonPathKey = "python_path" @@ -60,8 +62,6 @@ const ( MinReplicasKey = "min_replicas" MaxReplicasKey = "max_replicas" InitReplicasKey = "init_replicas" - WorkersPerReplicaKey = "workers_per_replica" - ThreadsPerWorkerKey = "threads_per_worker" TargetReplicaConcurrencyKey = "target_replica_concurrency" MaxReplicaConcurrencyKey = "max_replica_concurrency" WindowKey = "window" From f07a54a2d15e4c42782881395eb15e6422f34fcc Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 19:53:34 +0300 Subject: [PATCH 02/14] Set workers/threads vars for local deployments --- cli/local/docker_spec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/local/docker_spec.go b/cli/local/docker_spec.go index 4be4df8b7e..f575146a22 100644 --- a/cli/local/docker_spec.go +++ b/cli/local/docker_spec.go @@ -88,8 +88,8 @@ func getAPIEnv(api *spec.API, awsClient *aws.Client) []string { "CORTEX_MODELS="+strings.Join(api.ModelNames(), ","), "CORTEX_API_SPEC="+filepath.Join("/mnt/workspace", filepath.Base(api.Key)), "CORTEX_PROJECT_DIR="+_projectDir, - "CORTEX_WORKERS_PER_REPLICA=1", - "CORTEX_THREADS_PER_WORKER=1", + "CORTEX_WORKERS_PER_REPLICA="+s.Int32(api.Predictor.WorkersPerReplica), + "CORTEX_THREADS_PER_WORKER="+s.Int32(api.Predictor.ThreadsPerWorker), "CORTEX_MAX_WORKER_CONCURRENCY=1000", "CORTEX_SO_MAX_CONN=1000", "AWS_REGION="+awsClient.Region, From 17ba5f46d85325e10083ab1b45a9c98d319ba189 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 20:11:03 +0300 Subject: [PATCH 03/14] Move parallelism fields in examples --- .../tensorflow/image-classifier-resnet50/cortex_cpu.yaml | 5 ++--- .../tensorflow/image-classifier-resnet50/cortex_gpu.yaml | 5 ++--- .../tensorflow/image-classifier-resnet50/cortex_inf.yaml | 4 ++-- examples/tensorflow/license-plate-reader/cortex_full.yaml | 8 ++++---- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml index bcd2821013..01e82bb273 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml @@ -4,6 +4,8 @@ predictor: type: tensorflow path: predictor.py + workers_per_replica: 4 + threads_per_worker: 16 model: s3://cortex-examples/tensorflow/resnet50 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json @@ -12,6 +14,3 @@ compute: cpu: 3 mem: 4G - autoscaling: - workers_per_replica: 4 - threads_per_worker: 16 diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml index 33ad90327e..ae5ae6d817 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml @@ -4,6 +4,8 @@ predictor: type: tensorflow path: predictor.py + workers_per_replica: 4 + threads_per_worker: 24 model: s3://cortex-examples/tensorflow/resnet50 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json @@ -13,6 +15,3 @@ gpu: 1 cpu: 3 mem: 4G - autoscaling: - workers_per_replica: 4 - threads_per_worker: 24 diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml index e2e48c2d0a..357326ae6a 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml @@ -4,6 +4,8 @@ predictor: type: tensorflow path: predictor.py + workers_per_replica: 4 + threads_per_worker: 256 model: s3://cortex-examples/tensorflow/resnet50_neuron config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json @@ -14,6 +16,4 @@ cpu: 3 mem: 4G autoscaling: - workers_per_replica: 4 - threads_per_worker: 256 max_replica_concurrency: 16384 diff --git a/examples/tensorflow/license-plate-reader/cortex_full.yaml b/examples/tensorflow/license-plate-reader/cortex_full.yaml index 01a32ddd53..01ca27f471 100644 --- a/examples/tensorflow/license-plate-reader/cortex_full.yaml +++ b/examples/tensorflow/license-plate-reader/cortex_full.yaml @@ -4,6 +4,8 @@ predictor: type: tensorflow path: predictor_yolo.py + workers_per_replica: 4 + threads_per_worker: 3 model: s3://cortex-examples/tensorflow/license-plate-reader/yolov3_tf signature_key: serving_default config: @@ -15,13 +17,13 @@ autoscaling: min_replicas: 2 max_replicas: 2 - workers_per_replica: 4 - threads_per_worker: 3 - name: crnn predictor: type: python path: predictor_crnn.py + workers_per_replica: 1 + threads_per_worker: 1 compute: cpu: 1 gpu: 1 @@ -29,5 +31,3 @@ autoscaling: min_replicas: 10 max_replicas: 10 - workers_per_replica: 1 - threads_per_worker: 1 From 9cd3f0043aef61ed1aa35296d3260572426a811c Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 20:31:39 +0300 Subject: [PATCH 04/14] Adjust docs --- docs/deployments/api-configuration.md | 12 ++++++------ docs/deployments/autoscaling.md | 8 -------- docs/deployments/parallelism.md | 11 +++++++++++ docs/summary.md | 1 + 4 files changed, 18 insertions(+), 14 deletions(-) create mode 100644 docs/deployments/parallelism.md diff --git a/docs/deployments/api-configuration.md b/docs/deployments/api-configuration.md index 9bd13cedd1..b96f2d37c6 100644 --- a/docs/deployments/api-configuration.md +++ b/docs/deployments/api-configuration.md @@ -15,6 +15,8 @@ Reference the section below which corresponds to your Predictor type: [Python](# predictor: type: python path: # path to a python file with a PythonPredictor class definition, relative to the Cortex root (required) + workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) + threads_per_worker: # the number of threads per worker (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/python-predictor-cpu or cortexlabs/python-predictor-gpu based on compute) @@ -33,8 +35,6 @@ Reference the section below which corresponds to your Predictor type: [Python](# min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) @@ -60,6 +60,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput predictor: type: tensorflow path: # path to a python file with a TensorFlowPredictor class definition, relative to the Cortex root (required) + workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) + threads_per_worker: # the number of threads per worker (default: 1) model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (either this or 'models' must be provided) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) models: # use this when multiple models per API are desired (either this or 'model' must be provided) @@ -86,8 +88,6 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) @@ -113,6 +113,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput predictor: type: onnx path: # path to a python file with an ONNXPredictor class definition, relative to the Cortex root (required) + workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) + threads_per_worker: # the number of threads per worker (default: 1) model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (either this or 'models' must be provided) models: # use this when multiple models per API are desired (either this or 'model' must be provided) - name: # unique name for the model (e.g. iris-classifier) (required) @@ -136,8 +138,6 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) diff --git a/docs/deployments/autoscaling.md b/docs/deployments/autoscaling.md index 7f3c9303f0..265d90f849 100644 --- a/docs/deployments/autoscaling.md +++ b/docs/deployments/autoscaling.md @@ -4,14 +4,6 @@ _WARNING: you are on the master branch, please refer to the docs on the branch t Cortex autoscales your web services on a per-API basis based on your configuration. -## Replica Parallelism - -* `workers_per_replica` (default: 1): Each replica runs a web server with `workers_per_replica` workers, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 workers per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `workers_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API. - -* `threads_per_worker` (default: 1): Each worker uses a thread pool of size `threads_per_worker` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference or Inferentia ASIC-based inference, increasing the number of threads per worker can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per worker is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per worker. - -`workers_per_replica` * `threads_per_worker` represents the number of requests that your replica can work in parallel. For example, if `workers_per_replica` is 2 and `threads_per_worker` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3. - ## Autoscaling Replicas * `min_replicas`: The lower bound on how many replicas can be running for an API. diff --git a/docs/deployments/parallelism.md b/docs/deployments/parallelism.md new file mode 100644 index 0000000000..a5ea4f8968 --- /dev/null +++ b/docs/deployments/parallelism.md @@ -0,0 +1,11 @@ +# Parallelism + +_WARNING: you are on the master branch, please refer to the docs on the branch that matches your `cortex version`_ + +Replica parallelism can be tweaked with the following fields: + +* `workers_per_replica` (default: 1): Each replica runs a web server with `workers_per_replica` workers, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 workers per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `workers_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API. + +* `threads_per_worker` (default: 1): Each worker uses a thread pool of size `threads_per_worker` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference or Inferentia ASIC-based inference, increasing the number of threads per worker can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per worker is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per worker. + +`workers_per_replica` * `threads_per_worker` represents the number of requests that your replica can work in parallel. For example, if `workers_per_replica` is 2 and `threads_per_worker` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3. diff --git a/docs/summary.md b/docs/summary.md index 7acaef6b43..3ca691d44d 100644 --- a/docs/summary.md +++ b/docs/summary.md @@ -15,6 +15,7 @@ * [Predictor implementation](deployments/predictors.md) * [API configuration](deployments/api-configuration.md) * [API deployment](deployments/deployment.md) +* [Parellelism](deployments/parallelism.md) * [Autoscaling](deployments/autoscaling.md) * [Networking](deployments/networking.md) * [Compute](deployments/compute.md) From 686304f2ed62823d94a54cf27ea1f3a8d54268f5 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 20:53:59 +0300 Subject: [PATCH 05/14] Reposition parallelism fields in config --- docs/deployments/api-configuration.md | 8 +++---- pkg/types/spec/validations.go | 30 +++++++++++++-------------- pkg/types/userconfig/api.go | 8 +++---- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/docs/deployments/api-configuration.md b/docs/deployments/api-configuration.md index b96f2d37c6..4cd8c4cd1d 100644 --- a/docs/deployments/api-configuration.md +++ b/docs/deployments/api-configuration.md @@ -60,8 +60,6 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput predictor: type: tensorflow path: # path to a python file with a TensorFlowPredictor class definition, relative to the Cortex root (required) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (either this or 'models' must be provided) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) models: # use this when multiple models per API are desired (either this or 'model' must be provided) @@ -69,6 +67,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... + workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) + threads_per_worker: # the number of threads per worker (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/tensorflow-predictor) @@ -113,14 +113,14 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput predictor: type: onnx path: # path to a python file with an ONNXPredictor class definition, relative to the Cortex root (required) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (either this or 'models' must be provided) models: # use this when multiple models per API are desired (either this or 'model' must be provided) - name: # unique name for the model (e.g. iris-classifier) (required) model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... + workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) + threads_per_worker: # the number of threads per worker (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/onnx-predictor-gpu or cortexlabs/onnx-predictor-cpu based on compute) diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 4c899b2883..c42a35f207 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -104,21 +104,6 @@ func predictorValidation() *cr.StructFieldValidation { Required: true, }, }, - { - StructField: "WorkersPerReplica", - Int32Validation: &cr.Int32Validation{ - Default: 1, - GreaterThanOrEqualTo: pointer.Int32(1), - LessThanOrEqualTo: pointer.Int32(20), - }, - }, - { - StructField: "ThreadsPerWorker", - Int32Validation: &cr.Int32Validation{ - Default: 1, - GreaterThanOrEqualTo: pointer.Int32(1), - }, - }, { StructField: "Model", StringPtrValidation: &cr.StringPtrValidation{}, @@ -148,6 +133,21 @@ func predictorValidation() *cr.StructFieldValidation { DockerImageOrEmpty: true, }, }, + { + StructField: "WorkersPerReplica", + Int32Validation: &cr.Int32Validation{ + Default: 1, + GreaterThanOrEqualTo: pointer.Int32(1), + LessThanOrEqualTo: pointer.Int32(20), + }, + }, + { + StructField: "ThreadsPerWorker", + Int32Validation: &cr.Int32Validation{ + Default: 1, + GreaterThanOrEqualTo: pointer.Int32(1), + }, + }, { StructField: "Config", InterfaceMapValidation: &cr.InterfaceMapValidation{ diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 843f421fd0..6c4ca2addd 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -47,13 +47,13 @@ type API struct { type Predictor struct { Type PredictorType `json:"type" yaml:"type"` Path string `json:"path" yaml:"path"` - WorkersPerReplica int32 `json:"workers_per_replica" yaml:"workers_per_replica"` - ThreadsPerWorker int32 `json:"threads_per_worker" yaml:"threads_per_worker"` Model *string `json:"model" yaml:"model"` Models []*ModelResource `json:"models" yaml:"models"` PythonPath *string `json:"python_path" yaml:"python_path"` Image string `json:"image" yaml:"image"` TensorFlowServingImage string `json:"tensorflow_serving_image" yaml:"tensorflow_serving_image"` + WorkersPerReplica int32 `json:"workers_per_replica" yaml:"workers_per_replica"` + ThreadsPerWorker int32 `json:"threads_per_worker" yaml:"threads_per_worker"` Config map[string]interface{} `json:"config" yaml:"config"` Env map[string]string `json:"env" yaml:"env"` SignatureKey *string `json:"signature_key" yaml:"signature_key"` @@ -319,8 +319,6 @@ func (predictor *Predictor) UserStr() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: %s\n", TypeKey, predictor.Type)) sb.WriteString(fmt.Sprintf("%s: %s\n", PathKey, predictor.Path)) - sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(predictor.WorkersPerReplica))) - sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(predictor.ThreadsPerWorker))) if predictor.Model != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", ModelKey, *predictor.Model)) } @@ -333,6 +331,8 @@ func (predictor *Predictor) UserStr() string { if predictor.SignatureKey != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", SignatureKeyKey, *predictor.SignatureKey)) } + sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(predictor.WorkersPerReplica))) + sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(predictor.ThreadsPerWorker))) if predictor.PythonPath != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", PythonPathKey, *predictor.PythonPath)) } From bf525de54a4fce1a1a7b03d801220db4fcca956f Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 21:03:42 +0300 Subject: [PATCH 06/14] Match field order with that in the docs --- pkg/types/userconfig/api.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 6c4ca2addd..fd5fd8603d 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -333,17 +333,17 @@ func (predictor *Predictor) UserStr() string { } sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(predictor.WorkersPerReplica))) sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(predictor.ThreadsPerWorker))) - if predictor.PythonPath != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", PythonPathKey, *predictor.PythonPath)) + if len(predictor.Config) > 0 { + sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) + d, _ := yaml.Marshal(&predictor.Config) + sb.WriteString(s.Indent(string(d), " ")) } sb.WriteString(fmt.Sprintf("%s: %s\n", ImageKey, predictor.Image)) if predictor.TensorFlowServingImage != "" { sb.WriteString(fmt.Sprintf("%s: %s\n", TensorFlowServingImageKey, predictor.TensorFlowServingImage)) } - if len(predictor.Config) > 0 { - sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) - d, _ := yaml.Marshal(&predictor.Config) - sb.WriteString(s.Indent(string(d), " ")) + if predictor.PythonPath != nil { + sb.WriteString(fmt.Sprintf("%s: %s\n", PythonPathKey, *predictor.PythonPath)) } if len(predictor.Env) > 0 { sb.WriteString(fmt.Sprintf("%s:\n", EnvKey)) From 0c4896c0a416b01e02417cb695b6c388becb4d3c Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 23 Jun 2020 23:30:18 +0300 Subject: [PATCH 07/14] Rename parallelism fields --- cli/local/docker_spec.go | 6 ++--- docs/deployments/api-configuration.md | 18 +++++++-------- docs/deployments/autoscaling.md | 8 +++---- docs/deployments/gpus.md | 4 ++-- docs/deployments/inferentia.md | 8 +++---- docs/deployments/parallelism.md | 6 ++--- docs/guides/batch-runner.md | 6 ++--- .../image-classifier-resnet50/cortex_cpu.yaml | 4 ++-- .../image-classifier-resnet50/cortex_gpu.yaml | 4 ++-- .../image-classifier-resnet50/cortex_inf.yaml | 4 ++-- .../license-plate-reader/cortex_full.yaml | 8 +++---- images/tensorflow-serving-inf/run.sh | 2 +- pkg/operator/operator/k8s_specs.go | 22 +++++++++---------- pkg/types/spec/errors.go | 10 ++++----- pkg/types/spec/validations.go | 12 +++++----- pkg/types/userconfig/api.go | 12 +++++----- pkg/types/userconfig/config_key.go | 8 +++---- pkg/workloads/cortex/serve/serve.py | 2 +- pkg/workloads/cortex/serve/start_uvicorn.py | 18 +++++++-------- 19 files changed, 81 insertions(+), 81 deletions(-) diff --git a/cli/local/docker_spec.go b/cli/local/docker_spec.go index f575146a22..50566cac48 100644 --- a/cli/local/docker_spec.go +++ b/cli/local/docker_spec.go @@ -88,9 +88,9 @@ func getAPIEnv(api *spec.API, awsClient *aws.Client) []string { "CORTEX_MODELS="+strings.Join(api.ModelNames(), ","), "CORTEX_API_SPEC="+filepath.Join("/mnt/workspace", filepath.Base(api.Key)), "CORTEX_PROJECT_DIR="+_projectDir, - "CORTEX_WORKERS_PER_REPLICA="+s.Int32(api.Predictor.WorkersPerReplica), - "CORTEX_THREADS_PER_WORKER="+s.Int32(api.Predictor.ThreadsPerWorker), - "CORTEX_MAX_WORKER_CONCURRENCY=1000", + "CORTEX_PROCESSES_PER_REPLICA="+s.Int32(api.Predictor.ProcessesPerReplica), + "CORTEX_THREADS_PER_PROCESS="+s.Int32(api.Predictor.ThreadsPerProcess), + "CORTEX_MAX_PROCESS_CONCURRENCY=1000", "CORTEX_SO_MAX_CONN=1000", "AWS_REGION="+awsClient.Region, ) diff --git a/docs/deployments/api-configuration.md b/docs/deployments/api-configuration.md index 4cd8c4cd1d..a1b39f21f2 100644 --- a/docs/deployments/api-configuration.md +++ b/docs/deployments/api-configuration.md @@ -15,8 +15,8 @@ Reference the section below which corresponds to your Predictor type: [Python](# predictor: type: python path: # path to a python file with a PythonPredictor class definition, relative to the Cortex root (required) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) + processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) + threads_per_process: # the number of threads per process (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/python-predictor-cpu or cortexlabs/python-predictor-gpu based on compute) @@ -35,7 +35,7 @@ Reference the section below which corresponds to your Predictor type: [Python](# min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) + target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: processes_per_replica * threads_per_process) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) @@ -67,8 +67,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) + processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) + threads_per_process: # the number of threads per process (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/tensorflow-predictor) @@ -88,7 +88,7 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) + target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: processes_per_replica * threads_per_process) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) @@ -119,8 +119,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput model: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) + processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) + threads_per_process: # the number of threads per process (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/onnx-predictor-gpu or cortexlabs/onnx-predictor-cpu based on compute) @@ -138,7 +138,7 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) + target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: processes_per_replica * threads_per_process) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) diff --git a/docs/deployments/autoscaling.md b/docs/deployments/autoscaling.md index 265d90f849..13d5798173 100644 --- a/docs/deployments/autoscaling.md +++ b/docs/deployments/autoscaling.md @@ -10,7 +10,7 @@ Cortex autoscales your web services on a per-API basis based on your configurati * `max_replicas`: The upper bound on how many replicas can be running for an API. -* `target_replica_concurrency` (default: `workers_per_replica` * `threads_per_worker`): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. +* `target_replica_concurrency` (default: `processes_per_replica` * `threads_per_process`): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. Replica concurrency is simply how many requests have been sent to a replica and have not yet been responded to (also referred to as in-flight requests). Therefore, it includes requests which are currently being processed and requests which are waiting in the replica's queue. @@ -18,11 +18,11 @@ Cortex autoscales your web services on a per-API basis based on your configurati `desired replicas = sum(in-flight requests accross all replicas) / target_replica_concurrency` - For example, setting `target_replica_concurrency` to `workers_per_replica` * `threads_per_worker` (the default) causes the cluster to adjust the number of replicas so that on average, requests are immediately processed without waiting in a queue, and workers/threads are never idle. + For example, setting `target_replica_concurrency` to `processes_per_replica` * `threads_per_process` (the default) causes the cluster to adjust the number of replicas so that on average, requests are immediately processed without waiting in a queue, and processes/threads are never idle. -* `max_replica_concurrency` (default: 1024): This is the maximum number of in-flight requests per replica before requests are rejected with HTTP error code 503. `max_replica_concurrency` includes requests that are currently being processed as well as requests that are waiting in the replica's queue (a replica can actively process `workers_per_replica` * `threads_per_worker` requests concurrently, and will hold any additional requests in a local queue). Decreasing `max_replica_concurrency` and configuring the client to retry when it receives 503 responses will improve queue fairness by preventing requests from sitting in long queues. +* `max_replica_concurrency` (default: 1024): This is the maximum number of in-flight requests per replica before requests are rejected with HTTP error code 503. `max_replica_concurrency` includes requests that are currently being processed as well as requests that are waiting in the replica's queue (a replica can actively process `processes_per_replica` * `threads_per_process` requests concurrently, and will hold any additional requests in a local queue). Decreasing `max_replica_concurrency` and configuring the client to retry when it receives 503 responses will improve queue fairness by preventing requests from sitting in long queues. - *Note (if `workers_per_replica` > 1): In reality, there is a queue per worker; for most purposes thinking of it as a per-replica queue will be sufficient, although in some cases the distinction is relevant. Because requests are randomly assigned to workers within a replica (which leads to unbalanced worker queues), clients may receive 503 responses before reaching `max_replica_concurrency`. For example, if you set `workers_per_replica: 2` and `max_replica_concurrency: 100`, each worker will be allowed to handle 50 requests concurrently. If your replica receives 90 requests that take the same amount of time to process, there is a 24.6% possibility that more than 50 requests are routed to 1 worker, and each request that is routed to that worker above 50 is responded to with a 503. To address this, it is recommended to implement client retries for 503 errors, or to increase `max_replica_concurrency` to minimize the probability of getting 503 responses.* + *Note (if `processes_per_replica` > 1): In reality, there is a queue per process; for most purposes thinking of it as a per-replica queue will be sufficient, although in some cases the distinction is relevant. Because requests are randomly assigned to processes within a replica (which leads to unbalanced process queues), clients may receive 503 responses before reaching `max_replica_concurrency`. For example, if you set `processes_per_replica: 2` and `max_replica_concurrency: 100`, each process will be allowed to handle 50 requests concurrently. If your replica receives 90 requests that take the same amount of time to process, there is a 24.6% possibility that more than 50 requests are routed to 1 process, and each request that is routed to that process above 50 is responded to with a 503. To address this, it is recommended to implement client retries for 503 errors, or to increase `max_replica_concurrency` to minimize the probability of getting 503 responses.* * `window` (default: 60s): The time over which to average the API wide in-flight requests (which is the sum of in-flight requests in each replica). The longer the window, the slower the autoscaler will react to changes in API wide in-flight requests, since it is averaged over the `window`. API wide in-flight requests is calculated every 10 seconds, so `window` must be a multiple of 10 seconds. diff --git a/docs/deployments/gpus.md b/docs/deployments/gpus.md index 649407152b..09df57eafe 100644 --- a/docs/deployments/gpus.md +++ b/docs/deployments/gpus.md @@ -11,9 +11,9 @@ To use GPUs: ## Tips -### If using `workers_per_replica` > 1, TensorFlow-based models, and Python Predictor +### If using `processes_per_replica` > 1, TensorFlow-based models, and Python Predictor -When using `workers_per_replica` > 1 with TensorFlow-based models (including Keras) in the Python Predictor, loading the model in separate processes at the same time will throw a `CUDA_ERROR_OUT_OF_MEMORY: out of memory` error. This is because the first process that loads the model will allocate all of the GPU's memory and leave none to other processes. To prevent this from happening, the per-process GPU memory usage can be limited. There are two methods: +When using `processes_per_replica` > 1 with TensorFlow-based models (including Keras) in the Python Predictor, loading the model in separate processes at the same time will throw a `CUDA_ERROR_OUT_OF_MEMORY: out of memory` error. This is because the first process that loads the model will allocate all of the GPU's memory and leave none to other processes. To prevent this from happening, the per-process GPU memory usage can be limited. There are two methods: 1\) Configure the model to allocate only as much memory as it requires, via [tf.config.experimental.set_memory_growth()](https://www.tensorflow.org/api_docs/python/tf/config/experimental/set_memory_growth): diff --git a/docs/deployments/inferentia.md b/docs/deployments/inferentia.md index b77932bbb0..152f37244e 100644 --- a/docs/deployments/inferentia.md +++ b/docs/deployments/inferentia.md @@ -22,9 +22,9 @@ Each Inferentia ASIC comes with 4 NeuronCores and 8GB of cache memory. To better A [NeuronCore Group](https://github.com/aws/aws-neuron-sdk/blob/master/docs/tensorflow-neuron/tutorial-NeuronCore-Group.md) (NCG) is a set of NeuronCores that is used to load and run a compiled model. NCGs exist to aggregate NeuronCores to improve hardware performance. Models can be shared within an NCG, but this would require the device driver to dynamically context switch between each model, which degrades performance. Therefore we've decided to only allow one model per NCG (unless you are using a [multi-model endpoint](../guides/multi-model.md), in which case there will be multiple models on a single NCG, and there will be context switching). -Each Cortex API worker will have its own copy of the model and will run on its own NCG (the number of API workers is configured by the [`workers_per_replica`](autoscaling.md#replica-parallelism) field in the API configuration). Each NCG will have an equal share of NeuronCores. Therefore, the size of each NCG will be `4 * inf / workers_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). +Each Cortex API process will have its own copy of the model and will run on its own NCG (the number of API processes is configured by the [`processes_per_replica`](autoscaling.md#replica-parallelism) field in the API configuration). Each NCG will have an equal share of NeuronCores. Therefore, the size of each NCG will be `4 * inf / processes_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). -For example, if your API requests 2 `inf` chips, there will be 8 NeuronCores available. If you set `workers_per_replica` to 1, there will be one copy of your model running on a single NCG of size 8 NeuronCores. If `workers_per_replica` is 2, there will be two copies of your model, each running on a separate NCG of size 4 NeuronCores. If `workers_per_replica` is 4, there will be 4 NCGs of size 2 NeuronCores, and if If `workers_per_replica` is 8, there will be 8 NCGs of size 1 NeuronCores. In this scenario, these are the only valid values for `workers_per_replica`. In other words the total number of requested NeuronCores (which equals 4 * the number of requested Inferentia chips) must be divisible by `workers_per_replica`. +For example, if your API requests 2 `inf` chips, there will be 8 NeuronCores available. If you set `processes_per_replica` to 1, there will be one copy of your model running on a single NCG of size 8 NeuronCores. If `processes_per_replica` is 2, there will be two copies of your model, each running on a separate NCG of size 4 NeuronCores. If `processes_per_replica` is 4, there will be 4 NCGs of size 2 NeuronCores, and if If `processes_per_replica` is 8, there will be 8 NCGs of size 1 NeuronCores. In this scenario, these are the only valid values for `processes_per_replica`. In other words the total number of requested NeuronCores (which equals 4 * the number of requested Inferentia chips) must be divisible by `processes_per_replica`. The 8GB cache memory is shared between all 4 NeuronCores of an Inferentia chip. Therefore an NCG with 8 NeuronCores (i.e. 2 Inf chips) will have access to 16GB of cache memory. An NGC with 2 NeuronCores will have access to 8GB of cache memory, which will be shared with the other NGC of size 2 running on the same Inferentia chip. @@ -34,7 +34,7 @@ Before a model can be deployed on Inferentia chips, it must be compiled for Infe By default, the Neuron compiler will compile a model to use 1 NeuronCore, but can be manually set to a different size (1, 2, 4, etc). -For optimal performance, your model should be compiled to run on the number of NeuronCores available to it. The number of NeuronCores will be `4 * inf / workers_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). See [NeuronCore Groups](#neuron-core-groups) above for an example, and see [Improving performance](#improving-performance) below for a discussion of choosing the appropriate number of NeuronCores. +For optimal performance, your model should be compiled to run on the number of NeuronCores available to it. The number of NeuronCores will be `4 * inf / processes_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). See [NeuronCore Groups](#neuron-core-groups) above for an example, and see [Improving performance](#improving-performance) below for a discussion of choosing the appropriate number of NeuronCores. Here is an example of compiling a TensorFlow SavedModel for Inferentia: @@ -73,7 +73,7 @@ See AWS's [TensorFlow](https://github.com/aws/aws-neuron-sdk/blob/master/docs/te A few things can be done to improve performance using compiled models on Cortex: -1. There's a minimum number of NeuronCores for which a model can be compiled. That number depends on the model's architecture. Generally, compiling a model for more cores than its required minimum helps to distribute the model's operators across multiple cores, which in turn [can lead to lower latency](https://github.com/aws/aws-neuron-sdk/blob/master/docs/technotes/neuroncore-pipeline.md). However, compiling a model for more NeuronCores means that you'll have to set `workers_per_replica` to be lower so that the NeuronCore Group has access to the number of NeuronCores for which you compiled your model. This is acceptable if latency is your top priority, but if throughput is more important to you, this tradeoff is usually not worth it. To maximize throughput, compile your model for as few NeuronCores as possible and increase `workers_per_replica` to the maximum possible (see above for a sample calculation). +1. There's a minimum number of NeuronCores for which a model can be compiled. That number depends on the model's architecture. Generally, compiling a model for more cores than its required minimum helps to distribute the model's operators across multiple cores, which in turn [can lead to lower latency](https://github.com/aws/aws-neuron-sdk/blob/master/docs/technotes/neuroncore-pipeline.md). However, compiling a model for more NeuronCores means that you'll have to set `processes_per_replica` to be lower so that the NeuronCore Group has access to the number of NeuronCores for which you compiled your model. This is acceptable if latency is your top priority, but if throughput is more important to you, this tradeoff is usually not worth it. To maximize throughput, compile your model for as few NeuronCores as possible and increase `processes_per_replica` to the maximum possible (see above for a sample calculation). 1. Try to achieve a near [100% placement](https://github.com/aws/aws-neuron-sdk/blob/b28262e3072574c514a0d72ad3fe5ca48686d449/src/examples/tensorflow/keras_resnet50/pb2sm_compile.py#L59) of your model's graph onto the NeuronCores. During the compilation phase, any operators that can't execute on NeuronCores will be compiled to execute on the machine's CPU and memory instead. Even if just a few percent of the operations reside on the host's CPU/memory, the maximum throughput of the instance can be significantly limited. diff --git a/docs/deployments/parallelism.md b/docs/deployments/parallelism.md index a5ea4f8968..ae3dc89b3a 100644 --- a/docs/deployments/parallelism.md +++ b/docs/deployments/parallelism.md @@ -4,8 +4,8 @@ _WARNING: you are on the master branch, please refer to the docs on the branch t Replica parallelism can be tweaked with the following fields: -* `workers_per_replica` (default: 1): Each replica runs a web server with `workers_per_replica` workers, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 workers per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `workers_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API. +* `processes_per_replica` (default: 1): Each replica runs a web server with `processes_per_replica` processes, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 processes per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `processes_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API. -* `threads_per_worker` (default: 1): Each worker uses a thread pool of size `threads_per_worker` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference or Inferentia ASIC-based inference, increasing the number of threads per worker can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per worker is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per worker. +* `threads_per_process` (default: 1): Each process uses a thread pool of size `threads_per_process` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference or Inferentia ASIC-based inference, increasing the number of threads per process can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per process is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per process. -`workers_per_replica` * `threads_per_worker` represents the number of requests that your replica can work in parallel. For example, if `workers_per_replica` is 2 and `threads_per_worker` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3. +`processes_per_replica` * `threads_per_process` represents the number of requests that your replica can work in parallel. For example, if `processes_per_replica` is 2 and `threads_per_process` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3. diff --git a/docs/guides/batch-runner.md b/docs/guides/batch-runner.md index ea42a02f13..3625935e63 100644 --- a/docs/guides/batch-runner.md +++ b/docs/guides/batch-runner.md @@ -37,10 +37,10 @@ class PythonPredictor: batches = payload # Increasing max_workers will increase how many replicas will be used for the batch request. - # Assuming default values for target_replica_concurrency, workers_per_replica, and threads_per_worker + # Assuming default values for target_replica_concurrency, processes_per_replica, and threads_per_process # in your prediction API, the number of replicas created will be equal to max_workers. # If you have changed these values, the number of replicas created will be equal to max_workers / target_replica_concurrency - # (note that the default value of target_replica_concurrency is workers_per_replica * threads_per_worker). + # (note that the default value of target_replica_concurrency is processes_per_replica * threads_per_process). # If max_workers starts to get large, you will also want to set the inference API's max_replica_concurrency to avoid long and imbalanced queue lengths # Here are the autoscaling docs: https://docs.cortex.dev/deployments/autoscaling with ThreadPoolExecutor(max_workers=5) as executor: @@ -91,7 +91,7 @@ class PythonPredictor: endpoint: http://***.elb.us-west-2.amazonaws.com/iris-classifier autoscaling: max_replicas: 1 # this API may need to autoscale depending on how many batch requests, but disable it to start - threads_per_worker: 1 # set this to the number of batch requests you'd like to be able to be able to work on at a time + threads_per_process: 1 # set this to the number of batch requests you'd like to be able to be able to work on at a time # once that number is exceeded, they will be queued, which may be ok # setting this too high may lead to out of memory errors compute: diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml index 01e82bb273..8f555599e9 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml @@ -4,8 +4,8 @@ predictor: type: tensorflow path: predictor.py - workers_per_replica: 4 - threads_per_worker: 16 + processes_per_replica: 4 + threads_per_process: 16 model: s3://cortex-examples/tensorflow/resnet50 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml index ae5ae6d817..cb66774697 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml @@ -4,8 +4,8 @@ predictor: type: tensorflow path: predictor.py - workers_per_replica: 4 - threads_per_worker: 24 + processes_per_replica: 4 + threads_per_process: 24 model: s3://cortex-examples/tensorflow/resnet50 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml index 357326ae6a..3a57ad5717 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml @@ -4,8 +4,8 @@ predictor: type: tensorflow path: predictor.py - workers_per_replica: 4 - threads_per_worker: 256 + processes_per_replica: 4 + threads_per_process: 256 model: s3://cortex-examples/tensorflow/resnet50_neuron config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json diff --git a/examples/tensorflow/license-plate-reader/cortex_full.yaml b/examples/tensorflow/license-plate-reader/cortex_full.yaml index 01ca27f471..2f744e83e5 100644 --- a/examples/tensorflow/license-plate-reader/cortex_full.yaml +++ b/examples/tensorflow/license-plate-reader/cortex_full.yaml @@ -4,8 +4,8 @@ predictor: type: tensorflow path: predictor_yolo.py - workers_per_replica: 4 - threads_per_worker: 3 + processes_per_replica: 4 + threads_per_process: 3 model: s3://cortex-examples/tensorflow/license-plate-reader/yolov3_tf signature_key: serving_default config: @@ -22,8 +22,8 @@ predictor: type: python path: predictor_crnn.py - workers_per_replica: 1 - threads_per_worker: 1 + processes_per_replica: 1 + threads_per_process: 1 compute: cpu: 1 gpu: 1 diff --git a/images/tensorflow-serving-inf/run.sh b/images/tensorflow-serving-inf/run.sh index 7765af41df..d5df21f4d1 100644 --- a/images/tensorflow-serving-inf/run.sh +++ b/images/tensorflow-serving-inf/run.sh @@ -16,7 +16,7 @@ set -e -for i in $(seq 1 $TF_WORKERS); do +for i in $(seq 1 $TF_PROCESSES); do echo -e "\n\n" >> /tmp/supervisord.conf worker=$i port=$((CORTEX_TF_BASE_SERVING_PORT+i-1)) envsubst < /tmp/template.conf >> /tmp/supervisord.conf done diff --git a/pkg/operator/operator/k8s_specs.go b/pkg/operator/operator/k8s_specs.go index 6591d8892d..057896987a 100644 --- a/pkg/operator/operator/k8s_specs.go +++ b/pkg/operator/operator/k8s_specs.go @@ -617,21 +617,21 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { }, }, kcore.EnvVar{ - Name: "CORTEX_WORKERS_PER_REPLICA", - Value: s.Int32(api.Predictor.WorkersPerReplica), + Name: "CORTEX_PROCESSES_PER_REPLICA", + Value: s.Int32(api.Predictor.ProcessesPerReplica), }, kcore.EnvVar{ - Name: "CORTEX_THREADS_PER_WORKER", - Value: s.Int32(api.Predictor.ThreadsPerWorker), + Name: "CORTEX_THREADS_PER_PROCESS", + Value: s.Int32(api.Predictor.ThreadsPerProcess), }, kcore.EnvVar{ Name: "CORTEX_MAX_REPLICA_CONCURRENCY", Value: s.Int64(api.Autoscaling.MaxReplicaConcurrency), }, kcore.EnvVar{ - Name: "CORTEX_MAX_WORKER_CONCURRENCY", - // add 1 because it was required to achieve the target concurrency for 1 worker, 1 thread - Value: s.Int64(1 + int64(math.Round(float64(api.Autoscaling.MaxReplicaConcurrency)/float64(api.Predictor.WorkersPerReplica)))), + Name: "CORTEX_MAX_PROCESS_CONCURRENCY", + // add 1 because it was required to achieve the target concurrency for 1 process, 1 thread + Value: s.Int64(1 + int64(math.Round(float64(api.Autoscaling.MaxReplicaConcurrency)/float64(api.Predictor.ProcessesPerReplica)))), }, kcore.EnvVar{ Name: "CORTEX_SO_MAX_CONN", @@ -703,7 +703,7 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { envVars = append(envVars, kcore.EnvVar{ Name: "NEURONCORE_GROUP_SIZES", - Value: s.Int64(api.Compute.Inf * consts.NeuronCoresPerInf / int64(api.Predictor.WorkersPerReplica)), + Value: s.Int64(api.Compute.Inf * consts.NeuronCoresPerInf / int64(api.Predictor.ProcessesPerReplica)), }, kcore.EnvVar{ Name: "NEURON_RTD_ADDRESS", @@ -716,8 +716,8 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { if container == _tfServingContainerName { envVars = append(envVars, kcore.EnvVar{ - Name: "TF_WORKERS", - Value: s.Int32(api.Predictor.WorkersPerReplica), + Name: "TF_PROCESSES", + Value: s.Int32(api.Predictor.ProcessesPerReplica), }, kcore.EnvVar{ Name: "CORTEX_TF_BASE_SERVING_PORT", @@ -760,7 +760,7 @@ func tensorflowServingContainer(api *spec.API, volumeMounts []kcore.VolumeMount, } if api.Compute.Inf > 0 { - numPorts := api.Predictor.WorkersPerReplica + numPorts := api.Predictor.ProcessesPerReplica for i := int32(1); i < numPorts; i++ { ports = append(ports, kcore.ContainerPort{ ContainerPort: _tfBaseServingPortInt32 + i, diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index 935d6dd96f..8288405da9 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -61,7 +61,7 @@ const ( ErrRegistryAccountIDMismatch = "spec.registry_account_id_mismatch" ErrCannotAccessECRWithAnonymousAWSCreds = "spec.cannot_access_ecr_with_anonymous_aws_creds" ErrComputeResourceConflict = "spec.compute_resource_conflict" - ErrInvalidNumberOfInfWorkers = "spec.invalid_number_of_inf_workers" + ErrInvalidNumberOfInfProcesses = "spec.invalid_number_of_inf_processes" ErrInvalidNumberOfInfs = "spec.invalid_number_of_infs" ) @@ -325,11 +325,11 @@ func ErrorComputeResourceConflict(resourceA, resourceB string) error { }) } -func ErrorInvalidNumberOfInfWorkers(workersPerReplica int64, numInf int64, numNeuronCores int64) error { - acceptableWorkers := libmath.FactorsInt64(numNeuronCores) +func ErrorInvalidNumberOfInfProcesses(processesPerReplica int64, numInf int64, numNeuronCores int64) error { + acceptableProcesses := libmath.FactorsInt64(numNeuronCores) return errors.WithStack(&errors.Error{ - Kind: ErrInvalidNumberOfInfWorkers, - Message: fmt.Sprintf("cannot evenly distribute %d Inferentia %s (%d NeuronCores total) over %d worker(s) - acceptable numbers of workers are %s", numInf, s.PluralS("ASIC", numInf), numNeuronCores, workersPerReplica, s.UserStrsOr(acceptableWorkers)), + Kind: ErrInvalidNumberOfInfProcesses, + Message: fmt.Sprintf("cannot evenly distribute %d Inferentia %s (%d NeuronCores total) over %d processes - acceptable numbers of processes are %s", numInf, s.PluralS("ASIC", numInf), numNeuronCores, processesPerReplica, s.UserStrsOr(acceptableProcesses)), }) } diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index c42a35f207..700998c972 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -134,7 +134,7 @@ func predictorValidation() *cr.StructFieldValidation { }, }, { - StructField: "WorkersPerReplica", + StructField: "ProcessesPerReplica", Int32Validation: &cr.Int32Validation{ Default: 1, GreaterThanOrEqualTo: pointer.Int32(1), @@ -142,7 +142,7 @@ func predictorValidation() *cr.StructFieldValidation { }, }, { - StructField: "ThreadsPerWorker", + StructField: "ThreadsPerProcess", Int32Validation: &cr.Int32Validation{ Default: 1, GreaterThanOrEqualTo: pointer.Int32(1), @@ -945,7 +945,7 @@ func validateAutoscaling(api *userconfig.API) error { predictor := api.Predictor if autoscaling.TargetReplicaConcurrency == nil { - autoscaling.TargetReplicaConcurrency = pointer.Float64(float64(predictor.WorkersPerReplica * predictor.ThreadsPerWorker)) + autoscaling.TargetReplicaConcurrency = pointer.Float64(float64(predictor.ProcessesPerReplica * predictor.ThreadsPerProcess)) } if *autoscaling.TargetReplicaConcurrency > float64(autoscaling.MaxReplicaConcurrency) { @@ -966,9 +966,9 @@ func validateAutoscaling(api *userconfig.API) error { if api.Compute.Inf > 0 { numNeuronCores := api.Compute.Inf * consts.NeuronCoresPerInf - workersPerReplica := int64(predictor.WorkersPerReplica) - if !libmath.IsDivisibleByInt64(numNeuronCores, workersPerReplica) { - return ErrorInvalidNumberOfInfWorkers(workersPerReplica, api.Compute.Inf, numNeuronCores) + processesPerReplica := int64(predictor.ProcessesPerReplica) + if !libmath.IsDivisibleByInt64(numNeuronCores, processesPerReplica) { + return ErrorInvalidNumberOfInfProcesses(processesPerReplica, api.Compute.Inf, numNeuronCores) } } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index fd5fd8603d..8a188e8f61 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -52,8 +52,8 @@ type Predictor struct { PythonPath *string `json:"python_path" yaml:"python_path"` Image string `json:"image" yaml:"image"` TensorFlowServingImage string `json:"tensorflow_serving_image" yaml:"tensorflow_serving_image"` - WorkersPerReplica int32 `json:"workers_per_replica" yaml:"workers_per_replica"` - ThreadsPerWorker int32 `json:"threads_per_worker" yaml:"threads_per_worker"` + ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"` + ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"` Config map[string]interface{} `json:"config" yaml:"config"` Env map[string]string `json:"env" yaml:"env"` SignatureKey *string `json:"signature_key" yaml:"signature_key"` @@ -177,8 +177,8 @@ func (api *API) ToK8sAnnotations() map[string]string { APIGatewayAnnotationKey: api.Networking.APIGateway.String(), MinReplicasAnnotationKey: s.Int32(api.Autoscaling.MinReplicas), MaxReplicasAnnotationKey: s.Int32(api.Autoscaling.MaxReplicas), - WorkersPerReplicaAnnotationKey: s.Int32(api.Predictor.WorkersPerReplica), - ThreadsPerWorkerAnnotationKey: s.Int32(api.Predictor.ThreadsPerWorker), + ProcessesPerReplicaAnnotationKey: s.Int32(api.Predictor.ProcessesPerReplica), + ThreadsPerProcessAnnotationKey: s.Int32(api.Predictor.ThreadsPerProcess), TargetReplicaConcurrencyAnnotationKey: s.Float64(*api.Autoscaling.TargetReplicaConcurrency), MaxReplicaConcurrencyAnnotationKey: s.Int64(api.Autoscaling.MaxReplicaConcurrency), WindowAnnotationKey: api.Autoscaling.Window.String(), @@ -331,8 +331,8 @@ func (predictor *Predictor) UserStr() string { if predictor.SignatureKey != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", SignatureKeyKey, *predictor.SignatureKey)) } - sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(predictor.WorkersPerReplica))) - sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(predictor.ThreadsPerWorker))) + sb.WriteString(fmt.Sprintf("%s: %s\n", ProcessesPerReplicaKey, s.Int32(predictor.ProcessesPerReplica))) + sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerProcessKey, s.Int32(predictor.ThreadsPerProcess))) if len(predictor.Config) > 0 { sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) d, _ := yaml.Marshal(&predictor.Config) diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index ef325987a2..d4be650f4f 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -31,8 +31,8 @@ const ( // Predictor TypeKey = "type" PathKey = "path" - WorkersPerReplicaKey = "workers_per_replica" - ThreadsPerWorkerKey = "threads_per_worker" + ProcessesPerReplicaKey = "processes_per_replica" + ThreadsPerProcessKey = "threads_per_process" ModelKey = "model" ModelsKey = "models" PythonPathKey = "python_path" @@ -80,8 +80,8 @@ const ( APIGatewayAnnotationKey = "networking.cortex.dev/api-gateway" MinReplicasAnnotationKey = "autoscaling.cortex.dev/min-replicas" MaxReplicasAnnotationKey = "autoscaling.cortex.dev/max-replicas" - WorkersPerReplicaAnnotationKey = "autoscaling.cortex.dev/workers-per-replica" - ThreadsPerWorkerAnnotationKey = "autoscaling.cortex.dev/threads-per-worker" + ProcessesPerReplicaAnnotationKey = "autoscaling.cortex.dev/processes-per-replica" + ThreadsPerProcessAnnotationKey = "autoscaling.cortex.dev/threads-per-process" TargetReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/target-replica-concurrency" MaxReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-replica-concurrency" WindowAnnotationKey = "autoscaling.cortex.dev/window" diff --git a/pkg/workloads/cortex/serve/serve.py b/pkg/workloads/cortex/serve/serve.py index b0c65ed251..8af83e1592 100644 --- a/pkg/workloads/cortex/serve/serve.py +++ b/pkg/workloads/cortex/serve/serve.py @@ -53,7 +53,7 @@ loop = asyncio.get_event_loop() loop.set_default_executor( - ThreadPoolExecutor(max_workers=int(os.environ["CORTEX_THREADS_PER_WORKER"])) + ThreadPoolExecutor(max_workers=int(os.environ["CORTEX_THREADS_PER_PROCESS"])) ) app = FastAPI() diff --git a/pkg/workloads/cortex/serve/start_uvicorn.py b/pkg/workloads/cortex/serve/start_uvicorn.py index 260e461391..200b9a2b39 100644 --- a/pkg/workloads/cortex/serve/start_uvicorn.py +++ b/pkg/workloads/cortex/serve/start_uvicorn.py @@ -34,15 +34,15 @@ def load_tensorflow_serving_models(): from cortex.lib.server.tensorflow import TensorFlowServing - # determine if multiple TF workers are required - num_workers = 1 + # determine if multiple TF processes are required + num_processes = 1 has_multiple_servers = os.getenv("CORTEX_MULTIPLE_TF_SERVERS") if has_multiple_servers: - num_workers = int(os.environ["CORTEX_WORKERS_PER_REPLICA"]) + num_processes = int(os.environ["CORTEX_PROCESSES_PER_REPLICA"]) - # initialize models for each TF worker + # initialize models for each TF process base_paths = [os.path.join(model_dir, name) for name in models] - for w in range(int(num_workers)): + for w in range(int(num_processes)): tfs = TensorFlowServing(f"{tf_serving_host}:{tf_base_serving_port+w}") tfs.add_models_config(models, base_paths, replace_models=False) @@ -60,9 +60,9 @@ def main(): has_multiple_servers = os.getenv("CORTEX_MULTIPLE_TF_SERVERS") if has_multiple_servers: base_serving_port = int(os.environ["CORTEX_TF_BASE_SERVING_PORT"]) - num_workers = int(os.environ["CORTEX_WORKERS_PER_REPLICA"]) + num_processes = int(os.environ["CORTEX_PROCESSES_PER_REPLICA"]) used_ports = {} - for w in range(int(num_workers)): + for w in range(int(num_processes)): used_ports[str(base_serving_port + w)] = False with open("/run/used_ports.json", "w+") as f: json.dump(used_ports, f) @@ -86,8 +86,8 @@ def main(): "cortex.serve.wsgi:app", host="0.0.0.0", port=int(os.environ["CORTEX_SERVING_PORT"]), - workers=int(os.environ["CORTEX_WORKERS_PER_REPLICA"]), - limit_concurrency=int(os.environ["CORTEX_MAX_WORKER_CONCURRENCY"]), + workers=int(os.environ["CORTEX_PROCESSES_PER_REPLICA"]), + limit_concurrency=int(os.environ["CORTEX_MAX_PROCESS_CONCURRENCY"]), backlog=int(os.environ["CORTEX_SO_MAX_CONN"]), log_config=log_config, log_level="info", From cef3692b7080814f7c6a74da34e424dd1597d997 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Tue, 23 Jun 2020 13:55:59 -0700 Subject: [PATCH 08/14] Update license-plate-reader README.md --- examples/tensorflow/license-plate-reader/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/tensorflow/license-plate-reader/README.md b/examples/tensorflow/license-plate-reader/README.md index 294d8ea873..d7ff404a65 100644 --- a/examples/tensorflow/license-plate-reader/README.md +++ b/examples/tensorflow/license-plate-reader/README.md @@ -63,7 +63,7 @@ For another prediction, let's use a generic image from the web. Export [this ima ## Deployment - Full Version -The recommended number of instances to run this smoothly on a video stream is about 12 GPU instances (2 GPU instances for *YOLOv3* and 10 for *CRNN* + *CRAFT*). `cortex_full.yaml` is already set up to use these 12 instances. Note: this is the optimal number of instances when using the `g4dn.xlarge` instance type. For the client to work smoothly, the number of workers per replica can be adjusted, especially for `p3` or `g4` instances, where the GPU has a lot of compute capacity. +The recommended number of instances to run this smoothly on a video stream is about 12 GPU instances (2 GPU instances for *YOLOv3* and 10 for *CRNN* + *CRAFT*). `cortex_full.yaml` is already set up to use these 12 instances. Note: this is the optimal number of instances when using the `g4dn.xlarge` instance type. For the client to work smoothly, the number of processes per replica can be adjusted, especially for `p3` or `g4` instances, where the GPU has a lot of compute capacity. If you don't have access to this many GPU-equipped instances, you could just lower the number and expect dropped frames. It will still prove the point, albeit at a much lower framerate and with higher latency. More on that [here](https://github.com/RobertLucian/cortex-license-plate-reader-client). From 88280b76c8e31faf4e8f285a9cb8517b306fc00d Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 24 Jun 2020 00:18:43 +0300 Subject: [PATCH 09/14] Split concurrency over multiple procs for local --- cli/local/docker_spec.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cli/local/docker_spec.go b/cli/local/docker_spec.go index e2d5fb60e1..acb64bdd3c 100644 --- a/cli/local/docker_spec.go +++ b/cli/local/docker_spec.go @@ -19,6 +19,7 @@ package local import ( "context" "fmt" + "math" "path/filepath" "strings" @@ -90,7 +91,8 @@ func getAPIEnv(api *spec.API, awsClient *aws.Client) []string { "CORTEX_PROJECT_DIR="+_projectDir, "CORTEX_PROCESSES_PER_REPLICA="+s.Int32(api.Predictor.ProcessesPerReplica), "CORTEX_THREADS_PER_PROCESS="+s.Int32(api.Predictor.ThreadsPerProcess), - "CORTEX_MAX_PROCESS_CONCURRENCY=1000", + // add 1 because it was required to achieve the target concurrency for 1 process, 1 thread + "CORTEX_MAX_PROCESS_CONCURRENCY="+s.Int64(1+int64(math.Round(1000/float64(api.Predictor.ProcessesPerReplica)))), "CORTEX_SO_MAX_CONN=1000", "AWS_REGION="+awsClient.Region, ) From b6da31264c7199b20c998855aef48996c0690a5a Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 24 Jun 2020 01:00:46 +0300 Subject: [PATCH 10/14] Set default max concurrency through a const --- cli/local/docker_spec.go | 2 +- pkg/consts/consts.go | 1 + pkg/types/spec/validations.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cli/local/docker_spec.go b/cli/local/docker_spec.go index acb64bdd3c..0691781db9 100644 --- a/cli/local/docker_spec.go +++ b/cli/local/docker_spec.go @@ -92,7 +92,7 @@ func getAPIEnv(api *spec.API, awsClient *aws.Client) []string { "CORTEX_PROCESSES_PER_REPLICA="+s.Int32(api.Predictor.ProcessesPerReplica), "CORTEX_THREADS_PER_PROCESS="+s.Int32(api.Predictor.ThreadsPerProcess), // add 1 because it was required to achieve the target concurrency for 1 process, 1 thread - "CORTEX_MAX_PROCESS_CONCURRENCY="+s.Int64(1+int64(math.Round(1000/float64(api.Predictor.ProcessesPerReplica)))), + "CORTEX_MAX_PROCESS_CONCURRENCY="+s.Int64(1+int64(math.Round(float64(consts.DefaultMaxReplicaConcurrency)/float64(api.Predictor.ProcessesPerReplica)))), "CORTEX_SO_MAX_CONN=1000", "AWS_REGION="+awsClient.Region, ) diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 0a39b6cd61..5f641e78ae 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -52,6 +52,7 @@ var ( MaxClassesPerMonitoringRequest = 20 // cloudwatch.GeMetricData can get up to 100 metrics per request, avoid multiple requests and have room for other stats DashboardTitle = "# cortex monitoring dashboard" + DefaultMaxReplicaConcurrency = int64(1024) NeuronCoresPerInf = int64(4) ) diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 452eea3d39..4588fa9d91 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -310,7 +310,7 @@ func autoscalingValidation(provider types.ProviderType) *cr.StructFieldValidatio { StructField: "MaxReplicaConcurrency", Int64Validation: &cr.Int64Validation{ - Default: 1024, + Default: consts.DefaultMaxReplicaConcurrency, GreaterThan: pointer.Int64(0), LessThanOrEqualTo: pointer.Int64(math.MaxUint16), }, From a4613109c48c77c6acf8c017a35bc090f1729e9b Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Tue, 23 Jun 2020 15:35:33 -0700 Subject: [PATCH 11/14] Update annotations --- pkg/types/userconfig/api.go | 4 ++-- pkg/types/userconfig/config_key.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index f29ead74d2..4eb8566b69 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -176,10 +176,10 @@ func (api *API) ToK8sAnnotations() map[string]string { return map[string]string{ EndpointAnnotationKey: *api.Networking.Endpoint, APIGatewayAnnotationKey: api.Networking.APIGateway.String(), - MinReplicasAnnotationKey: s.Int32(api.Autoscaling.MinReplicas), - MaxReplicasAnnotationKey: s.Int32(api.Autoscaling.MaxReplicas), ProcessesPerReplicaAnnotationKey: s.Int32(api.Predictor.ProcessesPerReplica), ThreadsPerProcessAnnotationKey: s.Int32(api.Predictor.ThreadsPerProcess), + MinReplicasAnnotationKey: s.Int32(api.Autoscaling.MinReplicas), + MaxReplicasAnnotationKey: s.Int32(api.Autoscaling.MaxReplicas), TargetReplicaConcurrencyAnnotationKey: s.Float64(*api.Autoscaling.TargetReplicaConcurrency), MaxReplicaConcurrencyAnnotationKey: s.Int64(api.Autoscaling.MaxReplicaConcurrency), WindowAnnotationKey: api.Autoscaling.Window.String(), diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 37f2c813c1..ebfd0a27d7 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -79,10 +79,10 @@ const ( // K8s annotation EndpointAnnotationKey = "networking.cortex.dev/endpoint" APIGatewayAnnotationKey = "networking.cortex.dev/api-gateway" + ProcessesPerReplicaAnnotationKey = "predictor.cortex.dev/processes-per-replica" + ThreadsPerProcessAnnotationKey = "predictor.cortex.dev/threads-per-process" MinReplicasAnnotationKey = "autoscaling.cortex.dev/min-replicas" MaxReplicasAnnotationKey = "autoscaling.cortex.dev/max-replicas" - ProcessesPerReplicaAnnotationKey = "autoscaling.cortex.dev/processes-per-replica" - ThreadsPerProcessAnnotationKey = "autoscaling.cortex.dev/threads-per-process" TargetReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/target-replica-concurrency" MaxReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-replica-concurrency" WindowAnnotationKey = "autoscaling.cortex.dev/window" From bc7c7b42097f0c0e3d9ac23bff4cd8e69d8c3296 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Tue, 23 Jun 2020 15:41:45 -0700 Subject: [PATCH 12/14] Update parallelism.md --- docs/deployments/parallelism.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/deployments/parallelism.md b/docs/deployments/parallelism.md index ae3dc89b3a..9e6ff89ed0 100644 --- a/docs/deployments/parallelism.md +++ b/docs/deployments/parallelism.md @@ -2,10 +2,10 @@ _WARNING: you are on the master branch, please refer to the docs on the branch that matches your `cortex version`_ -Replica parallelism can be tweaked with the following fields: +Replica parallelism can be configured with the following fields in the `predictor` configuration: -* `processes_per_replica` (default: 1): Each replica runs a web server with `processes_per_replica` processes, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 processes per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `processes_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API. +* `processes_per_replica` (default: 1): Each replica runs a web server with `processes_per_replica` processes. For APIs running with multiple CPUs per replica, using 1-3 processes per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `processes_per_replica` is reasonable. The optimal number will vary based on the workload's characteristics and the CPU compute request for the API. -* `threads_per_process` (default: 1): Each process uses a thread pool of size `threads_per_process` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference or Inferentia ASIC-based inference, increasing the number of threads per process can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per process is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per process. +* `threads_per_process` (default: 1): Each process uses a thread pool of size `threads_per_process` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference, or Inferentia-based inference, increasing the number of threads per process can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per process is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per process. -`processes_per_replica` * `threads_per_process` represents the number of requests that your replica can work in parallel. For example, if `processes_per_replica` is 2 and `threads_per_process` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3. +`processes_per_replica` * `threads_per_process` represents the total number of requests that your replica can work on concurrently. For example, if `processes_per_replica` is 2 and `threads_per_process` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed and 1 would be waiting for a thread to become available. If the replica was hit with 3 concurrent requests, all three would begin processing immediately. From 4d31d773223a484f9c5417196c436a70406cebb6 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Tue, 23 Jun 2020 16:05:16 -0700 Subject: [PATCH 13/14] Update api-configuration.md --- docs/deployments/api-configuration.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/deployments/api-configuration.md b/docs/deployments/api-configuration.md index 6e264be440..0195710211 100644 --- a/docs/deployments/api-configuration.md +++ b/docs/deployments/api-configuration.md @@ -49,7 +49,7 @@ Reference the section below which corresponds to your Predictor type: [Python](# max_unavailable: # maximum number of replicas that can be unavailable during an update; can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) ``` -See additional documentation for [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). +See additional documentation for [parallelism](parallelism.md), [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). ## TensorFlow Predictor @@ -102,7 +102,7 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput max_unavailable: # maximum number of replicas that can be unavailable during an update; can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) ``` -See additional documentation for [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). +See additional documentation for [parallelism](parallelism.md), [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). ## ONNX Predictor @@ -152,4 +152,4 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput max_unavailable: # maximum number of replicas that can be unavailable during an update; can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) ``` -See additional documentation for [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). +See additional documentation for [parallelism](parallelism.md), [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). From f0272c96df0a28eefecc8e8e5ac9ca520f81c1a9 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 24 Jun 2020 02:40:02 +0300 Subject: [PATCH 14/14] Use "process" more to make it more consistent --- .../tensorflow/image-classifier-resnet50/README.md | 10 +++++----- .../image-classifier-resnet50/throughput_test.py | 14 +++++++------- images/tensorflow-serving-inf/run.sh | 2 +- images/tensorflow-serving-inf/template.conf | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/examples/tensorflow/image-classifier-resnet50/README.md b/examples/tensorflow/image-classifier-resnet50/README.md index 80e8adb26f..ecefb266bf 100644 --- a/examples/tensorflow/image-classifier-resnet50/README.md +++ b/examples/tensorflow/image-classifier-resnet50/README.md @@ -47,8 +47,8 @@ Usage: throughput_test.py [OPTIONS] IMG_URL ENDPOINT with CPU, GPU or Inferentia devices. Options: - -w, --workers INTEGER Number of workers for prediction requests. [default: 1] - -t, --threads INTEGER Number of threads per worker for prediction requests. [default: 1] + -w, --processes INTEGER Number of processes for prediction requests. [default: 1] + -t, --threads INTEGER Number of threads per process for prediction requests. [default: 1] -s, --samples INTEGER Number of samples to run per thread. [default: 10] -i, --time-based FLOAT How long the thread making predictions will run for in seconds. If set, -s option will be ignored. @@ -71,9 +71,9 @@ export IMG_URL=https://i.imgur.com/213xcvs.jpg # this is the cat image shown in Then, deploy each API one at a time and check the results: -1. Running `python throughput_test.py -i 30 -w 4 -t 48` with the [cortex_inf.yaml](cortex_inf.yaml) API running on an `inf1.2xlarge` instance will get **~510 inferences/sec** with an average latency of **80 ms**. -1. Running `python throughput_test.py -i 30 -w 4 -t 2` with the [cortex_cpu.yaml](cortex_cpu.yaml) API running on an `c5.xlarge` instance will get **~16.2 inferences/sec** with an average latency of **200 ms**. -1. Running `python throughput_test.py -i 30 -w 4 -t 24` with the [cortex_gpu.yaml](cortex_gpu.yaml) API running on an `g4dn.xlarge` instance will get **~125 inferences/sec** with an average latency of **85 ms**. Optimizing the model with TensorRT to use FP16 on TF-serving only seems to achieve a 10% performance improvement - one thing to consider is that the TensorRT engines hadn't been built beforehand, so this might have affected the results negatively. +1. Running `python throughput_test.py -i 30 -p 4 -t 48` with the [cortex_inf.yaml](cortex_inf.yaml) API running on an `inf1.2xlarge` instance will get **~510 inferences/sec** with an average latency of **80 ms**. +1. Running `python throughput_test.py -i 30 -p 4 -t 2` with the [cortex_cpu.yaml](cortex_cpu.yaml) API running on an `c5.xlarge` instance will get **~16.2 inferences/sec** with an average latency of **200 ms**. +1. Running `python throughput_test.py -i 30 -p 4 -t 24` with the [cortex_gpu.yaml](cortex_gpu.yaml) API running on an `g4dn.xlarge` instance will get **~125 inferences/sec** with an average latency of **85 ms**. Optimizing the model with TensorRT to use FP16 on TF-serving only seems to achieve a 10% performance improvement - one thing to consider is that the TensorRT engines hadn't been built beforehand, so this might have affected the results negatively. *Note: `inf1.xlarge` isn't used because the major bottleneck with `inf` instances for this example is with the CPU, and `inf1.2xlarge` has twice the amount of CPU cores for same number of Inferentia ASICs (which is 1), which translates to almost double the throughput.* diff --git a/examples/tensorflow/image-classifier-resnet50/throughput_test.py b/examples/tensorflow/image-classifier-resnet50/throughput_test.py index 47f9b11def..b4a1728f41 100644 --- a/examples/tensorflow/image-classifier-resnet50/throughput_test.py +++ b/examples/tensorflow/image-classifier-resnet50/throughput_test.py @@ -22,12 +22,12 @@ @click.argument("img_url", type=str, envvar="IMG_URL") @click.argument("endpoint", type=str, envvar="ENDPOINT") @click.option( - "--workers", - "-w", + "--processes", + "-p", type=int, default=1, show_default=True, - help="Number of workers for prediction requests.", + help="Number of processes for prediction requests.", ) @click.option( "--threads", @@ -35,7 +35,7 @@ type=int, default=1, show_default=True, - help="Number of threads per worker for prediction requests.", + help="Number of threads per process for prediction requests.", ) @click.option( "--samples", @@ -60,7 +60,7 @@ show_default=True, help="Number of images sent for inference in one request.", ) -def main(img_url, endpoint, workers, threads, samples, time_based, batch_size): +def main(img_url, endpoint, processes, threads, samples, time_based, batch_size): # get the image in bytes representation image = get_url_image(img_url) image_bytes = image_to_jpeg_bytes(image) @@ -73,9 +73,9 @@ def main(img_url, endpoint, workers, threads, samples, time_based, batch_size): print("Starting the inference throughput test...") results = [] start = time.time() - with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=processes) as executor: results = executor_submitter( - executor, workers, process_worker, threads, data, endpoint, samples, time_based + executor, processes, process_worker, threads, data, endpoint, samples, time_based ) end = time.time() elapsed = end - start diff --git a/images/tensorflow-serving-inf/run.sh b/images/tensorflow-serving-inf/run.sh index d5df21f4d1..0194eb305f 100644 --- a/images/tensorflow-serving-inf/run.sh +++ b/images/tensorflow-serving-inf/run.sh @@ -18,7 +18,7 @@ set -e for i in $(seq 1 $TF_PROCESSES); do echo -e "\n\n" >> /tmp/supervisord.conf - worker=$i port=$((CORTEX_TF_BASE_SERVING_PORT+i-1)) envsubst < /tmp/template.conf >> /tmp/supervisord.conf + process=$i port=$((CORTEX_TF_BASE_SERVING_PORT+i-1)) envsubst < /tmp/template.conf >> /tmp/supervisord.conf done mv /tmp/supervisord.conf /etc/supervisor/conf.d/supervisord.conf diff --git a/images/tensorflow-serving-inf/template.conf b/images/tensorflow-serving-inf/template.conf index 28df8485fc..d841f1651d 100644 --- a/images/tensorflow-serving-inf/template.conf +++ b/images/tensorflow-serving-inf/template.conf @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -[program:tensorflow-$worker] +[program:tensorflow-$process] command=tensorflow_model_server_neuron --port=$port --model_config_file=$TF_EMPTY_MODEL_CONFIG stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0