Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TestStreamIsActive #384

Merged
merged 9 commits into from
Jan 27, 2022
4 changes: 2 additions & 2 deletions scaler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func (e *impl) StreamIsActive(
if err != nil {
e.lggr.Error(
err,
"error getting active status in stream, continuing",
"error getting active status in stream",
)
continue
return err
}
server.Send(&externalscaler.IsActiveResponse{
Result: active.Result,
Expand Down
135 changes: 133 additions & 2 deletions scaler/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
context "context"
"fmt"
"net"
"testing"
"time"

Expand All @@ -11,6 +12,8 @@ import (
"github.com/kedacore/http-add-on/pkg/routing"
externalscaler "github.com/kedacore/http-add-on/proto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
)

func standardTarget() routing.Target {
Expand All @@ -22,7 +25,8 @@ func standardTarget() routing.Target {
123,
)
}
func TestIsActive(t *testing.T) {

func TestStreamIsActive(t *testing.T) {
type testCase struct {
name string
host string
Expand Down Expand Up @@ -88,9 +92,136 @@ func TestIsActive(t *testing.T) {
lggr := logr.Discard()
table := routing.NewTable()
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
tc.setup(table, pinger)

hdl := newImpl(
lggr,
pinger,
table,
123,
200,
)

bufSize := 1024 * 1024
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer()
asw101 marked this conversation as resolved.
Show resolved Hide resolved
defer grpcServer.Stop()
externalscaler.RegisterExternalScalerServer(
grpcServer,
hdl,
)
go grpcServer.Serve(lis)

bufDialFunc := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}

conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialFunc), grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to create queue pinger: %v", err)
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := externalscaler.NewExternalScalerClient(conn)

testRef := &externalscaler.ScaledObjectRef{
ScalerMetadata: map[string]string{
"host": tc.host,
},
}

// First will see if we can establish the stream and handle this
// error.
streamClient, err := client.StreamIsActive(ctx, testRef)
if err != nil {
t.Fatalf("StreamIsActive failed: %v", err)
}

// Next, as in TestIsActive, we check for any error, expected
// or unexpected, for each table test.
res, err := streamClient.Recv()

if tc.expectedErr && err != nil {
return
} else if err != nil {
t.Fatalf("expected no error but got: %v", err)
}

if tc.expected != res.Result {
t.Fatalf("Expected IsActive result %v, got: %v", tc.expected, res.Result)
}
})
}
}

func TestIsActive(t *testing.T) {
type testCase struct {
name string
host string
expected bool
expectedErr bool
setup func(*routing.Table, *queuePinger)
}

testCases := []testCase{
{
name: "Simple host inactive",
host: t.Name(),
expected: false,
expectedErr: false,
setup: func(table *routing.Table, q *queuePinger) {
table.AddTarget(t.Name(), standardTarget())
q.pingMut.Lock()
defer q.pingMut.Unlock()
q.allCounts[t.Name()] = 0
},
},
{
name: "Host is 'interceptor'",
host: "interceptor",
expected: true,
expectedErr: false,
setup: func(*routing.Table, *queuePinger) {},
},
{
name: "Simple host active",
host: t.Name(),
expected: true,
expectedErr: false,
setup: func(table *routing.Table, q *queuePinger) {
table.AddTarget(t.Name(), standardTarget())
q.pingMut.Lock()
defer q.pingMut.Unlock()
q.allCounts[t.Name()] = 1
},
},
{
name: "No host present, but host in routing table",
host: t.Name(),
expected: false,
expectedErr: false,
setup: func(table *routing.Table, q *queuePinger) {
table.AddTarget(t.Name(), standardTarget())
},
},
{
name: "Host doesn't exist",
host: t.Name(),
expected: false,
expectedErr: true,
setup: func(*routing.Table, *queuePinger) {},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r := require.New(t)
ctx := context.Background()
lggr := logr.Discard()
table := routing.NewTable()
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
tc.setup(table, pinger)
Expand Down