Skip to content

Commit

Permalink
test: improve DNS resolver test stability
Browse files Browse the repository at this point in the history
Run a health check before the test, as the test depends on CoreDNS being
healthy, and previous tests might disturb the cluster.

Also refactor by using watch instead of retries, make pods terminate
fast.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Apr 29, 2024
1 parent 5aa0299 commit b690ffe
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 21 deletions.
10 changes: 9 additions & 1 deletion internal/integration/api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/siderolabs/go-pointer"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -103,6 +104,7 @@ file locks (-x) unlimited
},
},
},
TerminationGracePeriodSeconds: pointer.To[int64](0),
},
}, metav1.CreateOptions{})

Expand All @@ -122,6 +124,11 @@ file locks (-x) unlimited

// TestDNSResolver verifies that external DNS resolving works from a pod.
func (suite *CommonSuite) TestDNSResolver() {
if suite.Cluster != nil {
// cluster should be healthy for kube-dns resolving to work
suite.AssertClusterHealthy(suite.ctx)
}

const (
namespace = "default"
pod = "dns-test"
Expand All @@ -143,6 +150,7 @@ func (suite *CommonSuite) TestDNSResolver() {
},
},
},
TerminationGracePeriodSeconds: pointer.To[int64](0),
},
}, metav1.CreateOptions{})

Expand All @@ -151,7 +159,7 @@ func (suite *CommonSuite) TestDNSResolver() {
defer suite.Clientset.CoreV1().Pods(namespace).Delete(suite.ctx, pod, metav1.DeleteOptions{}) //nolint:errcheck

// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 10*time.Minute, namespace, pod))
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, time.Minute, namespace, pod))

stdout, stderr, err := suite.ExecuteCommandInPod(suite.ctx, namespace, pod, "wget https://www.google.com/")
suite.Require().NoError(err)
Expand Down
31 changes: 21 additions & 10 deletions internal/integration/base/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,12 @@ func (apiSuite *APISuite) SetupSuite() {
apiSuite.Talosconfig, err = clientconfig.Open(apiSuite.TalosConfig)
apiSuite.Require().NoError(err)

opts := []client.OptionFunc{
client.WithConfig(apiSuite.Talosconfig),
}

if apiSuite.Endpoint != "" {
opts = append(opts, client.WithEndpoints(apiSuite.Endpoint))
apiSuite.Client = apiSuite.GetClientWithEndpoints(apiSuite.Endpoint)
} else {
apiSuite.Client = apiSuite.GetClientWithEndpoints()
}

apiSuite.Client, err = client.New(context.TODO(), opts...)
apiSuite.Require().NoError(err)

// clear any connection refused errors left after the previous tests
nodes := apiSuite.DiscoverNodeInternalIPs(context.TODO())

Expand All @@ -78,6 +73,19 @@ func (apiSuite *APISuite) SetupSuite() {
}
}

// GetClientWithEndpoints returns Talos API client with provided endpoints.
func (apiSuite *APISuite) GetClientWithEndpoints(endpoints ...string) *client.Client {
opts := []client.OptionFunc{
client.WithConfig(apiSuite.Talosconfig),
client.WithEndpoints(endpoints...),
}

cli, err := client.New(context.TODO(), opts...)
apiSuite.Require().NoError(err)

return cli
}

// DiscoverNodes provides list of Talos nodes in the cluster.
//
// As there's no way to provide this functionality via Talos API, it works the following way:
Expand Down Expand Up @@ -590,6 +598,9 @@ func (apiSuite *APISuite) ResetNode(ctx context.Context, node string, resetSpec

nodeCtx := client.WithNode(ctx, node)

nodeClient := apiSuite.GetClientWithEndpoints(node)
defer nodeClient.Close() //nolint:errcheck

// any reset should lead to a reboot, so read boot_id before reboot
bootIDBefore, err := apiSuite.ReadBootID(nodeCtx)
apiSuite.Require().NoError(err)
Expand All @@ -612,15 +623,15 @@ func (apiSuite *APISuite) ResetNode(ctx context.Context, node string, resetSpec
preReset, err := apiSuite.HashKubeletCert(ctx, node)
apiSuite.Require().NoError(err)

resp, err := apiSuite.Client.ResetGenericWithResponse(nodeCtx, resetSpec)
resp, err := nodeClient.ResetGenericWithResponse(nodeCtx, resetSpec)
apiSuite.Require().NoError(err)

actorID := resp.Messages[0].ActorId

eventCh := make(chan client.EventResult)

// watch for events
apiSuite.Require().NoError(apiSuite.Client.EventsWatchV2(nodeCtx, eventCh, client.WithActorID(actorID), client.WithTailEvents(-1)))
apiSuite.Require().NoError(nodeClient.EventsWatchV2(nodeCtx, eventCh, client.WithActorID(actorID), client.WithTailEvents(-1)))

waitTimer := time.NewTimer(5 * time.Minute)
defer waitTimer.Stop()
Expand Down
67 changes: 57 additions & 10 deletions internal/integration/base/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,67 @@ func (k8sSuite *K8sSuite) WaitForEventExists(ctx context.Context, ns string, che

// WaitForPodToBeRunning waits for the pod with the given namespace and name to be running.
func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout time.Duration, namespace, podName string) error {
return retry.Constant(timeout, retry.WithUnits(time.Second*10)).Retry(
func() error {
pod, err := k8sSuite.Clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return retry.ExpectedErrorf("error getting pod: %s", err)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

watcher, err := k8sSuite.Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
return err
}

defer watcher.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-watcher.ResultChan():
if event.Type == watch.Error {
return fmt.Errorf("error watching pod: %v", event.Object)
}

if pod.Status.Phase != corev1.PodRunning {
return retry.ExpectedErrorf("pod is not running yet: %s", pod.Status.Phase)
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}

return nil
},
)
if pod.Name == podName && pod.Status.Phase == corev1.PodRunning {
return nil
}
}
}
}

// WaitForPodToBeDeleted waits for the pod with the given namespace and name to be deleted.
func (k8sSuite *K8sSuite) WaitForPodToBeDeleted(ctx context.Context, timeout time.Duration, namespace, podName string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

watcher, err := k8sSuite.Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
return err
}

defer watcher.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
return nil
}

if event.Type == watch.Error {
return fmt.Errorf("error watching pod: %v", event.Object)
}
}
}
}

// ExecuteCommandInPod executes the given command in the pod with the given namespace and name.
Expand Down

0 comments on commit b690ffe

Please sign in to comment.