|
1 | 1 | package e2e |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "fmt" |
4 | 5 | "testing" |
| 6 | + "time" |
5 | 7 |
|
6 | 8 | . "github.com/onsi/gomega" |
| 9 | + corev1 "k8s.io/api/core/v1" |
| 10 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
7 | 11 |
|
8 | 12 | rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" |
9 | 13 | "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" |
@@ -52,6 +56,80 @@ func TestRayClusterAuthOptions(t *testing.T) { |
52 | 56 | VerifyContainerAuthTokenEnvVars(test, rayCluster, &workerPod.Spec.Containers[utils.RayContainerIndex]) |
53 | 57 | } |
54 | 58 |
|
55 | | - // TODO(andrewsykim): add job submission test with and without token once a Ray version with token support is released. |
| 59 | + // Get auth token for job submission tests |
| 60 | + authToken := getAuthTokenFromPod(test, rayCluster, headPod) |
| 61 | + g.Expect(authToken).NotTo(BeEmpty(), "Auth token should be present") |
| 62 | + |
| 63 | + // Test job submission with auth token using Ray Job CLI |
| 64 | + test.T().Run("Submit job with auth token should succeed", func(_ *testing.T) { |
| 65 | + LogWithTimestamp(test.T(), "Testing job submission WITH auth token") |
| 66 | + |
| 67 | + submissionId := fmt.Sprintf("test-job-with-auth-%d", time.Now().Unix()) |
| 68 | + |
| 69 | + // Submit job via Ray Job CLI with auth token |
| 70 | + // Set RAY_AUTH_TOKEN environment variable for authentication |
| 71 | + submitCmd := []string{ |
| 72 | + "bash", "-c", |
| 73 | + fmt.Sprintf("RAY_AUTH_TOKEN=%s ray job submit --address http://127.0.0.1:8265 --submission-id %s --no-wait -- python -c 'import ray; ray.init(); print(\"Job with auth succeeded\")'", |
| 74 | + authToken, submissionId), |
| 75 | + } |
| 76 | + |
| 77 | + stdout, _ := ExecPodCmd(test, headPod, headPod.Spec.Containers[utils.RayContainerIndex].Name, submitCmd) |
| 78 | + |
| 79 | + // Verify job was submitted successfully |
| 80 | + g.Expect(stdout.String()).To(ContainSubstring(submissionId), "Job submission should succeed with valid auth token") |
| 81 | + |
| 82 | + // Verify job status is queryable with auth token (confirms auth works) |
| 83 | + g.Eventually(func(g Gomega) { |
| 84 | + statusCmd := []string{ |
| 85 | + "bash", "-c", |
| 86 | + fmt.Sprintf("RAY_AUTH_TOKEN=%s ray job status --address http://127.0.0.1:8265 %s", authToken, submissionId), |
| 87 | + } |
| 88 | + stdout, _ := ExecPodCmd(test, headPod, headPod.Spec.Containers[utils.RayContainerIndex].Name, statusCmd) |
| 89 | + g.Expect(stdout.String()).To(ContainSubstring("succeeded")) |
| 90 | + }, TestTimeoutShort).Should(Succeed()) |
| 91 | + |
| 92 | + LogWithTimestamp(test.T(), "Successfully submitted and verified job with auth token") |
| 93 | + }) |
| 94 | + |
| 95 | + test.T().Run("Submit job with incorrect auth token should fail", func(_ *testing.T) { |
| 96 | + LogWithTimestamp(test.T(), "Testing job submission WITH incorrect auth token (should fail)") |
| 97 | + |
| 98 | + submissionId := fmt.Sprintf("test-job-bad-auth-%d", time.Now().Unix()) |
| 99 | + |
| 100 | + // Submit job via Ray Job CLI with INCORRECT auth token |
| 101 | + incorrectToken := "incorrect-token-12345" |
| 102 | + submitCmd := []string{ |
| 103 | + "bash", "-c", |
| 104 | + fmt.Sprintf("RAY_AUTH_TOKEN=%s ray job submit --address http://127.0.0.1:8265 --submission-id %s --no-wait -- python -c 'print(\"Should not run\")'", |
| 105 | + incorrectToken, submissionId), |
| 106 | + } |
| 107 | + |
| 108 | + _, stderr := ExecPodCmd(test, headPod, headPod.Spec.Containers[utils.RayContainerIndex].Name, submitCmd, true) |
| 109 | + |
| 110 | + // Verify response indicates authentication failure |
| 111 | + g.Expect(stderr.String()).To(ContainSubstring("Unauthenticated"), "Job submission should fail with Unauthorized when auth token is incorrect") |
| 112 | + |
| 113 | + LogWithTimestamp(test.T(), "Job submission correctly rejected with incorrect auth token") |
| 114 | + }) |
56 | 115 | }) |
57 | 116 | } |
| 117 | + |
| 118 | +// getAuthTokenFromPod extracts the auth token from the pod's environment variables. |
| 119 | +// It reads the token from the secret referenced by the RAY_AUTH_TOKEN environment variable. |
| 120 | +func getAuthTokenFromPod(test Test, rayCluster *rayv1.RayCluster, pod *corev1.Pod) string { |
| 121 | + test.T().Helper() |
| 122 | + g := NewWithT(test.T()) |
| 123 | + |
| 124 | + for _, envVar := range pod.Spec.Containers[utils.RayContainerIndex].Env { |
| 125 | + if envVar.Name == utils.RAY_AUTH_TOKEN_ENV_VAR { |
| 126 | + if envVar.ValueFrom != nil && envVar.ValueFrom.SecretKeyRef != nil { |
| 127 | + secret, err := test.Client().Core().CoreV1().Secrets(rayCluster.Namespace). |
| 128 | + Get(test.Ctx(), envVar.ValueFrom.SecretKeyRef.Name, metav1.GetOptions{}) |
| 129 | + g.Expect(err).NotTo(HaveOccurred()) |
| 130 | + return string(secret.Data[envVar.ValueFrom.SecretKeyRef.Key]) |
| 131 | + } |
| 132 | + } |
| 133 | + } |
| 134 | + return "" |
| 135 | +} |
0 commit comments