diff --git a/app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go b/app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go index cae5221c7112..71c3a508e943 100644 --- a/app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go +++ b/app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go @@ -191,6 +191,11 @@ func (b *remoteBootstrap) requestForBootstrap(ctx context.Context, client *http. }, SystemCaPath: params.SystemCaPath, } + if cfg.DataplaneRuntime.XDSConfigType == "" { + request.XDSConfigType = "sotw" + } else { + request.XDSConfigType = "delta" + } jsonBytes, err := json.MarshalIndent(request, "", " ") if err != nil { return nil, errors.Wrap(err, "could not marshal request to json") diff --git a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-0.golden.json b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-0.golden.json index 6f63ce578d65..faa20220fbcb 100644 --- a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-0.golden.json +++ b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-0.golden.json @@ -31,5 +31,6 @@ "certPath": "/tmp/cert.pem", "keyPath": "/tmp/key.pem" }, - "systemCaPath": "" + "systemCaPath": "", + "xdsConfigType": "sotw" } diff --git a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-1.golden.json b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-1.golden.json index 5e5d5da29c45..f5da93dc81d9 100644 --- a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-1.golden.json +++ b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-1.golden.json @@ -30,5 +30,6 @@ "certPath": "", "keyPath": "" }, - "systemCaPath": "" + "systemCaPath": "", + "xdsConfigType": "sotw" } diff --git a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-2.golden.json b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-2.golden.json index 473d5360187d..ec718bd63d64 100644 --- a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-2.golden.json +++ b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-2.golden.json @@ -29,5 +29,6 @@ "certPath": "", "keyPath": "" }, - "systemCaPath": "" + "systemCaPath": "", + "xdsConfigType": "sotw" } diff --git a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-3.golden.json b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-3.golden.json index 473d5360187d..ec718bd63d64 100644 --- a/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-3.golden.json +++ b/app/kuma-dp/pkg/dataplane/envoy/testdata/bootstrap-request-3.golden.json @@ -29,5 +29,6 @@ "certPath": "", "keyPath": "" }, - "systemCaPath": "" + "systemCaPath": "", + "xdsConfigType": "sotw" } diff --git a/go.mod b/go.mod index 89e3831fcc75..c90f4745e064 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/containernetworking/cni v1.2.3 github.com/containernetworking/plugins v1.5.1 github.com/emicklei/go-restful/v3 v3.12.1 - github.com/envoyproxy/go-control-plane v0.12.0 + github.com/envoyproxy/go-control-plane v0.13.0 github.com/envoyproxy/protoc-gen-validate v1.1.0 github.com/evanphx/json-patch/v5 v5.9.0 github.com/exaring/otelpgx v0.6.2 @@ -186,6 +186,7 @@ require ( github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pquerna/otp v1.2.0 // indirect @@ -227,6 +228,3 @@ require ( sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) - -// remove once https://github.com/envoyproxy/go-control-plane/issues/875 is resolved -replace github.com/envoyproxy/go-control-plane v0.12.0 => github.com/envoyproxy/go-control-plane v0.11.2-0.20231010133108-1dfbe83bcebc diff --git a/go.sum b/go.sum index 6b3f0cd05908..489199cb5af9 100644 --- a/go.sum +++ b/go.sum @@ -83,8 +83,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/envoyproxy/go-control-plane v0.11.2-0.20231010133108-1dfbe83bcebc h1:k6n7EmQYjNHEKr8XI3rdtFIhb0UdJSyCWnt9gvgLx5g= -github.com/envoyproxy/go-control-plane v0.11.2-0.20231010133108-1dfbe83bcebc/go.mod h1:9ODlpdyEVNyci9DZ6cdQYkwYSW1YMrnSz3xnku3cjL0= +github.com/envoyproxy/go-control-plane v0.13.0 h1:HzkeUz1Knt+3bK+8LG1bxOO/jzWZmdxpwC51i202les= +github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnvMg4d7nvT/wl9WgVXn3Q8= github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI= @@ -328,6 +328,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index 5350c82c2efd..0768f4f67cba 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -464,6 +464,8 @@ type ExperimentalConfig struct { // If true skips persisted VIPs. Change to true only if generateMeshServices is enabled. // Do not enable on production. SkipPersistedVIPs bool `json:"skipPersistedVIPs" envconfig:"KUMA_EXPERIMENTAL_SKIP_PERSISTED_VIPS"` + // If true uses Delta xDS to deliver changes to sidecars. + UseDeltaXDS bool `json:"useDeltaXDS" envconfig:"KUMA_EXPERIMENTAL_USE_DELTA_XDS"` } type ExperimentalKDSEventBasedWatchdog struct { diff --git a/pkg/config/app/kuma-dp/config.go b/pkg/config/app/kuma-dp/config.go index 05e94feadebf..adfb20050894 100644 --- a/pkg/config/app/kuma-dp/config.go +++ b/pkg/config/app/kuma-dp/config.go @@ -217,6 +217,8 @@ type DataplaneRuntime struct { DynamicConfiguration DynamicConfiguration `json:"dynamicConfiguration" envconfig:"kuma_dataplane_runtime_dynamic_configuration"` // SystemCaPath defines path of system provided Ca SystemCaPath string `json:"systemCaPath,omitempty" envconfig:"kuma_dataplane_runtime_dynamic_system_ca_path"` + // XDSConfigType defines xDS communication type between Envoy and control-plane + XDSConfigType string `json:"xdsConfigType,omitempty" envconfig:"kuma_dataplane_runtime_dynamic_xds_config_type"` } type Metrics struct { @@ -335,6 +337,13 @@ func (d *DataplaneRuntime) Validate() error { if d.BinaryPath == "" { errs = multierr.Append(errs, errors.Errorf(".BinaryPath must be non-empty")) } + if d.XDSConfigType != "" { + switch d.XDSConfigType { + case "delta", "sotw": + default: + errs = multierr.Append(errs, errors.Errorf(".XDSConfigType can be one of: delta, sotw")) + } + } return errs } diff --git a/pkg/config/app/kuma-dp/testdata/invalid-config.golden.txt b/pkg/config/app/kuma-dp/testdata/invalid-config.golden.txt index 5ca12650bb24..59e5a9b2bf4a 100644 --- a/pkg/config/app/kuma-dp/testdata/invalid-config.golden.txt +++ b/pkg/config/app/kuma-dp/testdata/invalid-config.golden.txt @@ -1 +1 @@ -parsing configuration from file 'testdata/invalid-config.input.yaml' failed: configuration validation failed: .ControlPlane is not valid: .Retry is not valid: .Backoff must be a positive duration; .Dataplane is not valid: .ProxyType is not valid: not-a-proxy is not a valid proxy type; .Mesh must be non-empty; .Name must be non-empty; .DrainTime must be positive; .DataplaneRuntime is not valid: .BinaryPath must be non-empty +parsing configuration from file 'testdata/invalid-config.input.yaml' failed: configuration validation failed: .ControlPlane is not valid: .Retry is not valid: .Backoff must be a positive duration; .Dataplane is not valid: .ProxyType is not valid: not-a-proxy is not a valid proxy type; .Mesh must be non-empty; .Name must be non-empty; .DrainTime must be positive; .DataplaneRuntime is not valid: .BinaryPath must be non-empty; .XDSConfigType can be one of: delta, sotw diff --git a/pkg/config/app/kuma-dp/testdata/invalid-config.input.yaml b/pkg/config/app/kuma-dp/testdata/invalid-config.input.yaml index 7467f0431afa..f2b1e736c7fb 100644 --- a/pkg/config/app/kuma-dp/testdata/invalid-config.input.yaml +++ b/pkg/config/app/kuma-dp/testdata/invalid-config.input.yaml @@ -12,3 +12,4 @@ dataplane: proxyType: not-a-proxy dataplaneRuntime: binaryPath: + xdsConfigType: a diff --git a/pkg/config/app/kuma-dp/testdata/valid-config.input.yaml b/pkg/config/app/kuma-dp/testdata/valid-config.input.yaml index 5046ac0d7784..0ab02544046e 100644 --- a/pkg/config/app/kuma-dp/testdata/valid-config.input.yaml +++ b/pkg/config/app/kuma-dp/testdata/valid-config.input.yaml @@ -12,3 +12,4 @@ dataplaneRuntime: binaryPath: envoy.sh configDir: /var/run/envoy envoyLogLevel: trace + xdsConfigType: delta diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 9416bc566280..2d37a3be9f1e 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -370,6 +370,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Experimental.SidecarContainers).To(BeTrue()) Expect(cfg.Experimental.SkipPersistedVIPs).To(BeTrue()) Expect(cfg.Experimental.GenerateMeshServices).To(BeTrue()) + Expect(cfg.Experimental.UseDeltaXDS).To(BeTrue()) Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1)) Expect(cfg.EventBus.BufferSize).To(Equal(uint(30))) @@ -754,6 +755,7 @@ experimental: sidecarContainers: true generateMeshServices: true skipPersistedVIPs: true + useDeltaXDS: true proxy: gateway: globalDownstreamMaxConnections: 1 @@ -1058,6 +1060,7 @@ meshService: "KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS": "true", "KUMA_EXPERIMENTAL_GENERATE_MESH_SERVICES": "true", "KUMA_EXPERIMENTAL_SKIP_PERSISTED_VIPS": "true", + "KUMA_EXPERIMENTAL_USE_DELTA_XDS": "true", "KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1", "KUMA_TRACING_OPENTELEMETRY_ENDPOINT": "otel-collector:4317", "KUMA_TRACING_OPENTELEMETRY_ENABLED": "true", diff --git a/pkg/hds/tracker/callbacks.go b/pkg/hds/tracker/callbacks.go index 8d0cba34ab45..1b483b97277a 100644 --- a/pkg/hds/tracker/callbacks.go +++ b/pkg/hds/tracker/callbacks.go @@ -40,7 +40,8 @@ type tracker struct { sync.RWMutex // protects access to the fields below streamsAssociation map[xds.StreamID]core_model.ResourceKey - dpStreams map[core_model.ResourceKey]streams + // deltaStreamsAssociation map[xds.StreamID]core_model.ResourceKey + dpStreams map[core_model.ResourceKey]streams } func NewCallbacks( @@ -56,10 +57,11 @@ func NewCallbacks( return &tracker{ resourceManager: resourceManager, streamsAssociation: map[xds.StreamID]core_model.ResourceKey{}, - dpStreams: map[core_model.ResourceKey]streams{}, - config: config, - log: log, - metrics: metrics, + // deltaStreamsAssociation: map[xds.StreamID]core_model.ResourceKey{}, + dpStreams: map[core_model.ResourceKey]streams{}, + config: config, + log: log, + metrics: metrics, reconciler: &reconciler{ cache: cache, hasher: hasher, @@ -69,6 +71,14 @@ func NewCallbacks( } } +func (t *tracker) OnDeltaStreamOpen(ctx context.Context, streamID int64) error { + return t.OnStreamOpen(ctx, streamID) +} + +func (t *tracker) OnDeltaStreamClosed(streamID xds.StreamID) { + t.OnStreamClosed(streamID) +} + func (t *tracker) OnStreamOpen(ctx context.Context, streamID int64) error { t.metrics.StreamsActiveInc() return nil diff --git a/pkg/util/xds/callbacks.go b/pkg/util/xds/callbacks.go index 95bbb5a55fb3..4a40b900ce93 100644 --- a/pkg/util/xds/callbacks.go +++ b/pkg/util/xds/callbacks.go @@ -3,52 +3,50 @@ package xds import ( "context" - discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" ) -// DiscoveryRequest defines interface over real Envoy's DiscoveryRequest. -type DiscoveryRequest interface { +type Request interface { NodeId() string - // Node returns either a v2 or v3 Node Node() interface{} Metadata() *structpb.Struct - VersionInfo() string - GetTypeUrl() string GetResponseNonce() string - GetResourceNames() []string + GetTypeUrl() string HasErrors() bool ErrorMsg() string + VersionInfo() string + GetResourceNames() []string } -// DiscoveryResponse defines interface over real Envoy's DiscoveryResponse. -type DiscoveryResponse interface { +type Response interface { GetTypeUrl() string - VersionInfo() string GetResources() []*anypb.Any GetNonce() string + VersionInfo() string + GetNumberOfResources() int +} + +// DiscoveryRequest defines interface over real Envoy's DiscoveryRequest. +type DiscoveryRequest interface { + Request +} + +// DiscoveryResponse defines interface over real Envoy's DiscoveryResponse. +type DiscoveryResponse interface { + Response } type DeltaDiscoveryRequest interface { - NodeId() string - // Node returns either a v2 or v3 Node - Node() interface{} - Metadata() *structpb.Struct - GetTypeUrl() string - GetResponseNonce() string + Request GetResourceNamesSubscribe() []string GetInitialResourceVersions() map[string]string - HasErrors() bool - ErrorMsg() string } // DeltaDiscoveryResponse defines interface over real Envoy's DeltaDiscoveryResponse. type DeltaDiscoveryResponse interface { - GetTypeUrl() string - GetResources() []*discoveryv3.Resource + Response GetRemovedResources() []string - GetNonce() string } // Callbacks defines Callbacks for xDS streaming requests. The difference over real go-control-plane Callbacks is that it takes an DiscoveryRequest / DiscoveryResponse interface. @@ -94,4 +92,10 @@ type RestCallbacks interface { type MultiCallbacks interface { Callbacks RestCallbacks + DeltaCallbacks +} + +type MultiXDSCallbacks interface { + Callbacks + DeltaCallbacks } diff --git a/pkg/util/xds/v3/callbacks.go b/pkg/util/xds/v3/callbacks.go index 861b1321866b..95b535ff422a 100644 --- a/pkg/util/xds/v3/callbacks.go +++ b/pkg/util/xds/v3/callbacks.go @@ -6,6 +6,7 @@ import ( envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" "github.com/kumahq/kuma/pkg/util/xds" @@ -175,6 +176,10 @@ type discoveryResponse struct { *envoy_sd.DiscoveryResponse } +func (d *discoveryResponse) GetNumberOfResources() int { + return len(d.Resources) +} + func (d *discoveryResponse) VersionInfo() string { return d.GetVersionInfo() } @@ -211,12 +216,36 @@ func (d *deltaDiscoveryRequest) GetInitialResourceVersions() map[string]string { return d.InitialResourceVersions } +func (d *deltaDiscoveryRequest) GetResourceNames() []string { + return d.GetResourceNamesSubscribe() +} + +func (d *deltaDiscoveryRequest) VersionInfo() string { + return "" +} + var _ xds.DeltaDiscoveryRequest = &deltaDiscoveryRequest{} type deltaDiscoveryResponse struct { *envoy_sd.DeltaDiscoveryResponse } +func (d *deltaDiscoveryResponse) VersionInfo() string { + return d.SystemVersionInfo +} + +func (d *deltaDiscoveryResponse) GetResources() []*anypb.Any { + resources := []*anypb.Any{} + for _, res := range d.Resources { + resources = append(resources, res.Resource) + } + return resources +} + +func (d *deltaDiscoveryResponse) GetNumberOfResources() int { + return len(d.Resources) +} + var _ xds.DeltaDiscoveryResponse = &deltaDiscoveryResponse{} func (d *deltaDiscoveryResponse) GetTypeUrl() string { diff --git a/pkg/xds/auth/callbacks.go b/pkg/xds/auth/callbacks.go index 4578111a4e23..2666310567ee 100644 --- a/pkg/xds/auth/callbacks.go +++ b/pkg/xds/auth/callbacks.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/metadata" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" + "github.com/kumahq/kuma/pkg/core" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" core_manager "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" @@ -30,7 +31,7 @@ type DPNotFoundRetry struct { MaxTimes uint } -func NewCallbacks(resManager core_manager.ReadOnlyResourceManager, authenticator Authenticator, dpNotFoundRetry DPNotFoundRetry) util_xds.Callbacks { +func NewCallbacks(resManager core_manager.ReadOnlyResourceManager, authenticator Authenticator, dpNotFoundRetry DPNotFoundRetry) util_xds.MultiXDSCallbacks { if dpNotFoundRetry.Backoff == 0 { // backoff cannot be 0 dpNotFoundRetry.Backoff = 1 * time.Millisecond } @@ -38,6 +39,7 @@ func NewCallbacks(resManager core_manager.ReadOnlyResourceManager, authenticator resManager: resManager, authenticator: authenticator, streams: map[core_xds.StreamID]stream{}, + deltaStreams: map[core_xds.StreamID]stream{}, dpNotFoundRetry: dpNotFoundRetry, } } @@ -51,6 +53,7 @@ type authCallbacks struct { sync.RWMutex // protects streams streams map[core_xds.StreamID]stream + deltaStreams map[core_xds.StreamID]stream } type stream struct { @@ -62,7 +65,7 @@ type stream struct { nodeID string } -var _ util_xds.Callbacks = &authCallbacks{} +var _ util_xds.MultiXDSCallbacks = &authCallbacks{} func (a *authCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { a.Lock() @@ -82,10 +85,38 @@ func (a *authCallbacks) OnStreamClosed(streamID core_xds.StreamID) { } func (a *authCallbacks) OnStreamRequest(streamID core_xds.StreamID, req util_xds.DiscoveryRequest) error { - s, err := a.stream(streamID, req) + return a.onStreamRequest(streamID, req, false) +} + +func (a *authCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { + a.Lock() + defer a.Unlock() + + a.deltaStreams[streamID] = stream{ + ctx: ctx, + resource: nil, + } + + core.Log.V(1).Info("OnDeltaStreamOpen", "streamID", streamID) + return nil +} + +func (a *authCallbacks) OnDeltaStreamClosed(streamID int64) { + a.Lock() + delete(a.deltaStreams, streamID) + a.Unlock() +} + +func (a *authCallbacks) OnStreamDeltaRequest(streamID core_xds.StreamID, req util_xds.DeltaDiscoveryRequest) error { + return a.onStreamRequest(streamID, req, true) +} + +func (a *authCallbacks) onStreamRequest(streamID core_xds.StreamID, req util_xds.Request, isDelta bool) error { + s, err := a.stream(streamID, req, isDelta) if err != nil { return err } + core.Log.V(1).Info("OnStreamDeltaRequest auth", "req", req) credential, err := ExtractCredential(s.ctx) if err != nil { @@ -95,14 +126,24 @@ func (a *authCallbacks) OnStreamRequest(streamID core_xds.StreamID, req util_xds return errors.Wrap(err, "authentication failed") } a.Lock() - a.streams[streamID] = s + if isDelta { + a.deltaStreams[streamID] = s + } else { + a.streams[streamID] = s + } a.Unlock() return nil } -func (a *authCallbacks) stream(streamID core_xds.StreamID, req util_xds.DiscoveryRequest) (stream, error) { +func (a *authCallbacks) stream(streamID core_xds.StreamID, req util_xds.Request, isDelta bool) (stream, error) { a.RLock() - s, ok := a.streams[streamID] + var s stream + var ok bool + if isDelta { + s, ok = a.deltaStreams[streamID] + } else { + s, ok = a.streams[streamID] + } a.RUnlock() if !ok { return stream{}, errors.New("stream is not present") diff --git a/pkg/xds/bootstrap/components.go b/pkg/xds/bootstrap/components.go index 64dcc49197f0..afde7225440b 100644 --- a/pkg/xds/bootstrap/components.go +++ b/pkg/xds/bootstrap/components.go @@ -20,6 +20,7 @@ func RegisterBootstrap(rt core_runtime.Runtime) error { rt.Config().DpServer.Authn.EnableReloadableTokens, rt.Config().DpServer.Hds.Enabled, rt.Config().GetEnvoyAdminPort(), + rt.Config().Experimental.UseDeltaXDS, ) if err != nil { return err diff --git a/pkg/xds/bootstrap/generator.go b/pkg/xds/bootstrap/generator.go index ed6ccc16d55b..1d85688da423 100644 --- a/pkg/xds/bootstrap/generator.go +++ b/pkg/xds/bootstrap/generator.go @@ -38,6 +38,7 @@ func NewDefaultBootstrapGenerator( enableReloadableTokens bool, hdsEnabled bool, defaultAdminPort uint32, + deltaXdsEnabled bool, ) (BootstrapGenerator, error) { hostsAndIps, err := hostsAndIPsFromCertFile(dpServerCertFile) if err != nil { @@ -56,6 +57,7 @@ func NewDefaultBootstrapGenerator( hostsAndIps: hostsAndIps, hdsEnabled: hdsEnabled, defaultAdminPort: defaultAdminPort, + deltaXdsEnabled: deltaXdsEnabled, }, nil } @@ -69,6 +71,7 @@ type bootstrapGenerator struct { hostsAndIps SANSet hdsEnabled bool defaultAdminPort uint32 + deltaXdsEnabled bool } func (b *bootstrapGenerator) Generate(ctx context.Context, request types.BootstrapRequest) (proto.Message, KumaDpBootstrap, error) { @@ -125,6 +128,12 @@ func (b *bootstrapGenerator) Generate(ctx context.Context, request types.Bootstr } } + if request.XDSConfigType == "" { + params.UseDelta = b.deltaXdsEnabled + } else { + params.UseDelta = request.XDSConfigType == "delta" + } + switch mesh_proto.ProxyType(params.ProxyType) { case mesh_proto.IngressProxyType: zoneIngress, err := b.zoneIngressFor(ctx, request, proxyId) diff --git a/pkg/xds/bootstrap/generator_test.go b/pkg/xds/bootstrap/generator_test.go index c71953e5dcd5..b921efb1e64a 100644 --- a/pkg/xds/bootstrap/generator_test.go +++ b/pkg/xds/bootstrap/generator_test.go @@ -117,7 +117,7 @@ var _ = Describe("bootstrapGenerator", func() { proxyConfig = *given.proxyConfig } - generator, err := NewDefaultBootstrapGenerator(resManager, given.serverConfig, proxyConfig, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), given.dpAuthForProxyType, given.useTokenPath, given.hdsEnabled, 0) + generator, err := NewDefaultBootstrapGenerator(resManager, given.serverConfig, proxyConfig, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), given.dpAuthForProxyType, given.useTokenPath, given.hdsEnabled, 0, false) Expect(err).ToNot(HaveOccurred()) // when @@ -205,6 +205,34 @@ var _ = Describe("bootstrapGenerator", func() { expectedConfigFile: "generator.custom-config-minimal-request.golden.yaml", hdsEnabled: true, }), + Entry("custom config with minimal request and delta", testCase{ + dpAuthForProxyType: map[string]bool{}, + serverConfig: func() *bootstrap_config.BootstrapServerConfig { + return &bootstrap_config.BootstrapServerConfig{ + Params: &bootstrap_config.BootstrapParamsConfig{ + AdminAddress: "192.168.0.1", // by default, Envoy Admin interface should listen on loopback address + AdminAccessLogPath: "/var/log", + XdsHost: "localhost", + XdsPort: 15678, + XdsConnectTimeout: config_types.Duration{Duration: 2 * time.Second}, + }, + } + }(), + dataplane: func() *core_mesh.DataplaneResource { + dp := defaultDataplane() + dp.Spec.Networking.Admin.Port = 9902 + return dp + }, + request: types.BootstrapRequest{ + Mesh: "mesh", + Name: "name.namespace", + Version: defaultVersion, + Workdir: "/tmp", + XDSConfigType: "delta", + }, + expectedConfigFile: "generator.custom-config-minimal-request-and-delta.golden.yaml", + hdsEnabled: true, + }), Entry("custom config", testCase{ dpAuthForProxyType: authEnabled, serverConfig: func() *bootstrap_config.BootstrapServerConfig { @@ -543,7 +571,7 @@ var _ = Describe("bootstrapGenerator", func() { cfg := bootstrap_config.DefaultBootstrapServerConfig() proxyCfg := xds_config.DefaultProxyConfig() - generator, err := NewDefaultBootstrapGenerator(resManager, cfg, proxyCfg, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), map[string]bool{}, false, true, 9901) + generator, err := NewDefaultBootstrapGenerator(resManager, cfg, proxyCfg, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), map[string]bool{}, false, true, 9901, false) Expect(err).ToNot(HaveOccurred()) // when @@ -681,7 +709,7 @@ Provide CA that was used to sign a certificate used in the control plane by usin err = resManager.Create(context.Background(), dataplane, store.CreateByKey("name.namespace", "metrics")) Expect(err).ToNot(HaveOccurred()) - generator, err := NewDefaultBootstrapGenerator(resManager, config(), proxyCfg, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), authEnabled, false, false, 0) + generator, err := NewDefaultBootstrapGenerator(resManager, config(), proxyCfg, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), authEnabled, false, false, 0, false) Expect(err).ToNot(HaveOccurred()) // when @@ -776,7 +804,7 @@ Provide CA that was used to sign a certificate used in the control plane by usin err = resManager.Create(context.Background(), dataplane, store.CreateByKey("name.namespace", "metrics")) Expect(err).ToNot(HaveOccurred()) - generator, err := NewDefaultBootstrapGenerator(resManager, config(), proxyCfg, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), authEnabled, false, false, 0) + generator, err := NewDefaultBootstrapGenerator(resManager, config(), proxyCfg, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), authEnabled, false, false, 0, false) Expect(err).ToNot(HaveOccurred()) // when diff --git a/pkg/xds/bootstrap/parameters.go b/pkg/xds/bootstrap/parameters.go index 287c5b0bbe27..ab84368499b4 100644 --- a/pkg/xds/bootstrap/parameters.go +++ b/pkg/xds/bootstrap/parameters.go @@ -52,4 +52,5 @@ type configParameters struct { IsGatewayDataplane bool Resources types.ProxyResources SystemCaPath string + UseDelta bool } diff --git a/pkg/xds/bootstrap/server_test.go b/pkg/xds/bootstrap/server_test.go index 68585c16c473..929bfc3a87b6 100644 --- a/pkg/xds/bootstrap/server_test.go +++ b/pkg/xds/bootstrap/server_test.go @@ -95,7 +95,7 @@ var _ = Describe("Bootstrap Server", func() { proxyConfig := xds_config.DefaultProxyConfig() - generator, err := bootstrap.NewDefaultBootstrapGenerator(resManager, config, proxyConfig, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), authEnabled, false, true, 0) + generator, err := bootstrap.NewDefaultBootstrapGenerator(resManager, config, proxyConfig, filepath.Join("..", "..", "..", "test", "certs", "server-cert.pem"), authEnabled, false, true, 0, false) Expect(err).ToNot(HaveOccurred()) bootstrapHandler := bootstrap.BootstrapHandler{ Generator: generator, diff --git a/pkg/xds/bootstrap/template_v3.go b/pkg/xds/bootstrap/template_v3.go index b36cde651ec9..a84aae7e5d65 100644 --- a/pkg/xds/bootstrap/template_v3.go +++ b/pkg/xds/bootstrap/template_v3.go @@ -106,6 +106,11 @@ func genConfig(parameters configParameters, proxyConfig xds.Proxy, enableReloada }, }) } + configType := envoy_core_v3.ApiConfigSource_GRPC + if parameters.UseDelta { + configType = envoy_core_v3.ApiConfigSource_DELTA_GRPC + } + res := &envoy_bootstrap_v3.Bootstrap{ Node: &envoy_core_v3.Node{ Id: parameters.Id, @@ -168,7 +173,7 @@ func genConfig(parameters configParameters, proxyConfig xds.Proxy, enableReloada ResourceApiVersion: envoy_core_v3.ApiVersion_V3, }, AdsConfig: &envoy_core_v3.ApiConfigSource{ - ApiType: envoy_core_v3.ApiConfigSource_GRPC, + ApiType: configType, TransportApiVersion: envoy_core_v3.ApiVersion_V3, SetNodeOnFirstMessageOnly: true, GrpcServices: []*envoy_core_v3.GrpcService{ diff --git a/pkg/xds/bootstrap/testdata/generator.custom-config-minimal-request-and-delta.golden.yaml b/pkg/xds/bootstrap/testdata/generator.custom-config-minimal-request-and-delta.golden.yaml new file mode 100644 index 000000000000..d813a9ebc47f --- /dev/null +++ b/pkg/xds/bootstrap/testdata/generator.custom-config-minimal-request-and-delta.golden.yaml @@ -0,0 +1,145 @@ +admin: + accessLog: + - name: envoy.access_loggers.file + typedConfig: + '@type': type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: /var/log + address: + socketAddress: + address: 192.168.0.1 + portValue: 9902 +defaultRegexEngine: + name: envoy.regex_engines.google_re2 + typedConfig: + '@type': type.googleapis.com/envoy.extensions.regex_engines.v3.GoogleRE2 +dynamicResources: + adsConfig: + apiType: DELTA_GRPC + grpcServices: + - envoyGrpc: + clusterName: ads_cluster + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 + cdsConfig: + ads: {} + initialFetchTimeout: 0s + resourceApiVersion: V3 + ldsConfig: + ads: {} + initialFetchTimeout: 0s + resourceApiVersion: V3 +hdsConfig: + apiType: GRPC + grpcServices: + - envoyGrpc: + clusterName: ads_cluster + setNodeOnFirstMessageOnly: true + transportApiVersion: V3 +layeredRuntime: + layers: + - name: kuma + staticLayer: + re2.max_program_size.error_level: 4294967295 + re2.max_program_size.warn_level: 1000 +node: + cluster: backend + id: mesh.name.namespace + metadata: + dataplane.admin.address: 192.168.0.1 + dataplane.admin.port: "9902" + dataplane.proxyType: dataplane + features: [] + metricsCertPath: "" + metricsKeyPath: "" + systemCaPath: "" + version: + envoy: + build: hash/1.15.0/RELEASE + version: 1.15.0 + kumaDp: + buildDate: "2019-08-07T11:26:06Z" + gitCommit: 91ce236824a9d875601679aa80c63783fb0e8725 + gitTag: v0.0.1 + version: 0.0.1 + workdir: /tmp +staticResources: + clusters: + - connectTimeout: 2s + loadAssignment: + clusterName: access_log_sink + endpoints: + - lbEndpoints: + - endpoint: + address: + pipe: + path: /tmp/kuma-al-name.namespace-mesh.sock + name: access_log_sink + type: STATIC + typedExtensionProtocolOptions: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicitHttpConfig: + http2ProtocolOptions: {} + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 10 + keepaliveProbes: 3 + keepaliveTime: 10 + - connectTimeout: 2s + loadAssignment: + clusterName: ads_cluster + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: localhost + portValue: 15678 + name: ads_cluster + transportSocket: + name: envoy.transport_sockets.tls + typedConfig: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + commonTlsContext: + tlsParams: + tlsMinimumProtocolVersion: TLSv1_2 + validationContextSdsSecretConfig: + name: cp_validation_ctx + sni: localhost + type: STRICT_DNS + typedExtensionProtocolOptions: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicitHttpConfig: + http2ProtocolOptions: {} + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 10 + keepaliveProbes: 3 + keepaliveTime: 10 + secrets: + - name: cp_validation_ctx + validationContext: + matchTypedSubjectAltNames: + - matcher: + exact: localhost + sanType: DNS + - matcher: + exact: localhost + sanType: IP_ADDRESS + trustedCa: + inlineBytes: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURNekNDQWh1Z0F3SUJBZ0lRRGhsSW5mc1hZSGFtS04rMjlxblF2ekFOQmdrcWhraUc5dzBCQVFzRkFEQVAKTVEwd0N3WURWUVFERXdScmRXMWhNQjRYRFRJeE1EUXdNakV3TWpJeU5sb1hEVE14TURNek1URXdNakl5TmxvdwpEekVOTUFzR0ExVUVBeE1FYTNWdFlUQ0NBU0l3RFFZSktvWklodmNOQVFFQkJRQURnZ0VQQURDQ0FRb0NnZ0VCCkFMNEdHZytlMk83ZUExMkYwRjZ2MnJyOGoyaVZTRktlcG5adEwxNWxyQ2RzNmxxSzUwc1hXT3c4UEtacDJpaEEKWEpWVFNaekthc3lMRFRBUjlWWVFqVHBFNTI2RXp2dGR0aFNhZ2YzMlFXVyt3WTZMTXBFZGV4S09PQ3gyc2U1NQpSZDk3TDMzeVlQZmdYMTVPWWxpSFBEMDU2ampob3RITGROMmxweTcrU1REdlF5Um5YQXU3M1lrWTM3RWQ0aEk0CnQvVjZzb0h5RUdOY0RobTlwNWZCR3F6MG5qQmJRa3AybFRZNS9rajQycUI3UTZyQ00ydGJQc0VNb29lQUF3NW0KaHlZNHhqMHRQOXVjcWxVejhnYys2bzhIRE5zdDhOZUpYWmt0V24rQ095dGpyL056R2dTMjJrdlNEcGhpc0pvdApvMEZ5b0lPZEF0eEMxcXhYWFIrWHVVVUNBd0VBQWFPQmlqQ0JoekFPQmdOVkhROEJBZjhFQkFNQ0FxUXdIUVlEClZSMGxCQll3RkFZSUt3WUJCUVVIQXdFR0NDc0dBUVVGQndNQk1BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0hRWUQKVlIwT0JCWUVGS1JMa2dJelgvT2pLdzlpZGVwdVEvUk10VCtBTUNZR0ExVWRFUVFmTUIyQ0NXeHZZMkZzYUc5egpkSWNRL1FDaEl3QUFBQUFBQUFBQUFBQUFBVEFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBUHM1eUpaaG9ZbEdXCkNwQThkU0lTaXZNOC84aUJOUTNmVndQNjNmdDBFSkxNVkd1MlJGWjQvVUFKL3JVUFNHTjh4aFhTazUrMWQ1NmEKL2thSDlyWDBIYVJJSEhseEE3aVBVS3hBajQ0eDlMS21xUEhUb0wzWGxXWTFBWHp2aWNXOWQrR00yRmFRZWUrSQpsZWFxTGJ6MEFadmxudTI3MVoxQ2VhQUN1VTlHbGp1anZ5aVRURTluYUhVRXF2SGdTcFB0aWxKYWx5SjUveklsClo5RjArVVd0M1RPWU1zNWcrU0N0ME13SFROYmlzYm1ld3BjRkZKemp0Mmt2dHJjOXQ5ZGtGODF4aGNTMTl3N3EKaDFBZVAzUlJsTGw3YnY5RUFWWEVtSWF2aWgvMjlQQTNaU3krcGJZTlc3ak5KSGpNUTRoUTBFK3hjQ2F6VS9PNAp5cFdHYWFudlBnPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo= +statsConfig: + statsTags: + - regex: ^grpc\.((.+)\.) + tagName: name + - regex: ^grpc.*streams_closed(_([0-9]+)) + tagName: status + - regex: ^kafka(\.(\S*[0-9]))\. + tagName: kafka_name + - regex: ^kafka\..*\.(.*?(?=_duration|$)) + tagName: kafka_type + - regex: (worker_([0-9]+)\.) + tagName: worker + - regex: ((.+?)\.)rbac\. + tagName: listener diff --git a/pkg/xds/bootstrap/types/bootstrap_request.go b/pkg/xds/bootstrap/types/bootstrap_request.go index df018fd9596b..eec6ae85ee5c 100644 --- a/pkg/xds/bootstrap/types/bootstrap_request.go +++ b/pkg/xds/bootstrap/types/bootstrap_request.go @@ -21,6 +21,7 @@ type BootstrapRequest struct { Workdir string `json:"workdir"` MetricsResources MetricsResources `json:"metricsResources"` SystemCaPath string `json:"systemCaPath"` + XDSConfigType string `json:"xdsConfigType,omitempty"` } type Version struct { diff --git a/pkg/xds/envoy/imports.go b/pkg/xds/envoy/imports.go index d3081bc99d0a..53b713f35492 100644 --- a/pkg/xds/envoy/imports.go +++ b/pkg/xds/envoy/imports.go @@ -127,6 +127,7 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/data/tap/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/filters/cel/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/fluentd/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/grpc/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/open_telemetry/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/stream/v3" @@ -151,12 +152,14 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/dependency/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/fault/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/matcher/action/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/set_filter_state/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/adaptive_concurrency/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/admission_control/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/alternate_protocols_cache/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/aws_lambda/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/aws_request_signing/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/bandwidth_limit/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/basic_auth/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/buffer/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cache/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cdn_loop/v3" @@ -164,6 +167,7 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/compressor/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/connect_grpc_bridge/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cors/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/credential_injector/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/csrf/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/custom_response/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/decompressor/v3" @@ -193,13 +197,16 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/oauth2/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/on_demand/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/original_src/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/proto_message_logging/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/rate_limit_quota/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/rbac/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/set_filter_state/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/set_metadata/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/stateful_session/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/tap/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/thrift_to_metadata/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/upstream_codec/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/http_inspector/v3" @@ -214,12 +221,19 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/dubbo_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/echo/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/ext_authz/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/generic_proxy/action/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/generic_proxy/codecs/dubbo/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/generic_proxy/codecs/http1/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/generic_proxy/matcher/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/generic_proxy/router/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/generic_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/local_ratelimit/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/mongo_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/ratelimit/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/rbac/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/redis_proxy/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/set_filter_state/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/sni_cluster/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/sni_dynamic_forward_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" @@ -231,11 +245,14 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/wasm/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/zookeeper_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/udp/dns_filter/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/udp/udp_proxy/session/dynamic_forward_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/udp/udp_proxy/session/http_capsule/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/udp/udp_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/formatter/cel/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/formatter/metadata/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/formatter/req_without_query/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/geoip_providers/common/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/geoip_providers/maxmind/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/health_check/event_sinks/file/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/health_checkers/redis/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/health_checkers/thrift/v3" @@ -246,6 +263,8 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/early_header_mutation/header_mutation/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/header_formatters/preserve_case/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/header_validators/envoy_default/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/injected_credentials/generic/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/injected_credentials/oauth2/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/original_ip_detection/custom_header/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/original_ip_detection/xff/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/http/stateful_session/cookie/v3" @@ -270,13 +289,17 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/matching/common_inputs/ssl/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/matching/input_matchers/consistent_hashing/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/matching/input_matchers/ip/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/matching/input_matchers/metadata/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/matching/input_matchers/runtime_fraction/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/network/dns_resolver/apple/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/network/dns_resolver/cares/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/network/dns_resolver/getaddrinfo/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/network/socket_interface/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/outlier_detection_monitors/common/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/outlier_detection_monitors/consecutive_errors/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/path/match/uri_template/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/path/rewrite/uri_template/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/quic/connection_debug_visitor/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/quic/connection_id_generator/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/quic/crypto_stream/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/quic/proof_source/v3" @@ -293,9 +316,13 @@ import ( _ "github.com/envoyproxy/go-control-plane/envoy/extensions/retry/host/omit_host_metadata/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/retry/host/previous_hosts/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/retry/priority/previous_priorities/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/router/cluster_specifiers/lua/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/stat_sinks/graphite_statsd/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/stat_sinks/open_telemetry/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/stat_sinks/wasm/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/string_matcher/lua/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/tracers/opentelemetry/resource_detectors/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/tracers/opentelemetry/samplers/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/alts/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/http_11_proxy/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/internal_upstream/v3" diff --git a/pkg/xds/runtime/context.go b/pkg/xds/runtime/context.go index 522655322753..0571e1133de6 100644 --- a/pkg/xds/runtime/context.go +++ b/pkg/xds/runtime/context.go @@ -17,7 +17,7 @@ type XDSRuntimeContext struct { DpProxyAuthenticator xds_auth.Authenticator ZoneProxyAuthenticator xds_auth.Authenticator Hooks *xds_hooks.Hooks - ServerCallbacks util_xds.Callbacks + ServerCallbacks util_xds.MultiXDSCallbacks Metrics *xds_metrics.Metrics } diff --git a/pkg/xds/server/callbacks/dataplane_callbacks.go b/pkg/xds/server/callbacks/dataplane_callbacks.go index f0c55a686ef1..59e88778f46c 100644 --- a/pkg/xds/server/callbacks/dataplane_callbacks.go +++ b/pkg/xds/server/callbacks/dataplane_callbacks.go @@ -34,15 +34,18 @@ type xdsCallbacks struct { util_xds.NoopCallbacks sync.RWMutex - dpStreams map[core_xds.StreamID]dpStream + dpStreams map[core_xds.StreamID]dpStream + dpDeltaStreams map[core_xds.StreamID]dpStream + // we don't need separate map for stream because we use here resource key activeStreams map[core_model.ResourceKey]int } -func DataplaneCallbacksToXdsCallbacks(callbacks DataplaneCallbacks) util_xds.Callbacks { +func DataplaneCallbacksToXdsCallbacks(callbacks DataplaneCallbacks) util_xds.MultiXDSCallbacks { return &xdsCallbacks{ - callbacks: callbacks, - dpStreams: map[core_xds.StreamID]dpStream{}, - activeStreams: map[core_model.ResourceKey]int{}, + callbacks: callbacks, + dpStreams: map[core_xds.StreamID]dpStream{}, + dpDeltaStreams: map[core_xds.StreamID]dpStream{}, + activeStreams: map[core_model.ResourceKey]int{}, } } @@ -51,12 +54,41 @@ type dpStream struct { ctx context.Context } -var _ util_xds.Callbacks = &xdsCallbacks{} +var _ util_xds.MultiXDSCallbacks = &xdsCallbacks{} func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) { + d.onStreamClosed(streamID, false) +} + +func (d *xdsCallbacks) OnDeltaStreamClosed(streamID core_xds.StreamID) { + d.onStreamClosed(streamID, true) +} + +func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error { + return d.onStreamRequest(streamID, request, false) +} + +func (d *xdsCallbacks) OnStreamDeltaRequest(streamID core_xds.StreamID, request util_xds.DeltaDiscoveryRequest) error { + return d.onStreamRequest(streamID, request, true) +} + +func (d *xdsCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { + return d.onStreamOpen(ctx, streamID, false) +} + +func (d *xdsCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { + return d.onStreamOpen(ctx, streamID, true) +} + +func (d *xdsCallbacks) onStreamClosed(streamID core_xds.StreamID, isDelta bool) { var lastStreamDpKey *core_model.ResourceKey d.Lock() - dpStream := d.dpStreams[streamID] + var dpStream dpStream + if isDelta { + dpStream = d.dpDeltaStreams[streamID] + } else { + dpStream = d.dpStreams[streamID] + } if dpKey := dpStream.dp; dpKey != nil { d.activeStreams[*dpKey]-- if d.activeStreams[*dpKey] == 0 { @@ -64,7 +96,11 @@ func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) { delete(d.activeStreams, *dpKey) } } - delete(d.dpStreams, streamID) + if isDelta { + delete(d.dpDeltaStreams, streamID) + } else { + delete(d.dpStreams, streamID) + } d.Unlock() if lastStreamDpKey != nil { // execute callback after lock is freed, so heavy callback implementation won't block every callback for every DPP. @@ -72,7 +108,7 @@ func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) { } } -func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error { +func (d *xdsCallbacks) onStreamRequest(streamID core_xds.StreamID, request util_xds.Request, isDelta bool) error { if request.NodeId() == "" { // from https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-versioning: // Only the first request on a stream is guaranteed to carry the node identifier. @@ -84,7 +120,13 @@ func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_ } d.RLock() - alreadyProcessed := d.dpStreams[streamID].dp != nil + var alreadyProcessed bool + if isDelta { + alreadyProcessed = d.dpDeltaStreams[streamID].dp != nil + } else { + alreadyProcessed = d.dpStreams[streamID].dp != nil + } + d.RUnlock() if alreadyProcessed { return nil @@ -104,14 +146,27 @@ func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_ // in case client will open 2 concurrent request for the same streamID then // we don't to increment the counter twice, so checking once again that stream // wasn't processed - alreadyProcessed = d.dpStreams[streamID].dp != nil + if isDelta { + alreadyProcessed = d.dpDeltaStreams[streamID].dp != nil + } else { + alreadyProcessed = d.dpStreams[streamID].dp != nil + } if alreadyProcessed { return nil } - dpStream := d.dpStreams[streamID] + var dpStream dpStream + if isDelta { + dpStream = d.dpDeltaStreams[streamID] + } else { + dpStream = d.dpStreams[streamID] + } dpStream.dp = &dpKey - d.dpStreams[streamID] = dpStream + if isDelta { + d.dpDeltaStreams[streamID] = dpStream + } else { + d.dpStreams[streamID] = dpStream + } activeStreams := d.activeStreams[dpKey] d.activeStreams[dpKey]++ @@ -129,13 +184,18 @@ func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_ return nil } -func (d *xdsCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { +func (d *xdsCallbacks) onStreamOpen(ctx context.Context, streamID core_xds.StreamID, isDelta bool) error { d.Lock() defer d.Unlock() dps := dpStream{ ctx: ctx, } - d.dpStreams[streamID] = dps + if isDelta { + d.dpDeltaStreams[streamID] = dps + } else { + d.dpStreams[streamID] = dps + } + return nil } diff --git a/pkg/xds/server/callbacks/dataplane_status_tracker.go b/pkg/xds/server/callbacks/dataplane_status_tracker.go index 8df8912364a7..bc23961c9514 100644 --- a/pkg/xds/server/callbacks/dataplane_status_tracker.go +++ b/pkg/xds/server/callbacks/dataplane_status_tracker.go @@ -21,6 +21,7 @@ var statusTrackerLog = core.Log.WithName("xds").WithName("status-tracker") type DataplaneStatusTracker interface { util_xds.Callbacks + util_xds.DeltaCallbacks GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool) } @@ -38,6 +39,7 @@ func NewDataplaneStatusTracker( runtimeInfo: runtimeInfo, createStatusSink: createStatusSink, streams: make(map[int64]*streamState), + deltaStreams: make(map[int64]*streamState), } } @@ -49,6 +51,7 @@ type dataplaneStatusTracker struct { createStatusSink DataplaneInsightSinkFactoryFunc mu sync.RWMutex // protects access to the fields below streams map[int64]*streamState + deltaStreams map[int64]*streamState } type streamState struct { @@ -61,73 +64,83 @@ type streamState struct { // OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS). // Returning an error will end processing and close the stream. OnStreamClosed will still be called. func (c *dataplaneStatusTracker) OnStreamOpen(ctx context.Context, streamID int64, typ string) error { - c.mu.Lock() // write access to the map of all ADS streams - defer c.mu.Unlock() - - // initialize subscription - now := core.Now() - subscription := &mesh_proto.DiscoverySubscription{ - Id: core.NewUUID(), - ControlPlaneInstanceId: c.runtimeInfo.GetInstanceId(), - ConnectTime: util_proto.MustTimestampProto(now), - Status: mesh_proto.NewSubscriptionStatus(now), - Version: mesh_proto.NewVersion(), - } - // initialize state per ADS stream - state := &streamState{ - stop: make(chan struct{}), - subscription: subscription, - } - // save - c.streams[streamID] = state + return c.onStreamOpen(streamID, typ, false) +} - statusTrackerLog.V(1).Info("proxy connecting", "streamID", streamID, "type", typ, "subscriptionID", subscription.Id) - return nil +// OnDeltaStreamOpen is called once an Delta xDS stream is open with a stream ID and the type URL (or "" for ADS). +// Returning an error will end processing and close the stream. OnDeltaStreamOpen will still be called. +func (c *dataplaneStatusTracker) OnDeltaStreamOpen(_ context.Context, streamID int64, typ string) error { + return c.onStreamOpen(streamID, typ, true) } // OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID. func (c *dataplaneStatusTracker) OnStreamClosed(streamID int64) { - c.mu.Lock() // write access to the map of all ADS streams - defer c.mu.Unlock() + c.onStreamClose(streamID, false) +} - state := c.streams[streamID] - if state == nil { - statusTrackerLog.Info("[WARNING] proxy disconnected but no state in the status_tracker", "streamID", streamID) - return - } +// OnDeltaStreamClosed is called immediately prior to closing an Delta xDS stream with a stream ID. +func (c *dataplaneStatusTracker) OnDeltaStreamClosed(streamID int64) { + c.onStreamClose(streamID, true) +} - delete(c.streams, streamID) +// OnStreamRequest is called once a request is received on a stream. +// Returning an error will end processing and close the stream. OnStreamClosed will still be called. +func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.DiscoveryRequest) error { + return c.onStreamRequest(streamID, req, false) +} - // finilize subscription - state.mu.Lock() // write access to the per Dataplane info - subscription := state.subscription - subscription.DisconnectTime = util_proto.MustTimestampProto(core.Now()) - state.mu.Unlock() +// OnStreamDeltaRequest is called once a request is received on a delta stream. +// Returning an error will end processing and close the stream. OnStreamDeltaRequest will still be called. +func (c *dataplaneStatusTracker) OnStreamDeltaRequest(streamID int64, req util_xds.DeltaDiscoveryRequest) error { + return c.onStreamRequest(streamID, req, true) +} - // trigger final flush - state.Close() +// OnStreamResponse is called immediately prior to sending a response on a stream. +func (c *dataplaneStatusTracker) OnStreamResponse(streamID int64, req util_xds.DiscoveryRequest, resp util_xds.DiscoveryResponse) { + c.onStreamResponse(streamID, req, resp, false) +} - log := statusTrackerLog.WithValues( - "streamID", streamID, - "proxyName", state.dataplaneId.Name, - "mesh", state.dataplaneId.Mesh, - "subscriptionID", state.subscription.Id, - ) +// OnStreamDeltaResponse is called immediately prior to sending a response on a delta stream. +func (c *dataplaneStatusTracker) OnStreamDeltaResponse(streamID int64, req util_xds.DeltaDiscoveryRequest, resp util_xds.DeltaDiscoveryResponse) { + c.onStreamResponse(streamID, req, resp, true) +} - if statusTrackerLog.V(1).Enabled() { - log = log.WithValues("subscription", subscription) +// To keep logs short, we want to log "Listeners" instead of full qualified Envoy type url name +func shortEnvoyType(typeURL string) string { + segments := strings.Split(typeURL, ".") + if len(segments) <= 1 { + return typeURL } + return segments[len(segments)-1] +} - log.Info("proxy disconnected") +func (c *dataplaneStatusTracker) GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool) { + state, ok := c.streams[streamID] + return state, ok } -// OnStreamRequest is called once a request is received on a stream. -// Returning an error will end processing and close the stream. OnStreamClosed will still be called. -func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.DiscoveryRequest) error { +var _ SubscriptionStatusAccessor = &streamState{} + +func (s *streamState) GetStatus() (core_model.ResourceKey, *mesh_proto.DiscoverySubscription) { + s.mu.RLock() // read access to the per Dataplane info + defer s.mu.RUnlock() + return s.dataplaneId, proto.Clone(s.subscription).(*mesh_proto.DiscoverySubscription) +} + +func (s *streamState) Close() { + close(s.stop) +} + +func (c *dataplaneStatusTracker) onStreamRequest(streamID int64, req util_xds.Request, isDelta bool) error { c.mu.RLock() // read access to the map of all ADS streams defer c.mu.RUnlock() - state := c.streams[streamID] + var state *streamState + if isDelta { + state = c.deltaStreams[streamID] + } else { + state = c.streams[streamID] + } state.mu.Lock() // write access to the per Dataplane info defer state.mu.Unlock() @@ -215,12 +228,16 @@ func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.Di return nil } -// OnStreamResponse is called immediately prior to sending a response on a stream. -func (c *dataplaneStatusTracker) OnStreamResponse(streamID int64, req util_xds.DiscoveryRequest, resp util_xds.DiscoveryResponse) { +func (c *dataplaneStatusTracker) onStreamResponse(streamID int64, req util_xds.Request, resp util_xds.Response, isDelta bool) { c.mu.RLock() // read access to the map of all ADS streams defer c.mu.RUnlock() - state := c.streams[streamID] + var state *streamState + if isDelta { + state = c.deltaStreams[streamID] + } else { + state = c.streams[streamID] + } state.mu.Lock() // write access to the per Dataplane info defer state.mu.Unlock() @@ -238,7 +255,7 @@ func (c *dataplaneStatusTracker) OnStreamResponse(streamID int64, req util_xds.D "type", shortEnvoyType(req.GetTypeUrl()), "resourceVersion", resp.VersionInfo(), "requestedResourceNames", req.GetResourceNames(), - "resourceCount", len(resp.GetResources()), + "resourceCount", resp.GetNumberOfResources(), ) if statusTrackerLog.V(1).Enabled() { log = log.WithValues( @@ -250,28 +267,74 @@ func (c *dataplaneStatusTracker) OnStreamResponse(streamID int64, req util_xds.D log.V(1).Info("config sent") } -// To keep logs short, we want to log "Listeners" instead of full qualified Envoy type url name -func shortEnvoyType(typeURL string) string { - segments := strings.Split(typeURL, ".") - if len(segments) <= 1 { - return typeURL +func (c *dataplaneStatusTracker) onStreamOpen(streamID int64, typ string, isDelta bool) error { + c.mu.Lock() // write access to the map of all ADS streams + defer c.mu.Unlock() + + // initialize subscription + now := core.Now() + subscription := &mesh_proto.DiscoverySubscription{ + Id: core.NewUUID(), + ControlPlaneInstanceId: c.runtimeInfo.GetInstanceId(), + ConnectTime: util_proto.MustTimestampProto(now), + Status: mesh_proto.NewSubscriptionStatus(now), + Version: mesh_proto.NewVersion(), + } + // initialize state per ADS stream + state := &streamState{ + stop: make(chan struct{}), + subscription: subscription, + } + // save + if isDelta { + c.deltaStreams[streamID] = state + } else { + c.streams[streamID] = state } - return segments[len(segments)-1] -} -func (c *dataplaneStatusTracker) GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool) { - state, ok := c.streams[streamID] - return state, ok + statusTrackerLog.V(1).Info("proxy connecting", "streamID", streamID, "type", typ, "subscriptionID", subscription.Id) + return nil } -var _ SubscriptionStatusAccessor = &streamState{} +func (c *dataplaneStatusTracker) onStreamClose(streamID int64, isDelta bool) { + c.mu.Lock() // write access to the map of all ADS streams + defer c.mu.Unlock() -func (s *streamState) GetStatus() (core_model.ResourceKey, *mesh_proto.DiscoverySubscription) { - s.mu.RLock() // read access to the per Dataplane info - defer s.mu.RUnlock() - return s.dataplaneId, proto.Clone(s.subscription).(*mesh_proto.DiscoverySubscription) -} + var state *streamState + if isDelta { + state = c.deltaStreams[streamID] + } else { + state = c.streams[streamID] + } + if state == nil { + statusTrackerLog.Info("[WARNING] proxy disconnected but no state in the status_tracker", "streamID", streamID) + return + } -func (s *streamState) Close() { - close(s.stop) + if isDelta { + delete(c.deltaStreams, streamID) + } else { + delete(c.streams, streamID) + } + // finilize subscription + state.mu.Lock() // write access to the per Dataplane info + subscription := state.subscription + subscription.DisconnectTime = util_proto.MustTimestampProto(core.Now()) + state.mu.Unlock() + + // trigger final flush + state.Close() + + log := statusTrackerLog.WithValues( + "streamID", streamID, + "proxyName", state.dataplaneId.Name, + "mesh", state.dataplaneId.Mesh, + "subscriptionID", state.subscription.Id, + ) + + if statusTrackerLog.V(1).Enabled() { + log = log.WithValues("subscription", subscription) + } + + log.Info("proxy disconnected") } diff --git a/pkg/xds/server/callbacks/nack_backoff.go b/pkg/xds/server/callbacks/nack_backoff.go index b5236da81390..76dcdfd84bfb 100644 --- a/pkg/xds/server/callbacks/nack_backoff.go +++ b/pkg/xds/server/callbacks/nack_backoff.go @@ -14,15 +14,23 @@ type nackBackoff struct { util_xds.NoopCallbacks } -var _ util_xds.Callbacks = &nackBackoff{} +var _ util_xds.MultiXDSCallbacks = &nackBackoff{} -func NewNackBackoff(backoff time.Duration) util_xds.Callbacks { +func NewNackBackoff(backoff time.Duration) util_xds.MultiXDSCallbacks { return &nackBackoff{ backoff: backoff, } } func (n *nackBackoff) OnStreamResponse(_ int64, request util_xds.DiscoveryRequest, _ util_xds.DiscoveryResponse) { + n.applyBackoff(request) +} + +func (n *nackBackoff) OnStreamDeltaResponse(_ int64, request util_xds.DeltaDiscoveryRequest, _ util_xds.DeltaDiscoveryResponse) { + n.applyBackoff(request) +} + +func (n *nackBackoff) applyBackoff(request util_xds.Request) { if request.HasErrors() { // When DiscoveryRequest contains errors, it means that Envoy rejected configuration generated by Control Plane // It may happen for several reasons: diff --git a/pkg/xds/server/v3/components.go b/pkg/xds/server/v3/components.go index 55cb870dd8c4..c8767b6213fa 100644 --- a/pkg/xds/server/v3/components.go +++ b/pkg/xds/server/v3/components.go @@ -5,6 +5,10 @@ import ( "time" envoy_service_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/config" + envoy_server_delta "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" + envoy_server_rest "github.com/envoyproxy/go-control-plane/pkg/server/rest/v3" + envoy_server_sotw "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -65,10 +69,33 @@ func RegisterXDS( callbacks = append(callbacks, util_xds_v3.AdaptCallbacks(cb)) } - srv := envoy_server.NewServer(context.Background(), xdsContext.Cache(), callbacks) + deltaCallbacks := util_xds_v3.CallbacksChain{ + util_xds_v3.NewControlPlaneIdCallbacks(rt.GetInstanceId()), + util_xds_v3.AdaptDeltaCallbacks(statsCallbacks), + util_xds_v3.AdaptDeltaCallbacks(authCallbacks), + util_xds_v3.AdaptDeltaCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(metadataTracker)), + util_xds_v3.AdaptDeltaCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))), + util_xds_v3.AdaptDeltaCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks( + xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager(), authenticator, rt.Config().XdsServer.DataplaneDeregistrationDelay.Duration, rt.GetInstanceId())), + ), + util_xds_v3.AdaptDeltaCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)), + util_xds_v3.AdaptDeltaCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff.Duration)), + } + + if cb := rt.XDS().ServerCallbacks; cb != nil { + deltaCallbacks = append(deltaCallbacks, util_xds_v3.AdaptDeltaCallbacks(cb)) + } + + rest := envoy_server_rest.NewServer(xdsContext.Cache(), callbacks) + sotw := envoy_server_sotw.NewServer(context.Background(), xdsContext.Cache(), callbacks) + ordered := func(o *config.Opts) { + o.Ordered = true + } + delta := envoy_server_delta.NewServer(context.Background(), xdsContext.Cache(), deltaCallbacks, ordered) + newServerAdvanced := envoy_server.NewServerAdvanced(rest, sotw, delta) xdsServerLog.Info("registering Aggregated Discovery Service V3 in Dataplane Server") - envoy_service_discovery.RegisterAggregatedDiscoveryServiceServer(rt.DpServer().GrpcServer(), srv) + envoy_service_discovery.RegisterAggregatedDiscoveryServiceServer(rt.DpServer().GrpcServer(), newServerAdvanced) return nil }