Skip to content

Update in-flight request counter, switch to FastAPI + Uvicorn #838

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 64 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
aa6cee5
FastAPI demo
deliahu Feb 24, 2020
72abaed
Switch gunicorn to uvicorn
vishalbollu Feb 25, 2020
c0aaaf9
flask to fast-api rough
vishalbollu Feb 26, 2020
b79b7d1
Merge branch 'master' of github.com:cortexlabs/cortex into fastapi
deliahu Feb 27, 2020
d3df3af
Cleanup gunicorn code
vishalbollu Feb 27, 2020
0f2aacd
Add max_queue_length
deliahu Feb 27, 2020
7de8652
Update run.sh
deliahu Feb 27, 2020
0c56233
Set somaxconn
deliahu Feb 27, 2020
1c005dd
Use regular function instead of async for predict endpoint
vishalbollu Feb 27, 2020
da08f18
Update log formatter
deliahu Feb 28, 2020
4363113
Remove api is live log line
deliahu Feb 28, 2020
bbf4304
Update tf.requirements.txt
deliahu Feb 28, 2020
37dd3ec
Add pyyaml
deliahu Feb 28, 2020
263061f
Use configuration in APIs
deliahu Feb 28, 2020
75359d8
Add healthchecks and liveness probes
vishalbollu Feb 28, 2020
789498e
Add comment for uvicorn config
deliahu Feb 28, 2020
1a8fc9c
Add license
deliahu Feb 28, 2020
7efa210
Update avg request header
deliahu Feb 28, 2020
23fb6d6
Use background tasks and dedicated threadpool for requests
vishalbollu Feb 28, 2020
241ec7a
Merge branch 'fastapi' of github.com:cortexlabs/cortex into fastapi
vishalbollu Feb 28, 2020
14f09e5
Remove extra log
deliahu Feb 28, 2020
2fcd86d
Put latency metrics post request and use json-tricks if default json …
vishalbollu Feb 28, 2020
e8115d6
Merge branch 'fastapi' of github.com:cortexlabs/cortex into fastapi
vishalbollu Feb 28, 2020
d70e4bb
Add timestamp readiness probe
deliahu Feb 28, 2020
74f0799
Merge branch 'fastapi' of github.com:cortexlabs/cortex into fastapi
deliahu Feb 28, 2020
5130d9b
Write current time to api readiness file
deliahu Feb 28, 2020
e410f5f
Update liveness and readiness checks
deliahu Feb 29, 2020
e1c9453
Update OOM calculation
deliahu Feb 29, 2020
2294d08
Move request tracking file deletion to background
deliahu Feb 29, 2020
c1a0b51
Use request ID for request tracker files
deliahu Feb 29, 2020
e2268cd
Move request_start_time
deliahu Feb 29, 2020
f69e581
Fix lint
deliahu Feb 29, 2020
9705cfe
Remove unnecessary sed
vishalbollu Mar 1, 2020
5c6c47a
Revert compute spec in example
vishalbollu Mar 1, 2020
f70976e
Rename start_time to total_time in metrics calculation
vishalbollu Mar 1, 2020
7551540
Remove unecessary exception variable delcartions
vishalbollu Mar 1, 2020
1b432eb
Remove spacing in tf.requirements.txt
vishalbollu Mar 2, 2020
f7c1952
Reorder serve.py
vishalbollu Mar 2, 2020
df7d74b
Undo compute commit in image-classifier
vishalbollu Mar 2, 2020
743a3ea
Update liveness check
deliahu Mar 2, 2020
af807d5
Update serve.py
deliahu Mar 2, 2020
e948769
Misc cleanup
deliahu Mar 2, 2020
bd84de4
Merge branch 'master' of github.com:cortexlabs/cortex into fastapi
deliahu Mar 2, 2020
c238020
Update autoscaling.md
deliahu Mar 3, 2020
1362395
Update docs
deliahu Mar 3, 2020
9185612
Update autoscaling.md
deliahu Mar 3, 2020
3125261
Update autoscaling.md
deliahu Mar 3, 2020
301a4bf
Remove whitespace
deliahu Mar 3, 2020
764dd01
Update docs
deliahu Mar 3, 2020
5538e10
queue_length -> replica_concurrency
deliahu Mar 3, 2020
885433a
Check if MetricDataResults has atleast 1 element
vishalbollu Mar 3, 2020
2ca47a3
Move request tracking to critical path instead of background task
vishalbollu Mar 3, 2020
f1a0633
Update autoscaler logs
deliahu Mar 3, 2020
a9b5886
Update autoscaling.md
deliahu Mar 3, 2020
f417eef
Divide max_replica_concurrency by number of workers
vishalbollu Mar 4, 2020
19d2f38
Update autoscaling documentation
vishalbollu Mar 4, 2020
4f0241a
Update autoscaling.md
deliahu Mar 4, 2020
965b528
Set backlog to so_max_conn
vishalbollu Mar 4, 2020
058b252
Use rounding instead of integer divide to calculate CORTEX_MAX_WORKER…
vishalbollu Mar 4, 2020
4819973
Merge branch 'fastapi' of github.com:cortexlabs/cortex into fastapi
vishalbollu Mar 4, 2020
486bdbc
Rename pytorch sentiment-analyzer
deliahu Mar 4, 2020
1daedd9
Accept any free form json
vishalbollu Mar 4, 2020
3c7d983
Update autoscaling.md
vishalbollu Mar 4, 2020
1688a5f
Merge branch 'master' into fastapi
deliahu Mar 4, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func apiTable(apis []spec.API, statuses []status.Status, allMetrics []metrics.Me
{Title: "requested"},
{Title: "failed", Hidden: totalFailed == 0},
{Title: "last update"},
{Title: "avg inference"},
{Title: "avg request"},
{Title: "2XX"},
{Title: "4XX", Hidden: total4XX == 0},
{Title: "5XX", Hidden: total5XX == 0},
Expand Down
42 changes: 40 additions & 2 deletions docs/deployments/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,48 @@ _WARNING: you are on the master branch, please refer to the docs on the branch t

Cortex autoscales your web services based on your configuration.

## Replica Parallelism

* `workers_per_replica` (default: 1): Each replica runs a web server with `workers_per_replica` workers, each of which runs in it's own process. For APIs running with multiple CPUs per replica, using 1-3 workers per unit of CPU generally leads to optimal throughput. For example, if `cpu` is 2, a value between 2 and 6 `workers_per_replica` is reasonable. The optimal number will vary based on the workload and the CPU request for the API.

* `threads_per_worker` (default: 1): Each worker uses a thread pool of size `threads_per_worker` to process requests. For applications that are not CPU intensive such as high I/O (e.g. downloading files) or GPU-based inference, increasing the number of threads per worker can increase throughput. For CPU-bound applications such as running your model inference on a CPU, using 1 thread per worker is recommended to avoid unnecessary context switching. Some applications are not thread-safe, and therefore must be run with 1 thread per worker.

`workers_per_replica` * `threads_per_worker` represents the number of requests that your replica can work in parallel. For example, if `workers_per_replica` is 2 and `threads_per_worker` is 2, and the replica was hit with 5 concurrent requests, 4 would immediately begin to be processed, 1 would be waiting for a thread to become available, and the concurrency for the replica would be 5. If the replica was hit with 3 concurrent requests, all three would begin processing immediately, and the replica concurrency would be 3.

## Autoscaling Replicas

Cortex adjusts the number of replicas that are serving predictions by monitoring the compute resource usage of each API. The number of replicas will be at least `min_replicas` and no more than `max_replicas`.
* `min_replicas`: The lower bound on how many replicas can be running for an API.

* `max_replicas`: The upper bound on how many replicas can be running for an API.

* `target_replica_concurrency` (default: `workers_per_replica` * `threads_per_worker`): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions.

Replica concurrency is simply how many requests have been sent to a replica and have not yet been responded to (also referred to as in-flight requests). Therefore, it includes requests which are currently being processed and requests which are waiting in the replica's queue.

The autoscaler uses this formula to determine the number of desired replicas:

`desired replicas = sum(in-flight requests in each replica) / target_replica_concurrency`

For example, setting `target_replica_concurrency` to `workers_per_replica` * `threads_per_worker` (the default) causes the cluster to adjust the number of replicas so that on average, requests are immediately processed without waiting in a queue, and workers/threads are never idle.

* `max_replica_concurrency` (default: 1024): This is the maximum number of in-flight requests per replica before requests are rejected with HTTP error code 503. `max_replica_concurrency` includes requests that are currently being processed as well as requests that are waiting in the replica's queue (a replica can actively process `workers_per_replica` * `threads_per_worker` requests concurrently, and will hold any additional requests in a local queue). Decreasing `max_replica_concurrency` and configuring the client to retry when it receives 503 responses will improve queue fairness by preventing requests from sitting in long queues.

Note (if `workers_per_replica` > 1): Because requests are randomly assigned to workers within a replica (which leads to unbalanced worker queues), clients may receive 503 responses before reaching `max_replica_concurrency`. For example, if you set `workers_per_replica: 2` and `max_replica_concurrency: 100`, each worker will have a maximum queue length of 50 requests. If your replica receives 90 requests, there is a possibility that more than 50 requests are routed to 1 worker, therefore each additional request beyond the 50 requests are responded with a 503.

* `window` (default: 60s): The time over which to average the API wide in-flight requests (which is the sum of in-flight requests in each replica). The longer the window, the slower the autoscaler will react to changes in API wide in-flight requests, since it is averaged over the `window`. API wide in-flight requests is calculated every 10 seconds, so `window` must be a multiple of 10 seconds.

* `downscale_stabilization_period` (default: 5m): The API will not scale below the highest recommendation made during this period. Every 10 seconds, the autoscaler makes a recommendation based on all of the other configuration parameters described here. It will then take the max of the current recommendation and all recommendations made during the `downscale_stabilization_period`, and use that to determine the final number of replicas to scale to. Increasing this value will cause the cluster to react more slowly to decreased traffic, and will reduce thrashing.

* `upscale_stabilization_period` (default: 0m): The API will not scale above the lowest recommendation made during this period. Every 10 seconds, the autoscaler makes a recommendation based on all of the other configuration parameters described here. It will then take the min of the current recommendation and all recommendations made during the `upscale_stabilization_period`, and use that to determine the final number of replicas to scale to. Increasing this value will cause the cluster to react more slowly to increased traffic, and will reduce thrashing. The default is 0 minutes, which means that the cluster will react quickly to increased traffic.

* `max_downscale_factor` (default: 0.5): The maximum factor by which to scale down the API on a single scaling event. For example, if `max_downscale_factor` is 0.5 and there are 10 running replicas, the autoscaler will not recommend fewer than 5 replicas. Increasing this number will allow the cluster to shrink more quickly in response to dramatic dips in traffic.

* `max_upscale_factor` (default: 10): The maximum factor by which to scale up the API on a single scaling event. For example, if `max_upscale_factor` is 10 and there are 5 running replicas, the autoscaler will not recommend more than 50 replicas. Increasing this number will allow the cluster to grow more quickly in response to dramatic spikes in traffic.

* `downscale_tolerance` (default: 0.1): Any recommendation falling within this factor below the current number of replicas will not trigger a scale down event. For example, if `downscale_tolerance` is 0.1 and there are 20 running replicas, a recommendation of 18 or 19 replicas will not be acted on, and the API will remain at 20 replicas. Increasing this value will prevent thrashing, but setting it too high will prevent the cluster from maintaining it's optimal size.

* `upscale_tolerance` (default: 0.1): Any recommendation falling within this factor above the current number of replicas will not trigger a scale up event. For example, if `upscale_tolerance` is 0.1 and there are 20 running replicas, a recommendation of 21 or 22 replicas will not be acted on, and the API will remain at 20 replicas. Increasing this value will prevent thrashing, but setting it too high will prevent the cluster from maintaining it's optimal size.

## Autoscaling Nodes

Cortex spins up and down nodes based on the aggregate resource requests of all APIs. The number of nodes will be at least `min_instances` and no more than `max_instances` (configured during installation and modifiable via `cortex cluster update` or the [AWS console](https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-manual-scaling.html)).
Cortex spins up and down nodes based on the aggregate resource requests of all APIs. The number of nodes will be at least `min_instances` and no more than `max_instances` ([configured during installation](../cluster-management/config.md) and modifiable via `cortex cluster update`).
6 changes: 4 additions & 2 deletions docs/deployments/onnx.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ You can deploy ONNX models as web services by defining a class that implements C
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
workers_per_replica: <int> # the number of parallel serving workers to run on each replica (default: 1)
threads_per_worker: <int> # the number of threads per worker (default: 1)
target_queue_length: <float> # the desired queue length per replica (default: 0)
window: <duration> # the time over which to average the API's queue length (default: 60s)
target_replica_concurrency: <float> # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker)
max_replica_concurrency: <int> # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024)
window: <duration> # the time over which to average the API's concurrency (default: 60s)
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 0m)
max_downscale_factor: <float> # the maximum factor by which to scale down the API on a single scaling event (default: 0.5)
Expand Down Expand Up @@ -129,6 +130,7 @@ dill==0.3.1.1
msgpack==0.6.2
numpy==1.18.0
onnxruntime==1.1.0
pyyaml==5.3
requests==2.22.0
```

Expand Down
6 changes: 4 additions & 2 deletions docs/deployments/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ In addition to supporting Python models via the Python Predictor interface, Cort
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
workers_per_replica: <int> # the number of parallel serving workers to run on each replica (default: 1)
threads_per_worker: <int> # the number of threads per worker (default: 1)
target_queue_length: <float> # the desired queue length per replica (default: 0)
window: <duration> # the time over which to average the API's queue length (default: 60s)
target_replica_concurrency: <float> # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker)
max_replica_concurrency: <int> # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024)
window: <duration> # the time over which to average the API's concurrency (default: 60s)
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 0m)
max_downscale_factor: <float> # the maximum factor by which to scale down the API on a single scaling event (default: 0.5)
Expand Down Expand Up @@ -155,6 +156,7 @@ numpy==1.18.0
pandas==0.25.3
opencv-python==4.1.2.30
Pillow==6.2.1
pyyaml==5.3
requests==2.22.0
scikit-image==0.16.2
scikit-learn==0.22
Expand Down
6 changes: 4 additions & 2 deletions docs/deployments/tensorflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ You can deploy TensorFlow models as web services by defining a class that implem
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
workers_per_replica: <int> # the number of parallel serving workers to run on each replica (default: 1)
threads_per_worker: <int> # the number of threads per worker (default: 1)
target_queue_length: <float> # the desired queue length per replica (default: 0)
window: <duration> # the time over which to average the API's queue length (default: 60s)
target_replica_concurrency: <float> # the desired number of in-flight requests per replica, which the autoscaler tries to maintain (default: workers_per_replica * threads_per_worker)
max_replica_concurrency: <int> # the maximum number of in-flight requests per replica before requests are rejected with error code 503 (default: 1024)
window: <duration> # the time over which to average the API's concurrency (default: 60s)
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 0m)
max_downscale_factor: <float> # the maximum factor by which to scale down the API on a single scaling event (default: 0.5)
Expand Down Expand Up @@ -122,6 +123,7 @@ msgpack==0.6.2
numpy==1.18.0
requests==2.22.0
opencv-python==4.1.2.30
pyyaml==5.3
tensor2tensor==1.15.4
tensorflow-hub==0.7.0
tensorflow==2.1.0
Expand Down
2 changes: 1 addition & 1 deletion examples/pytorch/sentiment-analyzer/cortex.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# WARNING: you are on the master branch, please refer to the examples on the branch that matches your `cortex version`

- name: analyzer
- name: sentiment-analyzer
predictor:
type: python
path: predictor.py
Expand Down
2 changes: 1 addition & 1 deletion images/request-monitor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN GO111MODULE=on CGO_ENABLED=0 GOOS=linux go build -installsuffix cgo -o reque

FROM alpine:3.11

RUN apk --no-cache add ca-certificates bash iproute2
RUN apk --no-cache add ca-certificates bash

COPY --from=builder /go/src/github.com/cortexlabs/cortex/images/request-monitor/request-monitor /root/
RUN chmod +x /root/request-monitor
Expand Down
36 changes: 17 additions & 19 deletions images/request-monitor/request-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ limitations under the License.
package main

import (
"bytes"
"fmt"
"log"
"os"
"os/exec"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -79,14 +76,16 @@ func main() {
client = cloudwatch.New(sess)
requestCounter := Counter{}

os.OpenFile("/request_monitor_ready.txt", os.O_RDONLY|os.O_CREATE, 0666)

for {
if _, err := os.Stat("/mnt/health_check.txt"); err == nil {
if _, err := os.Stat("/mnt/api_readiness.txt"); err == nil {
break
} else if os.IsNotExist(err) {
fmt.Println("waiting...")
fmt.Println("waiting for replica to be ready...")
time.Sleep(_tickInterval)
} else {
log.Printf("error encountered while looking for /mnt/health_check.txt") // unexpected
log.Printf("error encountered while looking for /mnt/api_readiness.txt") // unexpected
time.Sleep(_tickInterval)
}
}
Expand Down Expand Up @@ -164,22 +163,21 @@ func publishStats(apiName string, counter *Counter, client *cloudwatch.CloudWatc
}
}

func updateOpenConnections(requestCounter *Counter, timer *time.Timer) {
cmd := exec.Command("ss")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
func getFileCount() int {
dir, err := os.Open("/mnt/requests")
if err != nil {
log.Fatal(err)
panic(err)
}

output := out.String()
count := 0
for _, str := range strings.Split(output, "\n") {
if strings.Contains(str, ":8888 ") && strings.Contains(str, "ESTAB") {
count++
}
defer dir.Close()
fileNames, err := dir.Readdirnames(0)
if err != nil {
panic(err)
}
return len(fileNames)
}

func updateOpenConnections(requestCounter *Counter, timer *time.Timer) {
count := getFileCount()
requestCounter.Append(count)
timer.Reset(_requestSampleInterval)
}
33 changes: 21 additions & 12 deletions pkg/lib/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package k8s
import (
"bytes"
"regexp"
"strings"
"time"

"github.com/cortexlabs/cortex/pkg/lib/errors"
Expand Down Expand Up @@ -157,18 +158,20 @@ func GetPodStatus(pod *kcore.Pod) PodStatus {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.LastTerminationState.Terminated != nil {
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
if exitCode == 137 {
return PodStatusKilledOOM
}
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
return PodStatusKilledOOM
}
return PodStatusKilled
}
} else if containerStatus.State.Terminated != nil {
exitCode := containerStatus.State.Terminated.ExitCode
if exitCode == 137 {
return PodStatusKilledOOM
}
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
if _killStatuses[exitCode] {
if strings.Contains(reason, "oom") {
return PodStatusKilledOOM
}
return PodStatusKilled
}
}
Expand Down Expand Up @@ -200,23 +203,29 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P
numRunning++
} else if containerStatus.State.Terminated != nil {
exitCode := containerStatus.State.Terminated.ExitCode
reason := strings.ToLower(containerStatus.State.Terminated.Reason)
if exitCode == 0 {
numSucceeded++
} else if exitCode == 137 {
numKilledOOM++
} else if _killStatuses[exitCode] {
numKilled++
if strings.Contains(reason, "oom") {
numKilledOOM++
} else {
numKilled++
}
} else {
numFailed++
}
} else if containerStatus.LastTerminationState.Terminated != nil {
exitCode := containerStatus.LastTerminationState.Terminated.ExitCode
reason := strings.ToLower(containerStatus.LastTerminationState.Terminated.Reason)
if exitCode == 0 {
numSucceeded++
} else if exitCode == 137 {
numKilledOOM++
} else if _killStatuses[exitCode] {
numKilled++
if strings.Contains(reason, "oom") {
numKilledOOM++
} else {
numKilled++
}
} else {
numFailed++
}
Expand Down
Loading