@@ -23,24 +23,32 @@ import (
2323 "encoding/json"
2424 "fmt"
2525 "net"
26+ "sync"
2627 "testing"
2728
2829 "github.com/google/go-cmp/cmp"
2930 "github.com/google/go-cmp/cmp/cmpopts"
3031 "github.com/google/uuid"
3132 "google.golang.org/grpc"
3233 "google.golang.org/grpc/codes"
34+ "google.golang.org/grpc/credentials/insecure"
35+ "google.golang.org/grpc/internal"
36+ "google.golang.org/grpc/internal/stubserver"
3337 "google.golang.org/grpc/internal/testutils"
3438 "google.golang.org/grpc/internal/testutils/xds/e2e"
3539 "google.golang.org/grpc/internal/testutils/xds/fakeserver"
3640 "google.golang.org/grpc/internal/xds/bootstrap"
41+ "google.golang.org/grpc/resolver"
3742 "google.golang.org/grpc/status"
3843 "google.golang.org/grpc/xds/internal/clients"
44+ "google.golang.org/grpc/xds/internal/xdsclient"
3945 "google.golang.org/protobuf/testing/protocmp"
4046
4147 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
4248 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
4349 v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
50+ testgrpc "google.golang.org/grpc/interop/grpc_testing"
51+ testpb "google.golang.org/grpc/interop/grpc_testing"
4452 "google.golang.org/protobuf/types/known/durationpb"
4553)
4654
@@ -437,3 +445,113 @@ func (s) TestReportLoad_StreamCreation(t *testing.T) {
437445 defer sCancel3 ()
438446 cancel3 (sCtx3 )
439447}
448+
449+ // TestConcurrentReportLoad verifies that the client can safely handle concurrent
450+ // requests to initiate load reporting streams. It launches multiple goroutines
451+ // that all call client.ReportLoad simultaneously.
452+ func (s ) TestConcurrentReportLoad (t * testing.T ) {
453+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
454+ defer cancel ()
455+
456+ mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {SupportLoadReportingService : true })
457+ nodeID := uuid .New ().String ()
458+ bc := e2e .DefaultBootstrapContents (t , nodeID , mgmtServer .Address )
459+ client := createXDSClient (t , bc )
460+
461+ serverConfig , err := bootstrap .ServerConfigForTesting (bootstrap.ServerConfigTestingOptions {URI : mgmtServer .Address })
462+ if err != nil {
463+ t .Fatalf ("Failed to create server config for testing: %v" , err )
464+ }
465+
466+ // Call ReportLoad() concurrently from multiple go routines.
467+ var wg sync.WaitGroup
468+ const numGoroutines = 10
469+ wg .Add (numGoroutines )
470+ for range numGoroutines {
471+ go func () {
472+ defer wg .Done ()
473+ _ , cancelStore := client .ReportLoad (serverConfig )
474+ defer cancelStore (ctx )
475+ }()
476+ }
477+ wg .Wait ()
478+ }
479+
480+ // TestConcurrentChannels verifies that we can create multiple gRPC channels
481+ // concurrently with a shared XDSClient, each of which will create a new LRS
482+ // stream without any race.
483+ func (s ) TestConcurrentChannels (t * testing.T ) {
484+ // TODO(emchandwani) : Unskip after https://github.com/grpc/grpc-go/pull/8526 gets merged.
485+ t .Skip ()
486+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
487+ defer cancel ()
488+
489+ mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {AllowResourceSubset : true , SupportLoadReportingService : true })
490+
491+ nodeID := uuid .New ().String ()
492+ bc := e2e .DefaultBootstrapContents (t , nodeID , mgmtServer .Address )
493+
494+ if internal .NewXDSResolverWithPoolForTesting == nil {
495+ t .Fatalf ("internal.NewXDSResolverWithConfigForTesting is nil" )
496+ }
497+
498+ config , err := bootstrap .NewConfigFromContents (bc )
499+ if err != nil {
500+ t .Fatalf ("Failed to parse bootstrap contents: %s, %v" , string (bc ), err )
501+ }
502+ pool := xdsclient .NewPool (config )
503+
504+ resolverBuilder := internal .NewXDSResolverWithPoolForTesting .(func (* xdsclient.Pool ) (resolver.Builder , error ))
505+ xdsResolver , err := resolverBuilder (pool )
506+ if err != nil {
507+ t .Fatalf ("Failed to create xDS resolver for testing: %v" , err )
508+ }
509+
510+ server := stubserver .StartTestService (t , nil )
511+ defer server .Stop ()
512+
513+ // Configure the management server with resources that enable LRS.
514+ const serviceName = "my-service-e2e-lrs-test"
515+ resources := e2e .DefaultClientResources (e2e.ResourceParams {
516+ DialTarget : serviceName ,
517+ NodeID : nodeID ,
518+ Host : "localhost" ,
519+ Port : testutils .ParsePort (t , server .Address ),
520+ SecLevel : e2e .SecurityLevelNone ,
521+ })
522+ resources .Clusters [0 ].LrsServer = & v3corepb.ConfigSource {
523+ ConfigSourceSpecifier : & v3corepb.ConfigSource_Self {
524+ Self : & v3corepb.SelfConfigSource {},
525+ },
526+ }
527+ if err := mgmtServer .Update (ctx , resources ); err != nil {
528+ t .Fatal (err )
529+ }
530+
531+ var wg sync.WaitGroup
532+ const (
533+ numGoroutines = 10
534+ numRPCs = 10
535+ )
536+ for range numGoroutines {
537+ wg .Add (1 )
538+ go func () {
539+ defer wg .Done ()
540+ for range numRPCs {
541+ cc , err := grpc .NewClient (fmt .Sprintf ("xds:///%s" , serviceName ), grpc .WithTransportCredentials (insecure .NewCredentials ()), grpc .WithResolvers (xdsResolver ))
542+ if err != nil {
543+ t .Errorf ("grpc.NewClient() failed: %v" , err )
544+ return
545+ }
546+ defer cc .Close ()
547+
548+ testClient := testgrpc .NewTestServiceClient (cc )
549+ if _ , err := testClient .EmptyCall (ctx , & testpb.Empty {}); err != nil {
550+ t .Errorf ("EmptyCall() failed: %v" , err )
551+ return
552+ }
553+ }
554+ }()
555+ }
556+ wg .Wait ()
557+ }
0 commit comments