Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add end-to-end tests for local cache #292

Merged
merged 7 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/e2e-kubernetes/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var CSITestSuites = []func() framework.TestSuite{
custom_testsuites.InitS3CSIMultiVolumeTestSuite,
custom_testsuites.InitS3MountOptionsTestSuite,
custom_testsuites.InitS3CSICredentialsTestSuite,
custom_testsuites.InitS3CSICacheTestSuite,
}

// This executes testSuites for csi volumes.
Expand Down
237 changes: 237 additions & 0 deletions tests/e2e-kubernetes/testsuites/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package custom_testsuites

import (
"context"
"fmt"
"path/filepath"
"slices"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/utils/ptr"
)

const volumeName1 = "volume1"
const root = int64(0)
const defaultNonRootGroup = int64(2000)

type s3CSICacheTestSuite struct {
tsInfo storageframework.TestSuiteInfo
}

func InitS3CSICacheTestSuite() storageframework.TestSuite {
return &s3CSICacheTestSuite{
tsInfo: storageframework.TestSuiteInfo{
Name: "cache",
TestPatterns: []storageframework.TestPattern{
storageframework.DefaultFsPreprovisionedPV,
},
},
}
}

func (t *s3CSICacheTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
return t.tsInfo
}

func (t *s3CSICacheTestSuite) SkipUnsupportedTests(_ storageframework.TestDriver, pattern storageframework.TestPattern) {
if pattern.VolType != storageframework.PreprovisionedPV {
e2eskipper.Skipf("Suite %q does not support %v", t.tsInfo.Name, pattern.VolType)
}
}

func (t *s3CSICacheTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
f := framework.NewFrameworkWithCustomTimeouts(NamespacePrefix+"cache", storageframework.GetDriverTimeouts(driver))
// This is required for now due to hack mentioned in `ensureCacheDirExistsInNode` function, see the comments there for more context.
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
muddyfish marked this conversation as resolved.
Show resolved Hide resolved

type local struct {
config *storageframework.PerTestConfig

// A list of cleanup functions to be called after each test to clean resources created during the test.
cleanup []func(context.Context) error
}

var l local

deferCleanup := func(f func(context.Context) error) {
l.cleanup = append(l.cleanup, f)
}

cleanup := func(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking: Don't we have this setup in a load of other tests? At some point, maybe we should try and de-duplicate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. It's a bit tricky because we create and use a local variable in the function scope and reuse it with each test run via BeforeEach lifecycle events, see local struct in each test suite. Might need to change that pattern a bit to have something reusable.

var errs []error
slices.Reverse(l.cleanup) // clean items in reverse order similar to how `defer` works
for _, f := range l.cleanup {
errs = append(errs, f(ctx))
}
framework.ExpectNoError(errors.NewAggregate(errs), "while cleanup resource")
}
BeforeEach(func(ctx context.Context) {
l = local{}
l.config = driver.PrepareTest(ctx, f)
DeferCleanup(cleanup)
})

// checkBasicFileOperations verifies basic file operations works in the given `basePath` inside the `pod`.
checkBasicFileOperations := func(ctx context.Context, pod *v1.Pod, bucketName, basePath string) {
framework.Logf("Checking basic file operations inside pod %s at %s", pod.UID, basePath)

dir := filepath.Join(basePath, "test-dir")
first := filepath.Join(basePath, "first")
second := filepath.Join(dir, "second")

seed := time.Now().UTC().UnixNano()
testWriteSize := 1024 // 1KB

checkWriteToPath(f, pod, first, testWriteSize, seed)
checkListingPathWithEntries(f, pod, basePath, []string{"first"})
// Test reading multiple times to ensure cached-read works
for i := 0; i < 3; i++ {
checkReadFromPath(f, pod, first, testWriteSize, seed)
}

// Now remove the file from S3
deleteObjectFromS3(ctx, bucketName, "first")

// Ensure the data still read from the cache - without cache this would fail as its removed from underlying bucket
checkReadFromPath(f, pod, first, testWriteSize, seed)

e2evolume.VerifyExecInPodSucceed(f, pod, fmt.Sprintf("mkdir %s && cd %s && echo 'second!' > %s", dir, dir, second))
e2evolume.VerifyExecInPodSucceed(f, pod, fmt.Sprintf("cat %s | grep -q 'second!'", second))
checkListingPathWithEntries(f, pod, dir, []string{"second"})
checkListingPathWithEntries(f, pod, basePath, []string{"test-dir"})
checkDeletingPath(f, pod, first)
checkDeletingPath(f, pod, second)
}

createPod := func(ctx context.Context, additionalMountOptions []string, podModifiers ...func(*v1.Pod)) (*v1.Pod, string) {
cacheDir := randomCacheDir()

vol := createVolumeResourceWithMountOptions(ctx, l.config, pattern, append(additionalMountOptions, fmt.Sprintf("cache %s", cacheDir)))
deferCleanup(vol.CleanupResource)

bucketName := bucketNameFromVolumeResource(vol)

pod := e2epod.MakePod(f.Namespace.Name, nil, []*v1.PersistentVolumeClaim{vol.Pvc}, admissionapi.LevelBaseline, "")
ensureCacheDirExistsInNode(pod, cacheDir)
for _, pm := range podModifiers {
pm(pod)
}

pod, err := createPod(ctx, f.ClientSet, f.Namespace.Name, pod)
framework.ExpectNoError(err)
deferCleanup(func(ctx context.Context) error { return e2epod.DeletePodWithWait(ctx, f.ClientSet, pod) })

return pod, bucketName
}

Describe("Cache", func() {
Describe("Local", func() {
It("basic file operations as root", func(ctx context.Context) {
pod, bucketName := createPod(ctx, []string{"allow-delete"}, func(pod *v1.Pod) {
pod.Spec.Containers[0].SecurityContext.RunAsUser = ptr.To(root)
pod.Spec.Containers[0].SecurityContext.RunAsGroup = ptr.To(root)
})
checkBasicFileOperations(ctx, pod, bucketName, e2epod.VolumeMountPath1)
})

It("basic file operations as non-root", func(ctx context.Context) {
mountOptions := []string{
"allow-delete",
"allow-other",
fmt.Sprintf("uid=%d", *e2epod.GetDefaultNonRootUser()),
fmt.Sprintf("gid=%d", defaultNonRootGroup),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would these constants be useful as functions/moved to e2epod?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e2epod is a Kubernetes package, but I can move these constants to util.go in a follow-up PR.

}
pod, bucketName := createPod(ctx, mountOptions, func(pod *v1.Pod) {
pod.Spec.Containers[0].SecurityContext.RunAsUser = e2epod.GetDefaultNonRootUser()
pod.Spec.Containers[0].SecurityContext.RunAsGroup = ptr.To(defaultNonRootGroup)
pod.Spec.Containers[0].SecurityContext.RunAsNonRoot = ptr.To(true)
})

checkBasicFileOperations(ctx, pod, bucketName, e2epod.VolumeMountPath1)
})

It("two containers in the same pod using the same cache", func(ctx context.Context) {
testFile := filepath.Join(e2epod.VolumeMountPath1, "helloworld.txt")

pod, _ := createPod(ctx, []string{"allow-delete"}, func(pod *v1.Pod) {
// Make it init container to ensure it runs before regular containers
pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{
Name: "populate-cache",
Image: e2epod.GetDefaultTestImage(),
Command: e2epod.GenerateScriptCmd(
fmt.Sprintf("echo 'hello world!' > %s && cat %s | grep -q 'hello world!'", testFile, testFile)),
VolumeMounts: []v1.VolumeMount{
{
Name: volumeName1,
MountPath: e2epod.VolumeMountPath1,
},
},
})
})

e2evolume.VerifyExecInPodSucceed(f, pod, fmt.Sprintf("cat %s | grep -q 'hello world!'", testFile))
muddyfish marked this conversation as resolved.
Show resolved Hide resolved
})
})
})
}

// deleteObjectFromS3 deletes an object from given bucket by using S3 SDK.
// This is useful to create some side-effects by bypassing Mountpoint.
func deleteObjectFromS3(ctx context.Context, bucket string, key string) {
client := s3.NewFromConfig(awsConfig(ctx))
_, err := client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
framework.ExpectNoError(err)
}

func randomCacheDir() string {
return filepath.Join("/tmp/mp-cache", uuid.New().String())
}

// ensureCacheDirExistsInNode adds a hostPath for given `cacheDir` with `DirectoryOrCreate` type.
// This hack required because Mountpoint process is running on the underlying host and not inside the container,
// so we need to ensure cache directory exists on the host.
// This hack hopefully will go away with https://github.com/awslabs/mountpoint-s3-csi-driver/issues/279.
func ensureCacheDirExistsInNode(pod *v1.Pod, cacheDir string) {
cacheVolumeMount := v1.VolumeMount{
Name: "make-cache-dir",
MountPath: "/cache",
}

// The directory created with `DirectoryOrCreate` will have 0755 permissions and will be owned by kubelet.
// Unless we change permissions here, non-root containers won't be able to access to the cache dir.
pod.Spec.InitContainers = append(pod.Spec.DeepCopy().InitContainers, v1.Container{
Name: "chmod-cache-dir",
Image: e2epod.GetDefaultTestImage(),
Command: e2epod.GenerateScriptCmd("chmod -R 777 /cache"),
SecurityContext: &v1.SecurityContext{
RunAsUser: ptr.To(root),
RunAsGroup: ptr.To(root),
},
VolumeMounts: []v1.VolumeMount{cacheVolumeMount},
})
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{
Name: "make-cache-dir",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: cacheDir,
Type: ptr.To(v1.HostPathDirectoryOrCreate),
},
},
})
pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, cacheVolumeMount)
}
16 changes: 16 additions & 0 deletions tests/e2e-kubernetes/testsuites/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -76,6 +77,16 @@ func checkListingPath(f *framework.Framework, pod *v1.Pod, path string) {
e2evolume.VerifyExecInPodSucceed(f, pod, fmt.Sprintf("ls %s", path))
}

func checkListingPathWithEntries(f *framework.Framework, pod *v1.Pod, path string, entries []string) {
cmd := fmt.Sprintf("ls %s", path)
stdout, stderr, err := e2evolume.PodExec(f, pod, cmd)
framework.ExpectNoError(err,
"%q should succeed, but failed with error message %q\nstdout: %s\nstderr: %s",
cmd, err, stdout, stderr)

gomega.Expect(strings.Fields(stdout)).To(gomega.Equal(entries))
}

func createVolumeResourceWithMountOptions(ctx context.Context, config *storageframework.PerTestConfig, pattern storageframework.TestPattern, mountOptions []string) *storageframework.VolumeResource {
f := config.Framework
r := storageframework.VolumeResource{
Expand Down Expand Up @@ -129,6 +140,11 @@ func createVolumeResourceWithMountOptions(ctx context.Context, config *storagefr
return &r
}

func bucketNameFromVolumeResource(vol *storageframework.VolumeResource) string {
pvc := vol.Pv.Spec.PersistentVolumeSource
return pvc.CSI.VolumeHandle
}

func createPod(ctx context.Context, client clientset.Interface, namespace string, pod *v1.Pod) (*v1.Pod, error) {
framework.Logf("Creating Pod %s in %s (SA: %s)", pod.Name, namespace, pod.Spec.ServiceAccountName)
pod, err := client.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
Expand Down
Loading