diff --git a/.golangci.yml b/.golangci.yml index 3584c77a..e1ae7792 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,13 +4,10 @@ run: linters: disable-all: true enable: - - deadcode - errcheck - gosimple - govet - ineffassign - staticcheck - - structcheck - typecheck - unused - - varcheck diff --git a/Makefile b/Makefile index bf173127..da4d6d3a 100644 --- a/Makefile +++ b/Makefile @@ -273,7 +273,7 @@ output-dir: .PHONY: golangci-lint golangci-lint: - $(call go-get-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/cmd/golangci-lint@v1.56.2) + $(call go-get-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.2) .PHONY: proto-compiler proto-compiler: protoc protoc-gen-go protoc-gen-go-grpc diff --git a/api/v1/webhook_suite_test.go b/api/v1/webhook_suite_test.go index 14d3d56b..ab7ee16a 100644 --- a/api/v1/webhook_suite_test.go +++ b/api/v1/webhook_suite_test.go @@ -134,7 +134,7 @@ var _ = BeforeSuite(func() { if err != nil { return err } - conn.Close() + _ = conn.Close() return nil }).Should(Succeed()) diff --git a/build/frontend/Dockerfile b/build/frontend/Dockerfile index db52809e..9f7c06a9 100644 --- a/build/frontend/Dockerfile +++ b/build/frontend/Dockerfile @@ -3,7 +3,7 @@ ARG USER=meridio ARG UID=10001 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ENV GO111MODULE=on ENV CGO_ENABLED=0 diff --git a/build/ipam/Dockerfile b/build/ipam/Dockerfile index 661cbb9c..04722830 100644 --- a/build/ipam/Dockerfile +++ b/build/ipam/Dockerfile @@ -3,7 +3,7 @@ ARG USER=meridio ARG UID=10004 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ENV GO111MODULE=on ARG meridio_version=0.0.0-unknown diff --git a/build/nsp/Dockerfile b/build/nsp/Dockerfile index 9e2dd488..981e867c 100644 --- a/build/nsp/Dockerfile +++ b/build/nsp/Dockerfile @@ -3,7 +3,7 @@ ARG USER=meridio ARG UID=10003 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ARG meridio_version=0.0.0-unknown ENV GO111MODULE=on diff --git a/build/operator/Dockerfile b/build/operator/Dockerfile index a13dc910..f5df53c2 100644 --- a/build/operator/Dockerfile +++ b/build/operator/Dockerfile @@ -3,7 +3,7 @@ ARG USER=meridio ARG UID=10005 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ARG LDFLAGS WORKDIR /workspace diff --git a/build/proxy/Dockerfile b/build/proxy/Dockerfile index ae6ab619..6c3dc754 100644 --- a/build/proxy/Dockerfile +++ b/build/proxy/Dockerfile @@ -3,7 +3,7 @@ ARG USER=meridio ARG UID=10005 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ARG meridio_version=0.0.0-unknown ENV GO111MODULE=on diff --git a/build/stateless-lb/Dockerfile b/build/stateless-lb/Dockerfile index 27b1e660..96acd9dc 100644 --- a/build/stateless-lb/Dockerfile +++ b/build/stateless-lb/Dockerfile @@ -3,7 +3,7 @@ ARG USER=meridio ARG UID=10002 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ARG meridio_version=0.0.0-unknown ENV GO111MODULE=on diff --git a/build/tapa/Dockerfile b/build/tapa/Dockerfile index 8097aef5..efd3fd65 100644 --- a/build/tapa/Dockerfile +++ b/build/tapa/Dockerfile @@ -3,7 +3,7 @@ ARG USER=tapa ARG UID=10005 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ARG meridio_version=0.0.0-unknown ENV GO111MODULE=on diff --git a/cmd/frontend/internal/bird/bird.go b/cmd/frontend/internal/bird/bird.go index 876553a7..5f3e4a7d 100644 --- a/cmd/frontend/internal/bird/bird.go +++ b/cmd/frontend/internal/bird/bird.go @@ -124,7 +124,9 @@ func (b *RoutingService) Run(ctx context.Context, monitorLogs bool) error { return } // when context is done, close File thus signalling EOF to bufio Scan() - defer w.Close() + defer func() { + _ = w.Close() + }() <-ctx.Done() b.logger.Info("Context closed, terminate log monitoring") }() diff --git a/cmd/frontend/internal/bird/configure.go b/cmd/frontend/internal/bird/configure.go index 6cf2cea5..f0295fa4 100644 --- a/cmd/frontend/internal/bird/configure.go +++ b/cmd/frontend/internal/bird/configure.go @@ -51,7 +51,9 @@ func (r *routingConfig) Apply() error { if err != nil { return fmt.Errorf("create %v, err: %v", r.path, err) } - defer file.Close() + defer func() { + _ = file.Close() + }() _, err = file.WriteString(r.config) if err != nil { diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index 88b15522..76d88094 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -113,7 +113,9 @@ func main() { if err != nil { log.Fatal(logger, "Dial NSP", "error", err) } - defer conn.Close() + defer func() { + _ = conn.Close() + }() // monitor status of NSP connection and adjust probe status accordingly if err := connection.Monitor(ctx, health.NSPCliSvc, conn); err != nil { @@ -131,7 +133,9 @@ func main() { if err != nil { log.Fatal(logger, "Dial loadbalancer", "error", err) } - defer lbConn.Close() + defer func() { + _ = lbConn.Close() + }() // create and start frontend service c := &feConfig.Config{ diff --git a/cmd/ipam/main.go b/cmd/ipam/main.go index f100b58d..7c2020f4 100644 --- a/cmd/ipam/main.go +++ b/cmd/ipam/main.go @@ -137,7 +137,9 @@ func main() { if err != nil { log.Fatal(logger, "Dial NSP err", "error", err) } - defer conn.Close() + defer func() { + _ = conn.Close() + }() // monitor status of NSP connection and adjust probe status accordingly if err := connection.Monitor(ctx, health.NSPCliSvc, conn); err != nil { diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 172f7880..360d3906 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -87,7 +87,9 @@ func setupTLSCert(socket string) error { certDir := "/tmp/k8s-webhook-server/serving-certs" go func() { - defer client.Close() + defer func() { + _ = client.Close() + }() err := client.WatchX509Context(ctx, &x509Watcher{CertDir: certDir}) if err != nil && status.Code(err) != codes.Canceled { log.Fatal(setupLog, "error watching X.509 context", "error", err) @@ -132,7 +134,7 @@ func main() { "Enabling this will ensure there is only one active controller manager.") if os.Getenv(common.LogLevelEnv) == "" { // trace as default value - os.Setenv(common.LogLevelEnv, "trace") + _ = os.Setenv(common.LogLevelEnv, "trace") } ver := flag.Bool("version", false, "Print version and quit") diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 3735b3b3..94b59ed3 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -164,7 +164,9 @@ func main() { if err != nil { log.Fatal(logger, "Dialing IPAM", "error", err) } - defer conn.Close() + defer func() { + _ = conn.Close() + }() // monitor status of IPAM connection and adjust probe status accordingly if err := connection.Monitor(signalCtx, health.IPAMCliSvc, conn); err != nil { @@ -193,7 +195,9 @@ func main() { if err != nil { log.Fatal(logger, "Dialing NSP", "error", err) } - defer nspConn.Close() + defer func() { + _ = nspConn.Close() + }() // monitor status of NSP connection and adjust probe status accordingly if err := connection.Monitor(signalCtx, health.NSPCliSvc, nspConn); err != nil { @@ -234,14 +238,18 @@ func main() { cancelSignalCtx() return } - defer cc.Close() + defer func() { + _ = cc.Close() + }() monitorClient := networkservice.NewMonitorConnectionClient(cc) go nsmmonitor.ConnectionMonitor(ctx, config.Name, monitorClient) // create and start NSC that connects all remote NSE belonging to the right service interfaceMonitorClient := interfacemonitor.NewClient(interfaceMonitor, p, netUtils) nsmClient := service.GetNSC(ctx, &config, nsmAPIClient, p, interfaceMonitorClient, monitorClient) - defer nsmClient.Close() + defer func() { + _ = nsmClient.Close() + }() go func() { service.StartNSC(nsmClient, config.NetworkServiceName) cancelSignalCtx() // let others with proper clean-up gracefully terminate diff --git a/cmd/stateless-lb/config.go b/cmd/stateless-lb/config.go index a138d600..25a677ae 100644 --- a/cmd/stateless-lb/config.go +++ b/cmd/stateless-lb/config.go @@ -27,25 +27,27 @@ import ( // Config for the proxy type Config struct { - Name string `default:"load-balancer" desc:"Name of the pod"` - ServiceName string `default:"load-balancer" desc:"Name of providing service" split_words:"true"` - ConnectTo url.URL `default:"unix:///var/lib/networkservicemesh/nsm.io.sock" desc:"url to connect to NSM" split_words:"true"` - DialTimeout time.Duration `default:"5s" desc:"timeout to dial NSMgr" split_words:"true"` - RequestTimeout time.Duration `default:"15s" desc:"timeout to request NSE" split_words:"true"` - MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"` - NSPService string `default:"nsp-service:7778" desc:"IP (or domain) and port of the NSP Service" split_words:"true"` - ConduitName string `default:"load-balancer" desc:"Name of the conduit" split_words:"true"` - TrenchName string `default:"default" desc:"Trench the pod is running on" split_words:"true"` - LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"` - Nfqueue string `default:"0:3" desc:"netfilter queue(s) to be used by nfqlb" split_words:"true"` - NfqueueFanout bool `default:"false" desc:"enable fanout nfqueue option" split_words:"true"` - IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"` - GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout" envconfig:"grpc_keepalive_time"` - GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"` - GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"` - MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"` - MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"` - Socket url.URL `default:"unix:///var/lib/meridio/lb.sock" desc:"Server socket to host Stream Availability Service" split_words:"true"` + Name string `default:"load-balancer" desc:"Name of the pod"` + ServiceName string `default:"load-balancer" desc:"Name of providing service" split_words:"true"` + ConnectTo url.URL `default:"unix:///var/lib/networkservicemesh/nsm.io.sock" desc:"url to connect to NSM" split_words:"true"` + DialTimeout time.Duration `default:"5s" desc:"timeout to dial NSMgr" split_words:"true"` + RequestTimeout time.Duration `default:"15s" desc:"timeout to request NSE" split_words:"true"` + MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"` + NSPService string `default:"nsp-service:7778" desc:"IP (or domain) and port of the NSP Service" split_words:"true"` + ConduitName string `default:"load-balancer" desc:"Name of the conduit" split_words:"true"` + TrenchName string `default:"default" desc:"Trench the pod is running on" split_words:"true"` + LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"` + Nfqueue string `default:"0:3" desc:"netfilter queue(s) to be used by nfqlb" split_words:"true"` + NfqueueFanout bool `default:"false" desc:"enable fanout nfqueue option" split_words:"true"` + IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"` + GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout" envconfig:"grpc_keepalive_time"` + GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"` + GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"` + MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"` + MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"` + Socket url.URL `default:"unix:///var/lib/meridio/lb.sock" desc:"Server socket to host Stream Availability Service" split_words:"true"` + Namespace string `default:"default" desc:"Namespace the pod is running on" split_words:"true"` + TargetDisconnectMonitoring bool `default:"true" desc:"Enable listening to Target disconnect events to clean-up linux neighbor cache" split_words:"true"` } // IsValid checks if the configuration is valid diff --git a/cmd/stateless-lb/internal/neighborcache/neighborcache.go b/cmd/stateless-lb/internal/neighborcache/neighborcache.go new file mode 100644 index 00000000..ee0b21c1 --- /dev/null +++ b/cmd/stateless-lb/internal/neighborcache/neighborcache.go @@ -0,0 +1,90 @@ +/* +Copyright (c) 2024 OpenInfra Foundation Europe + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package neighborcache + +import ( + "context" + "net" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/nordix/meridio/pkg/log" + "github.com/vishvananda/netlink" +) + +// RemoveInvalid attempts to remove potentially invalid neighbor entries for +// which NSM has reported that the connection was closed, implying that the +// interface has either disappeared or is about to disappear along with its +// IP and MAC addresses. Thus, even if the same IP address reappears shortly +// due to NSM heal successfully fixing or, more accurately, re-establishing the +// connection, communication disturbances caused by an old invalid neighbor +// cache entry can be avoided, which would have otherwise occurred due to the +// behavior of the neighbor state machine (DELAY state and unicast probes). +// Note: The LB monitors TAPA -> Proxy connections, where the SrcIpAddrs refer +// to TAPA-side IPs, including the ones used as Target IPs by the LB. +func RemoveInvalid(ctx context.Context, connectionEvent *networkservice.ConnectionEvent) { + if connectionEvent.Type != networkservice.ConnectionEventType_DELETE { + return + } + logger := log.FromContextOrGlobal(ctx).WithValues("func", "RemoveInvalid") + // Fetch neighbor cache from kernel + neighborList, err := netlink.NeighList(0, 0) + if err != nil { + logger.Info("Could not fetch neighbor list", "err", err) + return + } + // Convert neighbor list to a map + neighborMap := make(map[string][]netlink.Neigh) + for _, neigh := range neighborList { + ipStr := neigh.IP.String() + neighborMap[ipStr] = append(neighborMap[ipStr], neigh) + } + + // Remove any of the NSM SrcIpAddrs from the neighbor cache if they are present + eventPrinted := false + for _, connection := range connectionEvent.Connections { + if connection.GetPath() == nil || len(connection.GetPath().GetPathSegments()) < 1 { + continue + } + if connection.GetContext() == nil || connection.GetContext().GetIpContext() == nil { + continue + } + ipContext := connection.GetContext().GetIpContext() + for _, ipStr := range ipContext.SrcIpAddrs { + if ip, _, err := net.ParseCIDR(ipStr); err == nil { + // Check if neighbor map has an entry for this IP + neighs, ok := neighborMap[ip.String()] + if !ok { + continue + } + if !eventPrinted { + eventPrinted = true + logger.Info("Connection event", "event", connectionEvent) + } + for _, neigh := range neighs { + logger.Info("Delete from neighbor cache", "neigh", neigh, "MAC", neigh.HardwareAddr.String()) + err := netlink.NeighDel(&netlink.Neigh{ + LinkIndex: neigh.LinkIndex, + IP: ip, + }) + if err != nil { + logger.Info("Failed to delete from neighbor cache", "neigh", neigh, "MAC", neigh.HardwareAddr.String(), "err", err) + } + } + } + } + } +} diff --git a/cmd/stateless-lb/main.go b/cmd/stateless-lb/main.go index 9eefe62f..3b73d661 100644 --- a/cmd/stateless-lb/main.go +++ b/cmd/stateless-lb/main.go @@ -24,8 +24,10 @@ import ( "fmt" "io" "net" + "net/url" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -35,6 +37,7 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/noop" + "github.com/networkservicemesh/api/pkg/api/registry" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" @@ -43,6 +46,8 @@ import ( nsmlog "github.com/networkservicemesh/sdk/pkg/tools/log" lbAPI "github.com/nordix/meridio/api/loadbalancer/v1" nspAPI "github.com/nordix/meridio/api/nsp/v1" + "github.com/nordix/meridio/cmd/stateless-lb/internal/neighborcache" + conduitns "github.com/nordix/meridio/pkg/conduit" "github.com/nordix/meridio/pkg/debug" "github.com/nordix/meridio/pkg/endpoint" "github.com/nordix/meridio/pkg/health" @@ -139,13 +144,13 @@ func main() { netUtils := &linuxKernel.KernelUtils{} - // create and start health server + // Create and start health server ctx = health.CreateChecker(ctx) if err := health.RegisterReadinessSubservices(ctx, health.LBReadinessServices...); err != nil { logger.Error(err, "RegisterReadinessSubservices") } - // note: NSM endpoint service is hosted from early on by its server, thus it can be probed - // irrespective of its registration status at NSM + // Note: The NSM endpoint service is hosted from the start by its server, + // so it can be probed regardless of its registration status at NSM. if err := health.RegisterLivenessSubservices(ctx, health.LBLivenessServices...); err != nil { logger.Error(err, "RegisterLivenessSubservices") } @@ -173,14 +178,16 @@ func main() { if err != nil { log.Fatal(logger, "Dial NSP", "error", err) } - defer conn.Close() + defer func() { + _ = conn.Close() + }() - // monitor status of NSP connection and adjust probe status accordingly + // Monitor status of NSP connection and adjust probe status accordingly if err := connection.Monitor(ctx, health.NSPCliSvc, conn); err != nil { logger.Error(err, "NSP connection state monitor") } - stream.SetInterfaceNamePrefix(config.ServiceName) // deduce the NSM interfacename prefix for the netfilter defrag rules + stream.SetInterfaceNamePrefix(config.ServiceName) // Determine the NSM interface name prefix for the netfilter defragmentation rules targetRegistryClient := nspAPI.NewTargetRegistryClient(conn) configurationManagerClient := nspAPI.NewConfigurationManagerClient(conn) conduit := &nspAPI.Conduit{ @@ -225,7 +232,7 @@ func main() { return } - // start server to host Stream Forwarding Availability service + // Start server to host Stream Forwarding Availability service lis, err := createStreamAvailabilityListener(config) if err != nil { logger.Error(err, "createStreamAvailabilityListener") @@ -239,7 +246,7 @@ func main() { }), ) defer func() { - // attempt graceful shutdown to allow sending out pending msgs + // Attempt graceful shutdown to allow sending out pending msgs stopped := make(chan struct{}) go func() { s.GracefulStop() @@ -248,7 +255,7 @@ func main() { waitTimer := time.NewTimer(time.Second) select { case <-waitTimer.C: - s.Stop() // graceful shutdown not finished in time, force stop immediately + s.Stop() // Graceful shutdown not finished in time, force stop immediately case <-stopped: waitTimer.Stop() select { @@ -257,7 +264,8 @@ func main() { } } }() - // announces forwarding availability of streams (i.e. if the LB can forward traffic towards application targets) + // Announces the forwarding availability of streams + // (i.e., whether the LB can forward traffic towards application targets) streamFwdAvailabilityService := stream.NewForwardingAvailabilityService( context.Background(), &lbAPI.Target{ @@ -280,8 +288,8 @@ func main() { configurationManagerClient, conduit, netUtils, - lbFactory, // to spawn nfqlb instance for each Stream created - nfa, // netfilter kernel configuration to steer VIP traffic to nfqlb process + lbFactory, // To spawn nfqlb instance for each Stream created + nfa, // Netfilter kernel configuration to steer VIP traffic to nfqlb process config.IdentifierOffsetStart, targetHitsMetrics, neighborMonitor, @@ -290,7 +298,7 @@ func main() { interfaceMonitorEndpoint := interfacemonitor.NewServer(interfaceMonitor, sns, netUtils) - // Note: naming the interface is left to NSM (refer to getNameFromConnection()) + // Note: Naming the interface is left to NSM (refer to getNameFromConnection()) // However NSM does not seem to ensure uniqueness either. Might need to revisit... responderEndpoint := []networkservice.NetworkServiceServer{ mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ @@ -310,10 +318,10 @@ func main() { MaxTokenLifetime: config.MaxTokenLifetime, GRPCMaxBackoff: config.GRPCMaxBackoff, } - nsmAPIClient := nsm.NewAPIClient(context.Background(), apiClientConfig) // background context to allow endpoint unregistration on tear down + nsmAPIClient := nsm.NewAPIClient(context.Background(), apiClientConfig) // Background context to allow endpoint unregistration on tear down defer nsmAPIClient.Delete() - // connect NSMgr and start NSM connection monitoring (to log events of interest) + // Connect local NSMgr cc, err := grpc.DialContext(ctx, grpcutils.URLToTarget(&nsmAPIClient.Config.ConnectTo), nsmAPIClient.GRPCDialOption..., @@ -323,9 +331,16 @@ func main() { cancel() return } - defer cc.Close() + defer func() { + _ = cc.Close() + }() + // Start monitoring NSM connections the LB is part of monitorClient := networkservice.NewMonitorConnectionClient(cc) go nsmmonitor.ConnectionMonitor(ctx, config.Name, monitorClient) + // Start cluster-wide monitoring of NSM connection Delete events between TAPA and Proxy + if config.TargetDisconnectMonitoring { + go startClusterConnectionMonitor(ctx, config, cc, nsmAPIClient.GRPCDialOption) + } endpointConfig := &endpoint.Config{ Name: config.Name, @@ -334,7 +349,7 @@ func main() { MaxTokenLifetime: config.MaxTokenLifetime, } ep, err := endpoint.NewEndpoint( - context.Background(), // use background context to allow endpoint unregistration on tear down + context.Background(), // Use background context to allow endpoint unregistration on tear down endpointConfig, nsmAPIClient.NetworkServiceRegistryClient, nsmAPIClient.NetworkServiceEndpointRegistryClient, @@ -346,7 +361,7 @@ func main() { defer func() { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3)) defer cancel() - ep.Delete(ctx) // let endpoint unregister with NSM to inform proxies in time + ep.Delete(ctx) // Let endpoint unregister with NSM to inform proxies in time logger.Info("LB endpoint deleted") }() @@ -367,7 +382,7 @@ func main() { cancel() }() sns.Start() - // monitor availibilty of frontends (advertise NSE to proxies only if there's feasible FE) + // Monitor availibilty of frontends (advertise NSE to proxies only if there's feasible FE) fns := NewFrontendNetworkService(ctx, targetRegistryClient, ep, NewServiceControlDispatcher(sns)) go func() { logger.Info("Start frontend monitoring service") @@ -938,3 +953,42 @@ func (sns *SimpleNetworkService) updateVips(vips []*nspAPI.Vip) error { } return nil } + +// startClusterConnectionMonitor starts cluster-wide monitoring of NSM connection +// Delete events between TAPA and Proxy, removing invalid Target entries from +// the Linux neighbor cache to prevent connection disturbances. +func startClusterConnectionMonitor( + ctx context.Context, + config Config, + cc grpc.ClientConnInterface, + dialOptions []grpc.DialOption, +) { + nsmmonitor.ClusterConnectionMonitor( + ctx, + // Create a registry client to learn NSMgr URLs + registry.NewNetworkServiceEndpointRegistryClient(cc), + // Use provided gRPC dial options to connect NSMgrs + dialOptions, + // Monitor only connections matching the specified network service name + // Note: A server running an old API version will simply ignore the unknown + // field in the selector, resulting in a wildcard match. + &networkservice.MonitorScopeSelector{ + NetworkServices: []string{ + conduitns.GetNetworkServiceNameWithProxy( + config.ConduitName, + config.TrenchName, + config.Namespace, + ), + }, + }, + // Replace "local" URLs with the local NSMgr's URL to avoid connecting the local forwarder + func(connectTo *url.URL) *url.URL { + if strings.HasPrefix(connectTo.String(), "inode://") { + return &config.ConnectTo + } + return connectTo + }, + // Callback function to remove invalid entries from the neighbor cache + neighborcache.RemoveInvalid, + ) +} diff --git a/cmd/tapa/main.go b/cmd/tapa/main.go index 555233ef..45aab573 100644 --- a/cmd/tapa/main.go +++ b/cmd/tapa/main.go @@ -170,7 +170,9 @@ func main() { if err != nil { log.Fatal(logger, "dial to NSMgr", "error", err) } - defer cc.Close() + defer func() { + _ = cc.Close() + }() monitorClient := networkservice.NewMonitorConnectionClient(cc) if err := os.RemoveAll(config.Socket); err != nil { diff --git a/config/templates/charts/meridio/deployment/stateless-lb-frontend.yaml b/config/templates/charts/meridio/deployment/stateless-lb-frontend.yaml index b4db04da..4850722b 100644 --- a/config/templates/charts/meridio/deployment/stateless-lb-frontend.yaml +++ b/config/templates/charts/meridio/deployment/stateless-lb-frontend.yaml @@ -115,6 +115,8 @@ spec: value: # to be filled by operator - name: NSM_METRICS_ENABLED value: "true" + - name: NSM_NAMESPACE + value: # to be filled by operator volumeMounts: - name: spire-agent-socket mountPath: /run/spire/sockets diff --git a/deployments/helm/templates/load-balancer.yaml b/deployments/helm/templates/load-balancer.yaml index 331b77e7..9765e5d0 100644 --- a/deployments/helm/templates/load-balancer.yaml +++ b/deployments/helm/templates/load-balancer.yaml @@ -57,6 +57,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: NSM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace - name: NSM_SERVICE_NAME value: {{ template "meridio.loadBalancer.networkServiceName" . }} - name: NSM_CONDUIT_NAME diff --git a/docs/components/stateless-lb.md b/docs/components/stateless-lb.md index bccf6506..0e9c91b5 100644 --- a/docs/components/stateless-lb.md +++ b/docs/components/stateless-lb.md @@ -38,6 +38,8 @@ NSM_GRPC_MAX_BACKOFF | time.Duration | Upper bound on gRPC connection backoff de NSM_METRICS_ENABLED | bool | Enable the metrics collection| false NSM_METRICS_PORT | int | Specify the port used to expose the metrics | 2223 NSM_SOCKET | url.URL | Server socket to host Stream Availability Service | unix:///var/lib/meridio/lb.sock +NSM_NAMESPACE | string | Namespace the pod is running on | default +NSM_TARGET_DISCONNECT_MONITORING | bool | Enable listenting to Target disconnect events to clean-up linux neighbor cache | true ## Command Line @@ -91,6 +93,6 @@ Sysctl: net.ipv4.conf.all.rp_filter=0 | Allow packets to have a source IPv4 addr Sysctl: net.ipv4.conf.default.rp_filter=0 | Allow packets to have a source IPv6 address which does not correspond to any routing destination address. Sysctl: net.ipv4.fwmark_reflect=1 | Allow LB generated outbound ICMP Frag Needed reply to use VIP as source address. Sysctl: net.ipv6.fwmark_reflect=1 | Allow LB generated outbound ICMPv6 Packet Too Big reply to use VIP as source address. -NET_ADMIN | The load balancer configures IP rules and IP routes to steer packets (processed by [nfqueue-loadbalancer program](https://github.com/Nordix/nfqueue-loadbalancer)) to targets. The user space load balancer program relies on [libnetfilter_queue](https://netfilter.org/projects/libnetfilter_queue). +NET_ADMIN | The load balancer configures IP rules and IP routes to steer packets (processed by [nfqueue-loadbalancer program](https://github.com/Nordix/nfqueue-loadbalancer)) to targets. The user space load balancer program relies on [libnetfilter_queue](https://netfilter.org/projects/libnetfilter_queue). The load balancer can remove entries from its Linux neighbor cache that correspond to targets for which the MAC address is no longer valid. IPC_LOCK | The user space load balancer program uses shared memory. IPC_OWNER | The user space load balancer program uses shared memory. diff --git a/examples/target/build/example-target/Dockerfile b/examples/target/build/example-target/Dockerfile index af296247..f5860079 100644 --- a/examples/target/build/example-target/Dockerfile +++ b/examples/target/build/example-target/Dockerfile @@ -2,7 +2,7 @@ ARG USER=tapa-user ARG UID=10006 ARG HOME=/home/${USER} -FROM golang:1.22 as build +FROM golang:1.23 as build ENV GO111MODULE=on diff --git a/examples/target/go.mod b/examples/target/go.mod index ed12b853..354a4088 100644 --- a/examples/target/go.mod +++ b/examples/target/go.mod @@ -1,8 +1,8 @@ module github.com/nordix/meridio/examples/target -go 1.22 +go 1.23 -toolchain go1.22.0 +toolchain go1.23.0 require ( github.com/nordix/meridio v0.8.0 diff --git a/go.mod b/go.mod index 7a7091b6..c63ffb9e 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,11 @@ module github.com/nordix/meridio -go 1.22 +go 1.23 + +toolchain go1.23.0 require ( + github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 github.com/edwarnicke/grpcfd v1.1.4 github.com/faisal-memon/sviddisk v0.0.0-20211007205134-77ccea0b9271 github.com/go-logr/logr v1.4.1 @@ -10,9 +13,9 @@ require ( github.com/golang/mock v1.6.0 github.com/google/nftables v0.2.0 github.com/kelseyhightower/envconfig v1.4.0 - github.com/networkservicemesh/api v1.13.2 - github.com/networkservicemesh/sdk v1.13.2 - github.com/networkservicemesh/sdk-sriov v1.13.2 + github.com/networkservicemesh/api v1.14.2-rc.1 + github.com/networkservicemesh/sdk v1.14.2-rc.1 + github.com/networkservicemesh/sdk-sriov v1.14.2-rc.1 github.com/onsi/ginkgo v1.16.5 github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 @@ -22,13 +25,13 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spiffe/go-spiffe/v2 v2.1.7 github.com/stretchr/testify v1.9.0 - github.com/vishvananda/netlink v1.2.1-beta.2.0.20220630165224-c591ada0fb2b + github.com/vishvananda/netlink v1.3.1-0.20240922070040-084abd93d350 go.opentelemetry.io/otel v1.20.0 go.opentelemetry.io/otel/exporters/prometheus v0.43.0 go.opentelemetry.io/otel/metric v1.20.0 go.opentelemetry.io/otel/sdk v1.20.0 go.opentelemetry.io/otel/sdk/metric v1.20.0 - go.uber.org/goleak v1.3.0 + go.uber.org/goleak v1.3.1-0.20241121203838-4ff5fa6529ee go.uber.org/zap v1.26.0 golang.org/x/net v0.24.0 golang.org/x/sys v0.19.0 @@ -55,7 +58,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/edwarnicke/exechelper v1.0.2 // indirect - github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 // indirect github.com/edwarnicke/serialize v1.0.7 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.8.0 // indirect @@ -70,7 +72,7 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/golang-jwt/jwt/v4 v4.5.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect @@ -111,7 +113,7 @@ require ( github.com/tchap/go-patricia/v2 v2.3.1 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect - github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 // indirect + github.com/vishvananda/netns v0.0.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/yashtewari/glob-intersection v0.1.0 // indirect diff --git a/go.sum b/go.sum index 2d782559..eb5c3176 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,8 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= -github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-jwt/jwt/v4 v4.5.1 h1:JdqV9zKUdtaa9gdPlywC3aeoEsR681PlKC+4F5gQgeo= +github.com/golang-jwt/jwt/v4 v4.5.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= @@ -178,8 +178,8 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -213,12 +213,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/networkservicemesh/api v1.13.2 h1:HiQTPED7l2rvGZUFN2FklOh6RGXS6Ha/xZPc8DKLZmQ= -github.com/networkservicemesh/api v1.13.2/go.mod h1:B8FmS3XZ7NZY7ZEtdcNg2NHYppDHlr4kl4eecdZN9eI= -github.com/networkservicemesh/sdk v1.13.2 h1:lvyfy9NSZdSLiaOM9Ic0CLcZLiLhahs67HJDOw8QfO8= -github.com/networkservicemesh/sdk v1.13.2/go.mod h1:Zij694LKc/YFsmo2cw0XI20NIN/RIuWit+hvr8CLIkM= -github.com/networkservicemesh/sdk-sriov v1.13.2 h1:B+v5GUIJmK0hdC6k5mHGEy1FvOTORls/Pm1JNL23KBw= -github.com/networkservicemesh/sdk-sriov v1.13.2/go.mod h1:Y78htZPdQ0X2JJ1XjpqlyryVEmbaG1ER5FYwtg7WSgc= +github.com/networkservicemesh/api v1.14.2-rc.1 h1:m2efoCNFebl6iVoST70SaGPTqgv/woHMQR0csmJ3uyM= +github.com/networkservicemesh/api v1.14.2-rc.1/go.mod h1:GT0Yw1LYFSTxlDyJjBDhIxT82rJ2czZ0TiyzxSyKzvg= +github.com/networkservicemesh/sdk v1.14.2-rc.1 h1:Xf/eUfzQNBDWZcOsqwer/6Zky+HH9MyX7erfadX71YU= +github.com/networkservicemesh/sdk v1.14.2-rc.1/go.mod h1:SfccFwcMWl81JZ7fTKTHueBIWsx7DPQiLePcpnW2H6U= +github.com/networkservicemesh/sdk-sriov v1.14.2-rc.1 h1:zUpPY6uxF+Vm/YU461DbRIhRvrPGOXESXsK9xBuppvs= +github.com/networkservicemesh/sdk-sriov v1.14.2-rc.1/go.mod h1:o9fVOhtlhSQVy/HhP7G5HCvBr5Ihj+ZBZuB+VLu5qMg= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -290,11 +290,10 @@ github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5I github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY= -github.com/vishvananda/netlink v1.2.1-beta.2.0.20220630165224-c591ada0fb2b h1:CyMWBGvc1ZOvUBxW51DVTSIIAeJWWJJs+Ko3ouM/AVI= -github.com/vishvananda/netlink v1.2.1-beta.2.0.20220630165224-c591ada0fb2b/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= -github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg= -github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netlink v1.3.1-0.20240922070040-084abd93d350 h1:w5OI+kArIBVksl8UGn6ARQshtPCQvDsbuA9NQie3GIg= +github.com/vishvananda/netlink v1.3.1-0.20240922070040-084abd93d350/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs= +github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= @@ -336,8 +335,8 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8 go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/goleak v1.3.1-0.20241121203838-4ff5fa6529ee h1:uOMbcH1Dmxv45VkkpZQYoerZFeDncWpjbN7ATiQOO7c= +go.uber.org/goleak v1.3.1-0.20241121203838-4ff5fa6529ee/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= @@ -402,9 +401,7 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -415,8 +412,10 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/pkg/controllers/attractor/stateless-lb-frontend.go b/pkg/controllers/attractor/stateless-lb-frontend.go index 0d04f7ea..7c17793c 100644 --- a/pkg/controllers/attractor/stateless-lb-frontend.go +++ b/pkg/controllers/attractor/stateless-lb-frontend.go @@ -75,6 +75,7 @@ func (l *LoadBalancer) getLbEnvVars(allEnv []corev1.EnvVar) []corev1.EnvVar { {Name: "NSM_TRENCH_NAME", Value: l.trench.ObjectMeta.Name}, {Name: "NSM_NSP_SERVICE", Value: common.NSPServiceWithPort(l.trench)}, {Name: "NSM_LOG_LEVEL", Value: common.GetLogLevel()}, + {Name: "NSM_NAMESPACE", Value: l.attractor.ObjectMeta.Namespace}, } if rpcTimeout := common.GetGRPCProbeRPCTimeout(); rpcTimeout != "" { operatorEnv = append(operatorEnv, corev1.EnvVar{Name: "NSM_GRPC_PROBE_RPC_TIMEOUT", Value: rpcTimeout}) diff --git a/pkg/controllers/common/networkAttachment.go b/pkg/controllers/common/networkAttachment.go index 56953d54..949b6525 100644 --- a/pkg/controllers/common/networkAttachment.go +++ b/pkg/controllers/common/networkAttachment.go @@ -18,7 +18,6 @@ package common import ( "encoding/json" - "fmt" "regexp" "strings" @@ -85,7 +84,7 @@ func parsePodNetworkObjectName(podnetwork string) (string, string, string, error for _, v := range allItems { matched := v.reg.MatchString(v.value) if !matched && len([]rune(v.value)) > 0 { - return "", "", "", errors.Errorf(fmt.Sprintf("parsePodNetworkObjectName: Failed to parse: one or more items did not match comma-delimited format. Mismatch @ '%v'", v.value)) + return "", "", "", errors.Errorf("parsePodNetworkObjectName: Failed to parse: one or more items did not match comma-delimited format. Mismatch @ '%v'", v.value) } } diff --git a/pkg/debug/collect.go b/pkg/debug/collect.go index c3c5e5de..18272196 100644 --- a/pkg/debug/collect.go +++ b/pkg/debug/collect.go @@ -190,7 +190,7 @@ func listRules() []*Rule { rules = append(rules, &Rule{ Table: r.Table, Priority: r.Priority, - Mark: r.Mark, + Mark: int(r.Mark), Source: source, Destination: destination, }) diff --git a/pkg/health/probe/probe.go b/pkg/health/probe/probe.go index 748c517e..eb081bb4 100644 --- a/pkg/health/probe/probe.go +++ b/pkg/health/probe/probe.go @@ -118,7 +118,9 @@ func (hp *HealthProbe) Request(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to connect service %q, %w", hp.addr, err) } - defer conn.Close() + defer func() { + _ = conn.Close() + }() rpcCtx, rpcCancel := context.WithTimeout(ctx, hp.rpcTimeout) defer rpcCancel() diff --git a/pkg/ipam/storage/sqlite/sqlite_test.go b/pkg/ipam/storage/sqlite/sqlite_test.go index 9fee1f28..30057fbd 100644 --- a/pkg/ipam/storage/sqlite/sqlite_test.go +++ b/pkg/ipam/storage/sqlite/sqlite_test.go @@ -30,7 +30,9 @@ const dbFileName = "test.db" func Test_Add_Get(t *testing.T) { _ = os.Remove(dbFileName) - defer os.Remove(dbFileName) + defer func() { + _ = os.Remove(dbFileName) + }() store, err := sqlite.New(dbFileName) assert.Nil(t, err) @@ -74,7 +76,9 @@ func Test_Add_Get(t *testing.T) { func Test_GetChilds(t *testing.T) { _ = os.Remove(dbFileName) - defer os.Remove(dbFileName) + defer func() { + _ = os.Remove(dbFileName) + }() store, err := sqlite.New(dbFileName) assert.Nil(t, err) @@ -106,7 +110,9 @@ func Test_GetChilds(t *testing.T) { func Test_Delete(t *testing.T) { _ = os.Remove(dbFileName) - defer os.Remove(dbFileName) + defer func() { + _ = os.Remove(dbFileName) + }() store, err := sqlite.New(dbFileName) assert.Nil(t, err) diff --git a/pkg/kernel/fwmark.go b/pkg/kernel/fwmark.go index d8382a32..4fbe8b6d 100644 --- a/pkg/kernel/fwmark.go +++ b/pkg/kernel/fwmark.go @@ -116,7 +116,7 @@ func (fwmr *FWMarkRoute) configure() error { rule := netlink.NewRule() rule.Table = fwmr.tableID - rule.Mark = fwmr.fwmark + rule.Mark = uint32(fwmr.fwmark) rule.Family = fwmr.family() err := netlink.RuleAdd(rule) if err != nil { diff --git a/pkg/nsm/client.go b/pkg/nsm/client.go index eb6b5957..be105629 100644 --- a/pkg/nsm/client.go +++ b/pkg/nsm/client.go @@ -172,7 +172,7 @@ func (apiClient *APIClient) Delete() { apiClient.cancel() } if apiClient.GRPCClient != nil { - apiClient.GRPCClient.Close() + _ = apiClient.GRPCClient.Close() } } diff --git a/pkg/nsm/monitor/clusterconnection.go b/pkg/nsm/monitor/clusterconnection.go new file mode 100644 index 00000000..8aaa497b --- /dev/null +++ b/pkg/nsm/monitor/clusterconnection.go @@ -0,0 +1,252 @@ +/* +Copyright (c) 2024 OpenInfra Foundation Europe + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/edwarnicke/genericsync" + "github.com/go-logr/logr" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/nordix/meridio/pkg/log" + "google.golang.org/grpc" +) + +const defaultMonitorConnectTimeout time.Duration = 15 * time.Second + +var nsmgrs genericsync.Map[string, bool] + +// ClusterConnectionMonitor - +// Based on NSM's cmd-dashboard-backend. +// +// Goal is to learn all NSMgr URLs via forwarder NSE objects, and create a +// MonitorConnectionClient towards each, thus monitor connections of the whole +// cluster matching the MonitorScopeSelector. +// +// It is preferred to have a registryClient parameter that is connected to the local NSMgr: +// 1. The Find stream towards a local NSMgr seems to get closed after 15 (or 30?) seconds. Which is a rather +// favorable behaviour. That's because our Find() is merely watching forwarder NSEs, yet we do intend to connect +// to NSMgr for Connection Monitoring purposes. +// Hence, if an NSMgr ConnectionMonitor stream terminates it's not self-explanatory that the forwarder NSE should +// experience any change to trigger an event so that we could restore the ConnectionMonitor ASAP. (At least not +// after NSMgr container restart.) +// 2. No dependency towards the NSM kubernetes namespace is required. (Forwarder NSE local to our service can have +// its URL replaced with inode by NSMgr. Therefore, allow replacing the URL in such cases with preferably with the +// URL of the worker local NSMgr.) +// +// Note: Due to the nature of nsmgr registry server, if registryClient is connected to local NSMgr, then NSMgr +// localBypassNSEServer component will restore the endpoint's URL on Find(). That's because Find()'s main use +// case is to let forwarders lookup NSEs upon NSC connection requests and connect them (either directly or via +// remote NSMgr depending on their location). +// (If registry client had a recvfd chain component, it would allow dialing and connecting the local forwarder's +// Connection Monitor server. But since the aim is to connect to NSMgr for connection monitoring, this should +// be avoided. Parameter checkMonitorServer() can be used to replace URLs with "unix:///proc" prefix...) +func ClusterConnectionMonitor( + ctx context.Context, + registryClient registry.NetworkServiceEndpointRegistryClient, + dialOptions []grpc.DialOption, + selector *networkservice.MonitorScopeSelector, + checkMonitorServer func(*url.URL) *url.URL, + callback func(context.Context, *networkservice.ConnectionEvent), +) { + logger := log.FromContextOrGlobal(ctx).WithValues("func", "ClusterConnectionMonitor") + + for ; ctx.Err() == nil; time.Sleep(time.Millisecond * 100) { + var nseChannel = getNetworkServiceEndpointChannel(ctx, logger, registryClient) + + channelReadLoop: + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return + case nse, ok := <-nseChannel: + if !ok { + break channelReadLoop + } + + nsmgrURL, err := url.Parse(nse.GetNetworkServiceEndpoint().Url) + if err != nil { + logger.Error(err, "Failed to parse raw NSMgr URL", "URL", nse.GetNetworkServiceEndpoint().Url) + continue + } + + // Note: A registry client that is connected to local NSMgr gets the URL of local endpoints + // instead of the URL of the local NSMgr. As GetNetworkServiceEndpoint() looks up forwarder + // endpoints, an "update" here would indicate forwarder related changes. Yet, the intension + // is to create Connection Monitor streams towards NSMgrs. Hence, the URL shall be manually + // overriden in such case. + if checkMonitorServer != nil { + nsmgrURL = checkMonitorServer(nsmgrURL) + } + + nsmgrAddr := nsmgrURL.String() + if _, exists := nsmgrs.Load(nsmgrAddr); !exists { + nsmgrs.Store(nsmgrAddr, true) + logger.V(1).Info("Extracted NSMgr address", "addr", nsmgrAddr) + + // Start a goroutine for each NSMgr + go func() { + logger := logger.WithValues("address", nsmgrAddr) + defer cleanupManager(logger, nsmgrAddr) + nsmgrConn, err := grpc.DialContext( + ctx, + func() string { + switch nsmgrURL.Scheme { + case "tcp": + return nsmgrURL.Host + } + return nsmgrURL.String() + }(), + dialOptions..., + ) + if err != nil { + logger.Error(err, "Failed to dial NSMgr") + return + } + + streamCtx, streamCancel := context.WithCancel(ctx) + defer streamCancel() + clientConnections := networkservice.NewMonitorConnectionClient(nsmgrConn) + stream, err := createMonitorStream(streamCtx, clientConnections, selector) + if err != nil { + logger.Error(err, "Error from NSMgr MonitorConnectionClient") + return + } + logger.V(1).Info("Connected NSMgr") + + for ctx.Err() == nil { + event, err := stream.Recv() + if err != nil { + logger.Error(err, "Error from NSMgr monitorConnection stream") + // XXX: What if stream gets an error, yet the NSMgr is alive + // or a NSMgr container restart occurred. The answer is that + // propably the stream is not supposed to get suddenly closed + // if NSMgr is alive. + // On the other hand, our NSE channel merely monitors forwarder + // NSEs that should not change upon above events. + // However, seemingly the nse monitor stream along with the + // endpoint channel get closed every 15 (30?) seconds in our case. + // Which is beneficial, because at least after 15 (30?) seconds + // delay the NSMgr MonitorConnection stream could get re-created. + break + } + if callback != nil { + callback(ctx, event) + } + } + }() + } + } + } + } +} + +// Lookup every forwarder NSE to extract NSMgr URL, that could be used to +// establish a connection towards each NSMgr and monitor connections. +// Since we are merely interested in NSMgrs, no need to check NSE state or +// lifetime. +func getNetworkServiceEndpointChannel( + ctx context.Context, logger logr.Logger, + registryClient registry.NetworkServiceEndpointRegistryClient, +) <-chan *registry.NetworkServiceEndpointResponse { + streamNse, err := registryClient.Find(ctx, ®istry.NetworkServiceEndpointQuery{ + Watch: true, + NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ + NetworkServiceNames: []string{"forwarder"}, + }, + }) + if err != nil { + logger.Error(err, "Failed to perform Find NSE request") + } + + return readNetworkServiceEndpointChannel(streamNse, logger) +} + +func cleanupManager(logger logr.Logger, nsmgrAddr string) { + logger.V(1).Info("Clean up NSMgr", "addr", nsmgrAddr) + nsmgrs.Delete(nsmgrAddr) +} + +func readNetworkServiceEndpointChannel( + stream registry.NetworkServiceEndpointRegistry_FindClient, + logger logr.Logger, +) <-chan *registry.NetworkServiceEndpointResponse { + result := make(chan *registry.NetworkServiceEndpointResponse) + go func() { + defer func() { + close(result) + }() + for msg, err := stream.Recv(); err == nil; msg, err = stream.Recv() { + select { + case result <- msg: + continue + case <-stream.Context().Done(): + logger.V(1).Info("context closed") + return + } + } + }() + return result +} + +// createMonitorStream - +// Attempts to create the monitor stream within defaultMonitorConnectTimeout. +// Expects a cancellable context to be passed in. +// The caller must call cancel() if an error is returned to avoid context leaks. +// +// Rationale: +// We cannot be certain that DialContext is configured via dial options to block +// until a connection is established. Therefore, this function ensures the monitor +// RPC call does not get stuck indefinitely if the server disappears before a +// connection is established or if the call cannot return in a timely manner. +func createMonitorStream( + ctx context.Context, + clientConnections networkservice.MonitorConnectionClient, + selector *networkservice.MonitorScopeSelector, +) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { + var stream networkservice.MonitorConnection_MonitorConnectionsClient + var err error + // Create a context with a timeout for the stream creation attempt + dialTimeoutCtx, cancel := context.WithTimeout(ctx, defaultMonitorConnectTimeout) + defer cancel() + + // Try to create the stream + done := make(chan struct{}) + go func() { + defer close(done) + stream, err = clientConnections.MonitorConnections(ctx, selector) + }() + + select { + case <-done: + // Stream creation returned (success or failure) + if err != nil { + // Error returned during stream creation + return nil, fmt.Errorf("failed to create monitor stream: %w", err) + } + // Success + return stream, err + case <-dialTimeoutCtx.Done(): + // Timeout reached while attempting to create the stream + // Cancel the RPC context to clean up any ongoing RPC and avoid goroutine leak + return nil, fmt.Errorf("failed to create stream within timeout: %w", dialTimeoutCtx.Err()) + } +} diff --git a/pkg/nsp/registry/sqlite/sqlite.go b/pkg/nsp/registry/sqlite/sqlite.go index 9f8e7f45..a745d021 100644 --- a/pkg/nsp/registry/sqlite/sqlite.go +++ b/pkg/nsp/registry/sqlite/sqlite.go @@ -60,7 +60,7 @@ func (trsql *TargetRegistrySQLite) Close() error { if err != nil { return fmt.Errorf("failed to close db connection: %w", err) } - sqlDB.Close() + _ = sqlDB.Close() return nil } diff --git a/pkg/nsp/registry/sqlite/sqlite_test.go b/pkg/nsp/registry/sqlite/sqlite_test.go index 665ba9d6..270437af 100644 --- a/pkg/nsp/registry/sqlite/sqlite_test.go +++ b/pkg/nsp/registry/sqlite/sqlite_test.go @@ -31,12 +31,12 @@ func TestTargetRegistrySQLite_Set(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) dbFile := "test.db" - os.Remove(dbFile) + _ = os.Remove(dbFile) db, err := sqlite.New(dbFile) assert.Nil(t, err) defer func() { - db.Close() - os.Remove(dbFile) + _ = db.Close() + _ = os.Remove(dbFile) }() ctx := context.Background()