Skip to content

Commit

Permalink
Revert "Merge pull request #19 from gadget-inc/sc/dont-429"
Browse files Browse the repository at this point in the history
This reverts commit 5f1f05b, reversing
changes made to fae2052.
  • Loading branch information
scott-rc committed Aug 10, 2023
1 parent 37e7804 commit a6ba481
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 111 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/executortype/poolmgr/gp.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac

gp.fsCache.PodToFsvc.Store(pod.GetObjectMeta().GetName(), fsvc)
gp.podFSVCMap.Store(pod.ObjectMeta.Name, []interface{}{crd.CacheKey(fsvc.Function), fsvc.Address})
gp.fsCache.AddFunc(ctx, *fsvc, fn.GetRequestPerPod(), fn.GetConcurrency())
gp.fsCache.AddFunc(ctx, *fsvc, fn.GetRequestPerPod())

logger.Info("added function service",
zap.String("pod", pod.ObjectMeta.Name),
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/fscache/functionServiceCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (fsc *FunctionServiceCache) GetByFunctionUID(uid types.UID) (*FuncSvc, erro
}

// AddFunc adds a function service to pool cache.
func (fsc *FunctionServiceCache) AddFunc(ctx context.Context, fsvc FuncSvc, requestsPerPod, concurrency int) {
fsc.connFunctionCache.SetSvcValue(ctx, crd.CacheKey(fsvc.Function), fsvc.Address, &fsvc, fsvc.CPULimit, requestsPerPod, concurrency)
func (fsc *FunctionServiceCache) AddFunc(ctx context.Context, fsvc FuncSvc, requestsPerPod int) {
fsc.connFunctionCache.SetSvcValue(ctx, crd.CacheKey(fsvc.Function), fsvc.Address, &fsvc, fsvc.CPULimit, requestsPerPod)
now := time.Now()
fsvc.Ctime = now
fsvc.Atime = now
Expand Down
125 changes: 17 additions & 108 deletions pkg/executor/fscache/poolcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"io"
"math/rand"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -46,15 +45,15 @@ const (
type (
funcSvcInfo struct {
val *FuncSvc
activeRequests int // number of requests currently being sent to the pod (can be greater than requestsPerPod if max concurrency is reached)
activeRequests int // number of requests served by function pod
currentCPUUsage resource.Quantity // current cpu usage of the specialized function pod
cpuLimit resource.Quantity // if currentCPUUsage is more than cpuLimit cache miss occurs in getValue request
}

funcSvcGroup struct {
svcWaiting int // number of requests waiting for a pod to finish specializing
svcs map[string]*funcSvcInfo // map of pod ip -> specialized pod
queue *Queue // requests waiting for a pod to finish specializing (does not include the first request that caused the pod to specialize)
svcWaiting int
svcs map[string]*funcSvcInfo
queue *Queue
}

// PoolCache implements a simple cache implementation having values mapped by two keys [function][address].
Expand All @@ -72,10 +71,10 @@ type (
address string
dumpWriter io.Writer
value *FuncSvc
requestsPerPod int // number of requests to send to a single pod before specializing another one
requestsPerPod int
cpuUsage resource.Quantity
responseChannel chan *response
concurrency int // maximum number of pods that can be specialized for this function
concurrency int
}
response struct {
error
Expand Down Expand Up @@ -116,16 +115,13 @@ func (c *PoolCache) service() {
case getValue:
funcSvcGroup, ok := c.cache[req.function]
if !ok {
// this function has never been specialized before, return not found so that the executor specializes a pod
c.cache[req.function] = NewFuncSvcGroup()
c.cache[req.function].svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound,
fmt.Sprintf("function Name '%v' not found", req.function))
req.responseChannel <- resp
continue
}

// this function has been specialized before, check if there's a specialized pod with room for another request
found := false
totalActiveRequests := 0
for addr := range funcSvcGroup.svcs {
Expand All @@ -142,21 +138,13 @@ func (c *PoolCache) service() {
break
}
}

if found {
// we found a specialized pod with room for another request, use it
req.responseChannel <- resp
continue
}

// none of our specialized pods have room for another request
// check if we're already specializing a pod, and if we are, check if it has room for another request
numPodsSpecializing := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()
maxRequestsForSpecializingPods := numPodsSpecializing * req.requestsPerPod

if maxRequestsForSpecializingPods-funcSvcGroup.svcWaiting > 0 {
// we're already specializing a pod and it has room for another request
// increment the number of requests waiting for a pod to finish specializing and push this request onto the queue
specializationInProgress := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()
capacity := ((specializationInProgress + len(funcSvcGroup.svcs)) * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting)
if capacity > 0 {
funcSvcGroup.svcWaiting++
svcWait := &svcWait{
svcChannel: make(chan *FuncSvc),
Expand All @@ -168,55 +156,14 @@ func (c *PoolCache) service() {
continue
}

numPodsSpecialized := len(funcSvcGroup.svcs)

// we're either not specializing a pod, or we are and none of them have room for another request
// check if we can specialize another pod
if req.concurrency > 0 && numPodsSpecialized+numPodsSpecializing >= req.concurrency {
// we can't specialize another pod because our specialized pods + specializing pods is >= this function's concurrency limit
// check if we have any pods that are already specialized
if numPodsSpecialized > 0 {
// we have specialized pod(s), just pick one at random and use it for this request
svc := randomSvc(funcSvcGroup.svcs)
otelUtils.LoggerWithTraceID(req.ctx, c.logger).
Info("max concurrency reached, sending request to random pod",
zap.String("function", req.function),
zap.String("address", svc.val.Address),
zap.Int("active_requests", svc.activeRequests),
zap.Int("svc_waiting", funcSvcGroup.svcWaiting),
zap.Int("queue_len", funcSvcGroup.queue.Len()),
)

svc.activeRequests++
resp.value = svc.val
req.responseChannel <- resp
continue
} else {
// we don't have any specialized pods yet, they're all still specializing
// increment the number of requests waiting for a pod to finish specializing and push this request onto the queue
otelUtils.LoggerWithTraceID(req.ctx, c.logger).
Info("max concurrency reached, queuing request for specializing pod",
zap.String("function", req.function),
zap.Int("svc_waiting", funcSvcGroup.svcWaiting),
zap.Int("queue_len", funcSvcGroup.queue.Len()),
)

funcSvcGroup.svcWaiting++
svcWait := &svcWait{
svcChannel: make(chan *FuncSvc),
ctx: req.ctx,
}
resp.svcWaitValue = svcWait
funcSvcGroup.queue.Push(svcWait)
req.responseChannel <- resp
continue
}
// concurrency should not be set to zero and
//sum of specialization in progress and specialized pods should be less then req.concurrency
if req.concurrency > 0 && (specializationInProgress+len(funcSvcGroup.svcs)) >= req.concurrency {
resp.error = ferror.MakeError(ferror.ErrorTooManyRequests, fmt.Sprintf("function '%s' concurrency '%d' limit reached.", req.function, req.concurrency))
} else {
funcSvcGroup.svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' all functions are busy", req.function))
}

// we can specialize another pod
// increment the number of requests waiting for a pod to finish specializing and return not found so that the executor specializes one
funcSvcGroup.svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' all functions are busy", req.function))
req.responseChannel <- resp
case setValue:
if _, ok := c.cache[req.function]; !ok {
Expand All @@ -226,10 +173,8 @@ func (c *PoolCache) service() {
c.cache[req.function].svcs[req.address] = &funcSvcInfo{}
}
c.cache[req.function].svcs[req.address].val = req.value
// we increment the active requests here because the pod that was just set is being used to serve the request that caused it to specialize
c.cache[req.function].svcs[req.address].activeRequests++
if c.cache[req.function].svcWaiting > 0 {
// we decrement for same reason above, the request that was waiting for the pod to finish specializing is now being served
c.cache[req.function].svcWaiting--
svcCapacity := req.requestsPerPod - c.cache[req.function].svcs[req.address].activeRequests
queueLen := c.cache[req.function].queue.Len()
Expand All @@ -249,29 +194,6 @@ func (c *PoolCache) service() {
close(popped.svcChannel)
c.cache[req.function].svcWaiting--
}
if len(c.cache[req.function].svcs) == req.concurrency {
// this is the last pod this function will specialize, make sure all the queued requests are served
otelUtils.LoggerWithTraceID(req.ctx, c.logger).
Info("max concurrency reached, sending queued requests to random pods",
zap.String("function", req.function),
zap.Int("svc_waiting", c.cache[req.function].svcWaiting),
zap.Int("queue_len", c.cache[req.function].queue.Len()),
)

for {
popped := c.cache[req.function].queue.Pop()
if popped == nil {
break
}
if popped.ctx.Err() == nil {
svc := randomSvc(c.cache[req.function].svcs)
popped.svcChannel <- svc.val
svc.activeRequests++
}
close(popped.svcChannel)
c.cache[req.function].svcWaiting--
}
}
}
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
Expand Down Expand Up @@ -412,7 +334,7 @@ func (c *PoolCache) ListAvailableValue() []*FuncSvc {
}

// SetValue marks the value at key [function][address] as active(begin used)
func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, value *FuncSvc, cpuLimit resource.Quantity, requestsPerPod, concurrency int) {
func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, value *FuncSvc, cpuLimit resource.Quantity, requestsPerPod int) {
respChannel := make(chan *response)
c.requestChannel <- &request{
ctx: ctx,
Expand All @@ -423,7 +345,6 @@ func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, v
cpuUsage: cpuLimit,
requestsPerPod: requestsPerPod,
responseChannel: respChannel,
concurrency: concurrency,
}
}

Expand Down Expand Up @@ -482,15 +403,3 @@ func (c *PoolCache) LogFnSvcGroup(ctx context.Context, file io.Writer) error {
resp := <-respChannel
return resp.error
}

func randomSvc(svcs map[string]*funcSvcInfo) *funcSvcInfo {
i := 0
iToChoose := rand.Intn(len(svcs))
for _, svc := range svcs {
if i == iToChoose {
return svc
}
i++
}
panic("unreachable")
}

0 comments on commit a6ba481

Please sign in to comment.