From 7f0ee9b8847ca31403481c881e008afb5f887fdb Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 24 Jun 2022 15:08:08 +0000 Subject: [PATCH] Update S3 implementation (#86) # Description - Access style is honoured/implemented - Secrets are no longer read directly but are mounted - TLS for S3 access is not yet implemented (though a non-`None` entry will affect the endpoint returned from `operator-rs`) Fixes #85. --- CHANGELOG.md | 10 +- Cargo.lock | 53 ++++- deploy/crd/sparkapplication.crd.yaml | 39 +++- deploy/helm/spark-k8s-operator/crds/crds.yaml | 39 +++- deploy/manifests/crds.yaml | 39 +++- .../examples/example-sparkapp-s3-private.yaml | 5 +- docs/modules/ROOT/pages/usage.adoc | 24 ++- rust/crd/Cargo.toml | 2 +- rust/crd/src/constants.rs | 3 +- rust/crd/src/lib.rs | 184 +++++++++++------- rust/operator-binary/Cargo.toml | 4 +- .../src/spark_k8s_controller.rs | 90 ++++++--- .../10-deploy-spark-app.yaml.j2 | 2 +- .../spark-pi-private-s3/00-s3-secret.yaml | 12 ++ .../10-deploy-spark-app.yaml.j2 | 5 +- .../10-deploy-spark-app.yaml.j2 | 2 +- 16 files changed, 376 insertions(+), 137 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f26f77c9..c5615e3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,20 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +### Changed + +- BREAKING: Use current S3 connection/bucket structs ([#86]) + +[#86]: https://github.com/stackabletech/spark-k8s-operator/pull/86 + ## [0.2.0] - 2022-06-21 ### Added - Added new fields to govern image pull policy ([#75]) -- New `nodeSelector` fields for both the driver and the excutors ([#76]) +- New `nodeSelector` fields for both the driver and the executors ([#76]) - Mirror driver pod status to the corresponding spark application ([#77]) [#75]: https://github.com/stackabletech/spark-k8s-operator/pull/75 diff --git a/Cargo.lock b/Cargo.lock index 5571c52c..cc1ec73d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,8 +274,18 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + +[[package]] +name = "darling" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4529658bdda7fd6769b8614be250cdcfc3aeb0ee72fe66f9e41e5e5eb73eac02" +dependencies = [ + "darling_core 0.14.1", + "darling_macro 0.14.1", ] [[package]] @@ -292,13 +302,38 @@ dependencies = [ "syn", ] +[[package]] +name = "darling_core" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "649c91bc01e8b1eac09fb91e8dbc7d517684ca6be8ebc75bb9cafc894f9fdb6f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + [[package]] name = "darling_macro" version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ - "darling_core", + "darling_core 0.13.4", + "quote", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5" +dependencies = [ + "darling_core 0.14.1", "quote", "syn", ] @@ -895,7 +930,7 @@ version = "0.71.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "203f7c5acf9d0dfb0b08d44ec1d66ace3d1dfe0cdd82e65e274f3f96615d666c" dependencies = [ - "darling", + "darling 0.13.4", "proc-macro2", "quote", "serde_json", @@ -1664,8 +1699,8 @@ dependencies = [ [[package]] name = "stackable-operator" -version = "0.19.0" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.19.0#f8f2d5527b3463cc40f3d851172af7ae81db75c1" +version = "0.21.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.21.0#dbfa6d45fb59fadd17f9b571255c0fdc4c522671" dependencies = [ "backoff", "chrono", @@ -1698,10 +1733,10 @@ dependencies = [ [[package]] name = "stackable-operator-derive" -version = "0.17.0" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.19.0#f8f2d5527b3463cc40f3d851172af7ae81db75c1" +version = "0.21.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.21.0#dbfa6d45fb59fadd17f9b571255c0fdc4c522671" dependencies = [ - "darling", + "darling 0.14.1", "proc-macro2", "quote", "syn", diff --git a/deploy/crd/sparkapplication.crd.yaml b/deploy/crd/sparkapplication.crd.yaml index 99b302ff..f5239cca 100644 --- a/deploy/crd/sparkapplication.crd.yaml +++ b/deploy/crd/sparkapplication.crd.yaml @@ -299,18 +299,51 @@ spec: inline: description: S3 connection definition as CRD. properties: + accessStyle: + description: "Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on " + enum: + - Path + - VirtualHosted + nullable: true + type: string + credentials: + description: "If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient." + nullable: true + properties: + scope: + description: "[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)" + nullable: true + properties: + node: + default: false + type: boolean + pod: + default: false + type: boolean + services: + default: [] + items: + type: string + type: array + type: object + secretClass: + description: "[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials" + type: string + required: + - secretClass + type: object host: + description: Hostname of the S3 server without any protocol or port nullable: true type: string port: + description: Port the S3 server listens on. If not specified the products will determine the port to use. format: uint16 minimum: 0.0 nullable: true type: integer - secretClass: - nullable: true - type: string tls: + description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. nullable: true properties: verification: diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 0c31374d..1bb64143 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -301,18 +301,51 @@ spec: inline: description: S3 connection definition as CRD. properties: + accessStyle: + description: "Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on " + enum: + - Path + - VirtualHosted + nullable: true + type: string + credentials: + description: "If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient." + nullable: true + properties: + scope: + description: "[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)" + nullable: true + properties: + node: + default: false + type: boolean + pod: + default: false + type: boolean + services: + default: [] + items: + type: string + type: array + type: object + secretClass: + description: "[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials" + type: string + required: + - secretClass + type: object host: + description: Hostname of the S3 server without any protocol or port nullable: true type: string port: + description: Port the S3 server listens on. If not specified the products will determine the port to use. format: uint16 minimum: 0.0 nullable: true type: integer - secretClass: - nullable: true - type: string tls: + description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. nullable: true properties: verification: diff --git a/deploy/manifests/crds.yaml b/deploy/manifests/crds.yaml index 0540e287..e907f8ef 100644 --- a/deploy/manifests/crds.yaml +++ b/deploy/manifests/crds.yaml @@ -302,18 +302,51 @@ spec: inline: description: S3 connection definition as CRD. properties: + accessStyle: + description: "Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the official documentation on " + enum: + - Path + - VirtualHosted + nullable: true + type: string + credentials: + description: "If the S3 uses authentication you have to specify you S3 credentials. In the most cases a SecretClass providing `accessKey` and `secretKey` is sufficient." + nullable: true + properties: + scope: + description: "[Scope](https://docs.stackable.tech/secret-operator/scope.html) of the [SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html)" + nullable: true + properties: + node: + default: false + type: boolean + pod: + default: false + type: boolean + services: + default: [] + items: + type: string + type: array + type: object + secretClass: + description: "[SecretClass](https://docs.stackable.tech/secret-operator/secretclass.html) containing the LDAP bind credentials" + type: string + required: + - secretClass + type: object host: + description: Hostname of the S3 server without any protocol or port nullable: true type: string port: + description: Port the S3 server listens on. If not specified the products will determine the port to use. format: uint16 minimum: 0.0 nullable: true type: integer - secretClass: - nullable: true - type: string tls: + description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting. nullable: true properties: verification: diff --git a/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml b/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml index 761b5de0..3ea32d07 100644 --- a/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml +++ b/docs/modules/ROOT/examples/example-sparkapp-s3-private.yaml @@ -16,10 +16,11 @@ spec: inline: host: test-minio port: 9000 - secretClass: minio-credentials # <4> + accessStyle: Path + credentials: # <4> + secretClass: s3-credentials-class sparkConf: # <5> spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" # <6> - spark.hadoop.fs.s3a.path.style.access: "true" spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" volumes: diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 569d9f5b..f6ffbb03 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -2,7 +2,7 @@ == Create an Apache Spark job -If you followed the installation instructions, you should now have a Stackable Operator for Apache Spark up and running and you are ready to create your first Apache Spark kubernetes cluster. +If you followed the installation instructions, you should now have a Stackable Operator for Apache Spark up and running, and you are ready to create your first Apache Spark kubernetes cluster. The example below creates a job running on Apache Spark 3.2.1, using the spark-on-kubernetes paradigm described in the spark documentation. The application file is itself part of the spark distribution and `local` refers to the path on the driver/executors; there are no external dependencies. @@ -64,11 +64,11 @@ include::example$example-sparkapp-external-dependencies.yaml[] include::example$example-sparkapp-image.yaml[] ---- -<1> Job image: this contains the job artifact that will retrieved from the volume mount backed by the PVC +<1> Job image: this contains the job artifact that will be retrieved from the volume mount backed by the PVC <2> Job python artifact (local) <3> Job argument (external) <4> List of python job requirements: these will be installed in the pods via `pip` -<5> Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in s3) +<5> Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in an S3 store) <6> the name of the volume mount backed by a `PersistentVolumeClaim` that must be pre-existing <7> the path on the volume mount: this is referenced in the `sparkConf` section where the extra class path is defined for the driver and executors @@ -81,7 +81,7 @@ include::example$example-sparkapp-pvc.yaml[] <1> Job artifact located on S3. <2> Job main class -<3> Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in s3, accessed without credentials) +<3> Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in an S3 store, accessed without credentials) <4> the name of the volume mount backed by a `PersistentVolumeClaim` that must be pre-existing <5> the path on the volume mount: this is referenced in the `sparkConf` section where the extra class path is defined for the driver and executors @@ -92,12 +92,12 @@ include::example$example-sparkapp-pvc.yaml[] include::example$example-sparkapp-s3-private.yaml[] ---- -<1> Job python artifact (located in S3) +<1> Job python artifact (located in an S3 store) <2> Artifact class <3> S3 section, specifying the existing secret and S3 end-point (in this case, MinIO) -<4> Credentials secret +<4> Credentials referencing a secretClass (not shown in is example) <5> Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources... -<6> ...in this case, in s3, accessed with the credentials defined in the secret +<6> ...in this case, in an S3 store, accessed with the credentials defined in the secret <7> the name of the volume mount backed by a `PersistentVolumeClaim` that must be pre-existing <8> the path on the volume mount: this is referenced in the `sparkConf` section where the extra class path is defined for the driver and executors @@ -121,7 +121,7 @@ include::example$example-sparkapp-configmap.yaml[] == S3 bucket specification -You can specify S3 connection details directly inside the `SparkApplication` specification or by refering to an external `S3Bucket` custom resource. +You can specify S3 connection details directly inside the `SparkApplication` specification or by referring to an external `S3Bucket` custom resource. To specify S3 connection details directly as part of the `SparkApplication` resource you add an inline bucket configuration as shown below. @@ -134,7 +134,9 @@ s3bucket: # <1> inline: host: test-minio # <3> port: 9000 # <4> - secretClass: minio-credentials # <5> + accessStyle: Path + credentials: + secretClass: s3-credentials-class # <5> ---- <1> Entry point for the bucket configuration. <2> Bucket name. @@ -166,7 +168,9 @@ spec: inline: host: test-minio port: 9000 - secretClass: minio-credentials + accessStyle: Path + credentials: + secretClass: minio-credentials-class ---- This has the advantage that bucket configuration can be shared across `SparkApplication`s and reduces the cost of updating these details. diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml index 5fd7bc60..b6f795ba 100644 --- a/rust/crd/Cargo.toml +++ b/rust/crd/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/stackabletech/spark-k8s-operator" version = "0.3.0-nightly" [dependencies] -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.19.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.21.0" } semver = "1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/rust/crd/src/constants.rs b/rust/crd/src/constants.rs index 47f03d31..ffe871e3 100644 --- a/rust/crd/src/constants.rs +++ b/rust/crd/src/constants.rs @@ -17,7 +17,6 @@ pub const CONTAINER_NAME_DRIVER: &str = "spark-driver"; pub const CONTAINER_IMAGE_NAME_EXECUTOR: &str = "dummy-overwritten-by-command-line"; pub const CONTAINER_NAME_EXECUTOR: &str = "spark-executor"; -pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; -pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; pub const ACCESS_KEY_ID: &str = "accessKeyId"; pub const SECRET_ACCESS_KEY: &str = "secretAccessKey"; +pub const S3_SECRET_DIR_NAME: &str = "/stackable/secrets"; diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index fbe7bf18..a586814b 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -4,10 +4,11 @@ pub mod constants; use constants::*; use stackable_operator::builder::VolumeBuilder; -use stackable_operator::commons::s3::{InlinedS3BucketSpec, S3BucketDef}; +use stackable_operator::commons::s3::{ + InlinedS3BucketSpec, S3AccessStyle, S3BucketDef, S3ConnectionSpec, +}; use stackable_operator::k8s_openapi::api::core::v1::{ - EmptyDirVolumeSource, EnvVar, EnvVarSource, LocalObjectReference, SecretKeySelector, Volume, - VolumeMount, + EmptyDirVolumeSource, EnvVar, LocalObjectReference, Volume, VolumeMount, }; use std::collections::{BTreeMap, HashMap}; @@ -166,7 +167,7 @@ impl SparkApplication { .map(|req| req.join(" ")) } - pub fn volumes(&self) -> Vec { + pub fn volumes(&self, s3bucket: &Option) -> Vec { let mut result: Vec = self .spec .volumes @@ -191,11 +192,24 @@ impl SparkApplication { .build(), ); } + + let s3_conn = s3bucket.as_ref().and_then(|i| i.connection.as_ref()); + + if let Some(S3ConnectionSpec { + credentials: Some(credentials), + .. + }) = s3_conn + { + result.push(credentials.to_volume("s3-credentials")); + } result } - pub fn executor_volume_mounts(&self) -> Vec { - let mut result: Vec = self + pub fn executor_volume_mounts( + &self, + s3bucket: &Option, + ) -> Vec { + let result: Vec = self .spec .executor .as_ref() @@ -205,27 +219,11 @@ impl SparkApplication { .cloned() .collect(); - if self.spec.image.is_some() { - result.push(VolumeMount { - name: VOLUME_MOUNT_NAME_JOB.into(), - mount_path: VOLUME_MOUNT_PATH_JOB.into(), - ..VolumeMount::default() - }); - } - - if self.requirements().is_some() { - result.push(VolumeMount { - name: VOLUME_MOUNT_NAME_REQ.into(), - mount_path: VOLUME_MOUNT_PATH_REQ.into(), - ..VolumeMount::default() - }); - } - - result + self.add_common_volume_mounts(result, s3bucket) } - pub fn driver_volume_mounts(&self) -> Vec { - let mut result: Vec = self + pub fn driver_volume_mounts(&self, s3bucket: &Option) -> Vec { + let result: Vec = self .spec .driver .as_ref() @@ -234,22 +232,43 @@ impl SparkApplication { .flat_map(|v| v.iter()) .cloned() .collect(); + + self.add_common_volume_mounts(result, s3bucket) + } + + fn add_common_volume_mounts( + &self, + mut mounts: Vec, + s3bucket: &Option, + ) -> Vec { if self.spec.image.is_some() { - result.push(VolumeMount { + mounts.push(VolumeMount { name: VOLUME_MOUNT_NAME_JOB.into(), mount_path: VOLUME_MOUNT_PATH_JOB.into(), ..VolumeMount::default() }); } - if self.requirements().is_some() { - result.push(VolumeMount { + mounts.push(VolumeMount { name: VOLUME_MOUNT_NAME_REQ.into(), mount_path: VOLUME_MOUNT_PATH_REQ.into(), ..VolumeMount::default() }); } - result + let s3_conn = s3bucket.as_ref().and_then(|i| i.connection.as_ref()); + + if let Some(S3ConnectionSpec { + credentials: Some(_credentials), + .. + }) = s3_conn + { + mounts.push(VolumeMount { + name: "s3-credentials".into(), + mount_path: S3_SECRET_DIR_NAME.into(), + ..VolumeMount::default() + }); + } + mounts } pub fn recommended_labels(&self) -> BTreeMap { @@ -273,7 +292,9 @@ impl SparkApplication { let mode = self.mode().context(ObjectHasNoDeployModeSnafu)?; let name = self.metadata.name.clone().context(ObjectHasNoNameSnafu)?; - let mut submit_cmd = vec![ + let mut submit_cmd: Vec = vec![]; + + submit_cmd.extend(vec![ "/stackable/spark/bin/spark-submit".to_string(), "--verbose".to_string(), "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}".to_string(), @@ -287,24 +308,36 @@ impl SparkApplication { format!("--conf spark.kubernetes.driver.container.image={}", self.spec.spark_image.as_ref().context(NoSparkImageSnafu)?), format!("--conf spark.kubernetes.executor.container.image={}", self.spec.spark_image.as_ref().context(NoSparkImageSnafu)?), format!("--conf spark.kubernetes.authenticate.driver.serviceAccountName={}", serviceaccount_name), - ]; + ]); // See https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management // for possible S3 related properties if let Some(endpoint) = s3bucket.as_ref().and_then(|s3| s3.endpoint()) { submit_cmd.push(format!("--conf spark.hadoop.fs.s3a.endpoint={}", endpoint)); } - if s3bucket.as_ref().and_then(|s3| s3.secret_class()).is_some() { - // We don't use the secret at all here, instead we assume the Self::env() has been - // called and this environment variables are availables. - submit_cmd.push(format!( - "--conf spark.hadoop.fs.s3a.access.key=${}", - ENV_AWS_ACCESS_KEY_ID - )); - submit_cmd.push(format!( - "--conf spark.hadoop.fs.s3a.secret.key=${}", - ENV_AWS_SECRET_ACCESS_KEY - )); + + if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { + match conn.access_style { + Some(S3AccessStyle::Path) => { + submit_cmd + .push("--conf spark.hadoop.fs.s3a.path.style.access=true".to_string()); + } + Some(S3AccessStyle::VirtualHosted) => {} + None => {} + } + if conn.credentials.as_ref().is_some() { + // We don't use the credentials at all here but assume they are available + submit_cmd.push(format!( + "--conf spark.hadoop.fs.s3a.access.key=$(cat {secret_dir}/{file_name})", + secret_dir = S3_SECRET_DIR_NAME, + file_name = ACCESS_KEY_ID + )); + submit_cmd.push(format!( + "--conf spark.hadoop.fs.s3a.secret.key=$(cat {secret_dir}/{file_name})", + secret_dir = S3_SECRET_DIR_NAME, + file_name = SECRET_ACCESS_KEY + )); + } } // conf arguments that are not driver or executor specific @@ -350,23 +383,9 @@ impl SparkApplication { Ok(submit_cmd) } - pub fn env(&self, s3bucket: &Option) -> Vec { + pub fn env(&self) -> Vec { let tmp = self.spec.env.as_ref(); let mut e: Vec = tmp.iter().flat_map(|e| e.iter()).cloned().collect(); - if let Some(s3) = s3bucket { - if let Some(secret) = s3.secret_class() { - e.push(Self::env_var_from_secret( - ENV_AWS_ACCESS_KEY_ID, - secret.as_ref(), - ACCESS_KEY_ID, - )); - e.push(Self::env_var_from_secret( - ENV_AWS_SECRET_ACCESS_KEY, - secret.as_ref(), - SECRET_ACCESS_KEY, - )); - } - } if self.requirements().is_some() { e.push(EnvVar { name: "PYTHONPATH".to_string(), @@ -379,21 +398,6 @@ impl SparkApplication { e } - fn env_var_from_secret(var_name: &str, secret: &str, secret_key: &str) -> EnvVar { - EnvVar { - name: String::from(var_name), - value_from: Some(EnvVarSource { - secret_key_ref: Some(SecretKeySelector { - name: Some(String::from(secret)), - key: String::from(secret_key), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - } - } - pub fn driver_node_selector(&self) -> Option> { self.spec .driver @@ -481,6 +485,10 @@ mod tests { use crate::ImagePullPolicy; use crate::LocalObjectReference; use crate::SparkApplication; + use stackable_operator::commons::s3::{ + S3AccessStyle, S3BucketSpec, S3ConnectionDef, S3ConnectionSpec, + }; + use stackable_operator::commons::tls::{Tls, TlsVerification}; use std::str::FromStr; #[test] @@ -711,4 +719,36 @@ spec: ImagePullPolicy::from_str("IfNotPresent").unwrap() ); } + + #[test] + fn test_ser_inline() { + let bucket = S3BucketSpec { + bucket_name: Some("test-bucket-name".to_owned()), + connection: Some(S3ConnectionDef::Inline(S3ConnectionSpec { + host: Some("host".to_owned()), + port: Some(8080), + credentials: None, + access_style: Some(S3AccessStyle::VirtualHosted), + tls: Some(Tls { + verification: TlsVerification::None {}, + }), + })), + }; + + assert_eq!( + serde_yaml::to_string(&bucket).unwrap(), + "--- +bucketName: test-bucket-name +connection: + inline: + host: host + port: 8080 + accessStyle: VirtualHosted + tls: + verification: + none: {} +" + .to_owned() + ) + } } diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index a291dacc..8fba7c3e 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/stackabletech/spark-k8s-operator" version = "0.3.0-nightly" [dependencies] -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.19.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.21.0" } stackable-spark-k8s-crd = { path = "../crd" } anyhow = "1.0" clap = { version = "3.2", features = ["derive"] } @@ -23,5 +23,5 @@ tracing-futures = { version = "0.2", features = ["futures-03"] } [build-dependencies] built = { version = "0.5", features = ["chrono", "git2"] } -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.19.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag="0.21.0" } stackable-spark-k8s-crd = { path = "../crd" } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 09e8058e..71411a8e 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -1,6 +1,10 @@ use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_operator::builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder}; +use stackable_operator::builder::{ + ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodSecurityContextBuilder, +}; +use stackable_operator::commons::s3::InlinedS3BucketSpec; +use stackable_operator::commons::tls::{CaCert, TlsVerification}; use stackable_operator::k8s_openapi::api::batch::v1::{Job, JobSpec}; use stackable_operator::k8s_openapi::api::core::v1::{ ConfigMap, ConfigMapVolumeSource, Container, EmptyDirVolumeSource, EnvVar, Pod, PodSpec, @@ -71,6 +75,10 @@ pub enum Error { S3Bucket { source: stackable_operator::error::Error, }, + #[snafu(display("tls non-verification not supported"))] + S3TlsNoVerificationNotSupported, + #[snafu(display("ca-cert verification not supported"))] + S3TlsCaVerificationNotSupported, } type Result = std::result::Result; @@ -98,6 +106,29 @@ pub async fn reconcile( _ => None, }; + if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { + if let Some(tls) = &conn.tls { + match &tls.verification { + TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), + TlsVerification::Server(server_verification) => { + match &server_verification.ca_cert { + CaCert::WebPki {} => {} + CaCert::SecretClass(_) => { + return S3TlsCaVerificationNotSupportedSnafu.fail() + } + } + } + } + } + } + + if let Some(conn) = s3bucket.as_ref().and_then(|i| i.connection.as_ref()) { + if conn.tls.as_ref().is_some() { + tracing::warn!("The resource indicates S3-access should use TLS: TLS-verification has not yet been implemented \ + but an HTTPS-endpoint will be used!"); + } + } + let (serviceaccount, rolebinding) = build_spark_role_serviceaccount(&spark_application)?; client .apply_patch(FIELD_MANAGER_SCOPE, &serviceaccount, &serviceaccount) @@ -144,14 +175,18 @@ pub async fn reconcile( container_builder.build() }); - let env_vars = spark_application.env(&s3bucket); + let env_vars = spark_application.env(); let init_containers: Vec = vec![job_container.clone(), requirements_container.clone()] .into_iter() .flatten() .collect(); - let pod_template_config_map = - pod_template_config_map(&spark_application, init_containers.as_ref(), &env_vars)?; + let pod_template_config_map = pod_template_config_map( + &spark_application, + init_containers.as_ref(), + &env_vars, + &s3bucket, + )?; client .apply_patch( FIELD_MANAGER_SCOPE, @@ -172,6 +207,7 @@ pub async fn reconcile( &job_container, &env_vars, &job_commands, + &s3bucket, )?; client .apply_patch(FIELD_MANAGER_SCOPE, &job, &job) @@ -190,18 +226,21 @@ fn pod_template( env: &[EnvVar], node_selector: Option>, ) -> Result { - let mut container = ContainerBuilder::new(container_name); - container - .add_volume_mounts(volume_mounts.to_vec()) + let mut cb = ContainerBuilder::new(container_name); + cb.add_volume_mounts(volume_mounts.to_vec()) .add_env_vars(env.to_vec()); if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { - container.image_pull_policy(image_pull_policy.to_string()); + cb.image_pull_policy(image_pull_policy.to_string()); } let mut pod_spec = PodSpec { - containers: vec![container.build()], + containers: vec![cb.build()], volumes: Some(volumes.to_vec()), + security_context: PodSecurityContextBuilder::new() + .fs_group(1000) + .build() + .into(), // Needed for secret-operator ..PodSpec::default() }; @@ -228,15 +267,16 @@ fn pod_template_config_map( spark_application: &SparkApplication, init_containers: &[Container], env: &[EnvVar], + s3bucket: &Option, ) -> Result { - let volumes = spark_application.volumes(); + let volumes = spark_application.volumes(s3bucket); let driver_template = pod_template( spark_application, CONTAINER_NAME_DRIVER, init_containers, volumes.as_ref(), - spark_application.driver_volume_mounts().as_ref(), + spark_application.driver_volume_mounts(s3bucket).as_ref(), env, spark_application.driver_node_selector(), )?; @@ -245,7 +285,7 @@ fn pod_template_config_map( CONTAINER_NAME_EXECUTOR, init_containers, volumes.as_ref(), - spark_application.executor_volume_mounts().as_ref(), + spark_application.executor_volume_mounts(s3bucket).as_ref(), env, spark_application.executor_node_selector(), )?; @@ -279,13 +319,14 @@ fn spark_job( job_container: &Option, env: &[EnvVar], job_commands: &[String], + s3bucket: &Option, ) -> Result { let mut volume_mounts = vec![VolumeMount { name: VOLUME_MOUNT_NAME_POD_TEMPLATES.into(), mount_path: VOLUME_MOUNT_PATH_POD_TEMPLATES.into(), ..VolumeMount::default() }]; - volume_mounts.extend(spark_application.driver_volume_mounts()); + volume_mounts.extend(spark_application.driver_volume_mounts(s3bucket)); if job_container.is_some() { volume_mounts.push(VolumeMount { name: VOLUME_MOUNT_NAME_JOB.into(), @@ -294,15 +335,10 @@ fn spark_job( }) } - let mut container = ContainerBuilder::new("spark-submit"); - container - .image(spark_image) - .command(vec!["/bin/bash".to_string()]) - .args(vec![ - "-c".to_string(), - "-x".to_string(), - job_commands.join(" "), - ]) + let mut cb = ContainerBuilder::new("spark-submit"); + cb.image(spark_image) + .command(vec!["/bin/sh".to_string()]) + .args(vec!["-c".to_string(), job_commands.join(" ")]) .add_volume_mounts(volume_mounts) .add_env_vars(env.to_vec()) // TODO: move this to the image @@ -313,7 +349,7 @@ fn spark_job( }]); if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() { - container.image_pull_policy(image_pull_policy.to_string()); + cb.image_pull_policy(image_pull_policy.to_string()); } let mut volumes = vec![Volume { @@ -324,7 +360,7 @@ fn spark_job( }), ..Volume::default() }]; - volumes.extend(spark_application.volumes()); + volumes.extend(spark_application.volumes(s3bucket)); if job_container.is_some() { volumes.push(Volume { @@ -342,12 +378,16 @@ fn spark_job( .build(), ), spec: Some(PodSpec { - containers: vec![container.build()], + containers: vec![cb.build()], init_containers: job_container.as_ref().map(|c| vec![c.clone()]), restart_policy: Some("Never".to_string()), service_account_name: serviceaccount.metadata.name.clone(), volumes: Some(volumes), image_pull_secrets: spark_application.spark_image_pull_secrets(), + security_context: PodSecurityContextBuilder::new() + .fs_group(1000) + .build() + .into(), // Needed for secret-operator ..PodSpec::default() }), }; diff --git a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 index 4eb813cf..07e3cf17 100644 --- a/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-ny-public-s3/10-deploy-spark-app.yaml.j2 @@ -34,9 +34,9 @@ spec: inline: host: test-minio port: 9000 + accessStyle: Path sparkConf: spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" - spark.hadoop.fs.s3a.path.style.access: "true" spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-{{ test_scenario['values']['hadoop'] }}.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-{{ test_scenario['values']['hadoop'] }}.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" driver: diff --git a/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml b/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml index 2e32aff6..0b9c799a 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml +++ b/tests/templates/kuttl/spark-pi-private-s3/00-s3-secret.yaml @@ -3,7 +3,19 @@ apiVersion: v1 kind: Secret metadata: name: minio-credentials + labels: + secrets.stackable.tech/class: s3-credentials-class timeout: 240 stringData: accessKeyId: minioAccessKey secretAccessKey: minioSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: s3-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} diff --git a/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 index 2fa270ba..2ced72ce 100644 --- a/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-private-s3/10-deploy-spark-app.yaml.j2 @@ -16,14 +16,15 @@ spec: inline: host: test-minio port: 9000 - secretClass: minio-credentials + accessStyle: Path + credentials: + secretClass: s3-credentials-class volumes: - name: spark-pi-deps persistentVolumeClaim: claimName: spark-pi-pvc sparkConf: spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" - spark.hadoop.fs.s3a.path.style.access: "true" spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" driver: diff --git a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 index 427205fe..f7143249 100644 --- a/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-pi-public-s3/10-deploy-spark-app.yaml.j2 @@ -21,9 +21,9 @@ spec: inline: host: test-minio port: 9000 + accessStyle: Path sparkConf: spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider" - spark.hadoop.fs.s3a.path.style.access: "true" spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-{{ test_scenario['values']['hadoop'] }}.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-{{ test_scenario['values']['hadoop'] }}.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar" driver: