diff --git a/go.mod b/go.mod index c61f1486..066d2fe0 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/cilium/ebpf v0.10.0 + github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 github.com/edwarnicke/serialize v1.0.7 github.com/golang/protobuf v1.5.3 github.com/google/uuid v1.3.0 @@ -32,7 +33,6 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/edwarnicke/exechelper v1.0.2 // indirect - github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 // indirect github.com/edwarnicke/grpcfd v1.1.2 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/ftrvxmtrx/fd v0.0.0-20150925145434-c6d800382fff // indirect diff --git a/pkg/networkservice/vl3lb/client.go b/pkg/networkservice/vl3lb/client.go new file mode 100644 index 00000000..682089ba --- /dev/null +++ b/pkg/networkservice/vl3lb/client.go @@ -0,0 +1,114 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vl3lb + +import ( + "context" + "net" + "net/url" + "time" + + "go.fd.io/govpp/api" + "google.golang.org/grpc" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/govpp/binapi/ip_types" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client" + registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd" + registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd" +) + +type vl3lbClient struct { + port uint16 + targetPort uint16 + protocol ip_types.IPProto + selector map[string]string + + chainCtx context.Context + vppConn api.Connection + nseRegistryClient registry.NetworkServiceEndpointRegistryClient + dialTimeout time.Duration + dialOpts []grpc.DialOption +} + +// NewClient - return a new Client chain element implementing the vl3 load balancer +func NewClient(chainCtx context.Context, vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient { + opts := &vl3LBOptions{ + port: 80, + targetPort: 80, + protocol: ip_types.IP_API_PROTO_TCP, + selector: make(map[string]string), + clientURL: &url.URL{Scheme: "unix", Host: "connect.to.socket"}, + } + for _, opt := range options { + opt(opts) + } + + nseRegistryClient := registryclient.NewNetworkServiceEndpointRegistryClient(chainCtx, + registryclient.WithClientURL(opts.clientURL), + registryclient.WithNSEAdditionalFunctionality( + registryrecvfd.NewNetworkServiceEndpointRegistryClient(), + registrysendfd.NewNetworkServiceEndpointRegistryClient(), + ), + registryclient.WithDialOptions(opts.dialOpts...), + ) + + return &vl3lbClient{ + port: opts.port, + targetPort: opts.targetPort, + protocol: opts.protocol, + selector: opts.selector, + chainCtx: chainCtx, + vppConn: vppConn, + nseRegistryClient: nseRegistryClient, + dialTimeout: opts.dialTimeout, + dialOpts: opts.dialOpts, + } +} + +func (lb *vl3lbClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + requestContext := request.GetConnection().GetContext() + var previousIP net.IP + if requestContext != nil && requestContext.GetIpContext() != nil { + previousIP = requestContext.GetIpContext().GetSrcIPNets()[0].IP + } + conn, err := next.Client(ctx).Request(ctx, request, opts...) + if err != nil { + return nil, err + } + if !previousIP.Equal(conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP) { + if cancel, ok := loadAndDeleteCancel(ctx); ok { + cancel() + } + cancelCtx, cancel := context.WithCancel(lb.chainCtx) + go lb.balanceService(cancelCtx, conn) + + storeCancel(ctx, cancel) + } + + return conn, nil +} + +func (lb *vl3lbClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + if cancel, ok := loadAndDeleteCancel(ctx); ok { + cancel() + } + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/vl3lb/common.go b/pkg/networkservice/vl3lb/common.go new file mode 100644 index 00000000..2c46019b --- /dev/null +++ b/pkg/networkservice/vl3lb/common.go @@ -0,0 +1,160 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vl3lb + +import ( + "context" + "net/url" + + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +// This function balances the load between the 'real' servers. +// The 'real' servers are searched by 'Selector' +// +// Briefly, the work looks like this: +// 1. Find all vl3-NSE +// 2. Connect to grpc monitor services of these vl3-NSE +// 3. Monitor all connections containing the vl3-NSE name +// 4. Filter out those connections that contain 'Selector' labels +// 5. Get SrcIPs from these connections +// 6. Configure VPP load balancing + +func (lb *vl3lbClient) balanceService(ctx context.Context, conn *networkservice.Connection) { + loggerLb := log.FromContext(ctx).WithField("LoadBalancer", conn.NetworkService) + + // 1. Find all vl3-NSE + nseStream, err := lb.nseRegistryClient.Find(ctx, ®istry.NetworkServiceEndpointQuery{ + NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ + NetworkServiceNames: []string{conn.NetworkService}, + }, + Watch: true, + }) + if err != nil { + loggerLb.Errorf("error getting nses: %+v", err) + return + } + + lbVpp := newHandler(lb.vppConn, &endpoint{ + IP: conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP, + Port: lb.port, + }, lb.protocol) + + monitoredNSEs := make(map[string]string) + for { + msg, err := nseStream.Recv() + if err != nil { + break + } + + // Do not monitor the same NSE multiple times + if msg.Deleted { + delete(monitoredNSEs, msg.GetNetworkServiceEndpoint().GetName()) + continue + } + if _, ok := monitoredNSEs[msg.GetNetworkServiceEndpoint().GetName()]; ok { + continue + } + monitoredNSEs[msg.GetNetworkServiceEndpoint().GetName()] = "" + + go lb.balanceNSE(ctx, loggerLb, lbVpp, msg.NetworkServiceEndpoint) + } +} + +func (lb *vl3lbClient) balanceNSE(ctx context.Context, loggerLb log.Logger, lbVpp *handler, nse *registry.NetworkServiceEndpoint) { + logger := loggerLb.WithField("NSE", nse.Name) + urlNSE, err := url.Parse(nse.Url) + if err != nil { + logger.Errorf("url.Parse: %+v", err) + return + } + + // 2. Connect to grpc monitor services of these vl3-NSE + dialCtx, cancelDial := context.WithTimeout(ctx, lb.dialTimeout) + defer cancelDial() + + ccMonitor, err := grpc.DialContext(dialCtx, grpcutils.URLToTarget(urlNSE), lb.dialOpts...) + if err != nil { + logger.Errorf("failed to dial: %v, URL: %v, err: %v", nse.Name, urlNSE.String(), err.Error()) + return + } + logger.Info("connected") + + // 3. Monitor all connections containing the vl3-NSE name + monitorClientNse := networkservice.NewMonitorConnectionClient(ccMonitor) + monitorCtx, cancelMonitor := context.WithCancel(ctx) + defer cancelMonitor() + + stream, err := monitorClientNse.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{ + PathSegments: []*networkservice.PathSegment{{ + Name: nse.Name, + }}, + }) + if err != nil { + logger.WithField("NSE", nse.Name).Errorf("failed to MonitorConnections: %v", err.Error()) + return + } + for { + event, err := stream.Recv() + if err != nil { + logger.WithField("NSE", nse.Name).Errorf("error MonitorConnections stream: %v", err.Error()) + _ = lbVpp.deleteServers(context.Background(), nse.Name, lbVpp.getServerIDsByVL3Name(nse.Name)) + break + } + + // 4. Filter out those connections that contain 'Selector' labels + add, del := filterConnections(event, lb.selector, lb.targetPort) + // 6. Configure VPP load balancing + if err = lbVpp.addServers(ctx, nse.Name, add); err != nil { + logger.Errorf("addServers error: %v", err.Error()) + } + if err = lbVpp.deleteServers(ctx, nse.Name, del); err != nil { + logger.Errorf("deleteServers error: %v", err.Error()) + } + } +} + +func filterConnections(event *networkservice.ConnectionEvent, selector map[string]string, targetPort uint16) (add map[string]*endpoint, del []string) { + add = make(map[string]*endpoint) + for _, eventConnection := range event.Connections { + for k, v := range selector { + if eventConnection.Labels[k] == v { + if event.GetType() == networkservice.ConnectionEventType_DELETE { + del = append(del, eventConnection.Id) + } else { + // 5. Get SrcIPs from these connections + switch eventConnection.GetState() { + case networkservice.State_DOWN: + del = append(del, eventConnection.Id) + default: + add[eventConnection.Id] = &endpoint{ + IP: eventConnection.GetContext().GetIpContext().GetSrcIPNets()[0].IP, + Port: targetPort, + } + } + break + } + } + } + } + return +} diff --git a/pkg/networkservice/vl3lb/doc.go b/pkg/networkservice/vl3lb/doc.go new file mode 100644 index 00000000..1a28cb3a --- /dev/null +++ b/pkg/networkservice/vl3lb/doc.go @@ -0,0 +1,18 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package vl3lb provides vl3 load balancing chain element +package vl3lb diff --git a/pkg/networkservice/vl3lb/handler.go b/pkg/networkservice/vl3lb/handler.go new file mode 100644 index 00000000..ecd6448d --- /dev/null +++ b/pkg/networkservice/vl3lb/handler.go @@ -0,0 +1,216 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vl3lb + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/edwarnicke/genericsync" + "go.fd.io/govpp/api" + + "github.com/networkservicemesh/govpp/binapi/cnat" + "github.com/networkservicemesh/govpp/binapi/interface_types" + "github.com/networkservicemesh/govpp/binapi/ip_types" + + "github.com/networkservicemesh/sdk-vpp/pkg/tools/types" + + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +// endpoint contains the main fields for the VPP plugin +type endpoint struct { + IP net.IP + Port uint16 +} + +// equals returns true if Endpoints are equal +func (e *endpoint) equals(endpoint *endpoint) bool { + return e.IP.Equal(endpoint.IP) && e.Port == endpoint.Port +} + +// equals returns true if Endpoints are equal +func (e *endpoint) string() string { + return fmt.Sprintf("%s:%d", e.IP.String(), e.Port) +} + +// handler works with load balancer servers. It is based on CNAT VPP-plugin +type handler struct { + vppConn api.Connection + lbEndpoint cnat.CnatEndpoint + proto ip_types.IPProto + isRealIP uint8 + lbType cnat.CnatLbType + + // [vl3-NSE] --> [connID]*Endpoint + // We store it this way because the plugin does not add, but only updates existing entries. Therefore, to add/delete one entry, we must also pass the old ones. + servers genericsync.Map[string, *genericsync.Map[string, *endpoint]] +} + +// newHandler creates a Handler. +// endpoint contains Load Balancer parameters. The clients can reach the LB with endpoint.Addr:endpoint:Port +func newHandler(vppConn api.Connection, endpoint *endpoint, proto ip_types.IPProto) *handler { + return &handler{ + vppConn: vppConn, + lbEndpoint: cnat.CnatEndpoint{ + Addr: types.ToVppAddress(endpoint.IP), + SwIfIndex: interface_types.InterfaceIndex(^uint32(0)), + Port: endpoint.Port, + }, + proto: proto, + isRealIP: 1, + lbType: cnat.CNAT_LB_TYPE_MAGLEV, + } +} + +func cnatTranslationString(c *cnat.CnatTranslation) string { + str := fmt.Sprintf("%s:%d", c.Vip.Addr.String(), c.Vip.Port) + for _, p := range c.Paths { + str = fmt.Sprintf("%s to %s -> %s:%d, ", str, p.SrcEp.Addr, p.DstEp.Addr, p.DstEp.Port) + } + return str +} + +// addServers adds the real servers to the VPP plugin +func (c *handler) addServers(ctx context.Context, vl3NSEName string, add map[string]*endpoint) (err error) { + updateRequired := false + realServers, _ := c.servers.LoadOrStore(vl3NSEName, new(genericsync.Map[string, *endpoint])) + for k, v := range add { + if endpoint, ok := realServers.Load(k); !ok || !endpoint.equals(v) { + realServers.Store(k, v) + updateRequired = true + log.FromContext(ctx).WithField("vl3lb", "AddServers"). + WithField("vL3NSE", vl3NSEName). + WithField("serverID", k). + WithField("server", v.string()).Debugf("completed") + } + } + + if updateRequired { + err = c.updateVPPCnat(ctx) + } + + return err +} + +// deleteServers deletes the real servers from the VPP plugin +func (c *handler) deleteServers(ctx context.Context, vl3NSEName string, del []string) (err error) { + realServers, ok := c.servers.Load(vl3NSEName) + if !ok { + return nil + } + + updateRequired := false + for _, id := range del { + realServers.Delete(id) + updateRequired = true + log.FromContext(ctx).WithField("vl3lb", "DeleteServers"). + WithField("vL3NSE", vl3NSEName). + WithField("serverID", id).Debugf("completed") + } + + if updateRequired { + var length int + realServers.Range(func(key string, value *endpoint) bool { + length++ + return true + }) + + if length == 0 { + c.servers.Delete(vl3NSEName) + log.FromContext(ctx).WithField("vl3lb", "DeleteServers"). + WithField("vL3NSE", vl3NSEName).Debug("vL3NSE entry was deleted") + } + + err = c.updateVPPCnat(ctx) + } + + return err +} + +// getServerIDsByVL3Name returns the list of the servers belonging to the vl3-NSE +func (c *handler) getServerIDsByVL3Name(vl3NSEName string) []string { + var list []string + realServers, loaded := c.servers.Load(vl3NSEName) + if loaded { + realServers.Range(func(key string, value *endpoint) bool { + list = append(list, key) + return true + }) + } + return list +} + +func (c *handler) updateVPPCnat(ctx context.Context) error { + var paths []cnat.CnatEndpointTuple + c.servers.Range(func(key string, realServers *genericsync.Map[string, *endpoint]) bool { + realServers.Range(func(key string, s *endpoint) bool { + paths = append(paths, cnat.CnatEndpointTuple{ + DstEp: cnat.CnatEndpoint{ + Addr: types.ToVppAddress(s.IP), + SwIfIndex: interface_types.InterfaceIndex(^uint32(0)), + Port: s.Port, + }, + SrcEp: cnat.CnatEndpoint{ + Addr: c.lbEndpoint.Addr, + SwIfIndex: interface_types.InterfaceIndex(^uint32(0)), + }, + }) + return true + }) + return true + }) + + if len(paths) == 0 { + now := time.Now() + cnatTranslationDel := cnat.CnatTranslationDel{ID: 0} + _, err := cnat.NewServiceClient(c.vppConn).CnatTranslationDel(ctx, &cnatTranslationDel) + if err != nil { + return err + } + + log.FromContext(ctx). + WithField("translationID", cnatTranslationDel.ID). + WithField("duration", time.Since(now)). + WithField("vppapi", "CnatTranslationDel").Debug("completed") + return nil + } + + now := time.Now() + cnatTranslation := cnat.CnatTranslation{ + Vip: c.lbEndpoint, + IPProto: c.proto, + IsRealIP: c.isRealIP, + ID: 0, + LbType: c.lbType, + NPaths: uint32(len(paths)), + Paths: paths, + } + reply, err := cnat.NewServiceClient(c.vppConn).CnatTranslationUpdate(ctx, &cnat.CnatTranslationUpdate{Translation: cnatTranslation}) + if err != nil { + return err + } + cnatTranslation.ID = reply.ID + + log.FromContext(ctx). + WithField("translation", cnatTranslationString(&cnatTranslation)). + WithField("duration", time.Since(now)). + WithField("vppapi", "CnatTranslationUpdate").Debug("completed") + return nil +} diff --git a/pkg/networkservice/vl3lb/metadata.go b/pkg/networkservice/vl3lb/metadata.go new file mode 100644 index 00000000..8c7d739a --- /dev/null +++ b/pkg/networkservice/vl3lb/metadata.go @@ -0,0 +1,41 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vl3lb + +import ( + "context" + + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" +) + +type keyCancel struct{} + +// storeCancel sets the context.CancelFunc stored in per Connection.Id metadata. +func storeCancel(ctx context.Context, cancel context.CancelFunc) { + metadata.Map(ctx, true).Store(keyCancel{}, cancel) +} + +// loadAndDeleteCancel deletes the context.CancelFunc stored in per Connection.Id metadata, +// returning the previous value if any. The loaded result reports whether the key was present. +func loadAndDeleteCancel(ctx context.Context) (value context.CancelFunc, ok bool) { + rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(keyCancel{}) + if !ok { + return + } + value, ok = rawValue.(context.CancelFunc) + return value, ok +} diff --git a/pkg/networkservice/vl3lb/options.go b/pkg/networkservice/vl3lb/options.go new file mode 100644 index 00000000..04042713 --- /dev/null +++ b/pkg/networkservice/vl3lb/options.go @@ -0,0 +1,92 @@ +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vl3lb + +import ( + "net/url" + "time" + + "google.golang.org/grpc" + + "github.com/networkservicemesh/govpp/binapi/ip_types" +) + +type vl3LBOptions struct { + port uint16 + targetPort uint16 + protocol ip_types.IPProto + selector map[string]string + + clientURL *url.URL + dialTimeout time.Duration + dialOpts []grpc.DialOption +} + +// Option is an option pattern for forwarder chain elements +type Option func(o *vl3LBOptions) + +// WithPort - set a load balancer port +func WithPort(port uint16) Option { + return func(o *vl3LBOptions) { + o.port = port + if o.targetPort == 0 { + o.targetPort = port + } + } +} + +// WithTargetPort - set a real server target port +func WithTargetPort(targetPort uint16) Option { + return func(o *vl3LBOptions) { + o.targetPort = targetPort + } +} + +// WithProtocol - set IP protocol +func WithProtocol(protocol ip_types.IPProto) Option { + return func(o *vl3LBOptions) { + o.protocol = protocol + } +} + +// WithSelector - set a load balancer selector +func WithSelector(selector map[string]string) Option { + return func(o *vl3LBOptions) { + o.selector = selector + } +} + +// WithClientURL sets clientURL. +func WithClientURL(clientURL *url.URL) Option { + return func(c *vl3LBOptions) { + c.clientURL = clientURL + } +} + +// WithDialTimeout sets dial timeout for the client +func WithDialTimeout(dialTimeout time.Duration) Option { + return func(o *vl3LBOptions) { + o.dialTimeout = dialTimeout + } +} + +// WithDialOptions sets dial options +func WithDialOptions(opts ...grpc.DialOption) Option { + return func(o *vl3LBOptions) { + o.dialOpts = opts + } +}