diff --git a/main.go b/main.go index 57aa86fca..dd6c544eb 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,7 @@ import ( submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" "github.com/submariner-io/submariner/pkg/gateway" "github.com/submariner-io/submariner/pkg/natdiscovery" + "github.com/submariner-io/submariner/pkg/node" "github.com/submariner-io/submariner/pkg/types" "github.com/submariner-io/submariner/pkg/versions" "golang.org/x/net/http/httpproxy" @@ -123,6 +124,14 @@ func main() { logger.FatalOnError(subv1.AddToScheme(scheme.Scheme), "Error adding submariner types to the scheme") + ctx := signals.SetupSignalHandler() + + if submSpec.WaitForNode { + node.WaitForLocalNodeReady(ctx, k8sClient) + + return + } + gw, err := gateway.New(&gateway.Config{ LeaderElectionConfig: gateway.LeaderElectionConfig{ LeaseDuration: time.Duration(gwLeadershipConfig.LeaseDuration) * time.Second, @@ -146,7 +155,7 @@ func main() { }) logger.FatalOnError(err, "Error creating gateway instance") - err = gw.Run(signals.SetupSignalHandler()) + err = gw.Run(ctx) logger.FatalOnError(err, "Error running the gateway") } diff --git a/pkg/node/node.go b/pkg/node/node.go index 5897e1dcf..a215ecef1 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -25,17 +25,19 @@ import ( "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/admiral/pkg/resource" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" + nodeutil "k8s.io/component-helpers/node/util" logf "sigs.k8s.io/controller-runtime/pkg/log" ) var logger = log.Logger{Logger: logf.Log.WithName("Node")} -var nodeRetry = wait.Backoff{ +var Retry = wait.Backoff{ Steps: 5, Duration: 5 * time.Second, Factor: 1.2, @@ -50,7 +52,7 @@ func GetLocalNode(clientset kubernetes.Interface) (*v1.Node, error) { var node *v1.Node - err := retry.OnError(nodeRetry, func(err error) bool { + err := retry.OnError(Retry, func(err error) bool { logger.Warningf("Error reading the local node - retrying: %v", err) return true }, func() error { @@ -66,3 +68,27 @@ func GetLocalNode(clientset kubernetes.Interface) (*v1.Node, error) { return node, errors.Wrapf(err, "failed to get local node %q", nodeName) } + +func WaitForLocalNodeReady(ctx context.Context, client kubernetes.Interface) { + // In most cases the node will already be ready; otherwise, wait forever or until the context is cancelled. + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (bool, error) { + localNode, err := GetLocalNode(client) //nolint:contextcheck // TODO - should pass the context parameter + + if err != nil { + logger.Error(err, "Error retrieving local node") + } else { + _, condition := nodeutil.GetNodeCondition(&localNode.Status, v1.NodeReady) + if condition != nil && condition.Status == v1.ConditionTrue { + logger.Info("Local node ready") + return true, nil + } + + logger.Infof("Local node not ready - waiting. Conditions: %s", resource.ToJSON(localNode.Status.Conditions)) + } + + return false, nil + }) + if err != nil { + logger.Error(err, "Error waiting for local node") + } +} diff --git a/pkg/node/node_suite_test.go b/pkg/node/node_suite_test.go new file mode 100644 index 000000000..90ccdc67a --- /dev/null +++ b/pkg/node/node_suite_test.go @@ -0,0 +1,40 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/log/kzerolog" +) + +func init() { + kzerolog.AddFlags(nil) +} + +var _ = BeforeSuite(func() { + kzerolog.InitK8sLogging() +}) + +func TestNode(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Node Suite") +} diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go new file mode 100644 index 000000000..6c15464ae --- /dev/null +++ b/pkg/node/node_test.go @@ -0,0 +1,157 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node_test + +import ( + "context" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/submariner/pkg/node" + corev1 "k8s.io/api/core/v1" + v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + fakeK8s "k8s.io/client-go/kubernetes/fake" + nodeutil "k8s.io/component-helpers/node/util" +) + +const localNodeName = "local-node" + +var _ = Describe("GetLocalNode", func() { + t := newTestDriver() + + When("the local Node resource exists", func() { + It("should return the resource", func() { + Expect(node.GetLocalNode(t.client)).To(Equal(t.node)) + }) + }) + + When("the local Node resource does not exist", func() { + BeforeEach(func() { + t.initialObjs = []runtime.Object{} + }) + + It("should return an error", func() { + _, err := node.GetLocalNode(t.client) + Expect(err).To(HaveOccurred()) + }) + }) + + When("the local Node retrieval initially fails", func() { + JustBeforeEach(func() { + fake.FailOnAction(&t.client.Fake, "nodes", "get", nil, true) + }) + + It("should eventually return the resource", func() { + Expect(node.GetLocalNode(t.client)).To(Equal(t.node)) + }) + }) + + When("the NODE_NAME env var isn't set", func() { + BeforeEach(func() { + os.Unsetenv("NODE_NAME") + }) + + It("should return an error", func() { + _, err := node.GetLocalNode(t.client) + Expect(err).To(HaveOccurred()) + }) + }) +}) + +var _ = Describe("WaitForLocalNodeReady", func() { + t := newTestDriver() + + var ( + cancel context.CancelFunc + completed chan struct{} + ) + + JustBeforeEach(func() { + var ctx context.Context + + ctx, cancel = context.WithCancel(context.Background()) + completed = make(chan struct{}, 1) + + go func() { + node.WaitForLocalNodeReady(ctx, t.client) + close(completed) + }() + + DeferCleanup(cancel) + + Consistently(completed).ShouldNot(BeClosed()) + }) + + When("the local Node becomes ready", func() { + It("should return", func() { + Expect(nodeutil.SetNodeCondition(t.client, localNodeName, corev1.NodeCondition{ + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + })).To(Succeed()) + + Eventually(completed, 3*time.Second).Should(BeClosed()) + }) + }) + + When("the context is cancelled", func() { + It("should return", func() { + cancel() + + Eventually(completed, 3*time.Second).Should(BeClosed()) + }) + }) +}) + +type testDriver struct { + client *fakeK8s.Clientset + node *corev1.Node + initialObjs []runtime.Object +} + +func newTestDriver() *testDriver { + t := &testDriver{} + + BeforeEach(func() { + node.Retry = wait.Backoff{ + Steps: 2, + Duration: 10 * time.Millisecond, + } + + t.node = &corev1.Node{ + ObjectMeta: v1meta.ObjectMeta{ + Name: localNodeName, + }, + } + + t.initialObjs = []runtime.Object{t.node} + + os.Setenv("NODE_NAME", localNodeName) + }) + + JustBeforeEach(func() { + t.client = fakeK8s.NewClientset(t.initialObjs...) + }) + + return t +} diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index 583b44df7..4af716d55 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -56,7 +56,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" - nodeutil "k8s.io/component-helpers/node/util" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager/signals" ) @@ -84,7 +83,7 @@ func main() { logger.Info("Starting submariner-route-agent using the event framework") // set up signals so we handle the first shutdown signal gracefully - stopCh := signals.SetupSignalHandler().Done() + ctx := signals.SetupSignalHandler() // Clean up "sockets" created as directories by previous versions removeInvalidSockets() @@ -113,7 +112,7 @@ func main() { logger.FatalOnError(err, "Error building the REST mapper") if env.WaitForNode { - waitForNodeReady(k8sClientSet) + node.WaitForLocalNodeReady(ctx, k8sClientSet) return } @@ -181,6 +180,8 @@ func main() { }) logger.FatalOnError(err, "Error creating controller for event handling") + stopCh := ctx.Done() + err = ctl.Start(stopCh) logger.FatalOnError(err, "Error starting controller") @@ -223,27 +224,6 @@ func uninstall(registry *event.Registry) { } } -func waitForNodeReady(k8sClientSet *kubernetes.Clientset) { - // In most cases the node will already be ready; otherwise, wait for ever - for { - localNode, err := node.GetLocalNode(k8sClientSet) - - if err != nil { - logger.Error(err, "Error retrieving local node") - } else if localNode != nil { - _, condition := nodeutil.GetNodeCondition(&localNode.Status, corev1.NodeReady) - if condition != nil && condition.Status == corev1.ConditionTrue { - logger.Info("Node ready") - return - } - - logger.Infof("Node not ready, waiting: %v", localNode.Status) - } - - time.Sleep(1 * time.Second) - } -} - func removeInvalidSockets() { // This can be removed once we stop supporting upgrades from 0.16.0 or older for _, dir := range []string{"/run/openvswitch/db.sock", "/var/run/openvswitch/ovnnb_db.sock", "/var/run/ovn-ic/ovnnb_db.sock"} { diff --git a/pkg/types/types.go b/pkg/types/types.go index 6f5c8f3c9..4dbcf83a1 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -46,6 +46,7 @@ type SubmarinerSpecification struct { HealthCheckEnabled bool `default:"true"` Uninstall bool HaltOnCertError bool `split_words:"true"` + WaitForNode bool HealthCheckInterval int HealthCheckMaxPacketLossCount int MetricsPort int `default:"32780"`