Skip to content

Commit

Permalink
Add ADS client
Browse files Browse the repository at this point in the history
Signed-off-by: Renuka Fernando <renukapiyumal@gmail.com>
  • Loading branch information
renuka-fernando committed Oct 31, 2022
1 parent 69fb579 commit 4c746ee
Show file tree
Hide file tree
Showing 2 changed files with 277 additions and 0 deletions.
153 changes: 153 additions & 0 deletions pkg/adsclient/sotw/v3/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2022 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sotw provides an implementation of GRPC SoTW (State of The World) part of XDS client
package sotw

import (
"context"
"errors"
"io"
"sync"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/golang/protobuf/ptypes/any"
status "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
)

var (
errInit = errors.New("ads client: grpc connection is not initialized (use InitConnect() method to initialize connection)")
errNilResp = errors.New("ads client: nil response from xds management server")
)

type AdsClient interface {
// Initialize the gRPC connection with management server and send the initial Discovery Request.
InitConnect(clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) error
// Fetch waits for a response from management server and returns response or error.
Fetch() (*Response, error)
// Ack acknowledge the validity of the last received response to management server.
Ack() error
// Nack acknowledge the invalidity of the last received response to management server.
Nack(message string) error
}

type Response struct {
Resources []*any.Any
}

type adsClient struct {
ctx context.Context
mu sync.RWMutex
nodeID string
typeURL string

// streamClient is the ADS discovery client
streamClient discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// lastAckedResponse is the last response acked by the ADS client
lastAckedResponse *discovery.DiscoveryResponse
// lastReceivedResponse is the last response received from management server
lastReceivedResponse *discovery.DiscoveryResponse
}

func NewAdsClient(ctx context.Context, nodeID string, typeURL string) AdsClient {
return &adsClient{
ctx: ctx,
nodeID: nodeID,
typeURL: typeURL,
}
}

func (c *adsClient) InitConnect(clientConn grpc.ClientConnInterface, opts ...grpc.CallOption) error {
streamClient, err := discovery.NewAggregatedDiscoveryServiceClient(clientConn).StreamAggregatedResources(c.ctx, opts...)
if err != nil {
return err
}
c.streamClient = streamClient
return c.Ack()
}

func (c *adsClient) Fetch() (*Response, error) {
if c.streamClient == nil {
return nil, errInit
}
resp, err := c.streamClient.Recv()
if err != nil {
return nil, err
}
if resp == nil {
return nil, errNilResp
}

c.mu.Lock()
c.lastReceivedResponse = resp
c.mu.Unlock()

return &Response{
Resources: resp.GetResources(),
}, err
}

func (c *adsClient) Ack() error {
c.mu.Lock()
c.lastAckedResponse = c.lastReceivedResponse
c.mu.Unlock()
return c.send(nil)
}

func (c *adsClient) Nack(message string) error {
errorDetail := &status.Status{
Message: message,
}
return c.send(errorDetail)
}

// IsConnError checks the provided error is due to the gRPC connection
// and returns true if it is due to the gRPC connection.
//
// In this case the gRPC connection with the server should be re initialized with the
// AdsClient.InitConnect method.
func IsConnError(err error) bool {
if err == nil {
return false
}
if err == io.EOF {
return true
}
errStatus, ok := grpcStatus.FromError(err)
if !ok {
return false
}
return errStatus.Code() == codes.Unavailable || errStatus.Code() == codes.Canceled
}

func (c *adsClient) send(errorDetail *status.Status) error {
c.mu.RLock()
req := &discovery.DiscoveryRequest{
Node: &core.Node{Id: c.nodeID},
VersionInfo: c.lastAckedResponse.GetVersionInfo(),
TypeUrl: c.typeURL,
ResponseNonce: c.lastReceivedResponse.GetNonce(),
ErrorDetail: errorDetail,
}
c.mu.RUnlock()

if c.streamClient == nil {
return errInit
}
return c.streamClient.Send(req)
}
124 changes: 124 additions & 0 deletions pkg/adsclient/sotw/v3/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package sotw_test

import (
"context"
"fmt"
"net"
"testing"
"time"

clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
client "github.com/envoyproxy/go-control-plane/pkg/adsclient/sotw/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/stretchr/testify/assert"
)

func TestFetch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

snapCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil)
go func() {
err := startAdsServer(t, ctx, snapCache)
assert.NoError(t, err)
}()

conn, err := grpc.Dial(":18001", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
assert.NoError(t, err)
defer conn.Close()

c := client.NewAdsClient(ctx, "node_1", resource.ClusterType)
c.InitConnect(conn)

t.Run("Test initial fetch", testInitialFetch(t, ctx, snapCache, c))
t.Run("Test next fetch", testNextFetch(t, ctx, snapCache, c))
}

func testInitialFetch(t *testing.T, ctx context.Context, snapCache cache.SnapshotCache, c client.AdsClient) func(t *testing.T) {
return func(t *testing.T) {
go func() {
// watch for configs
resp, err := c.Fetch()
assert.NoError(t, err)
assert.Equal(t, 3, len(resp.Resources))
for i, r := range resp.Resources {
cluster := &clusterv3.Cluster{}
anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
assert.Equal(t, fmt.Sprint("cluster_", i), cluster.Name)
}

err = c.Ack()
assert.NoError(t, err)
}()

snapshot, err := cache.NewSnapshot("1", map[resource.Type][]types.Resource{
resource.ClusterType: {
&clusterv3.Cluster{Name: "cluster_1"},
&clusterv3.Cluster{Name: "cluster_2"},
&clusterv3.Cluster{Name: "cluster_3"},
},
})
assert.NoError(t, err)

err = snapshot.Consistent()
assert.NoError(t, err)
snapCache.SetSnapshot(ctx, "node_1", snapshot)
}
}

func testNextFetch(t *testing.T, ctx context.Context, snapCache cache.SnapshotCache, c client.AdsClient) func(t *testing.T) {
return func(t *testing.T) {
go func() {
// watch for configs
resp, err := c.Fetch()
assert.NoError(t, err)
assert.Equal(t, 2, len(resp.Resources))
for i, r := range resp.Resources {
cluster := &clusterv3.Cluster{}
anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{})
assert.Equal(t, fmt.Sprint("cluster_", i), cluster.Name)
}

err = c.Ack()
assert.NoError(t, err)
}()

snapshot, err := cache.NewSnapshot("2", map[resource.Type][]types.Resource{
resource.ClusterType: {
&clusterv3.Cluster{Name: "cluster_1"},
&clusterv3.Cluster{Name: "cluster_2"},
},
})
assert.NoError(t, err)

err = snapshot.Consistent()
assert.NoError(t, err)
snapCache.SetSnapshot(ctx, "node_1", snapshot)
}
}

func startAdsServer(t *testing.T, ctx context.Context, snapCache cache.SnapshotCache) error {
lis, err := net.Listen("tcp", ":18001")
if err != nil {
return err
}

grpcServer := grpc.NewServer()
s := server.NewServer(ctx, snapCache, nil)
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s)

if e := grpcServer.Serve(lis); e != nil {
err = e
}

return err
}

0 comments on commit 4c746ee

Please sign in to comment.