diff --git a/Dockerfile b/Dockerfile index f9dd164ed..495dda63d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,7 +45,7 @@ ENV GOMODCACHE /.cache/mod RUN --mount=type=cache,target=/.cache go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.8.0 RUN --mount=type=cache,target=/.cache go install k8s.io/code-generator/cmd/conversion-gen@v0.23.1 RUN --mount=type=cache,target=/.cache go install mvdan.cc/gofumpt/gofumports@v0.1.1 -RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /toolchain/bin v1.43.0 +RUN curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/v1.43.0/install.sh | bash -s -- -b /toolchain/bin v1.43.0 WORKDIR /src COPY ./go.mod ./ COPY ./go.sum ./ diff --git a/sfyra/go.mod b/sfyra/go.mod index 585ec2cdd..be2f6de4d 100644 --- a/sfyra/go.mod +++ b/sfyra/go.mod @@ -16,7 +16,7 @@ require ( github.com/stretchr/testify v1.7.1 github.com/talos-systems/cluster-api-control-plane-provider-talos v0.4.5 github.com/talos-systems/go-debug v0.2.1 - github.com/talos-systems/go-loadbalancer v0.1.1 + github.com/talos-systems/go-loadbalancer v0.1.2 github.com/talos-systems/go-procfs v0.1.0 github.com/talos-systems/go-retry v0.3.1 github.com/talos-systems/net v0.3.2 diff --git a/sfyra/go.sum b/sfyra/go.sum index 3482db06a..7e3a731d6 100644 --- a/sfyra/go.sum +++ b/sfyra/go.sum @@ -1241,8 +1241,9 @@ github.com/talos-systems/go-cmd v0.1.0/go.mod h1:kf+rZzTEmlDiYQ6ulslvRONnKLQH8x8 github.com/talos-systems/go-debug v0.2.1 h1:VSN8P1zXWeHWgUBZn4cVT3keBcecCAJBG9Up+F6N2KM= github.com/talos-systems/go-debug v0.2.1/go.mod h1:pR4NjsZQNFqGx3n4qkD4MIj1F2CxyIF8DCiO1+05JO0= github.com/talos-systems/go-kmsg v0.1.1/go.mod h1:dppwQn+/mrdvsziGMbXjzfc4E+75oZhr39UIP6LgL0w= -github.com/talos-systems/go-loadbalancer v0.1.1 h1:qjC0uWHu6O7VXG9EN4ovVPg79sRbypXTrJZJskdaa2k= github.com/talos-systems/go-loadbalancer v0.1.1/go.mod h1:L0uLhCBUQVkdBqtnxqbrw+wzopQyoeluPos8okqdxLo= +github.com/talos-systems/go-loadbalancer v0.1.2 h1:gWgOY5c4gVrahbDOAPHA2UbDY8QSgiR8S6bRXoC5Wqw= +github.com/talos-systems/go-loadbalancer v0.1.2/go.mod h1:ptznwIJopZLeXBviQnIGqEuN1xJ5xVQSB7KWqGvqK8A= github.com/talos-systems/go-procfs v0.1.0 h1:AuS3/4fx5Me6CUyPVDxBH79eSSnl+8C83tzGmsMAPzs= github.com/talos-systems/go-procfs v0.1.0/go.mod h1:ATyUGFQIW8OnbnmvqefZWVPgL9g+CAmXHfkgny21xX8= github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= diff --git a/sfyra/pkg/loadbalancer/loadbalancer.go b/sfyra/pkg/loadbalancer/loadbalancer.go index b27ec3861..ab6258aa1 100644 --- a/sfyra/pkg/loadbalancer/loadbalancer.go +++ b/sfyra/pkg/loadbalancer/loadbalancer.go @@ -13,19 +13,18 @@ import ( "net" "reflect" "sort" - "strconv" "sync" "time" cacpt "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3" - "github.com/talos-systems/go-loadbalancer/loadbalancer" + "github.com/talos-systems/go-loadbalancer/controlplane" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" capiv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" - sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3" + infrav1 "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3" metal "github.com/talos-systems/sidero/app/sidero-controller-manager/api/v1alpha1" ) @@ -33,14 +32,12 @@ import ( type ControlPlane struct { client client.Client - endpoint string - lb loadbalancer.TCP - prevUpstreams []string clusterNamespace, clusterName string - ctx context.Context + lb *controlplane.LoadBalancer + ctxCancel context.CancelFunc wg sync.WaitGroup @@ -54,43 +51,34 @@ func NewControlPlane(client client.Client, address net.IP, port int, clusterName clusterName: clusterName, } - cp.lb.DialTimeout = 5 * time.Second - cp.lb.KeepAlivePeriod = time.Second - cp.lb.TCPUserTimeout = 5 * time.Second - - cp.ctx, cp.ctxCancel = context.WithCancel(context.Background()) + logWriter := log.Writer() + if !verboseLog { + logWriter = ioutil.Discard + } var err error - if port == 0 { - port, err = findListenPort(address) - if err != nil { - return nil, err - } + cp.lb, err = controlplane.NewLoadBalancer(address.String(), port, logWriter) + if err != nil { + return nil, err } - cp.endpoint = net.JoinHostPort(address.String(), strconv.Itoa(port)) + upstreamCh := make(chan []string) - if !verboseLog { - // send logs to /dev/null - cp.lb.Logger = log.New(ioutil.Discard, "", 0) - } + var ctx context.Context - // create route without any upstreams yet - if err := cp.lb.AddRoute(cp.endpoint, nil); err != nil { - return nil, err - } + ctx, cp.ctxCancel = context.WithCancel(context.Background()) cp.wg.Add(1) - go cp.reconcileLoop() + go cp.reconcileLoop(ctx, upstreamCh) - return &cp, cp.lb.Start() + return &cp, cp.lb.Start(upstreamCh) } // GetEndpoint returns loadbalancer endpoint. func (cp *ControlPlane) GetEndpoint() string { - return cp.endpoint + return cp.lb.Endpoint() } // Close the load balancer. @@ -98,14 +86,10 @@ func (cp *ControlPlane) Close() error { cp.ctxCancel() cp.wg.Wait() - if err := cp.lb.Close(); err != nil { - return err - } - - return cp.lb.Wait() + return cp.lb.Shutdown() } -func (cp *ControlPlane) reconcileLoop() { +func (cp *ControlPlane) reconcileLoop(ctx context.Context, upstreamCh chan<- []string) { defer cp.wg.Done() const interval = 15 * time.Second @@ -114,28 +98,34 @@ func (cp *ControlPlane) reconcileLoop() { defer ticker.Stop() for { - if err := cp.reconcile(); err != nil { + if err := cp.reconcile(ctx); err != nil { log.Printf("load balancer reconcile failed: %s", err) + } else { + select { + case upstreamCh <- cp.prevUpstreams: + case <-ctx.Done(): + return + } } select { - case <-cp.ctx.Done(): + case <-ctx.Done(): return case <-ticker.C: } } } -func (cp *ControlPlane) reconcile() error { +func (cp *ControlPlane) reconcile(ctx context.Context) error { var cluster capiv1.Cluster - if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: cp.clusterNamespace, Name: cp.clusterName}, &cluster); err != nil { + if err := cp.client.Get(ctx, types.NamespacedName{Namespace: cp.clusterNamespace, Name: cp.clusterName}, &cluster); err != nil { return err } var controlPlane cacpt.TalosControlPlane - if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: cluster.Spec.ControlPlaneRef.Namespace, Name: cluster.Spec.ControlPlaneRef.Name}, &controlPlane); err != nil { + if err := cp.client.Get(ctx, types.NamespacedName{Namespace: cluster.Spec.ControlPlaneRef.Namespace, Name: cluster.Spec.ControlPlaneRef.Name}, &controlPlane); err != nil { return err } @@ -146,16 +136,18 @@ func (cp *ControlPlane) reconcile() error { return err } - if err := cp.client.List(cp.ctx, &machines, client.MatchingLabelsSelector{Selector: labelSelector}); err != nil { + if err := cp.client.List(ctx, &machines, client.MatchingLabelsSelector{Selector: labelSelector}); err != nil { return err } var upstreams []string for _, machine := range machines.Items { - var metalMachine sidero.MetalMachine + // we could have looked up addresses via Machine.status, but as we still have tests with Talos 0.13 (before SideroLink was introduced), + // we need to keep this way of looking up addresses. + var metalMachine infrav1.MetalMachine - if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: machine.Spec.InfrastructureRef.Namespace, Name: machine.Spec.InfrastructureRef.Name}, &metalMachine); err != nil { + if err := cp.client.Get(ctx, types.NamespacedName{Namespace: machine.Spec.InfrastructureRef.Namespace, Name: machine.Spec.InfrastructureRef.Name}, &metalMachine); err != nil { continue } @@ -165,7 +157,7 @@ func (cp *ControlPlane) reconcile() error { continue } - if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: metalMachine.Spec.ServerRef.Namespace, Name: metalMachine.Spec.ServerRef.Name}, &server); err != nil { + if err := cp.client.Get(ctx, types.NamespacedName{Namespace: metalMachine.Spec.ServerRef.Namespace, Name: metalMachine.Spec.ServerRef.Name}, &server); err != nil { return err } @@ -179,21 +171,10 @@ func (cp *ControlPlane) reconcile() error { sort.Strings(upstreams) if !reflect.DeepEqual(cp.prevUpstreams, upstreams) { - log.Printf("new control plane loadbalancer %q routes: %v", cp.endpoint, upstreams) + log.Printf("new control plane loadbalancer %q routes: %v", cp.lb.Endpoint(), upstreams) } cp.prevUpstreams = upstreams - return cp.lb.ReconcileRoute(cp.endpoint, upstreams) -} - -func findListenPort(address net.IP) (int, error) { - l, err := net.Listen("tcp", net.JoinHostPort(address.String(), "0")) - if err != nil { - return 0, err - } - - port := l.Addr().(*net.TCPAddr).Port - - return port, l.Close() + return nil }