diff --git a/scaler/handlers.go b/scaler/handlers.go index 482039a6..4920f2dc 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -99,9 +99,9 @@ func (e *impl) StreamIsActive( if err != nil { e.lggr.Error( err, - "error getting active status in stream, continuing", + "error getting active status in stream", ) - continue + return err } server.Send(&externalscaler.IsActiveResponse{ Result: active.Result, diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index f6c4fa80..fd28954f 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -3,6 +3,7 @@ package main import ( context "context" "fmt" + "net" "testing" "time" @@ -11,6 +12,8 @@ import ( "github.com/kedacore/http-add-on/pkg/routing" externalscaler "github.com/kedacore/http-add-on/proto" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" ) func standardTarget() routing.Target { @@ -22,7 +25,8 @@ func standardTarget() routing.Target { 123, ) } -func TestIsActive(t *testing.T) { + +func TestStreamIsActive(t *testing.T) { type testCase struct { name string host string @@ -88,9 +92,136 @@ func TestIsActive(t *testing.T) { lggr := logr.Discard() table := routing.NewTable() ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) + defer ticker.Stop() + tc.setup(table, pinger) + + hdl := newImpl( + lggr, + pinger, + table, + 123, + 200, + ) + + bufSize := 1024 * 1024 + lis := bufconn.Listen(bufSize) + grpcServer := grpc.NewServer() + defer grpcServer.Stop() + externalscaler.RegisterExternalScalerServer( + grpcServer, + hdl, + ) + go grpcServer.Serve(lis) + + bufDialFunc := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialFunc), grpc.WithInsecure()) if err != nil { - t.Fatalf("failed to create queue pinger: %v", err) + t.Fatalf("Failed to dial bufnet: %v", err) } + defer conn.Close() + + client := externalscaler.NewExternalScalerClient(conn) + + testRef := &externalscaler.ScaledObjectRef{ + ScalerMetadata: map[string]string{ + "host": tc.host, + }, + } + + // First will see if we can establish the stream and handle this + // error. + streamClient, err := client.StreamIsActive(ctx, testRef) + if err != nil { + t.Fatalf("StreamIsActive failed: %v", err) + } + + // Next, as in TestIsActive, we check for any error, expected + // or unexpected, for each table test. + res, err := streamClient.Recv() + + if tc.expectedErr && err != nil { + return + } else if err != nil { + t.Fatalf("expected no error but got: %v", err) + } + + if tc.expected != res.Result { + t.Fatalf("Expected IsActive result %v, got: %v", tc.expected, res.Result) + } + }) + } +} + +func TestIsActive(t *testing.T) { + type testCase struct { + name string + host string + expected bool + expectedErr bool + setup func(*routing.Table, *queuePinger) + } + + testCases := []testCase{ + { + name: "Simple host inactive", + host: t.Name(), + expected: false, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + table.AddTarget(t.Name(), standardTarget()) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts[t.Name()] = 0 + }, + }, + { + name: "Host is 'interceptor'", + host: "interceptor", + expected: true, + expectedErr: false, + setup: func(*routing.Table, *queuePinger) {}, + }, + { + name: "Simple host active", + host: t.Name(), + expected: true, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + table.AddTarget(t.Name(), standardTarget()) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts[t.Name()] = 1 + }, + }, + { + name: "No host present, but host in routing table", + host: t.Name(), + expected: false, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + table.AddTarget(t.Name(), standardTarget()) + }, + }, + { + name: "Host doesn't exist", + host: t.Name(), + expected: false, + expectedErr: true, + setup: func(*routing.Table, *queuePinger) {}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + r := require.New(t) + ctx := context.Background() + lggr := logr.Discard() + table := routing.NewTable() + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) r.NoError(err) defer ticker.Stop() tc.setup(table, pinger)