diff --git a/cmd/dequeuer/main.go b/cmd/dequeuer/main.go index f1707258ef..8bd6a1641d 100644 --- a/cmd/dequeuer/main.go +++ b/cmd/dequeuer/main.go @@ -50,6 +50,7 @@ func main() { statsdAddress string apiKind string adminPort int + workers int ) flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path") flag.StringVar(&clusterUID, "cluster-uid", "", "cluster unique identifier") @@ -61,6 +62,7 @@ func main() { flag.StringVar(&statsdAddress, "statsd-address", "", "address to push statsd metrics") flag.IntVar(&userContainerPort, "user-port", 8080, "target port to which the dequeued messages will be sent to") flag.IntVar(&adminPort, "admin-port", 0, "port where the admin server (for the probes) will be exposed") + flag.IntVar(&workers, "workers", 1, "number of workers pulling from the queue") flag.Parse() @@ -166,6 +168,7 @@ func main() { Region: clusterConfig.Region, QueueURL: queueURL, StopIfNoMessages: true, + Workers: workers, } case userconfig.AsyncAPIKind.String(): @@ -186,6 +189,7 @@ func main() { Region: clusterConfig.Region, QueueURL: queueURL, StopIfNoMessages: false, + Workers: workers, } // report prometheus metrics for async api kinds diff --git a/docs/workloads/async/autoscaling.md b/docs/workloads/async/autoscaling.md index 036f675dc2..86eb969c22 100644 --- a/docs/workloads/async/autoscaling.md +++ b/docs/workloads/async/autoscaling.md @@ -4,6 +4,14 @@ Cortex auto-scales AsyncAPIs on a per-API basis based on your configuration. ## Autoscaling replicas +### Relevant pod configuration + +In addition to the autoscaling configuration options (described below), there is one field in the pod configuration which is relevant to replica autoscaling: + +**`max_concurrency`** (default: 1): The maximum number of requests that will be concurrently sent into the container by Cortex. If your web server is designed to handle multiple concurrent requests, increasing `max_concurrency` will increase the throughput of a replica (and result in fewer total replicas for a given load). + +
+ ### Autoscaling configuration **`min_replicas`** (default: 1): The lower bound on how many replicas can be running for an API. Scale-to-zero is supported. @@ -14,13 +22,13 @@ Cortex auto-scales AsyncAPIs on a per-API basis based on your configuration.
-**`target_in_flight`** (default: 1): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. The number of in-flight requests is simply how many requests have been submitted and are not yet finished being processed. Therefore, this number includes requests which are actively being processed as well as requests which are waiting in the queue. +**`target_in_flight`** (default: `max_concurrency` in the pod configuration): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. The number of in-flight requests is simply how many requests have been submitted and are not yet finished being processed. Therefore, this number includes requests which are actively being processed as well as requests which are waiting in the queue. The autoscaler uses this formula to determine the number of desired replicas: `desired replicas = total in-flight requests / target_in_flight` -For example, setting `target_in_flight` to 1 (the default) causes the cluster to adjust the number of replicas so that on average, there are no requests waiting in the queue. +For example, setting `target_in_flight` to `max_concurrency` (the default) causes the cluster to adjust the number of replicas so that on average, there are no requests waiting in the queue.
@@ -58,9 +66,9 @@ Cortex spins up and down instances based on the aggregate resource requests of a ## Overprovisioning -The default value for `target_in_flight` is 1, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity). +The default value for `target_in_flight` is `max_concurrency`, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value relative to the expected replica's concurrency. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity). -For example, if you wanted to overprovision by 25%, you could set `target_in_flight` to 0.8. If your API has an average of 4 concurrent requests, the autoscaler would maintain 5 live replicas (4/0.8 = 5). +For example, if you've determined that each replica in your API can efficiently handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5). ## Autoscaling responsiveness diff --git a/docs/workloads/async/configuration.md b/docs/workloads/async/configuration.md index 2ff686820f..f8ba4a4930 100644 --- a/docs/workloads/async/configuration.md +++ b/docs/workloads/async/configuration.md @@ -5,6 +5,7 @@ kind: AsyncAPI # must be "AsyncAPI" for async APIs (required) pod: # pod configuration (required) port: # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT) + max_concurrency: # maximum number of requests that will be concurrently sent into the container (default: 1, max allowed: 100) containers: # configurations for the containers to run (at least one constainer must be provided) - name: # name of the container (required) image: # docker image to use for the container (required) @@ -45,7 +46,7 @@ min_replicas: # minimum number of replicas (default: 1; min value: 0) max_replicas: # maximum number of replicas (default: 100) init_replicas: # initial number of replicas (default: ) - target_in_flight: # desired number of in-flight requests per replica (including requests actively being processed as well as queued), which the autoscaler tries to maintain (default: 1) + target_in_flight: # desired number of in-flight requests per replica (including requests actively being processed as well as queued), which the autoscaler tries to maintain (default: ) window: # duration over which to average the API's in-flight requests per replica (default: 60s) downscale_stabilization_period: # the API will not scale below the highest recommendation made during this period (default: 5m) upscale_stabilization_period: # the API will not scale above the lowest recommendation made during this period (default: 1m) diff --git a/docs/workloads/realtime/autoscaling.md b/docs/workloads/realtime/autoscaling.md index cd38639875..843f6bc09b 100644 --- a/docs/workloads/realtime/autoscaling.md +++ b/docs/workloads/realtime/autoscaling.md @@ -72,7 +72,7 @@ Cortex spins up and down instances based on the aggregate resource requests of a The default value for `target_in_flight` is `max_concurrency`, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value relative to the expected replica's concurrency. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity). -For example, if you've determined that each replica in your API can handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5). +For example, if you've determined that each replica in your API can efficiently handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5). ## Autoscaling responsiveness diff --git a/pkg/dequeuer/batch_handler.go b/pkg/dequeuer/batch_handler.go index f2c9e9d28b..ccf19fcf7b 100644 --- a/pkg/dequeuer/batch_handler.go +++ b/pkg/dequeuer/batch_handler.go @@ -158,7 +158,7 @@ func (h *BatchMessageHandler) handleBatch(message *sqs.Message) error { return nil } - endTime := time.Now().Sub(startTime) + endTime := time.Since(startTime) err = h.recordSuccess() if err != nil { @@ -175,7 +175,7 @@ func (h *BatchMessageHandler) handleBatch(message *sqs.Message) error { func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error { shouldRunOnJobComplete := false h.log.Info("received job_complete message") - for true { + for { queueAttributes, err := GetQueueAttributes(h.aws, h.config.QueueURL) if err != nil { return err @@ -223,8 +223,6 @@ func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error { time.Sleep(h.jobCompleteMessageDelay) } - - return nil } func isOnJobCompleteMessage(message *sqs.Message) bool { diff --git a/pkg/dequeuer/dequeuer.go b/pkg/dequeuer/dequeuer.go index 16d3237652..beb42d0f1e 100644 --- a/pkg/dequeuer/dequeuer.go +++ b/pkg/dequeuer/dequeuer.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/math" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "go.uber.org/zap" ) @@ -40,6 +41,7 @@ type SQSDequeuerConfig struct { Region string QueueURL string StopIfNoMessages bool + Workers int } type SQSDequeuer struct { @@ -96,12 +98,37 @@ func (d *SQSDequeuer) ReceiveMessage() (*sqs.Message, error) { } func (d *SQSDequeuer) Start(messageHandler MessageHandler, readinessProbeFunc func() bool) error { + numWorkers := math.MaxInt(d.config.Workers, 1) + + d.log.Infof("Starting %d workers", numWorkers) + errCh := make(chan error) + doneChs := make([]chan struct{}, d.config.Workers) + for i := 0; i < numWorkers; i++ { + doneChs[i] = make(chan struct{}) + go func(i int) { + errCh <- d.worker(messageHandler, readinessProbeFunc, doneChs[i]) + }(i) + } + + select { + case err := <-errCh: + return err + case <-d.done: + for _, doneCh := range doneChs { + doneCh <- struct{}{} + } + } + + return nil +} + +func (d SQSDequeuer) worker(messageHandler MessageHandler, readinessProbeFunc func() bool, workerDone chan struct{}) error { noMessagesInPreviousIteration := false loop: for { select { - case <-d.done: + case <-workerDone: break loop default: if !readinessProbeFunc() { @@ -134,8 +161,8 @@ loop: noMessagesInPreviousIteration = false receiptHandle := *message.ReceiptHandle - done := d.StartMessageRenewer(receiptHandle) - err = d.handleMessage(message, messageHandler, done) + renewerDone := d.StartMessageRenewer(receiptHandle) + err = d.handleMessage(message, messageHandler, renewerDone) if err != nil { d.log.Error(err) telemetry.Error(err) @@ -196,7 +223,7 @@ func (d *SQSDequeuer) StartMessageRenewer(receiptHandle string) chan struct{} { startTime := time.Now() go func() { defer ticker.Stop() - for true { + for { select { case <-done: return diff --git a/pkg/dequeuer/dequeuer_test.go b/pkg/dequeuer/dequeuer_test.go index e23a6fa41c..87af744b34 100644 --- a/pkg/dequeuer/dequeuer_test.go +++ b/pkg/dequeuer/dequeuer_test.go @@ -31,6 +31,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/random" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/ory/dockertest/v3" dc "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/require" @@ -179,6 +180,7 @@ func TestSQSDequeuer_ReceiveMessage(t *testing.T) { Region: _localStackDefaultRegion, QueueURL: queueURL, StopIfNoMessages: true, + Workers: 1, }, awsClient, logger, ) require.NoError(t, err) @@ -205,6 +207,7 @@ func TestSQSDequeuer_StartMessageRenewer(t *testing.T) { Region: _localStackDefaultRegion, QueueURL: queueURL, StopIfNoMessages: true, + Workers: 1, }, awsClient, logger, ) require.NoError(t, err) @@ -253,6 +256,7 @@ func TestSQSDequeuerTerminationOnEmptyQueue(t *testing.T) { Region: _localStackDefaultRegion, QueueURL: queueURL, StopIfNoMessages: true, + Workers: 1, }, awsClient, logger, ) require.NoError(t, err) @@ -303,6 +307,7 @@ func TestSQSDequeuer_Shutdown(t *testing.T) { Region: _localStackDefaultRegion, QueueURL: queueURL, StopIfNoMessages: true, + Workers: 1, }, awsClient, logger, ) require.NoError(t, err) @@ -345,6 +350,7 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) { Region: _localStackDefaultRegion, QueueURL: queueURL, StopIfNoMessages: true, + Workers: 1, }, awsClient, logger, ) require.NoError(t, err) @@ -383,3 +389,72 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) { return msg != nil }, 5*time.Second, time.Second) } + +func TestSQSDequeuer_MultipleWorkers(t *testing.T) { + t.Parallel() + + awsClient := testAWSClient(t) + queueURL := createQueue(t, awsClient) + + numMessages := 3 + expectedMsgs := make([]string, numMessages) + for i := 0; i < numMessages; i++ { + message := fmt.Sprintf("%d", i) + expectedMsgs[i] = message + _, err := awsClient.SQS().SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(message), + MessageDeduplicationId: aws.String(message), + MessageGroupId: aws.String(message), + QueueUrl: aws.String(queueURL), + }) + require.NoError(t, err) + } + + logger := newLogger(t) + defer func() { _ = logger.Sync() }() + + dq, err := NewSQSDequeuer( + SQSDequeuerConfig{ + Region: _localStackDefaultRegion, + QueueURL: queueURL, + StopIfNoMessages: true, + Workers: numMessages, + }, awsClient, logger, + ) + require.NoError(t, err) + + dq.waitTimeSeconds = aws.Int64(0) + dq.notFoundSleepTime = 0 + + msgCh := make(chan string, numMessages) + handler := NewMessageHandlerFunc( + func(message *sqs.Message) error { + msgCh <- *message.Body + return nil + }, + ) + + errCh := make(chan error) + go func() { + errCh <- dq.Start(handler, func() bool { return true }) + }() + + receivedMessages := make([]string, numMessages) + for i := 0; i < numMessages; i++ { + receivedMessages[i] = <-msgCh + } + dq.Shutdown() + + // timeout test after 10 seconds + time.AfterFunc(10*time.Second, func() { + close(msgCh) + errCh <- errors.New("test timed out") + }) + + require.Len(t, receivedMessages, numMessages) + + set := strset.FromSlice(receivedMessages) + require.True(t, set.Has(expectedMsgs...)) + + require.NoError(t, <-errCh) +} diff --git a/pkg/dequeuer/probes.go b/pkg/dequeuer/probes.go index 294249f8ed..601fd4e971 100644 --- a/pkg/dequeuer/probes.go +++ b/pkg/dequeuer/probes.go @@ -35,10 +35,12 @@ func ProbesFromFile(probesPath string, logger *zap.SugaredLogger) ([]*probe.Prob return nil, err } - var probesSlice []*probe.Probe + probesSlice := make([]*probe.Probe, len(probesMap)) + var i int for _, p := range probesMap { auxProbe := p - probesSlice = append(probesSlice, probe.NewProbe(&auxProbe, logger)) + probesSlice[i] = probe.NewProbe(&auxProbe, logger) + i++ } return probesSlice, nil } diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 71cce373ba..aac2fdeed7 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -189,6 +189,19 @@ func podValidation(kind userconfig.Kind) *cr.StructFieldValidation { ) } + if kind == userconfig.AsyncAPIKind { + validation.StructValidation.StructFieldValidations = append(validation.StructValidation.StructFieldValidations, + &cr.StructFieldValidation{ + StructField: "MaxConcurrency", + Int64Validation: &cr.Int64Validation{ + Default: consts.DefaultMaxConcurrency, + GreaterThan: pointer.Int64(0), + LessThanOrEqualTo: pointer.Int64(100), + }, + }, + ) + } + return validation } @@ -818,7 +831,7 @@ func validateAutoscaling(api *userconfig.API) error { if api.Kind == userconfig.AsyncAPIKind { if autoscaling.TargetInFlight == nil { - autoscaling.TargetInFlight = pointer.Float64(1) + autoscaling.TargetInFlight = pointer.Float64(float64(pod.MaxConcurrency)) } } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index c524c599e0..92bea39467 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -164,6 +164,10 @@ func (api *API) ToK8sAnnotations() map[string]string { annotations[MaxQueueLengthAnnotationKey] = s.Int64(api.Pod.MaxQueueLength) } + if api.Pod != nil && api.Kind == AsyncAPIKind { + annotations[MaxConcurrencyAnnotationKey] = s.Int64(api.Pod.MaxConcurrency) + } + if api.Networking != nil { annotations[EndpointAnnotationKey] = *api.Networking.Endpoint } @@ -339,6 +343,10 @@ func (pod *Pod) UserStr(kind Kind) string { sb.WriteString(fmt.Sprintf("%s: %s\n", MaxQueueLengthKey, s.Int64(pod.MaxQueueLength))) } + if kind == AsyncAPIKind { + sb.WriteString(fmt.Sprintf("%s: %s\n", MaxConcurrencyKey, s.Int64(pod.MaxConcurrency))) + } + sb.WriteString(fmt.Sprintf("%s:\n", ContainersKey)) for _, container := range pod.Containers { containerUserStr := s.Indent(container.UserStr(), " ") diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 0523c8cbfe..9f5c2f3b8b 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -133,6 +133,7 @@ func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container "--statsd-address", _statsdAddress, "--user-port", s.Int32(*api.Pod.Port), "--admin-port", consts.AdminPortStr, + "--workers", s.Int64(api.Pod.MaxConcurrency), }, Env: BaseEnvVars, EnvFrom: BaseClusterEnvVars(), @@ -366,8 +367,8 @@ func userPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { ClientConfigMount(), } - var containers []kcore.Container - for _, container := range api.Pod.Containers { + containers := make([]kcore.Container, len(api.Pod.Containers)) + for i, container := range api.Pod.Containers { containerResourceList := kcore.ResourceList{} containerResourceLimitsList := kcore.ResourceList{} securityContext := kcore.SecurityContext{ @@ -433,7 +434,7 @@ func userPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { }) } - containers = append(containers, kcore.Container{ + containers[i] = kcore.Container{ Name: container.Name, Image: container.Image, Command: container.Command, @@ -448,7 +449,7 @@ func userPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { }, ImagePullPolicy: kcore.PullAlways, SecurityContext: &securityContext, - }) + } } return containers, volumes @@ -498,10 +499,9 @@ func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { nodeGroups = config.ClusterConfig.NodeGroups } - var requiredNodeGroups []string - var preferredAffinities []kcore.PreferredSchedulingTerm - - for _, nodeGroup := range nodeGroups { + requiredNodeGroups := make([]string, len(nodeGroups)) + preferredAffinities := make([]kcore.PreferredSchedulingTerm, len(nodeGroups)) + for i, nodeGroup := range nodeGroups { var nodeGroupPrefix string if nodeGroup.Spot { nodeGroupPrefix = "cx-ws-" @@ -509,7 +509,7 @@ func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { nodeGroupPrefix = "cx-wd-" } - preferredAffinities = append(preferredAffinities, kcore.PreferredSchedulingTerm{ + preferredAffinities[i] = kcore.PreferredSchedulingTerm{ Weight: int32(nodeGroup.Priority), Preference: kcore.NodeSelectorTerm{ MatchExpressions: []kcore.NodeSelectorRequirement{ @@ -520,8 +520,9 @@ func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { }, }, }, - }) - requiredNodeGroups = append(requiredNodeGroups, nodeGroupPrefix+nodeGroup.Name) + } + + requiredNodeGroups[i] = nodeGroupPrefix + nodeGroup.Name } var requiredNodeSelector *kcore.NodeSelector