Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC dial timeout to game server allocation policy (#1700) #1830

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,19 @@ type serviceHandler struct {
// Allocate implements the Allocate gRPC method definition
func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) (*pb.AllocationResponse, error) {
logger.WithField("request", in).Infof("allocation request received.")

switch ctx.Err() {
case context.Canceled:
logger.Info("allocation request canceled by client, abandoning")
return nil, status.Error(codes.Canceled, "allocation request canceled by client, abandoning")
case context.DeadlineExceeded:
logger.Info("allocation request deadline exceedeed, abandoning")
return nil, status.Error(codes.DeadlineExceeded, "allocation request deadline exceeded, abandoning")
}

gsa := converters.ConvertAllocationRequestToGSA(in)
resultObj, err := h.allocationCallback(gsa)

if err != nil {
logger.WithField("gsa", gsa).WithError(err).Info("allocation failed")
return nil, err
Expand Down
47 changes: 47 additions & 0 deletions cmd/allocator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"os"
"testing"
"time"

pb "agones.dev/agones/pkg/allocation/go"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
Expand Down Expand Up @@ -106,6 +107,52 @@ func TestGetTlsCert(t *testing.T) {
assert.Equal(t, cert2.Certificate, retrievedCert2.Certificate, "expected the retrieved cert to be equal to the original one")
}

func TestContextDeadlineExceeded(t *testing.T) {
t.Parallel()

handler := serviceHandler{}
request := pb.AllocationRequest{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Nanosecond))
defer cancel()
_, err := handler.Allocate(ctx, &request)

if !assert.Error(t, err, "expecting failure") {
return
}

st, ok := status.FromError(err)

if !ok {
t.Errorf("expecting status error: %v", err)
}

assert.Equal(t, codes.DeadlineExceeded, st.Code())
assert.Contains(t, st.Message(), "allocation request deadline exceeded, abandoning")
}

func TestContextCancelation(t *testing.T) {
t.Parallel()

handler := serviceHandler{}
request := pb.AllocationRequest{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
cancel()
_, err := handler.Allocate(ctx, &request)

if !assert.Error(t, err, "expecting failure") {
return
}

st, ok := status.FromError(err)

if !ok {
t.Errorf("expecting status error: %v", err)
}

assert.Equal(t, codes.Canceled, st.Code())
assert.Contains(t, st.Message(), "allocation request canceled by client, abandoning")
}

func TestHandlingStatus(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ spec:
minItems: 1
clusterName:
type: string
timeout:
format: int64
minimum: 0
type: integer
backoffCap:
format: int64
minimum: 0
type: integer
namespace:
type: string
required:
Expand Down
8 changes: 8 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,14 @@ spec:
minItems: 1
clusterName:
type: string
timeout:
format: int64
minimum: 0
type: integer
backoffCap:
format: int64
minimum: 0
type: integer
namespace:
type: string
required:
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/multicluster/v1/gameserverallocationpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type ClusterConnectionInfo struct {
Namespace string `json:"namespace"`
// The PEM encoded server CA, used by the allocator client to authenticate the remote server.
ServerCA []byte `json:"serverCa,omitempty"`
// Optional: specifies how long (in milliseconds) gRPC client is willing to wait for the allocator service
// to complete request before it is terminated with the error DEADLINE_EXCEEDED
Timeout int64 `json:"timeout,omitempty"`
// Optional: the maximum duration (in milliseconds) applied to a backoff function
BackoffCap int64 `json:"backoffCap,omitempty"`
}

// +genclient
Expand Down
39 changes: 28 additions & 11 deletions pkg/gameserverallocations/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ var allocationRetry = wait.Backoff{
Jitter: 0.1,
}

var remoteAllocationRetry = wait.Backoff{
Steps: 7,
Duration: 100 * time.Millisecond,
Factor: 2.0,
}

// Allocator handles game server allocation
type Allocator struct {
baseLogger *logrus.Entry
Expand All @@ -106,7 +100,7 @@ type Allocator struct {
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
}

// request is an async request for allocation
Expand All @@ -133,15 +127,15 @@ func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPoli
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationCallback: func(endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
conn, err := grpc.Dial(endpoint, dialOpts)
if err != nil {
return nil, err
}
defer conn.Close() // nolint: errcheck

grpcClient := pb.NewAllocationServiceClient(conn)
return grpcClient.Allocate(context.Background(), request)
return grpcClient.Allocate(ctx, request)
},
}

Expand Down Expand Up @@ -335,14 +329,26 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
request := converters.ConvertGSAToAllocationRequest(gsa)
request.MultiClusterSetting.Enabled = false
request.Namespace = connectionInfo.Namespace
backoff := newRemoteAllocationBackoff(time.Duration(connectionInfo.BackoffCap) * time.Millisecond)

// Retry on remote call failures.
err = Retry(remoteAllocationRetry, func() error {
err = Retry(backoff, func() error {
for i, ip := range connectionInfo.AllocationEndpoints {
endpoint := addPort(ip)
c.loggerForGameServerAllocationKey("remote-allocation").WithField("request", request).WithField("endpoint", endpoint).Debug("forwarding allocation request")

allocationResponse, err = c.remoteAllocationCallback(endpoint, dialOpts, request)
ctx := context.Background()

if connectionInfo.Timeout > 0 {
deadline := time.Now().Add(time.Duration(connectionInfo.Timeout) * time.Millisecond)
ctxWithDeadline, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

ctx = ctxWithDeadline
}

allocationResponse, err = c.remoteAllocationCallback(ctx, endpoint, dialOpts, request)

if err != nil {
c.baseLogger.Errorf("remote allocation failed with: %v", err)
// If there are multiple enpoints for the allocator connection and the current one is
Expand All @@ -362,6 +368,17 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
return converters.ConvertAllocationResponseToGSA(allocationResponse), err
}

func newRemoteAllocationBackoff(backoffCap time.Duration) wait.Backoff {
backoff := wait.Backoff{
Steps: 7,
Duration: 100 * time.Millisecond,
Factor: 2.0,
Cap: backoffCap,
}

return backoff
}

// createRemoteClusterDialOption creates a grpc client dial option with proper certs to make a remote call.
func (c *Allocator) createRemoteClusterDialOption(namespace string, connectionInfo *multiclusterv1.ClusterConnectionInfo) (grpc.DialOption, error) {
// TODO: disableMTLS works for a single cluster; still need to address how the flag interacts with multi-cluster authentication.
Expand Down
7 changes: 4 additions & 3 deletions pkg/gameserverallocations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gameserverallocations

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -968,7 +969,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
},
}

c.allocator.remoteAllocationCallback = func(e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
assert.Equal(t, endpoint+":443", e)
serverResponse := pb.AllocationResponse{
GameServerName: expectedGSName,
Expand All @@ -991,7 +992,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
retry := 0
endpoint := "z.z.z.z"

c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
if count == 0 {
serverResponse := pb.AllocationResponse{}
count++
Expand Down Expand Up @@ -1087,7 +1088,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
healthyEndpoint := "healthy_endpoint:443"

expectedGSName := "mocked"
c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
if endpoint == unhealthyEndpoint {
return nil, errors.New("test error message")
}
Expand Down
Loading