Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring concurrency in scaler's pending HTTP queue fetcher logic #291

Merged
merged 14 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion operator/controllers/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestPingInterceptors(t *testing.T) {
r.NoError(err)
defer srv.Close()
ctx := context.Background()
endpoints := k8s.FakeEndpointsForURL(url, ns, svcName, 2)
endpoints, err := k8s.FakeEndpointsForURL(url, ns, svcName, 2)
r.NoError(err)
cl := fake.NewClientBuilder().WithObjects(endpoints).Build()
r.NoError(pingInterceptors(
ctx,
Expand Down
31 changes: 28 additions & 3 deletions pkg/k8s/fake_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package k8s

import (
"net/url"
"strconv"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,13 +17,36 @@ func FakeEndpointsForURL(
namespace,
name string,
num int,
) *v1.Endpoints {
addrs := make([]v1.EndpointAddress, num)
) (*v1.Endpoints, error) {
urls := make([]*url.URL, num)
for i := 0; i < num; i++ {
urls[i] = u
}
return FakeEndpointsForURLs(urls, namespace, name)
}

// FakeEndpointsForURLs creates and returns a new
// *v1.Endpoints with a single v1.EndpointSubset in it
// that has each url in the urls parameter in it.
func FakeEndpointsForURLs(
urls []*url.URL,
namespace,
name string,
) (*v1.Endpoints, error) {
addrs := make([]v1.EndpointAddress, len(urls))
ports := make([]v1.EndpointPort, len(urls))
for i, u := range urls {
addrs[i] = v1.EndpointAddress{
Hostname: u.Hostname(),
IP: u.Hostname(),
}
portInt, err := strconv.Atoi(u.Port())
if err != nil {
return nil, err
}
ports[i] = v1.EndpointPort{
Port: int32(portInt),
}
}
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -32,7 +56,8 @@ func FakeEndpointsForURL(
Subsets: []v1.EndpointSubset{
{
Addresses: addrs,
Ports: ports,
},
},
}
}, nil
}
9 changes: 9 additions & 0 deletions pkg/queue/queue_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ func NewCounts() *Counts {
}
}

// Aggregate returns the total count across all hosts
func (q *Counts) Aggregate() int {
agg := 0
for _, count := range q.Counts {
agg += count
}
return agg
}

// MarshalJSON implements json.Marshaler
func (q *Counts) MarshalJSON() ([]byte, error) {
return json.Marshal(q.Counts)
Expand Down
23 changes: 23 additions & 0 deletions pkg/queue/queue_counts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package queue

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestAggregate(t *testing.T) {
r := require.New(t)
counts := NewCounts()
counts.Counts = map[string]int{
"host1": 123,
"host2": 234,
"host3": 456,
"host4": 567,
}
expectedAgg := 0
for _, v := range counts.Counts {
expectedAgg += v
}
r.Equal(expectedAgg, counts.Aggregate())
}
2 changes: 2 additions & 0 deletions scaler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type config struct {
// UpdateRoutingTableDur is the duration between manual
// updates to the routing table.
UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"`
// QueueTickDuration is the duration between queue requests
QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"`
// This will be the 'Target Pending Requests' for the interceptor
TargetPendingRequestsInterceptor int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS_INTERCEPTOR" default:"100"`
}
Expand Down
174 changes: 169 additions & 5 deletions scaler/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestIsActive(t *testing.T) {
ctx := context.Background()
lggr := logr.Discard()
table := routing.NewTable()
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
pinger.pingMut.Lock()
pinger.allCounts[host] = 0
Expand Down Expand Up @@ -70,6 +71,36 @@ func TestGetMetricSpec(t *testing.T) {
target = int64(200)
)
ctx := context.Background()
// <<<<<<< HEAD
// lggr := logr.Discard()
// table := routing.NewTable()
// table.AddTarget(host, routing.NewTarget(
// "testsrv",
// 8080,
// "testdepl",
// int32(target),
// ))
// ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
// r.NoError(err)
// defer ticker.Stop()
// hdl := newImpl(lggr, pinger, table, 123, 200)
// meta := map[string]string{
// "host": host,
// "targetPendingRequests": strconv.Itoa(int(target)),
// }
// ref := &externalscaler.ScaledObjectRef{
// ScalerMetadata: meta,
// }
// ret, err := hdl.GetMetricSpec(ctx, ref)
// r.NoError(err)
// r.NotNil(ret)
// r.Equal(1, len(ret.MetricSpecs))
// spec := ret.MetricSpecs[0]
// r.Equal(host, spec.MetricName)
// r.Equal(target, spec.TargetSize)
// }
// =======
// >>>>>>> 30fb204671f165b0a251a0e50634472d2a86960d

type testCase struct {
name string
Expand All @@ -79,6 +110,24 @@ func TestGetMetricSpec(t *testing.T) {
newRoutingTableFn func() *routing.Table
checker func(*testing.T, *externalscaler.GetMetricSpecResponse, error)
}
// <<<<<<< HEAD
// table := routing.NewTable()
// ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
// r.NoError(err)
// defer ticker.Stop()
// hdl := newImpl(lggr, pinger, table, 123, 200)

// // no 'host' in the ScalerObjectRef's metadata field
// res, err := hdl.GetMetrics(ctx, req)
// r.Error(err)
// r.Nil(res)
// r.Contains(
// err.Error(),
// "no 'host' field found in ScaledObject metadata",
// )
// }
// =======
// >>>>>>> 30fb204671f165b0a251a0e50634472d2a86960d

cases := []testCase{
{
Expand Down Expand Up @@ -141,6 +190,17 @@ func TestGetMetricSpec(t *testing.T) {
},
}

// <<<<<<< HEAD
// table := routing.NewTable()
// ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
// r.NoError(err)

// defer ticker.Stop()
// hdl := newImpl(lggr, pinger, table, 123, 200)

// req := &externalscaler.GetMetricsRequest{
// ScaledObjectRef: &externalscaler.ScaledObjectRef{},
// =======
for i, c := range cases {
testName := fmt.Sprintf("test case #%d: %s", i, c.name)
// capture tc in scope so that we can run the below test
Expand All @@ -150,7 +210,13 @@ func TestGetMetricSpec(t *testing.T) {
t.Parallel()
lggr := logr.Discard()
table := testCase.newRoutingTableFn()
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
if err != nil {
t.Fatalf(
"error creating new fake queue pinger and related components: %s",
err,
)
}
defer ticker.Stop()
hdl := newImpl(
lggr,
Expand All @@ -165,6 +231,7 @@ func TestGetMetricSpec(t *testing.T) {
ret, err := hdl.GetMetricSpec(ctx, &scaledObjectRef)
testCase.checker(t, ret, err)
})
// >>>>>>> 30fb204671f165b0a251a0e50634472d2a86960d
}
}

Expand Down Expand Up @@ -206,15 +273,52 @@ func TestGetMetrics(t *testing.T) {
return nil, nil, err
}

// <<<<<<< HEAD
// // create a fake interceptor
// fakeSrv, fakeSrvURL, endpoints, err := startFakeQueueEndpointServer(
// ns,
// svcName,
// q,
// 1,
// )
// r.NoError(err)
// defer fakeSrv.Close()

// table := routing.NewTable()
// // create a fake queue pinger. this is the simulated
// // scaler that pings the above fake interceptor
// ticker, pinger, err := newFakeQueuePinger(
// ctx,
// lggr,
// func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints },
// func(opts *fakeQueuePingerOpts) { opts.tickDur = 1 * time.Millisecond },
// func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() },
// )
// r.NoError(err)
// defer ticker.Stop()
// // start the pinger watch loop
// go func() {

// pinger.start(ctx, ticker)
// }()

// // sleep for more than enough time for the pinger to do its
// // first tick
// time.Sleep(50 * time.Millisecond)
// =======
// create a fake queue pinger. this is the simulated
// scaler that pings the above fake interceptor
ticker, pinger := newFakeQueuePinger(
ticker, pinger, err := newFakeQueuePinger(
ctx,
lggr,
func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints },
func(opts *fakeQueuePingerOpts) { opts.tickDur = queuePingerTickDur },
func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() },
)
if err != nil {
return nil, nil, err
}
// >>>>>>> 30fb204671f165b0a251a0e50634472d2a86960d

// sleep for a bit to ensure the pinger has time to do its first tick
time.Sleep(10 * queuePingerTickDur)
Expand All @@ -233,7 +337,10 @@ func TestGetMetrics(t *testing.T) {
lggr logr.Logger,
) (*routing.Table, *queuePinger, func(), error) {
table := routing.NewTable()
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
if err != nil {
return nil, nil, nil, err
}
return table, pinger, func() { ticker.Stop() }, nil
},
checkFn: func(t *testing.T, res *externalscaler.GetMetricsResponse, err error) {
Expand All @@ -260,7 +367,10 @@ func TestGetMetrics(t *testing.T) {
) (*routing.Table, *queuePinger, func(), error) {
table := routing.NewTable()
// create queue and ticker without the host in it
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
if err != nil {
return nil, nil, nil, err
}
return table, pinger, func() { ticker.Stop() }, nil
},
checkFn: func(t *testing.T, res *externalscaler.GetMetricsResponse, err error) {
Expand Down Expand Up @@ -342,6 +452,59 @@ func TestGetMetrics(t *testing.T) {
},
}

// <<<<<<< HEAD
// r := require.New(t)
// ctx := context.Background()
// lggr := logr.Discard()

// // we need to create a new queuePinger with valid endpoints
// // to query this time, so that when counts are requested by
// // the internal queuePinger logic, there is a valid host from
// // which to request those counts
// q := queue.NewFakeCounter()
// // NOTE: don't call .Resize here or you'll have to make sure
// // to receive on q.ResizedCh
// q.RetMap["host1"] = pendingQLen
// q.RetMap["host2"] = pendingQLen

// // create a fake interceptor
// fakeSrv, fakeSrvURL, endpoints, err := startFakeQueueEndpointServer(
// ns,
// svcName,
// q,
// 1,
// )
// r.NoError(err)
// defer fakeSrv.Close()

// table := routing.NewTable()
// // create a fake queue pinger. this is the simulated
// // scaler that pings the above fake interceptor
// const tickDur = 5 * time.Millisecond
// ticker, pinger, err := newFakeQueuePinger(
// ctx,
// lggr,
// func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints },
// func(opts *fakeQueuePingerOpts) { opts.tickDur = tickDur },
// func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() },
// )
// r.NoError(err)
// defer ticker.Stop()

// // sleep for more than enough time for the pinger to do its
// // first tick
// time.Sleep(tickDur * 5)

// hdl := newImpl(lggr, pinger, table, 123, 200)
// res, err := hdl.GetMetrics(ctx, req)
// r.NoError(err)
// r.NotNil(res)
// r.Equal(1, len(res.MetricValues))
// metricVal := res.MetricValues[0]
// r.Equal("interceptor", metricVal.MetricName)
// aggregate := pinger.aggregate()
// r.Equal(int64(aggregate), metricVal.MetricValue)
// =======
for i, c := range testCases {
tc := c
name := fmt.Sprintf("test case %d: %s", i, tc.name)
Expand Down Expand Up @@ -370,4 +533,5 @@ func TestGetMetrics(t *testing.T) {
tc.checkFn(t, res, err)
})
}
// >>>>>>> 30fb204671f165b0a251a0e50634472d2a86960d
}
Loading