diff --git a/cli/local/docker_spec.go b/cli/local/docker_spec.go index 76c6ca742b..0691781db9 100644 --- a/cli/local/docker_spec.go +++ b/cli/local/docker_spec.go @@ -19,6 +19,7 @@ package local import ( "context" "fmt" + "math" "path/filepath" "strings" @@ -88,9 +89,10 @@ func getAPIEnv(api *spec.API, awsClient *aws.Client) []string { "CORTEX_MODELS="+strings.Join(api.ModelNames(), ","), "CORTEX_API_SPEC="+filepath.Join("/mnt/workspace", filepath.Base(api.Key)), "CORTEX_PROJECT_DIR="+_projectDir, - "CORTEX_WORKERS_PER_REPLICA=1", - "CORTEX_THREADS_PER_WORKER=1", - "CORTEX_MAX_WORKER_CONCURRENCY=1000", + "CORTEX_PROCESSES_PER_REPLICA="+s.Int32(api.Predictor.ProcessesPerReplica), + "CORTEX_THREADS_PER_PROCESS="+s.Int32(api.Predictor.ThreadsPerProcess), + // add 1 because it was required to achieve the target concurrency for 1 process, 1 thread + "CORTEX_MAX_PROCESS_CONCURRENCY="+s.Int64(1+int64(math.Round(float64(consts.DefaultMaxReplicaConcurrency)/float64(api.Predictor.ProcessesPerReplica)))), "CORTEX_SO_MAX_CONN=1000", "AWS_REGION="+awsClient.Region, ) diff --git a/docs/deployments/api-configuration.md b/docs/deployments/api-configuration.md index c75b09509f..0195710211 100644 --- a/docs/deployments/api-configuration.md +++ b/docs/deployments/api-configuration.md @@ -13,6 +13,8 @@ Reference the section below which corresponds to your Predictor type: [Python](# predictor: type: python path: # path to a python file with a PythonPredictor class definition, relative to the Cortex root (required) + processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) + threads_per_process: # the number of threads per process (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/python-predictor-cpu or cortexlabs/python-predictor-gpu based on compute) @@ -33,9 +35,7 @@ Reference the section below which corresponds to your Predictor type: [Python](# min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) - target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) + target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: processes_per_replica * threads_per_process) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) @@ -49,7 +49,7 @@ Reference the section below which corresponds to your Predictor type: [Python](# max_unavailable: # maximum number of replicas that can be unavailable during an update; can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) ``` -See additional documentation for [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). +See additional documentation for [parallelism](parallelism.md), [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). ## TensorFlow Predictor @@ -65,6 +65,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput model_path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... + processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) + threads_per_process: # the number of threads per process (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/tensorflow-predictor) @@ -86,9 +88,7 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) - target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) + target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: processes_per_replica * threads_per_process) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) @@ -102,7 +102,7 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput max_unavailable: # maximum number of replicas that can be unavailable during an update; can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) ``` -See additional documentation for [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). +See additional documentation for [parallelism](parallelism.md), [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). ## ONNX Predictor @@ -117,6 +117,8 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput model_path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... + processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) + threads_per_process: # the number of threads per process (default: 1) config: # arbitrary dictionary passed to the constructor of the Predictor (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: cortexlabs/onnx-predictor-gpu or cortexlabs/onnx-predictor-cpu based on compute) @@ -136,9 +138,7 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput min_replicas: # minimum number of replicas (default: 1) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - workers_per_replica: # the number of parallel serving workers to run on each replica (default: 1) - threads_per_worker: # the number of threads per worker (default: 1) - target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker) + target_replica_concurrency: # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: processes_per_replica * threads_per_process) max_replica_concurrency: # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024) window: # the time over which to average the API's concurrency (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) @@ -152,4 +152,4 @@ See additional documentation for [autoscaling](autoscaling.md), [compute](comput max_unavailable: # maximum number of replicas that can be unavailable during an update; can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) ``` -See additional documentation for [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). +See additional documentation for [parallelism](parallelism.md), [autoscaling](autoscaling.md), [compute](compute.md), [networking](networking.md), [prediction monitoring](prediction-monitoring.md), and [overriding API images](system-packages.md). diff --git a/docs/deployments/autoscaling.md b/docs/deployments/autoscaling.md index 7f3c9303f0..13d5798173 100644 --- a/docs/deployments/autoscaling.md +++ b/docs/deployments/autoscaling.md @@ -4,21 +4,13 @@ _WARNING: you are on the master branch, please refer to the docs on the branch t Cortex autoscales your web services on a per-API basis based on your configuration. -## Replica Parallelism - -* `workers_per_replica` (default: 1): Each replica runs a web server with `workers_per_replica` workers, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 workers per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `workers_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API. - -* `threads_per_worker` (default: 1): Each worker uses a thread pool of size `threads_per_worker` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference or Inferentia ASIC-based inference, increasing the number of threads per worker can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per worker is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per worker. - -`workers_per_replica` * `threads_per_worker` represents the number of requests that your replica can work in parallel. For example, if `workers_per_replica` is 2 and `threads_per_worker` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3. - ## Autoscaling Replicas * `min_replicas`: The lower bound on how many replicas can be running for an API. * `max_replicas`: The upper bound on how many replicas can be running for an API. -* `target_replica_concurrency` (default: `workers_per_replica` * `threads_per_worker`): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. +* `target_replica_concurrency` (default: `processes_per_replica` * `threads_per_process`): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. Replica concurrency is simply how many requests have been sent to a replica and have not yet been responded to (also referred to as in-flight requests). Therefore, it includes requests which are currently being processed and requests which are waiting in the replica's queue. @@ -26,11 +18,11 @@ Cortex autoscales your web services on a per-API basis based on your configurati `desired replicas = sum(in-flight requests accross all replicas) / target_replica_concurrency` - For example, setting `target_replica_concurrency` to `workers_per_replica` * `threads_per_worker` (the default) causes the cluster to adjust the number of replicas so that on average, requests are immediately processed without waiting in a queue, and workers/threads are never idle. + For example, setting `target_replica_concurrency` to `processes_per_replica` * `threads_per_process` (the default) causes the cluster to adjust the number of replicas so that on average, requests are immediately processed without waiting in a queue, and processes/threads are never idle. -* `max_replica_concurrency` (default: 1024): This is the maximum number of in-flight requests per replica before requests are rejected with HTTP error code 503. `max_replica_concurrency` includes requests that are currently being processed as well as requests that are waiting in the replica's queue (a replica can actively process `workers_per_replica` * `threads_per_worker` requests concurrently, and will hold any additional requests in a local queue). Decreasing `max_replica_concurrency` and configuring the client to retry when it receives 503 responses will improve queue fairness by preventing requests from sitting in long queues. +* `max_replica_concurrency` (default: 1024): This is the maximum number of in-flight requests per replica before requests are rejected with HTTP error code 503. `max_replica_concurrency` includes requests that are currently being processed as well as requests that are waiting in the replica's queue (a replica can actively process `processes_per_replica` * `threads_per_process` requests concurrently, and will hold any additional requests in a local queue). Decreasing `max_replica_concurrency` and configuring the client to retry when it receives 503 responses will improve queue fairness by preventing requests from sitting in long queues. - *Note (if `workers_per_replica` > 1): In reality, there is a queue per worker; for most purposes thinking of it as a per-replica queue will be sufficient, although in some cases the distinction is relevant. Because requests are randomly assigned to workers within a replica (which leads to unbalanced worker queues), clients may receive 503 responses before reaching `max_replica_concurrency`. For example, if you set `workers_per_replica: 2` and `max_replica_concurrency: 100`, each worker will be allowed to handle 50 requests concurrently. If your replica receives 90 requests that take the same amount of time to process, there is a 24.6% possibility that more than 50 requests are routed to 1 worker, and each request that is routed to that worker above 50 is responded to with a 503. To address this, it is recommended to implement client retries for 503 errors, or to increase `max_replica_concurrency` to minimize the probability of getting 503 responses.* + *Note (if `processes_per_replica` > 1): In reality, there is a queue per process; for most purposes thinking of it as a per-replica queue will be sufficient, although in some cases the distinction is relevant. Because requests are randomly assigned to processes within a replica (which leads to unbalanced process queues), clients may receive 503 responses before reaching `max_replica_concurrency`. For example, if you set `processes_per_replica: 2` and `max_replica_concurrency: 100`, each process will be allowed to handle 50 requests concurrently. If your replica receives 90 requests that take the same amount of time to process, there is a 24.6% possibility that more than 50 requests are routed to 1 process, and each request that is routed to that process above 50 is responded to with a 503. To address this, it is recommended to implement client retries for 503 errors, or to increase `max_replica_concurrency` to minimize the probability of getting 503 responses.* * `window` (default: 60s): The time over which to average the API wide in-flight requests (which is the sum of in-flight requests in each replica). The longer the window, the slower the autoscaler will react to changes in API wide in-flight requests, since it is averaged over the `window`. API wide in-flight requests is calculated every 10 seconds, so `window` must be a multiple of 10 seconds. diff --git a/docs/deployments/gpus.md b/docs/deployments/gpus.md index 649407152b..09df57eafe 100644 --- a/docs/deployments/gpus.md +++ b/docs/deployments/gpus.md @@ -11,9 +11,9 @@ To use GPUs: ## Tips -### If using `workers_per_replica` > 1, TensorFlow-based models, and Python Predictor +### If using `processes_per_replica` > 1, TensorFlow-based models, and Python Predictor -When using `workers_per_replica` > 1 with TensorFlow-based models (including Keras) in the Python Predictor, loading the model in separate processes at the same time will throw a `CUDA_ERROR_OUT_OF_MEMORY: out of memory` error. This is because the first process that loads the model will allocate all of the GPU's memory and leave none to other processes. To prevent this from happening, the per-process GPU memory usage can be limited. There are two methods: +When using `processes_per_replica` > 1 with TensorFlow-based models (including Keras) in the Python Predictor, loading the model in separate processes at the same time will throw a `CUDA_ERROR_OUT_OF_MEMORY: out of memory` error. This is because the first process that loads the model will allocate all of the GPU's memory and leave none to other processes. To prevent this from happening, the per-process GPU memory usage can be limited. There are two methods: 1\) Configure the model to allocate only as much memory as it requires, via [tf.config.experimental.set_memory_growth()](https://www.tensorflow.org/api_docs/python/tf/config/experimental/set_memory_growth): diff --git a/docs/deployments/inferentia.md b/docs/deployments/inferentia.md index b77932bbb0..152f37244e 100644 --- a/docs/deployments/inferentia.md +++ b/docs/deployments/inferentia.md @@ -22,9 +22,9 @@ Each Inferentia ASIC comes with 4 NeuronCores and 8GB of cache memory. To better A [NeuronCore Group](https://github.com/aws/aws-neuron-sdk/blob/master/docs/tensorflow-neuron/tutorial-NeuronCore-Group.md) (NCG) is a set of NeuronCores that is used to load and run a compiled model. NCGs exist to aggregate NeuronCores to improve hardware performance. Models can be shared within an NCG, but this would require the device driver to dynamically context switch between each model, which degrades performance. Therefore we've decided to only allow one model per NCG (unless you are using a [multi-model endpoint](../guides/multi-model.md), in which case there will be multiple models on a single NCG, and there will be context switching). -Each Cortex API worker will have its own copy of the model and will run on its own NCG (the number of API workers is configured by the [`workers_per_replica`](autoscaling.md#replica-parallelism) field in the API configuration). Each NCG will have an equal share of NeuronCores. Therefore, the size of each NCG will be `4 * inf / workers_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). +Each Cortex API process will have its own copy of the model and will run on its own NCG (the number of API processes is configured by the [`processes_per_replica`](autoscaling.md#replica-parallelism) field in the API configuration). Each NCG will have an equal share of NeuronCores. Therefore, the size of each NCG will be `4 * inf / processes_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). -For example, if your API requests 2 `inf` chips, there will be 8 NeuronCores available. If you set `workers_per_replica` to 1, there will be one copy of your model running on a single NCG of size 8 NeuronCores. If `workers_per_replica` is 2, there will be two copies of your model, each running on a separate NCG of size 4 NeuronCores. If `workers_per_replica` is 4, there will be 4 NCGs of size 2 NeuronCores, and if If `workers_per_replica` is 8, there will be 8 NCGs of size 1 NeuronCores. In this scenario, these are the only valid values for `workers_per_replica`. In other words the total number of requested NeuronCores (which equals 4 * the number of requested Inferentia chips) must be divisible by `workers_per_replica`. +For example, if your API requests 2 `inf` chips, there will be 8 NeuronCores available. If you set `processes_per_replica` to 1, there will be one copy of your model running on a single NCG of size 8 NeuronCores. If `processes_per_replica` is 2, there will be two copies of your model, each running on a separate NCG of size 4 NeuronCores. If `processes_per_replica` is 4, there will be 4 NCGs of size 2 NeuronCores, and if If `processes_per_replica` is 8, there will be 8 NCGs of size 1 NeuronCores. In this scenario, these are the only valid values for `processes_per_replica`. In other words the total number of requested NeuronCores (which equals 4 * the number of requested Inferentia chips) must be divisible by `processes_per_replica`. The 8GB cache memory is shared between all 4 NeuronCores of an Inferentia chip. Therefore an NCG with 8 NeuronCores (i.e. 2 Inf chips) will have access to 16GB of cache memory. An NGC with 2 NeuronCores will have access to 8GB of cache memory, which will be shared with the other NGC of size 2 running on the same Inferentia chip. @@ -34,7 +34,7 @@ Before a model can be deployed on Inferentia chips, it must be compiled for Infe By default, the Neuron compiler will compile a model to use 1 NeuronCore, but can be manually set to a different size (1, 2, 4, etc). -For optimal performance, your model should be compiled to run on the number of NeuronCores available to it. The number of NeuronCores will be `4 * inf / workers_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). See [NeuronCore Groups](#neuron-core-groups) above for an example, and see [Improving performance](#improving-performance) below for a discussion of choosing the appropriate number of NeuronCores. +For optimal performance, your model should be compiled to run on the number of NeuronCores available to it. The number of NeuronCores will be `4 * inf / processes_per_replica` (`inf` refers to your API's `compute` request, and it's multiplied by 4 because there are 4 NeuronCores per Inferentia chip). See [NeuronCore Groups](#neuron-core-groups) above for an example, and see [Improving performance](#improving-performance) below for a discussion of choosing the appropriate number of NeuronCores. Here is an example of compiling a TensorFlow SavedModel for Inferentia: @@ -73,7 +73,7 @@ See AWS's [TensorFlow](https://github.com/aws/aws-neuron-sdk/blob/master/docs/te A few things can be done to improve performance using compiled models on Cortex: -1. There's a minimum number of NeuronCores for which a model can be compiled. That number depends on the model's architecture. Generally, compiling a model for more cores than its required minimum helps to distribute the model's operators across multiple cores, which in turn [can lead to lower latency](https://github.com/aws/aws-neuron-sdk/blob/master/docs/technotes/neuroncore-pipeline.md). However, compiling a model for more NeuronCores means that you'll have to set `workers_per_replica` to be lower so that the NeuronCore Group has access to the number of NeuronCores for which you compiled your model. This is acceptable if latency is your top priority, but if throughput is more important to you, this tradeoff is usually not worth it. To maximize throughput, compile your model for as few NeuronCores as possible and increase `workers_per_replica` to the maximum possible (see above for a sample calculation). +1. There's a minimum number of NeuronCores for which a model can be compiled. That number depends on the model's architecture. Generally, compiling a model for more cores than its required minimum helps to distribute the model's operators across multiple cores, which in turn [can lead to lower latency](https://github.com/aws/aws-neuron-sdk/blob/master/docs/technotes/neuroncore-pipeline.md). However, compiling a model for more NeuronCores means that you'll have to set `processes_per_replica` to be lower so that the NeuronCore Group has access to the number of NeuronCores for which you compiled your model. This is acceptable if latency is your top priority, but if throughput is more important to you, this tradeoff is usually not worth it. To maximize throughput, compile your model for as few NeuronCores as possible and increase `processes_per_replica` to the maximum possible (see above for a sample calculation). 1. Try to achieve a near [100% placement](https://github.com/aws/aws-neuron-sdk/blob/b28262e3072574c514a0d72ad3fe5ca48686d449/src/examples/tensorflow/keras_resnet50/pb2sm_compile.py#L59) of your model's graph onto the NeuronCores. During the compilation phase, any operators that can't execute on NeuronCores will be compiled to execute on the machine's CPU and memory instead. Even if just a few percent of the operations reside on the host's CPU/memory, the maximum throughput of the instance can be significantly limited. diff --git a/docs/deployments/parallelism.md b/docs/deployments/parallelism.md new file mode 100644 index 0000000000..9e6ff89ed0 --- /dev/null +++ b/docs/deployments/parallelism.md @@ -0,0 +1,11 @@ +# Parallelism + +_WARNING: you are on the master branch, please refer to the docs on the branch that matches your `cortex version`_ + +Replica parallelism can be configured with the following fields in the `predictor` configuration: + +* `processes_per_replica` (default: 1): Each replica runs a web server with `processes_per_replica` processes. For APIs running with multiple CPUs per replica, using 1-3 processes per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `processes_per_replica` is reasonable. The optimal number will vary based on the workload's characteristics and the CPU compute request for the API. + +* `threads_per_process` (default: 1): Each process uses a thread pool of size `threads_per_process` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files), GPU-based inference, or Inferentia-based inference, increasing the number of threads per process can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per process is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per process. + +`processes_per_replica` * `threads_per_process` represents the total number of requests that your replica can work on concurrently. For example, if `processes_per_replica` is 2 and `threads_per_process` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed and 1 would be waiting for a thread to become available. If the replica was hit with 3 concurrent requests, all three would begin processing immediately. diff --git a/docs/guides/batch-runner.md b/docs/guides/batch-runner.md index ea42a02f13..3625935e63 100644 --- a/docs/guides/batch-runner.md +++ b/docs/guides/batch-runner.md @@ -37,10 +37,10 @@ class PythonPredictor: batches = payload # Increasing max_workers will increase how many replicas will be used for the batch request. - # Assuming default values for target_replica_concurrency, workers_per_replica, and threads_per_worker + # Assuming default values for target_replica_concurrency, processes_per_replica, and threads_per_process # in your prediction API, the number of replicas created will be equal to max_workers. # If you have changed these values, the number of replicas created will be equal to max_workers / target_replica_concurrency - # (note that the default value of target_replica_concurrency is workers_per_replica * threads_per_worker). + # (note that the default value of target_replica_concurrency is processes_per_replica * threads_per_process). # If max_workers starts to get large, you will also want to set the inference API's max_replica_concurrency to avoid long and imbalanced queue lengths # Here are the autoscaling docs: https://docs.cortex.dev/deployments/autoscaling with ThreadPoolExecutor(max_workers=5) as executor: @@ -91,7 +91,7 @@ class PythonPredictor: endpoint: http://***.elb.us-west-2.amazonaws.com/iris-classifier autoscaling: max_replicas: 1 # this API may need to autoscale depending on how many batch requests, but disable it to start - threads_per_worker: 1 # set this to the number of batch requests you'd like to be able to be able to work on at a time + threads_per_process: 1 # set this to the number of batch requests you'd like to be able to be able to work on at a time # once that number is exceeded, they will be queued, which may be ok # setting this too high may lead to out of memory errors compute: diff --git a/docs/summary.md b/docs/summary.md index 7acaef6b43..3ca691d44d 100644 --- a/docs/summary.md +++ b/docs/summary.md @@ -15,6 +15,7 @@ * [Predictor implementation](deployments/predictors.md) * [API configuration](deployments/api-configuration.md) * [API deployment](deployments/deployment.md) +* [Parellelism](deployments/parallelism.md) * [Autoscaling](deployments/autoscaling.md) * [Networking](deployments/networking.md) * [Compute](deployments/compute.md) diff --git a/examples/tensorflow/image-classifier-resnet50/README.md b/examples/tensorflow/image-classifier-resnet50/README.md index 80e8adb26f..ecefb266bf 100644 --- a/examples/tensorflow/image-classifier-resnet50/README.md +++ b/examples/tensorflow/image-classifier-resnet50/README.md @@ -47,8 +47,8 @@ Usage: throughput_test.py [OPTIONS] IMG_URL ENDPOINT with CPU, GPU or Inferentia devices. Options: - -w, --workers INTEGER Number of workers for prediction requests. [default: 1] - -t, --threads INTEGER Number of threads per worker for prediction requests. [default: 1] + -w, --processes INTEGER Number of processes for prediction requests. [default: 1] + -t, --threads INTEGER Number of threads per process for prediction requests. [default: 1] -s, --samples INTEGER Number of samples to run per thread. [default: 10] -i, --time-based FLOAT How long the thread making predictions will run for in seconds. If set, -s option will be ignored. @@ -71,9 +71,9 @@ export IMG_URL=https://i.imgur.com/213xcvs.jpg # this is the cat image shown in Then, deploy each API one at a time and check the results: -1. Running `python throughput_test.py -i 30 -w 4 -t 48` with the [cortex_inf.yaml](cortex_inf.yaml) API running on an `inf1.2xlarge` instance will get **~510 inferences/sec** with an average latency of **80 ms**. -1. Running `python throughput_test.py -i 30 -w 4 -t 2` with the [cortex_cpu.yaml](cortex_cpu.yaml) API running on an `c5.xlarge` instance will get **~16.2 inferences/sec** with an average latency of **200 ms**. -1. Running `python throughput_test.py -i 30 -w 4 -t 24` with the [cortex_gpu.yaml](cortex_gpu.yaml) API running on an `g4dn.xlarge` instance will get **~125 inferences/sec** with an average latency of **85 ms**. Optimizing the model with TensorRT to use FP16 on TF-serving only seems to achieve a 10% performance improvement - one thing to consider is that the TensorRT engines hadn't been built beforehand, so this might have affected the results negatively. +1. Running `python throughput_test.py -i 30 -p 4 -t 48` with the [cortex_inf.yaml](cortex_inf.yaml) API running on an `inf1.2xlarge` instance will get **~510 inferences/sec** with an average latency of **80 ms**. +1. Running `python throughput_test.py -i 30 -p 4 -t 2` with the [cortex_cpu.yaml](cortex_cpu.yaml) API running on an `c5.xlarge` instance will get **~16.2 inferences/sec** with an average latency of **200 ms**. +1. Running `python throughput_test.py -i 30 -p 4 -t 24` with the [cortex_gpu.yaml](cortex_gpu.yaml) API running on an `g4dn.xlarge` instance will get **~125 inferences/sec** with an average latency of **85 ms**. Optimizing the model with TensorRT to use FP16 on TF-serving only seems to achieve a 10% performance improvement - one thing to consider is that the TensorRT engines hadn't been built beforehand, so this might have affected the results negatively. *Note: `inf1.xlarge` isn't used because the major bottleneck with `inf` instances for this example is with the CPU, and `inf1.2xlarge` has twice the amount of CPU cores for same number of Inferentia ASICs (which is 1), which translates to almost double the throughput.* diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml index 7ac68d3dad..b94643e32e 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_cpu.yaml @@ -5,6 +5,8 @@ type: tensorflow path: predictor.py model_path: s3://cortex-examples/tensorflow/resnet50 + processes_per_replica: 4 + threads_per_process: 16 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json input_shape: [224, 224] @@ -12,6 +14,3 @@ compute: cpu: 3 mem: 4G - autoscaling: - workers_per_replica: 4 - threads_per_worker: 16 diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml index 101a3fe5e1..b2e8187dfb 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_gpu.yaml @@ -5,6 +5,8 @@ type: tensorflow path: predictor.py model_path: s3://cortex-examples/tensorflow/resnet50 + processes_per_replica: 4 + threads_per_process: 24 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json input_shape: [224, 224] @@ -13,6 +15,3 @@ gpu: 1 cpu: 3 mem: 4G - autoscaling: - workers_per_replica: 4 - threads_per_worker: 24 diff --git a/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml b/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml index 525f2c0dc8..83328c7992 100644 --- a/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml +++ b/examples/tensorflow/image-classifier-resnet50/cortex_inf.yaml @@ -5,6 +5,8 @@ type: tensorflow path: predictor.py model_path: s3://cortex-examples/tensorflow/resnet50_neuron + processes_per_replica: 4 + threads_per_process: 256 config: classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json input_shape: [224, 224] @@ -14,6 +16,4 @@ cpu: 3 mem: 4G autoscaling: - workers_per_replica: 4 - threads_per_worker: 256 max_replica_concurrency: 16384 diff --git a/examples/tensorflow/image-classifier-resnet50/throughput_test.py b/examples/tensorflow/image-classifier-resnet50/throughput_test.py index 47f9b11def..b4a1728f41 100644 --- a/examples/tensorflow/image-classifier-resnet50/throughput_test.py +++ b/examples/tensorflow/image-classifier-resnet50/throughput_test.py @@ -22,12 +22,12 @@ @click.argument("img_url", type=str, envvar="IMG_URL") @click.argument("endpoint", type=str, envvar="ENDPOINT") @click.option( - "--workers", - "-w", + "--processes", + "-p", type=int, default=1, show_default=True, - help="Number of workers for prediction requests.", + help="Number of processes for prediction requests.", ) @click.option( "--threads", @@ -35,7 +35,7 @@ type=int, default=1, show_default=True, - help="Number of threads per worker for prediction requests.", + help="Number of threads per process for prediction requests.", ) @click.option( "--samples", @@ -60,7 +60,7 @@ show_default=True, help="Number of images sent for inference in one request.", ) -def main(img_url, endpoint, workers, threads, samples, time_based, batch_size): +def main(img_url, endpoint, processes, threads, samples, time_based, batch_size): # get the image in bytes representation image = get_url_image(img_url) image_bytes = image_to_jpeg_bytes(image) @@ -73,9 +73,9 @@ def main(img_url, endpoint, workers, threads, samples, time_based, batch_size): print("Starting the inference throughput test...") results = [] start = time.time() - with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=processes) as executor: results = executor_submitter( - executor, workers, process_worker, threads, data, endpoint, samples, time_based + executor, processes, process_worker, threads, data, endpoint, samples, time_based ) end = time.time() elapsed = end - start diff --git a/examples/tensorflow/license-plate-reader/README.md b/examples/tensorflow/license-plate-reader/README.md index 294d8ea873..d7ff404a65 100644 --- a/examples/tensorflow/license-plate-reader/README.md +++ b/examples/tensorflow/license-plate-reader/README.md @@ -63,7 +63,7 @@ For another prediction, let's use a generic image from the web. Export [this ima ## Deployment - Full Version -The recommended number of instances to run this smoothly on a video stream is about 12 GPU instances (2 GPU instances for *YOLOv3* and 10 for *CRNN* + *CRAFT*). `cortex_full.yaml` is already set up to use these 12 instances. Note: this is the optimal number of instances when using the `g4dn.xlarge` instance type. For the client to work smoothly, the number of workers per replica can be adjusted, especially for `p3` or `g4` instances, where the GPU has a lot of compute capacity. +The recommended number of instances to run this smoothly on a video stream is about 12 GPU instances (2 GPU instances for *YOLOv3* and 10 for *CRNN* + *CRAFT*). `cortex_full.yaml` is already set up to use these 12 instances. Note: this is the optimal number of instances when using the `g4dn.xlarge` instance type. For the client to work smoothly, the number of processes per replica can be adjusted, especially for `p3` or `g4` instances, where the GPU has a lot of compute capacity. If you don't have access to this many GPU-equipped instances, you could just lower the number and expect dropped frames. It will still prove the point, albeit at a much lower framerate and with higher latency. More on that [here](https://github.com/RobertLucian/cortex-license-plate-reader-client). diff --git a/examples/tensorflow/license-plate-reader/cortex_full.yaml b/examples/tensorflow/license-plate-reader/cortex_full.yaml index e43e3e87ab..77e3e0e4c9 100644 --- a/examples/tensorflow/license-plate-reader/cortex_full.yaml +++ b/examples/tensorflow/license-plate-reader/cortex_full.yaml @@ -5,6 +5,8 @@ type: tensorflow path: predictor_yolo.py model_path: s3://cortex-examples/tensorflow/license-plate-reader/yolov3_tf + processes_per_replica: 4 + threads_per_process: 3 signature_key: serving_default config: model_config: config.json @@ -15,13 +17,13 @@ autoscaling: min_replicas: 2 max_replicas: 2 - workers_per_replica: 4 - threads_per_worker: 3 - name: crnn predictor: type: python path: predictor_crnn.py + processes_per_replica: 1 + threads_per_process: 1 compute: cpu: 1 gpu: 1 @@ -29,5 +31,3 @@ autoscaling: min_replicas: 10 max_replicas: 10 - workers_per_replica: 1 - threads_per_worker: 1 diff --git a/images/tensorflow-serving-inf/run.sh b/images/tensorflow-serving-inf/run.sh index 7765af41df..0194eb305f 100644 --- a/images/tensorflow-serving-inf/run.sh +++ b/images/tensorflow-serving-inf/run.sh @@ -16,9 +16,9 @@ set -e -for i in $(seq 1 $TF_WORKERS); do +for i in $(seq 1 $TF_PROCESSES); do echo -e "\n\n" >> /tmp/supervisord.conf - worker=$i port=$((CORTEX_TF_BASE_SERVING_PORT+i-1)) envsubst < /tmp/template.conf >> /tmp/supervisord.conf + process=$i port=$((CORTEX_TF_BASE_SERVING_PORT+i-1)) envsubst < /tmp/template.conf >> /tmp/supervisord.conf done mv /tmp/supervisord.conf /etc/supervisor/conf.d/supervisord.conf diff --git a/images/tensorflow-serving-inf/template.conf b/images/tensorflow-serving-inf/template.conf index 28df8485fc..d841f1651d 100644 --- a/images/tensorflow-serving-inf/template.conf +++ b/images/tensorflow-serving-inf/template.conf @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -[program:tensorflow-$worker] +[program:tensorflow-$process] command=tensorflow_model_server_neuron --port=$port --model_config_file=$TF_EMPTY_MODEL_CONFIG stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 0a39b6cd61..5f641e78ae 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -52,6 +52,7 @@ var ( MaxClassesPerMonitoringRequest = 20 // cloudwatch.GeMetricData can get up to 100 metrics per request, avoid multiple requests and have room for other stats DashboardTitle = "# cortex monitoring dashboard" + DefaultMaxReplicaConcurrency = int64(1024) NeuronCoresPerInf = int64(4) ) diff --git a/pkg/operator/operator/k8s_specs.go b/pkg/operator/operator/k8s_specs.go index b70df3bfd5..1dc0f5a184 100644 --- a/pkg/operator/operator/k8s_specs.go +++ b/pkg/operator/operator/k8s_specs.go @@ -617,21 +617,21 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { }, }, kcore.EnvVar{ - Name: "CORTEX_WORKERS_PER_REPLICA", - Value: s.Int32(api.Autoscaling.WorkersPerReplica), + Name: "CORTEX_PROCESSES_PER_REPLICA", + Value: s.Int32(api.Predictor.ProcessesPerReplica), }, kcore.EnvVar{ - Name: "CORTEX_THREADS_PER_WORKER", - Value: s.Int32(api.Autoscaling.ThreadsPerWorker), + Name: "CORTEX_THREADS_PER_PROCESS", + Value: s.Int32(api.Predictor.ThreadsPerProcess), }, kcore.EnvVar{ Name: "CORTEX_MAX_REPLICA_CONCURRENCY", Value: s.Int64(api.Autoscaling.MaxReplicaConcurrency), }, kcore.EnvVar{ - Name: "CORTEX_MAX_WORKER_CONCURRENCY", - // add 1 because it was required to achieve the target concurrency for 1 worker, 1 thread - Value: s.Int64(1 + int64(math.Round(float64(api.Autoscaling.MaxReplicaConcurrency)/float64(api.Autoscaling.WorkersPerReplica)))), + Name: "CORTEX_MAX_PROCESS_CONCURRENCY", + // add 1 because it was required to achieve the target concurrency for 1 process, 1 thread + Value: s.Int64(1 + int64(math.Round(float64(api.Autoscaling.MaxReplicaConcurrency)/float64(api.Predictor.ProcessesPerReplica)))), }, kcore.EnvVar{ Name: "CORTEX_SO_MAX_CONN", @@ -703,7 +703,7 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { envVars = append(envVars, kcore.EnvVar{ Name: "NEURONCORE_GROUP_SIZES", - Value: s.Int64(api.Compute.Inf * consts.NeuronCoresPerInf / int64(api.Autoscaling.WorkersPerReplica)), + Value: s.Int64(api.Compute.Inf * consts.NeuronCoresPerInf / int64(api.Predictor.ProcessesPerReplica)), }, kcore.EnvVar{ Name: "NEURON_RTD_ADDRESS", @@ -716,8 +716,8 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { if container == _tfServingContainerName { envVars = append(envVars, kcore.EnvVar{ - Name: "TF_WORKERS", - Value: s.Int32(api.Autoscaling.WorkersPerReplica), + Name: "TF_PROCESSES", + Value: s.Int32(api.Predictor.ProcessesPerReplica), }, kcore.EnvVar{ Name: "CORTEX_TF_BASE_SERVING_PORT", @@ -760,7 +760,7 @@ func tensorflowServingContainer(api *spec.API, volumeMounts []kcore.VolumeMount, } if api.Compute.Inf > 0 { - numPorts := api.Autoscaling.WorkersPerReplica + numPorts := api.Predictor.ProcessesPerReplica for i := int32(1); i < numPorts; i++ { ports = append(ports, kcore.ContainerPort{ ContainerPort: _tfBaseServingPortInt32 + i, diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index 1bf4957f1e..999649bd54 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -61,7 +61,7 @@ const ( ErrRegistryAccountIDMismatch = "spec.registry_account_id_mismatch" ErrCannotAccessECRWithAnonymousAWSCreds = "spec.cannot_access_ecr_with_anonymous_aws_creds" ErrComputeResourceConflict = "spec.compute_resource_conflict" - ErrInvalidNumberOfInfWorkers = "spec.invalid_number_of_inf_workers" + ErrInvalidNumberOfInfProcesses = "spec.invalid_number_of_inf_processes" ErrInvalidNumberOfInfs = "spec.invalid_number_of_infs" ) @@ -325,11 +325,11 @@ func ErrorComputeResourceConflict(resourceA, resourceB string) error { }) } -func ErrorInvalidNumberOfInfWorkers(workersPerReplica int64, numInf int64, numNeuronCores int64) error { - acceptableWorkers := libmath.FactorsInt64(numNeuronCores) +func ErrorInvalidNumberOfInfProcesses(processesPerReplica int64, numInf int64, numNeuronCores int64) error { + acceptableProcesses := libmath.FactorsInt64(numNeuronCores) return errors.WithStack(&errors.Error{ - Kind: ErrInvalidNumberOfInfWorkers, - Message: fmt.Sprintf("cannot evenly distribute %d Inferentia %s (%d NeuronCores total) over %d worker(s) - acceptable numbers of workers are %s", numInf, s.PluralS("ASIC", numInf), numNeuronCores, workersPerReplica, s.UserStrsOr(acceptableWorkers)), + Kind: ErrInvalidNumberOfInfProcesses, + Message: fmt.Sprintf("cannot evenly distribute %d Inferentia %s (%d NeuronCores total) over %d processes - acceptable numbers of processes are %s", numInf, s.PluralS("ASIC", numInf), numNeuronCores, processesPerReplica, s.UserStrsOr(acceptableProcesses)), }) } diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 5f42efe14c..4588fa9d91 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -119,6 +119,21 @@ func predictorValidation() *cr.StructFieldValidation { DockerImageOrEmpty: true, }, }, + { + StructField: "ProcessesPerReplica", + Int32Validation: &cr.Int32Validation{ + Default: 1, + GreaterThanOrEqualTo: pointer.Int32(1), + LessThanOrEqualTo: pointer.Int32(20), + }, + }, + { + StructField: "ThreadsPerProcess", + Int32Validation: &cr.Int32Validation{ + Default: 1, + GreaterThanOrEqualTo: pointer.Int32(1), + }, + }, { StructField: "Config", InterfaceMapValidation: &cr.InterfaceMapValidation{ @@ -286,21 +301,6 @@ func autoscalingValidation(provider types.ProviderType) *cr.StructFieldValidatio GreaterThan: pointer.Int32(0), }, }, - { - StructField: "WorkersPerReplica", - Int32Validation: &cr.Int32Validation{ - Default: 1, - GreaterThanOrEqualTo: pointer.Int32(1), - LessThanOrEqualTo: pointer.Int32(20), - }, - }, - { - StructField: "ThreadsPerWorker", - Int32Validation: &cr.Int32Validation{ - Default: 1, - GreaterThanOrEqualTo: pointer.Int32(1), - }, - }, { StructField: "TargetReplicaConcurrency", Float64PtrValidation: &cr.Float64PtrValidation{ @@ -310,7 +310,7 @@ func autoscalingValidation(provider types.ProviderType) *cr.StructFieldValidatio { StructField: "MaxReplicaConcurrency", Int64Validation: &cr.Int64Validation{ - Default: 1024, + Default: consts.DefaultMaxReplicaConcurrency, GreaterThan: pointer.Int64(0), LessThanOrEqualTo: pointer.Int64(math.MaxUint16), }, @@ -941,9 +941,10 @@ func validatePythonPath(pythonPath string, projectFiles ProjectFiles) error { func validateAutoscaling(api *userconfig.API) error { autoscaling := api.Autoscaling + predictor := api.Predictor if autoscaling.TargetReplicaConcurrency == nil { - autoscaling.TargetReplicaConcurrency = pointer.Float64(float64(autoscaling.WorkersPerReplica * autoscaling.ThreadsPerWorker)) + autoscaling.TargetReplicaConcurrency = pointer.Float64(float64(predictor.ProcessesPerReplica * predictor.ThreadsPerProcess)) } if *autoscaling.TargetReplicaConcurrency > float64(autoscaling.MaxReplicaConcurrency) { @@ -964,9 +965,9 @@ func validateAutoscaling(api *userconfig.API) error { if api.Compute.Inf > 0 { numNeuronCores := api.Compute.Inf * consts.NeuronCoresPerInf - workersPerReplica := int64(api.Autoscaling.WorkersPerReplica) - if !libmath.IsDivisibleByInt64(numNeuronCores, workersPerReplica) { - return ErrorInvalidNumberOfInfWorkers(workersPerReplica, api.Compute.Inf, numNeuronCores) + processesPerReplica := int64(predictor.ProcessesPerReplica) + if !libmath.IsDivisibleByInt64(numNeuronCores, processesPerReplica) { + return ErrorInvalidNumberOfInfProcesses(processesPerReplica, api.Compute.Inf, numNeuronCores) } } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index a66025b453..4eb8566b69 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -50,6 +50,8 @@ type Predictor struct { PythonPath *string `json:"python_path" yaml:"python_path"` Image string `json:"image" yaml:"image"` TensorFlowServingImage string `json:"tensorflow_serving_image" yaml:"tensorflow_serving_image"` + ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"` + ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"` Config map[string]interface{} `json:"config" yaml:"config"` Env map[string]string `json:"env" yaml:"env"` SignatureKey *string `json:"signature_key" yaml:"signature_key"` @@ -83,8 +85,6 @@ type Autoscaling struct { MinReplicas int32 `json:"min_replicas" yaml:"min_replicas"` MaxReplicas int32 `json:"max_replicas" yaml:"max_replicas"` InitReplicas int32 `json:"init_replicas" yaml:"init_replicas"` - WorkersPerReplica int32 `json:"workers_per_replica" yaml:"workers_per_replica"` - ThreadsPerWorker int32 `json:"threads_per_worker" yaml:"threads_per_worker"` TargetReplicaConcurrency *float64 `json:"target_replica_concurrency" yaml:"target_replica_concurrency"` MaxReplicaConcurrency int64 `json:"max_replica_concurrency" yaml:"max_replica_concurrency"` Window time.Duration `json:"window" yaml:"window"` @@ -176,10 +176,10 @@ func (api *API) ToK8sAnnotations() map[string]string { return map[string]string{ EndpointAnnotationKey: *api.Networking.Endpoint, APIGatewayAnnotationKey: api.Networking.APIGateway.String(), + ProcessesPerReplicaAnnotationKey: s.Int32(api.Predictor.ProcessesPerReplica), + ThreadsPerProcessAnnotationKey: s.Int32(api.Predictor.ThreadsPerProcess), MinReplicasAnnotationKey: s.Int32(api.Autoscaling.MinReplicas), MaxReplicasAnnotationKey: s.Int32(api.Autoscaling.MaxReplicas), - WorkersPerReplicaAnnotationKey: s.Int32(api.Autoscaling.WorkersPerReplica), - ThreadsPerWorkerAnnotationKey: s.Int32(api.Autoscaling.ThreadsPerWorker), TargetReplicaConcurrencyAnnotationKey: s.Float64(*api.Autoscaling.TargetReplicaConcurrency), MaxReplicaConcurrencyAnnotationKey: s.Int64(api.Autoscaling.MaxReplicaConcurrency), WindowAnnotationKey: api.Autoscaling.Window.String(), @@ -215,18 +215,6 @@ func AutoscalingFromAnnotations(k8sObj kmeta.Object) (*Autoscaling, error) { } a.MaxReplicas = maxReplicas - workersPerReplica, err := k8s.ParseInt32Annotation(k8sObj, WorkersPerReplicaAnnotationKey) - if err != nil { - return nil, err - } - a.WorkersPerReplica = workersPerReplica - - threadsPerWorker, err := k8s.ParseInt32Annotation(k8sObj, ThreadsPerWorkerAnnotationKey) - if err != nil { - return nil, err - } - a.ThreadsPerWorker = threadsPerWorker - targetReplicaConcurrency, err := k8s.ParseFloat64Annotation(k8sObj, TargetReplicaConcurrencyAnnotationKey) if err != nil { return nil, err @@ -336,17 +324,19 @@ func (predictor *Predictor) UserStr() string { if predictor.SignatureKey != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", SignatureKeyKey, *predictor.SignatureKey)) } - if predictor.PythonPath != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", PythonPathKey, *predictor.PythonPath)) + sb.WriteString(fmt.Sprintf("%s: %s\n", ProcessesPerReplicaKey, s.Int32(predictor.ProcessesPerReplica))) + sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerProcessKey, s.Int32(predictor.ThreadsPerProcess))) + if len(predictor.Config) > 0 { + sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) + d, _ := yaml.Marshal(&predictor.Config) + sb.WriteString(s.Indent(string(d), " ")) } sb.WriteString(fmt.Sprintf("%s: %s\n", ImageKey, predictor.Image)) if predictor.TensorFlowServingImage != "" { sb.WriteString(fmt.Sprintf("%s: %s\n", TensorFlowServingImageKey, predictor.TensorFlowServingImage)) } - if len(predictor.Config) > 0 { - sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey)) - d, _ := yaml.Marshal(&predictor.Config) - sb.WriteString(s.Indent(string(d), " ")) + if predictor.PythonPath != nil { + sb.WriteString(fmt.Sprintf("%s: %s\n", PythonPathKey, *predictor.PythonPath)) } if len(predictor.Env) > 0 { sb.WriteString(fmt.Sprintf("%s:\n", EnvKey)) @@ -443,8 +433,6 @@ func (autoscaling *Autoscaling) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", MinReplicasKey, s.Int32(autoscaling.MinReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicasKey, s.Int32(autoscaling.MaxReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", InitReplicasKey, s.Int32(autoscaling.InitReplicas))) - sb.WriteString(fmt.Sprintf("%s: %s\n", WorkersPerReplicaKey, s.Int32(autoscaling.WorkersPerReplica))) - sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerWorkerKey, s.Int32(autoscaling.ThreadsPerWorker))) sb.WriteString(fmt.Sprintf("%s: %s\n", TargetReplicaConcurrencyKey, s.Float64(*autoscaling.TargetReplicaConcurrency))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicaConcurrencyKey, s.Int64(autoscaling.MaxReplicaConcurrency))) sb.WriteString(fmt.Sprintf("%s: %s\n", WindowKey, autoscaling.Window.String())) diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 1d20375f27..ebfd0a27d7 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -30,6 +30,8 @@ const ( TypeKey = "type" PathKey = "path" ModelPathKey = "model_path" + ProcessesPerReplicaKey = "processes_per_replica" + ThreadsPerProcessKey = "threads_per_process" ModelsKey = "models" PythonPathKey = "python_path" ImageKey = "image" @@ -60,8 +62,6 @@ const ( MinReplicasKey = "min_replicas" MaxReplicasKey = "max_replicas" InitReplicasKey = "init_replicas" - WorkersPerReplicaKey = "workers_per_replica" - ThreadsPerWorkerKey = "threads_per_worker" TargetReplicaConcurrencyKey = "target_replica_concurrency" MaxReplicaConcurrencyKey = "max_replica_concurrency" WindowKey = "window" @@ -79,10 +79,10 @@ const ( // K8s annotation EndpointAnnotationKey = "networking.cortex.dev/endpoint" APIGatewayAnnotationKey = "networking.cortex.dev/api-gateway" + ProcessesPerReplicaAnnotationKey = "predictor.cortex.dev/processes-per-replica" + ThreadsPerProcessAnnotationKey = "predictor.cortex.dev/threads-per-process" MinReplicasAnnotationKey = "autoscaling.cortex.dev/min-replicas" MaxReplicasAnnotationKey = "autoscaling.cortex.dev/max-replicas" - WorkersPerReplicaAnnotationKey = "autoscaling.cortex.dev/workers-per-replica" - ThreadsPerWorkerAnnotationKey = "autoscaling.cortex.dev/threads-per-worker" TargetReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/target-replica-concurrency" MaxReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-replica-concurrency" WindowAnnotationKey = "autoscaling.cortex.dev/window" diff --git a/pkg/workloads/cortex/serve/serve.py b/pkg/workloads/cortex/serve/serve.py index b0c65ed251..8af83e1592 100644 --- a/pkg/workloads/cortex/serve/serve.py +++ b/pkg/workloads/cortex/serve/serve.py @@ -53,7 +53,7 @@ loop = asyncio.get_event_loop() loop.set_default_executor( - ThreadPoolExecutor(max_workers=int(os.environ["CORTEX_THREADS_PER_WORKER"])) + ThreadPoolExecutor(max_workers=int(os.environ["CORTEX_THREADS_PER_PROCESS"])) ) app = FastAPI() diff --git a/pkg/workloads/cortex/serve/start_uvicorn.py b/pkg/workloads/cortex/serve/start_uvicorn.py index 260e461391..200b9a2b39 100644 --- a/pkg/workloads/cortex/serve/start_uvicorn.py +++ b/pkg/workloads/cortex/serve/start_uvicorn.py @@ -34,15 +34,15 @@ def load_tensorflow_serving_models(): from cortex.lib.server.tensorflow import TensorFlowServing - # determine if multiple TF workers are required - num_workers = 1 + # determine if multiple TF processes are required + num_processes = 1 has_multiple_servers = os.getenv("CORTEX_MULTIPLE_TF_SERVERS") if has_multiple_servers: - num_workers = int(os.environ["CORTEX_WORKERS_PER_REPLICA"]) + num_processes = int(os.environ["CORTEX_PROCESSES_PER_REPLICA"]) - # initialize models for each TF worker + # initialize models for each TF process base_paths = [os.path.join(model_dir, name) for name in models] - for w in range(int(num_workers)): + for w in range(int(num_processes)): tfs = TensorFlowServing(f"{tf_serving_host}:{tf_base_serving_port+w}") tfs.add_models_config(models, base_paths, replace_models=False) @@ -60,9 +60,9 @@ def main(): has_multiple_servers = os.getenv("CORTEX_MULTIPLE_TF_SERVERS") if has_multiple_servers: base_serving_port = int(os.environ["CORTEX_TF_BASE_SERVING_PORT"]) - num_workers = int(os.environ["CORTEX_WORKERS_PER_REPLICA"]) + num_processes = int(os.environ["CORTEX_PROCESSES_PER_REPLICA"]) used_ports = {} - for w in range(int(num_workers)): + for w in range(int(num_processes)): used_ports[str(base_serving_port + w)] = False with open("/run/used_ports.json", "w+") as f: json.dump(used_ports, f) @@ -86,8 +86,8 @@ def main(): "cortex.serve.wsgi:app", host="0.0.0.0", port=int(os.environ["CORTEX_SERVING_PORT"]), - workers=int(os.environ["CORTEX_WORKERS_PER_REPLICA"]), - limit_concurrency=int(os.environ["CORTEX_MAX_WORKER_CONCURRENCY"]), + workers=int(os.environ["CORTEX_PROCESSES_PER_REPLICA"]), + limit_concurrency=int(os.environ["CORTEX_MAX_PROCESS_CONCURRENCY"]), backlog=int(os.environ["CORTEX_SO_MAX_CONN"]), log_config=log_config, log_level="info",