diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 61959ca2a304..0671cde1be70 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -53,7 +53,7 @@ function build { # contain a lot of duplicated jars with the main Spark directory. In a proper distribution, # the examples directory is cleaned up before generating the distribution tarball, so this # issue does not occur. - IMG_PATH=resource-managers/kubernetes/docker/src/main/dockerfiles + IMG_PATH=resource-managers/kubernetes/docker/src JARS=assembly/target/scala-$SPARK_SCALA_VERSION/jars BUILD_ARGS=( ${BUILD_PARAMS} @@ -68,7 +68,7 @@ function build { ) else # Not passed as arguments to docker, but used to validate the Spark directory. - IMG_PATH="kubernetes/dockerfiles" + IMG_PATH="kubernetes/src" JARS=jars BUILD_ARGS=(${BUILD_PARAMS}) fi @@ -91,10 +91,11 @@ function build { --build-arg base_img=$(image_ref spark) ) - local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} - local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} - local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"} + local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/Dockerfile"} + local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/bindings/python/Dockerfile"} + local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/bindings/R/Dockerfile"} + # Spark Base docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ -f "$BASEDOCKERFILE" . @@ -102,12 +103,15 @@ function build { error "Failed to build Spark JVM Docker image, please refer to Docker build output for details." fi + # PySpark docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ -t $(image_ref spark-py) \ -f "$PYDOCKERFILE" . if [ $? -ne 0 ]; then error "Failed to build PySpark Docker image, please refer to Docker build output for details." fi + + # SparkR docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ -t $(image_ref spark-r) \ -f "$RDOCKERFILE" . diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 84f4ae9a64ff..7beff7552d3a 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -191,7 +191,8 @@ fi # Only create and copy the dockerfiles directory if the kubernetes artifacts were built. if [ -d "$SPARK_HOME"/resource-managers/kubernetes/core/target/ ]; then mkdir -p "$DISTDIR/kubernetes/" - cp -a "$SPARK_HOME"/resource-managers/kubernetes/docker/src/main/dockerfiles "$DISTDIR/kubernetes/" + cp -a "$SPARK_HOME"/resource-managers/kubernetes/docker/src "$DISTDIR/kubernetes/" + cp -a "$SPARK_HOME"/resource-managers/kubernetes/integration-tests/scripts "$DISTDIR/kubernetes/" cp -a "$SPARK_HOME"/resource-managers/kubernetes/integration-tests/tests "$DISTDIR/kubernetes/" fi diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 5f469c30a96f..f7b214b10b6a 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -19,7 +19,7 @@ FROM openjdk:8-alpine ARG spark_jars=jars ARG example_jars=examples/jars -ARG img_path=kubernetes/dockerfiles +ARG img_path=kubernetes/src/main/dockerfiles ARG k8s_tests=kubernetes/tests # Before building the docker image, first build and make a Spark distribution following diff --git a/resource-managers/kubernetes/docker/src/test/data/people.txt b/resource-managers/kubernetes/docker/src/test/data/people.txt new file mode 100644 index 000000000000..30f7501874b6 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/data/people.txt @@ -0,0 +1,3 @@ +Michael, 29 +Andy, 30 +Justin, 19 \ No newline at end of file diff --git a/resource-managers/kubernetes/docker/src/test/dockerfiles/hadoop/Dockerfile b/resource-managers/kubernetes/docker/src/test/dockerfiles/hadoop/Dockerfile new file mode 100644 index 000000000000..d9b67dadde4c --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/dockerfiles/hadoop/Dockerfile @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM centos:7 + +ARG hadoop_version +ARG k_img_path=kubernetes/src/test + +RUN yum -y install krb5-server krb5-workstation +RUN yum -y install java-1.8.0-openjdk-headless +RUN yum -y install apache-commons-daemon-jsvc +RUN yum install net-tools -y +RUN yum install telnet telnet-server -y +RUN yum -y install which + +RUN sed -i -e 's/#//' -e 's/default_ccache_name/# default_ccache_name/' /etc/krb5.conf + +RUN useradd -u 1098 hdfs + +ADD hadoop-${hadoop_version}.tar.gz / +RUN ln -s hadoop-${hadoop_version} hadoop +RUN chown -R -L hdfs /hadoop + +COPY ${k_img_path}/hadoop/conf/ssl-server.xml /hadoop/etc/hadoop/ +COPY ${k_img_path}/hadoop/conf/yarn-site.xml /hadoop/etc/hadoop/ + +COPY ${k_img_path}/scripts/start-namenode.sh / +COPY ${k_img_path}/scripts/start-datanode.sh / +COPY ${k_img_path}/scripts/populate-data.sh / +COPY ${k_img_path}/scripts/start-kdc.sh / + +COPY ${k_img_path}/data/people.txt / diff --git a/resource-managers/kubernetes/docker/src/test/dockerfiles/spark/kerberos/Dockerfile b/resource-managers/kubernetes/docker/src/test/dockerfiles/spark/kerberos/Dockerfile new file mode 100644 index 000000000000..01d69a3c8cb3 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/dockerfiles/spark/kerberos/Dockerfile @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img + +ARG k_img_path=kubernetes/src/test + +COPY ${k_img_path}/scripts/run-kerberos-test.sh /opt/spark/ +COPY ${k_img_path}/hadoop/conf /opt/spark/hconf diff --git a/resource-managers/kubernetes/docker/src/test/hadoop/conf/ssl-server.xml b/resource-managers/kubernetes/docker/src/test/hadoop/conf/ssl-server.xml new file mode 100644 index 000000000000..45cfa1870792 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/hadoop/conf/ssl-server.xml @@ -0,0 +1,44 @@ + + + + + + + ssl.server.truststore.location + /var/keytabs/hdfs.jks + + + + ssl.server.truststore.password + changeme + + + + ssl.server.keystore.location + /var/keytabs/hdfs.jks + + + + ssl.server.keystore.password + changeme + + + + ssl.server.keystore.keypassword + changeme + + + diff --git a/resource-managers/kubernetes/docker/src/test/hadoop/conf/yarn-site.xml b/resource-managers/kubernetes/docker/src/test/hadoop/conf/yarn-site.xml new file mode 100755 index 000000000000..b8ff146d98a3 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/hadoop/conf/yarn-site.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + yarn.resourcemanager.principal + yarn/_HOST@CLUSTER.LOCAL + + diff --git a/resource-managers/kubernetes/docker/src/test/scripts/populate-data.sh b/resource-managers/kubernetes/docker/src/test/scripts/populate-data.sh new file mode 100644 index 000000000000..4e2d8f3254d3 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/scripts/populate-data.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk +export PATH=/hadoop/bin:$PATH +export HADOOP_CONF_DIR=/hadoop/etc/hadoop +export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true ${HADOOP_OPTS}" +export KRB5CCNAME=KRBCONF +mkdir -p /hadoop/etc/data +cp ${TMP_KRB_LOC} /etc/krb5.conf +cp ${TMP_CORE_LOC} /hadoop/etc/hadoop/core-site.xml +cp ${TMP_HDFS_LOC} /hadoop/etc/hadoop/hdfs-site.xml + +until kinit -kt /var/keytabs/hdfs.keytab hdfs/nn.${NAMESPACE}.svc.cluster.local; do sleep 2; done + +until (echo > /dev/tcp/nn.${NAMESPACE}.svc.cluster.local/9000) >/dev/null 2>&1; do sleep 2; done + +hdfs dfsadmin -safemode wait + + +hdfs dfs -mkdir -p /user/userone/ +hdfs dfs -copyFromLocal /people.txt /user/userone + +hdfs dfs -chmod -R 755 /user/userone +hdfs dfs -chown -R ifilonenko /user/userone diff --git a/resource-managers/kubernetes/docker/src/test/scripts/run-kerberos-test.sh b/resource-managers/kubernetes/docker/src/test/scripts/run-kerberos-test.sh new file mode 100644 index 000000000000..56542fed6622 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/scripts/run-kerberos-test.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +sed -i -e 's/#//' -e 's/default_ccache_name/# default_ccache_name/' /etc/krb5.conf +export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true" +export HADOOP_JAAS_DEBUG=true +export HADOOP_ROOT_LOGGER=DEBUG,console +cp ${TMP_KRB_LOC} /etc/krb5.conf +cp ${TMP_CORE_LOC} /opt/spark/hconf/core-site.xml +cp ${TMP_HDFS_LOC} /opt/spark/hconf/hdfs-site.xml +mkdir -p /etc/krb5.conf.d +/opt/spark/bin/spark-submit \ + --deploy-mode cluster \ + --class ${CLASS_NAME} \ + --master k8s://${MASTER_URL} \ + --conf spark.kubernetes.namespace=${NAMESPACE} \ + --conf spark.executor.instances=1 \ + --conf spark.app.name=spark-hdfs \ + --conf spark.driver.extraClassPath=/opt/spark/hconf/core-site.xml:/opt/spark/hconf/hdfs-site.xml:/opt/spark/hconf/yarn-site.xml:/etc/krb5.conf \ + --conf spark.kubernetes.container.image=${BASE_SPARK_IMAGE} \ + --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \ + --conf spark.kerberos.keytab=/var/keytabs/hdfs.keytab \ + --conf spark.kerberos.principal=hdfs/nn.${NAMESPACE}.svc.cluster.local@CLUSTER.LOCAL \ + --conf spark.kubernetes.driver.label.spark-app-locator=${APP_LOCATOR_LABEL} \ + ${SUBMIT_RESOURCE} \ + hdfs://nn.${NAMESPACE}.svc.cluster.local:9000/user/userone/people.txt diff --git a/resource-managers/kubernetes/docker/src/test/scripts/start-datanode.sh b/resource-managers/kubernetes/docker/src/test/scripts/start-datanode.sh new file mode 100644 index 000000000000..d87ea659ae64 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/scripts/start-datanode.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk +export PATH=/hadoop/bin:$PATH +export HADOOP_CONF_DIR=/hadoop/etc/hadoop +mkdir -p /hadoop/etc/data +cp ${TMP_KRB_LOC} /etc/krb5.conf +cp ${TMP_CORE_LOC} /hadoop/etc/hadoop/core-site.xml +cp ${TMP_HDFS_LOC} /hadoop/etc/hadoop/hdfs-site.xml + +until kinit -kt /var/keytabs/hdfs.keytab hdfs/nn.${NAMESPACE}.svc.cluster.local; do sleep 15; done + +echo "KDC is up and ready to go... starting up" + +kdestroy + +hdfs datanode diff --git a/resource-managers/kubernetes/docker/src/test/scripts/start-kdc.sh b/resource-managers/kubernetes/docker/src/test/scripts/start-kdc.sh new file mode 100644 index 000000000000..820ee29650b4 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/scripts/start-kdc.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk +export PATH=/hadoop/bin:$PATH +export HADOOP_CONF_DIR=/hadoop/etc/hadoop +mkdir -p /hadoop/etc/data +cp ${TMP_KRB_LOC} /etc/krb5.conf +cp ${TMP_CORE_LOC} /hadoop/etc/hadoop/core-site.xml +cp ${TMP_HDFS_LOC} /hadoop/etc/hadoop/hdfs-site.xml + +/usr/sbin/kdb5_util -P changeme create -s + + +## password only user +/usr/sbin/kadmin.local -q "addprinc -randkey userone" +/usr/sbin/kadmin.local -q "ktadd -k /var/keytabs/userone.keytab userone" + +/usr/sbin/kadmin.local -q "addprinc -randkey HTTP/server.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "ktadd -k /var/keytabs/server.keytab HTTP/server.${NAMESPACE}.svc.cluster.local" + +/usr/sbin/kadmin.local -q "addprinc -randkey hdfs/nn.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "addprinc -randkey HTTP/nn.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "addprinc -randkey hdfs/dn1.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "addprinc -randkey HTTP/dn1.${NAMESPACE}.svc.cluster.local" + +/usr/sbin/kadmin.local -q "ktadd -k /var/keytabs/hdfs.keytab hdfs/nn.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "ktadd -k /var/keytabs/hdfs.keytab HTTP/nn.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "ktadd -k /var/keytabs/hdfs.keytab hdfs/dn1.${NAMESPACE}.svc.cluster.local" +/usr/sbin/kadmin.local -q "ktadd -k /var/keytabs/hdfs.keytab HTTP/dn1.${NAMESPACE}.svc.cluster.local" + +chown hdfs /var/keytabs/hdfs.keytab + +keytool -genkey -alias nn.${NAMESPACE}.svc.cluster.local -keyalg rsa -keysize 1024 -dname "CN=nn.${NAMESPACE}.svc.cluster.local" -keypass changeme -keystore /var/keytabs/hdfs.jks -storepass changeme +keytool -genkey -alias dn1.${NAMESPACE}.svc.cluster.local -keyalg rsa -keysize 1024 -dname "CN=dn1.${NAMESPACE}.svc.cluster.local" -keypass changeme -keystore /var/keytabs/hdfs.jks -storepass changeme + +chmod 700 /var/keytabs/hdfs.jks +chown hdfs /var/keytabs/hdfs.jks + + +krb5kdc -n diff --git a/resource-managers/kubernetes/docker/src/test/scripts/start-namenode.sh b/resource-managers/kubernetes/docker/src/test/scripts/start-namenode.sh new file mode 100644 index 000000000000..d23ec8265412 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/test/scripts/start-namenode.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk +export PATH=/hadoop/bin:$PATH +export HADOOP_CONF_DIR=/hadoop/etc/hadoop +mkdir -p /hadoop/etc/data +cp ${TMP_KRB_LOC} /etc/krb5.conf +cp ${TMP_CORE_LOC} /hadoop/etc/hadoop/core-site.xml +cp ${TMP_HDFS_LOC} /hadoop/etc/hadoop/hdfs-site.xml + +until kinit -kt /var/keytabs/hdfs.keytab hdfs/nn.${NAMESPACE}.svc.cluster.local; do sleep 15; done + +echo "KDC is up and ready to go... starting up" + +kdestroy + +hdfs namenode -format +hdfs namenode diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/data-populator-job.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/data-populator-job.yml new file mode 100755 index 000000000000..294506f7965d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/data-populator-job.yml @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: batch/v1 +kind: Job +metadata: + name: data-populator +spec: + manualSelector: true + backoffLimit: 4 + selector: + matchLabels: + name: hdfs-data-populator + kerberosService: data-populator + job: kerberostest + template: + metadata: + annotations: + pod.beta.kubernetes.io/hostname: data-populator + labels: + name: hdfs-data-populator + kerberosService: data-populator + job: kerberostest + spec: + containers: + - command: ["sh"] + args: ["/populate-data.sh"] + name: data-populator + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: /var/keytabs + name: data-populator-keytab + restartPolicy: OnFailure + volumes: + - name: data-populator-keytab + persistentVolumeClaim: + claimName: server-keytab diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/dn1-service.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/dn1-service.yml new file mode 100755 index 000000000000..963e66da8015 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/dn1-service.yml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +kind: Service +metadata: + annotations: + service.alpha.kubernetes.io/tolerate-unready-endpoints: "true" + labels: + kerberosService: dn1 + job: kerberostest + name: dn1 +spec: + clusterIP: None + ports: + - protocol: TCP + port: 55555 + targetPort: 0 + selector: + kerberosService: dn1 diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/dn1-set.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/dn1-set.yml new file mode 100755 index 000000000000..77484be5c6e0 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/dn1-set.yml @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: dn1 +spec: + replicas: 1 + selector: + matchLabels: + name: hdfs-dn1 + kerberosService: dn1 + job: kerberostest + template: + metadata: + annotations: + pod.beta.kubernetes.io/hostname: dn1 + labels: + name: hdfs-dn1 + kerberosService: dn1 + job: kerberostest + spec: + containers: + - command: ["sh"] + args: ["/start-datanode.sh"] + name: dn1 + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: /var/keytabs + name: dn1-keytab + restartPolicy: Always + volumes: + - name: dn1-keytab + persistentVolumeClaim: + claimName: server-keytab diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-service.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-service.yml new file mode 100755 index 000000000000..c1ff280b2bda --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-service.yml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +kind: Service +metadata: + annotations: + service.alpha.kubernetes.io/tolerate-unready-endpoints: "true" + labels: + kerberosService: kerberos + job: kerberostest + name: kerberos +spec: + clusterIP: None + ports: + - protocol: TCP + port: 55555 + targetPort: 0 + selector: + kerberosService: kerberos diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-set.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-set.yml new file mode 100755 index 000000000000..5000e115c364 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-set.yml @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: kerberos +spec: + replicas: 1 + selector: + matchLabels: + name: hdfs-kerberos + kerberosService: kerberos + job: kerberostest + template: + metadata: + annotations: + pod.beta.kubernetes.io/hostname: kerberos + labels: + name: hdfs-kerberos + kerberosService: kerberos + job: kerberostest + spec: + containers: + - command: ["sh"] + args: ["/start-kdc.sh"] + name: kerberos + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: /var/keytabs + name: kerb-keytab + restartPolicy: Always + volumes: + - name: kerb-keytab + persistentVolumeClaim: + claimName: server-keytab diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-test.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-test.yml new file mode 100755 index 000000000000..493de8ce6ba7 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/kerberos-test.yml @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: batch/v1 +kind: Job +metadata: + name: kerberos-test +spec: + manualSelector: true + backoffLimit: 1 + selector: + matchLabels: + name: kerberos-test + template: + metadata: + labels: + name: kerberos-test + spec: + containers: + - command: ["/bin/bash"] + args: ["/opt/spark/run-kerberos-test.sh"] + name: kerberos-test + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: /var/keytabs + name: kerberos-test-keytab + restartPolicy: OnFailure + volumes: + - name: kerberos-test-keytab + persistentVolumeClaim: + claimName: server-keytab diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/nn-service.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/nn-service.yml new file mode 100755 index 000000000000..7d6cfd415a28 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/nn-service.yml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +kind: Service +metadata: + annotations: + service.alpha.kubernetes.io/tolerate-unready-endpoints: "true" + labels: + kerberosService: nn + job: kerberostest + name: nn +spec: + clusterIP: None + ports: + - protocol: TCP + port: 9000 + targetPort: 9000 + selector: + kerberosService: nn diff --git a/resource-managers/kubernetes/integration-tests/kerberos-yml/nn-set.yml b/resource-managers/kubernetes/integration-tests/kerberos-yml/nn-set.yml new file mode 100755 index 000000000000..9329edc79164 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/kerberos-yml/nn-set.yml @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: nn +spec: + replicas: 1 + selector: + matchLabels: + name: hdfs-nn + kerberosService: nn + job: kerberostest + template: + metadata: + annotations: + pod.beta.kubernetes.io/hostname: nn + labels: + name: hdfs-nn + kerberosService: nn + job: kerberostest + spec: + containers: + - command: ["sh"] + args: ["/start-namenode.sh"] + name: nn + ports: + - containerPort: 9000 + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: /var/keytabs + name: nn-keytab + restartPolicy: Always + volumes: + - name: nn-keytab + persistentVolumeClaim: + claimName: server-keytab + - name: nn-hadoop + persistentVolumeClaim: + claimName: nn-hadoop diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index a07fe1feea3e..63793f8e76ff 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -26,14 +26,16 @@ spark-kubernetes-integration-tests_2.11 - 1.3.0 + 1.4.1 1.4.0 4.1.0 + 2.7.7 3.2.2 1.0 kubernetes-integration-tests ${project.build.directory}/spark-dist-unpacked + ${project.build.directory}/hadoop-dist-loc N/A ${project.build.directory}/imageTag.txt minikube @@ -71,6 +73,25 @@ + + com.googlecode.maven-download-plugin + download-maven-plugin + ${download-maven-plugin.version} + + + install-hadoop-distribution + pre-integration-test + + wget + + + http://apache.mirrors.lucidnetworks.net/hadoop/common/hadoop-${hadoop-common-version}/hadoop-${hadoop-common-version}.tar.gz + ${spark.kubernetes.test.hadoopTgz} + hadoop-${hadoop-common-version}.tar.gz + + + + org.codehaus.mojo exec-maven-plugin @@ -102,12 +123,17 @@ --spark-tgz ${spark.kubernetes.test.sparkTgz} + + --hadoop-tgz + ${spark.kubernetes.test.hadoopTgz}/hadoop-${hadoop-common-version}.tar.gz + + --hadoop-version + ${hadoop-common-version} - org.apache.maven.plugins maven-surefire-plugin diff --git a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh index ccfb8e767c52..31633145ea89 100755 --- a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh +++ b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh @@ -23,6 +23,8 @@ DEPLOY_MODE="minikube" IMAGE_REPO="docker.io/kubespark" IMAGE_TAG="N/A" SPARK_TGZ="N/A" +HADOOP_TGZ="N/A" +HADOOP_VERSION="N/A" # Parse arguments while (( "$#" )); do @@ -51,6 +53,14 @@ while (( "$#" )); do SPARK_TGZ="$2" shift ;; + --hadoop-tgz) + HADOOP_TGZ="$2" + shift + ;; + --hadoop-version) + HADOOP_VERSION="$2" + shift + ;; *) break ;; @@ -63,6 +73,16 @@ then echo "Must specify a Spark tarball to build Docker images against with --spark-tgz." && exit 1; fi +if [[ $HADOOP_TGZ == "N/A" ]]; +then + echo "Must specify a Hadoop tarball to build hadoop Docker images against with --hadoop-tgz." && exit 1; +fi + +if [[ $HADOOP_VERSION == "N/A" ]]; +then + echo "Must specify a Hadoop version with --hadoop-version." && exit 1; +fi + rm -rf $UNPACKED_SPARK_TGZ mkdir -p $UNPACKED_SPARK_TGZ tar -xzvf $SPARK_TGZ --strip-components=1 -C $UNPACKED_SPARK_TGZ; @@ -71,6 +91,7 @@ if [[ $IMAGE_TAG == "N/A" ]]; then IMAGE_TAG=$(uuidgen); cd $UNPACKED_SPARK_TGZ + cp $HADOOP_TGZ $UNPACKED_SPARK_TGZ/ if [[ $DEPLOY_MODE == cloud ]] ; then $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t $IMAGE_TAG build @@ -83,6 +104,8 @@ then else # -m option for minikube. $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -m -r $IMAGE_REPO -t $IMAGE_TAG build + chmod +x $UNPACKED_SPARK_TGZ/kubernetes/scripts/setup-krb-integration-test-env.sh + $UNPACKED_SPARK_TGZ/kubernetes/scripts/setup-krb-integration-test-env.sh -r $IMAGE_REPO -t $IMAGE_TAG -v $HADOOP_VERSION fi cd - fi diff --git a/resource-managers/kubernetes/integration-tests/scripts/setup-krb-integration-test-env.sh b/resource-managers/kubernetes/integration-tests/scripts/setup-krb-integration-test-env.sh new file mode 100644 index 000000000000..dc0cdecbffd4 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/scripts/setup-krb-integration-test-env.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function error { + echo "$@" 1>&2 + exit 1 +} + +function image_ref { + local image="$1" + local add_repo="${2:-1}" + if [ -n "$REPO" ]; then + image="$REPO/$image" + fi + if [ -n "$TAG" ]; then + image="$image:$TAG" + fi + echo "$image" +} + +function build { + local IMG_PATH="kubernetes/src" + + if [ ! -d "$IMG_PATH" ]; then + error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark." + fi + + local KRB_BUILD_ARGS=( + --build-arg + base_img=$(image_ref spark) + ) + local HADOOP_BUILD_ARGS=( + --build-arg + hadoop_version="$HVERSION" + ) + local HDOCKERFILE="$IMG_PATH/test/dockerfiles/hadoop/Dockerfile" + local KDOCKERFILE="$IMG_PATH/test/dockerfiles/spark/kerberos/Dockerfile" + + docker build $NOCACHEARG "${HADOOP_BUILD_ARGS[@]}" \ + -t $(image_ref hadoop-base) \ + -f "$HDOCKERFILE" . + + docker build $NOCACHEARG "${KRB_BUILD_ARGS[@]}" \ + -t $(image_ref spark-kerberos) \ + -f "$KDOCKERFILE" . +} + +REPO= +TAG= +NOCACHEARG= +HVERSION= +while getopts r:t:v:n: option +do + case "${option}" + in + r) REPO=${OPTARG};; + t) TAG=${OPTARG};; + v) HVERSION=${OPTARG};; + n) NOCACHEARG="--no-cache";; + esac +done + +eval $(minikube docker-env) +build diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KerberosTestSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KerberosTestSuite.scala new file mode 100755 index 000000000000..5dea0f998b4c --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KerberosTestSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.scalatest.concurrent.Eventually + +import org.apache.spark.deploy.k8s.integrationtest.KerberosTestSuite._ +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} +import org.apache.spark.deploy.k8s.integrationtest.kerberos._ + +private[spark] trait KerberosTestSuite { k8sSuite: KubernetesSuite => + + test("Secure HDFS test with HDFS keytab (Cluster Mode)", k8sTestTag) { + val kubernetesClient = kubernetesTestComponents.kubernetesClient + + // Launches single-noded psuedo-distributed kerberized hadoop cluster + kerberizedHadoopClusterLauncher.launchKerberizedCluster(kerberosUtils) + + // Launches Kerberos test + val driverWatcherCache = new KerberosJobWatcherCache( + kerberosUtils, + Map("spark-app-locator" -> appLocator)) + + driverWatcherCache.deploy( + kerberosUtils.getKerberosTest( + containerLocalSparkDistroExamplesJar, + HDFS_TEST_CLASS, + appLocator, + KERB_YAML_LOCATION) + ) + driverWatcherCache.stopWatch() + + val expectedLogOnCompletion = Seq( + "File contents: [Michael,", + "Returned length(s) of: 3.0") + val driverPod = kubernetesClient + .pods() + .inNamespace(kubernetesTestComponents.namespace) + .withLabel("spark-app-locator", appLocator) + .list() + .getItems + .get(0) + Eventually.eventually(TIMEOUT, INTERVAL) { + expectedLogOnCompletion.foreach { e => + assert(kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } + } + } +} + +private[spark] object KerberosTestSuite { + val HDFS_TEST_CLASS = "org.apache.spark.examples.HdfsTest" + val KERB_YAML_LOCATION = "kerberos-test" +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index e2e5880255e2..e3044fc85caf 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -34,11 +34,13 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.deploy.k8s.integrationtest.kerberos.{KerberizedHadoopClusterLauncher, KerberosUtils} import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite + with KerberosTestSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -46,6 +48,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite private var sparkHomeDir: Path = _ private var pyImage: String = _ private var rImage: String = _ + private var hImage: String = _ + private var kImage: String = _ protected var image: String = _ protected var testBackend: IntegrationTestBackend = _ @@ -54,6 +58,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite protected var sparkAppConf: SparkAppConf = _ protected var containerLocalSparkDistroExamplesJar: String = _ protected var appLocator: String = _ + // Kerberos related testing + protected var kerberizedHadoopClusterLauncher: KerberizedHadoopClusterLauncher = _ + protected var kerberosUtils: KerberosUtils = _ // Default memory limit is 1024M + 384M (minimum overhead constant) private val baseMemory = s"${1024 + 384}Mi" @@ -87,6 +94,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite image = s"$imageRepo/spark:$imageTag" pyImage = s"$imageRepo/spark-py:$imageTag" rImage = s"$imageRepo/spark-r:$imageTag" + hImage = s"$imageRepo/hadoop-base:$imageTag" + kImage = s"$imageRepo/spark-kerberos:$imageTag" val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) .toFile @@ -96,6 +105,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite testBackend = IntegrationTestBackendFactory.getTestBackend testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) + kerberizedHadoopClusterLauncher = new KerberizedHadoopClusterLauncher( + KERBEROS_LABEL, + kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace), + kubernetesTestComponents.namespace) + kerberosUtils = new KerberosUtils( + image, + hImage, + kImage, + kubernetesTestComponents.serviceAccountName, + kubernetesTestComponents.kubernetesClient, + kubernetesTestComponents.namespace) } override def afterAll(): Unit = { @@ -124,6 +144,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite kubernetesTestComponents.deleteNamespace() } deleteDriverPod() + deleteKubernetesPVs() } protected def runSparkPiAndVerifyCompletion( @@ -344,6 +365,26 @@ private[spark] class KubernetesSuite extends SparkFunSuite .get() == null) } } + + private def deleteKubernetesPVs(): Unit = { + // Temporary hack until client library for fabric8 is updated to get around + // the NPE that comes about when I do .list().getItems().asScala + try { + val pvList = kubernetesTestComponents.kubernetesClient + .persistentVolumes().withLabels(KERBEROS_LABEL.asJava) + .list().getItems.asScala + if (pvList.nonEmpty) { + kubernetesTestComponents.kubernetesClient + .persistentVolumes().withLabels(KERBEROS_LABEL.asJava).delete() + } + Eventually.eventually(TIMEOUT, INTERVAL) { + kubernetesTestComponents.kubernetesClient + .persistentVolumes().withLabels(KERBEROS_LABEL.asJava) + .list().getItems.asScala.isEmpty should be (true) } + } catch { + case ex: java.lang.NullPointerException => + } + } } private[spark] object KubernetesSuite { @@ -353,4 +394,5 @@ private[spark] object KubernetesSuite { val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val KERBEROS_LABEL = Map("job" -> "kerberostest") } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index 6494cbc18f33..af85aae21678 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube -import java.io.File import java.nio.file.Paths import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index cb9324179d70..921dc1685795 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -17,10 +17,15 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, KERBEROS_LABEL, TIMEOUT} import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend -private[spark] object MinikubeTestBackend extends IntegrationTestBackend { +private[spark] object MinikubeTestBackend + extends IntegrationTestBackend with Eventually with Matchers { private var defaultClient: DefaultKubernetesClient = _ diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/ContainerNameEqualityPredicate.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/ContainerNameEqualityPredicate.scala new file mode 100644 index 000000000000..aaed74a6533f --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/ContainerNameEqualityPredicate.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import java.lang.Boolean + +import io.fabric8.kubernetes.api.builder.Predicate +import io.fabric8.kubernetes.api.model.ContainerBuilder + +private[spark] class ContainerNameEqualityPredicate(containerName: String) + extends Predicate[ContainerBuilder] { + override def apply(item: ContainerBuilder): Boolean = { + item.getName == containerName + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberizedHadoopClusterLauncher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberizedHadoopClusterLauncher.scala new file mode 100755 index 000000000000..99eeccac36ea --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberizedHadoopClusterLauncher.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.internal.Logging + +/** + * This class is responsible for launching a psuedo-distributed, single noded, + * kerberized, Hadoop cluster to test secure HDFS interaction. Because each node: + * kdc, data node, and name node rely on Persistent Volumes and Config Maps to be set, + * and a particular order in pod-launching, this class leverages Watchers and thread locks + * to ensure that order is always preserved and the cluster is the same for every run. + */ +private[spark] class KerberizedHadoopClusterLauncher( + labels: Map[String, String], + kubernetesClient: KubernetesClient, + namespace: String) + extends Logging { + + def launchKerberizedCluster(kerberosUtils: KerberosUtils): Unit = { + // These Utils allow for each step in this launch process to re-use + // common functionality for setting up hadoop nodes. + // Launches persistent volumes and its claims for sharing keytabs across pods + val pvWatcherCache = new KerberosPVWatcherCache(kerberosUtils, labels) + pvWatcherCache.deploy(kerberosUtils.getNNStorage) + pvWatcherCache.deploy(kerberosUtils.getKTStorage) + pvWatcherCache.stopWatch() + + // Launches config map for the files in HADOOP_CONF_DIR + val cmWatcherCache = new KerberosCMWatcherCache(kerberosUtils) + cmWatcherCache.deploy(kerberosUtils.getConfigMap) + cmWatcherCache.stopWatch() + + // Launches the Hadoop cluster pods: KDC --> NN --> DN1 + val podWatcherCache = new KerberosPodWatcherCache(kerberosUtils, labels) + podWatcherCache.deploy(kerberosUtils.getKDC) + podWatcherCache.deploy(kerberosUtils.getNN) + podWatcherCache.deploy(kerberosUtils.getDN) + podWatcherCache.stopWatch() + + // Launch the Data populator pod to populate HDFS + val jobWatcherCache = new KerberosJobWatcherCache(kerberosUtils, labels) + jobWatcherCache.deploy(kerberosUtils.getDP) + jobWatcherCache.stopWatch() + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosCMWatcherCache.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosCMWatcherCache.scala new file mode 100755 index 000000000000..709c613fdc8d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosCMWatcherCache.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import io.fabric8.kubernetes.api.model.ConfigMap +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, TIMEOUT} +import org.apache.spark.internal.Logging + +/** + * This class is responsible for ensuring that no logic progresses in the cluster launcher + * until a configmap with the HADOOP_CONF_DIR specifications has been created. + */ +private[spark] class KerberosCMWatcherCache(kerberosUtils: KerberosUtils) + extends WatcherCacheConfiguration[ConfigMapStorage] with Logging with Eventually with Matchers { + private val kubernetesClient = kerberosUtils.getClient + private val namespace = kerberosUtils.getNamespace + private val requiredFiles = Seq("core-site.xml", "hdfs-site.xml", "krb5.conf") + private val cmCache = scala.collection.mutable.Map[String, Map[String, String]]() + private val configMapName = kerberosUtils.getConfigMap.resource.getMetadata.getName + // Watching ConfigMaps + logInfo("Beginning the watch of the Kerberos Config Map") + private val watcher: Watch = kubernetesClient + .configMaps() + .withName(configMapName) + .watch(new Watcher[ConfigMap] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of Kerberos Config Map") + override def eventReceived(action: Watcher.Action, resource: ConfigMap): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.DELETED | Action.ERROR => + logInfo(s"$name either deleted or error") + cmCache.remove(name) + case Action.ADDED | Action.MODIFIED => + val data = resource.getData.asScala.toMap + logInfo(s"$name includes ${data.keys.mkString(",")}") + cmCache(name) = data + } + } + }) + // Check for CM to have proper files + override def check(name: String): Boolean = { + cmCache.get(name).exists { data => requiredFiles.forall(data.keys.toSeq.contains)} + } + + override def deploy(storage: ConfigMapStorage): Unit = { + logInfo("Launching the ConfigMap") + kerberosUtils.getClient.configMaps() + .inNamespace(namespace).createOrReplace(storage.resource) + // Making sure CM has correct files + Eventually.eventually(TIMEOUT, INTERVAL) { + check(configMapName) should be (true) + } + } + + override def stopWatch() : Unit = { + // Closing Watcher + watcher.close() + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosJobWatcherCache.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosJobWatcherCache.scala new file mode 100755 index 000000000000..2e9ed256b913 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosJobWatcherCache.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, TIMEOUT} +import org.apache.spark.internal.Logging + +/** + * This class is responsible for ensuring that the driver-pod launched by the KerberosTestPod + * is running before trying to grab its logs for the sake of monitoring success of completition. + */ +private[spark] class KerberosJobWatcherCache( + kerberosUtils: KerberosUtils, + labels: Map[String, String]) + extends WatcherCacheConfiguration[JobStorage] with Logging with Eventually with Matchers { + private val kubernetesClient = kerberosUtils.getClient + private val namespace = kerberosUtils.getNamespace + private var jobName: String = "" + private val podCache = scala.collection.mutable.Map[String, String]() + private val watcher: Watch = kubernetesClient + .pods() + .withLabels(labels.asJava) + .watch(new Watcher[Pod] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of Job pod") + override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.DELETED | Action.ERROR => + logInfo(s"$name either deleted or error") + podCache.remove(name) + case Action.ADDED | Action.MODIFIED => + val phase = resource.getStatus.getPhase + logInfo(s"$name is as $phase") + podCache(name) = phase + jobName = name + } + } + }) + + private def additionalCheck(name: String): Boolean = { + name match { + case _ if name.startsWith("data-populator") + => hasInLogs(name, "Entered Krb5Context.initSecContext") + case _ => true + } + } + + override def check(name: String): Boolean = + podCache.get(name).contains("Succeeded") && + additionalCheck(name) + + override def deploy(storage: JobStorage) : Unit = { + kubernetesClient.batch().jobs().inNamespace(namespace).create(storage.resource) + Eventually.eventually(TIMEOUT, INTERVAL) { + check(jobName) should be (true) + } + } + + override def stopWatch(): Unit = { + // Closing Watch + watcher.close() + } + + def hasInLogs(name: String, expectation: String): Boolean = { + kubernetesClient.pods().withName(name).getLog().contains(expectation) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosPVWatcherCache.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosPVWatcherCache.scala new file mode 100755 index 000000000000..88dbac79d9b3 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosPVWatcherCache.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import io.fabric8.kubernetes.api.model.{PersistentVolume, PersistentVolumeClaim} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, TIMEOUT} +import org.apache.spark.internal.Logging + +/** + * This class is responsible for ensuring that the persistent volume claims are bounded + * to the correct persistent volume and that they are both created before launching the + * pods which expect to use them. + */ +private[spark] class KerberosPVWatcherCache( + kerberosUtils: KerberosUtils, + labels: Map[String, String]) + extends WatcherCacheConfiguration[PVStorage] with Logging with Eventually with Matchers { + private val kubernetesClient = kerberosUtils.getClient + private val namespace = kerberosUtils.getNamespace + + // Cache for PVs and PVCs + private val pvCache = scala.collection.mutable.Map[String, String]() + private val pvcCache = scala.collection.mutable.Map[String, String]() + + // Watching PVs + logInfo("Beginning the watch of Persistent Volumes") + private val pvWatcher: Watch = kubernetesClient + .persistentVolumes() + .withLabels(labels.asJava) + .watch(new Watcher[PersistentVolume] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of Persistent Volumes", cause) + override def eventReceived(action: Watcher.Action, resource: PersistentVolume): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.DELETED | Action.ERROR => + logInfo(s"$name either deleted or error") + pvCache.remove(name) + case Action.ADDED | Action.MODIFIED => + val phase = resource.getStatus.getPhase + logInfo(s"$name is at stage: $phase") + pvCache(name) = phase + } + } + }) + + // Watching PVCs + logInfo("Beginning the watch of Persistent Volume Claims") + private val pvcWatcher: Watch = kubernetesClient + .persistentVolumeClaims() + .withLabels(labels.asJava) + .watch(new Watcher[PersistentVolumeClaim] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of Persistent Volume Claims") + override def eventReceived( + action: Watcher.Action, + resource: PersistentVolumeClaim): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.DELETED | Action.ERROR => + logInfo(s"$name either deleted or error") + pvcCache.remove(name) + case Action.ADDED | Action.MODIFIED => + val volumeName = resource.getSpec.getVolumeName + val state = resource.getStatus.getPhase + logInfo(s"$name claims itself to $volumeName and is $state") + pvcCache(name) = s"$volumeName $state" + } + } + }) + + // Check for PVC being bounded to correct PV + override def check(name: String): Boolean = { + pvCache.get(name).contains("Bound") && + pvcCache.get(name).contains(s"$name Bound") + } + + override def deploy(pv: PVStorage) : Unit = { + logInfo("Launching the Persistent Storage") + kubernetesClient + .persistentVolumes().create(pv.persistentVolume) + // Making sure PV is Available for creation of PVC + Eventually.eventually(TIMEOUT, INTERVAL) { + (pvCache(pv.name) == "Available") should be (true) + } + kubernetesClient + .persistentVolumeClaims().inNamespace(namespace).create(pv.persistentVolumeClaim) + Eventually.eventually(TIMEOUT, INTERVAL) { + check(pv.name) should be (true) + } + } + + override def stopWatch(): Unit = { + // Closing Watchers + pvWatcher.close() + pvcWatcher.close() + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosPodWatcherCache.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosPodWatcherCache.scala new file mode 100755 index 000000000000..f1f8b2533228 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosPodWatcherCache.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import io.fabric8.kubernetes.api.model.{Pod, Service} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, TIMEOUT} +import org.apache.spark.internal.Logging + +/** + * This class is used to ensure that the Hadoop cluster that is launched is executed + * in this order: KDC --> NN --> DN --> Data-Populator and that each one of these nodes + * is running before launching the Kerberos test. + */ +private[spark] class KerberosPodWatcherCache( + kerberosUtils: KerberosUtils, + labels: Map[String, String]) + extends WatcherCacheConfiguration[ServiceStorage] with Logging with Eventually with Matchers { + + private val kubernetesClient = kerberosUtils.getClient + private val namespace = kerberosUtils.getNamespace + private val podCache = scala.collection.mutable.Map[String, String]() + private val serviceCache = scala.collection.mutable.Map[String, String]() + private var kdcName: String = _ + private var nnName: String = _ + private var dnName: String = _ + private val podWatcher: Watch = kubernetesClient + .pods() + .withLabels(labels.asJava) + .watch(new Watcher[Pod] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of Pods") + override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + val name = resource.getMetadata.getName + val keyName = podNameParse(name) + action match { + case Action.DELETED | Action.ERROR => + logInfo(s"$name either deleted or error") + podCache.remove(keyName) + case Action.ADDED | Action.MODIFIED => + val phase = resource.getStatus.getPhase + logInfo(s"$name is as $phase") + if (keyName == "kerberos") { kdcName = name } + if (keyName == "nn") { nnName = name } + if (keyName == "dn1") { dnName = name } + podCache(keyName) = phase + } + } + }) + + private val serviceWatcher: Watch = kubernetesClient + .services() + .withLabels(labels.asJava) + .watch(new Watcher[Service] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of Services") + override def eventReceived(action: Watcher.Action, resource: Service): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.DELETED | Action.ERROR => + logInfo(s"$name either deleted or error") + serviceCache.remove(name) + case Action.ADDED | Action.MODIFIED => + val bound = resource.getSpec.getSelector.get("kerberosService") + logInfo(s"$name is bounded to $bound") + serviceCache(name) = bound }}}) + + private def additionalCheck(name: String): Boolean = { + name match { + case "kerberos" => hasInLogs(kdcName, "krb5kdc: starting") + case "nn" => hasInLogs(nnName, "createNameNode") + case "dn1" => hasInLogs(dnName, "Got finalize command for block pool") + } + } + + override def check(name: String): Boolean = { + podCache.get(name).contains("Running") && + serviceCache.get(name).contains(name) && + additionalCheck(name) + } + + override def deploy(srvc: ServiceStorage) : Unit = { + logInfo("Launching the Deployment") + kubernetesClient + .apps().statefulSets().inNamespace(namespace).create(srvc.podSet) + // Making sure Pod is running + Eventually.eventually(TIMEOUT, INTERVAL) { + (podCache(srvc.name) == "Running") should be (true) + } + kubernetesClient.services().inNamespace(namespace).create(srvc.service) + Eventually.eventually(TIMEOUT, INTERVAL) { + check(srvc.name) should be (true) + } + } + + override def stopWatch(): Unit = { + // Closing Watchers + podWatcher.close() + serviceWatcher.close() + } + + private def podNameParse(name: String) : String = { + name match { + case _ if name.startsWith("kerberos") => "kerberos" + case _ if name.startsWith("nn") => "nn" + case _ if name.startsWith("dn1") => "dn1" + } + } + + def hasInLogs(name: String, expectation: String): Boolean = { + kubernetesClient.pods().withName(name).getLog().contains(expectation) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosStorage.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosStorage.scala new file mode 100755 index 000000000000..4e0b5ec5b749 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosStorage.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.apps.StatefulSet +import io.fabric8.kubernetes.api.model.batch.Job + +private[spark] sealed trait KerberosStorage + +private[spark] case class PVStorage( + name: String, + persistentVolumeClaim: PersistentVolumeClaim, + persistentVolume: PersistentVolume) + extends KerberosStorage + +private[spark] case class ServiceStorage( + name: String, + podSet: StatefulSet, + service: Service) + extends KerberosStorage + +private[spark] case class JobStorage( + resource: Job) + extends KerberosStorage + +private[spark] case class ConfigMapStorage( + resource: ConfigMap) + extends KerberosStorage diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosUtils.scala new file mode 100755 index 000000000000..8eb1481c8e25 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/KerberosUtils.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +import java.io.{File, FileInputStream} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.apps.{StatefulSet, StatefulSetBuilder} +import io.fabric8.kubernetes.api.model.batch.{Job, JobBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import org.apache.commons.io.FileUtils.readFileToString + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.KERBEROS_LABEL + +/** + * This class is responsible for handling all Utils and Constants necessary for testing + */ +private[spark] class KerberosUtils( + sparkImage: String, + hadoopImage: String, + kerberosImage: String, + serviceAccountName: String, + kubernetesClient: KubernetesClient, + namespace: String) { + def getClient: KubernetesClient = kubernetesClient + def getNamespace: String = namespace + def yamlLocation(loc: String): String = s"kerberos-yml/$loc.yml" + def loadFromYaml(resource: String): FileInputStream = + new FileInputStream(new File(yamlLocation(resource))) + private val regex = "REPLACE_ME".r + private val regexDP = "# default_ccache_name = MEMORY".r + private val defaultCacheDP = "default_ccache_name = KRBCONF" + private def locationResolver(loc: String) = s"test-data/hadoop-conf/$loc" + private val kerberosFiles = Seq("krb5.conf", "core-site.xml", "hdfs-site.xml") + private val kerberosConfTupList = + kerberosFiles.map { file => + (file, regex.replaceAllIn(readFileToString(new File(locationResolver(file))), namespace))} ++ + Seq(("krb5-dp.conf", regexDP.replaceAllIn(regex.replaceAllIn(readFileToString( + new File(locationResolver("krb5.conf"))), namespace), defaultCacheDP))) + private val KRB_VOLUME = "krb5-conf" + private val KRB_FILE_DIR = "/mnt" + private val KRB_CONFIG_MAP_NAME = "krb-config-map" + private val keyPaths: Seq[KeyToPath] = (kerberosFiles ++ Seq("krb5-dp.conf")) + .map { file => + new KeyToPathBuilder() + .withKey(file) + .withPath(file) + .build() + }.toList + private def createPVTemplate(name: String, pathType: String) : PersistentVolume = + new PersistentVolumeBuilder() + .withNewMetadata() + .withName(name) + .withLabels(Map( + "type" -> "local", + "job" -> "kerberostest").asJava) + .endMetadata() + .withNewSpec() + .withStorageClassName(name) + .withCapacity(Map("storage" -> new Quantity("200Mi")).asJava) + .withAccessModes("ReadWriteMany") + .withHostPath(new HostPathVolumeSource(s"$KRB_FILE_DIR/$namespace/$pathType", "")) + .endSpec() + .build() + private def createPVCTemplate(name: String) : PersistentVolumeClaim = + new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName(name) + .withLabels(Map( + "job" -> "kerberostest").asJava) + .endMetadata() + .withNewSpec() + .withStorageClassName(name) + .withVolumeName(name) + .withAccessModes("ReadWriteMany") + .withNewResources() + .withRequests(Map("storage" -> new Quantity("200Mi")).asJava) + .endResources() + .endSpec() + .build() + private val pvNN = "nn-hadoop" + private val pvKT = "server-keytab" + private val persistentVolumeMap: Map[String, PersistentVolume] = Map( + pvNN -> createPVTemplate(pvNN, "nn"), + pvKT -> createPVTemplate(pvKT, "keytab")) + + private def buildKerberosPV(pvType: String) = { + PVStorage(pvType, createPVCTemplate(pvType), persistentVolumeMap(pvType)) + } + + def getNNStorage: PVStorage = buildKerberosPV(pvNN) + def getKTStorage: PVStorage = buildKerberosPV(pvKT) + def getLabels: Map[String, String] = KERBEROS_LABEL + def getKeyPaths: Seq[KeyToPath] = keyPaths + def getConfigMap: ConfigMapStorage = + ConfigMapStorage( + new ConfigMapBuilder() + .withNewMetadata() + .withName(KRB_CONFIG_MAP_NAME) + .endMetadata() + .addToData(kerberosConfTupList.toMap.asJava) + .build() + ) + private val kdcNode = Seq("kerberos-set", "kerberos-service") + private val nnNode = Seq("nn-set", "nn-service") + private val dnNode = Seq("dn1-set", "dn1-service") + private val dataPopulator = "data-populator-job" + private val hadoopContainerEnvs = Seq( + new EnvVarBuilder() + .withName("NAMESPACE") + .withValue(namespace) + .build(), + new EnvVarBuilder() + .withName("TMP_KRB_LOC") + .withValue(s"$KRB_FILE_DIR/${kerberosFiles.head}") + .build(), + new EnvVarBuilder() + .withName("TMP_KRB_DP_LOC") + .withValue(s"$KRB_FILE_DIR/krb5-dp.conf") + .build(), + new EnvVarBuilder() + .withName("TMP_CORE_LOC") + .withValue(s"$KRB_FILE_DIR/${kerberosFiles(1)}") + .build(), + new EnvVarBuilder() + .withName("TMP_HDFS_LOC") + .withValue(s"$KRB_FILE_DIR/${kerberosFiles(2)}") + .build() + ).asJava + private val krbVolume = + new VolumeBuilder() + .withName(KRB_VOLUME) + .withNewConfigMap() + .withName(KRB_CONFIG_MAP_NAME) + .withItems(keyPaths.asJava) + .endConfigMap() + .build() + private val krbVolumeMount = + new VolumeMountBuilder() + .withName(KRB_VOLUME) + .withMountPath(KRB_FILE_DIR) + .build() + + private def buildHadoopClusterDeployment(name: String, seqPair: Seq[String]) = { + val statefulSet = + kubernetesClient.load(loadFromYaml(seqPair.head)).get().get(0).asInstanceOf[StatefulSet] + ServiceStorage( + name, + new StatefulSetBuilder(statefulSet) + .editSpec() + .editTemplate() + .editSpec() + .addNewVolumeLike(krbVolume).endVolume() + .editMatchingContainer(new ContainerNameEqualityPredicate( + statefulSet.getMetadata.getName)) + .addAllToEnv(hadoopContainerEnvs) + .addNewVolumeMountLike(krbVolumeMount).endVolumeMount() + .withImage(hadoopImage) + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .build(), + kubernetesClient.load(loadFromYaml(seqPair(1))).get().get(0).asInstanceOf[Service] ) + } + private def buildDP(yamlLocation: String): JobStorage = { + val job = kubernetesClient.load(loadFromYaml(yamlLocation)).get().get(0).asInstanceOf[Job] + JobStorage( + new JobBuilder(job) + .editSpec() + .editTemplate() + .editSpec() + .addNewVolumeLike(krbVolume).endVolume() + .editMatchingContainer(new ContainerNameEqualityPredicate( + job.getMetadata.getName)) + .addAllToEnv(hadoopContainerEnvs) + .addNewVolumeMountLike(krbVolumeMount).endVolumeMount() + .withImage(hadoopImage) + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .build() + ) + } + def getKDC: ServiceStorage = buildHadoopClusterDeployment("kerberos", kdcNode) + def getNN: ServiceStorage = buildHadoopClusterDeployment("nn", nnNode) + def getDN: ServiceStorage = buildHadoopClusterDeployment("dn1", dnNode) + def getDP: JobStorage = buildDP(dataPopulator) + private val HADOOP_CONF_DIR_PATH = "/opt/spark/hconf" + private val krb5TestkeyPaths = + kerberosFiles.map { file => + new KeyToPathBuilder() + .withKey(file) + .withPath(file) + .build() + }.toList + + def getKerberosTest( + resource: String, + className: String, + appLabel: String, + yamlLocation: String): JobStorage = { + val job = kubernetesClient.load(loadFromYaml(yamlLocation)).get().get(0).asInstanceOf[Job] + JobStorage( + new JobBuilder(job) + .editSpec() + .editTemplate() + .editOrNewMetadata() + .addToLabels(Map("name" -> "kerberos-test").asJava) + .endMetadata() + .editSpec() + .withServiceAccountName(serviceAccountName) + .addNewVolume() + .withName(KRB_VOLUME) + .withNewConfigMap() + .withName(KRB_CONFIG_MAP_NAME) + .withItems(krb5TestkeyPaths.asJava) + .endConfigMap() + .endVolume() + .editMatchingContainer(new ContainerNameEqualityPredicate( + job.getMetadata.getName)) + .addNewEnv() + .withName("NAMESPACE") + .withValue(namespace) + .endEnv() + .addNewEnv() + .withName("MASTER_URL") + .withValue(kubernetesClient.getMasterUrl.toString) + .endEnv() + .addNewEnv() + .withName("SUBMIT_RESOURCE") + .withValue(resource) + .endEnv() + .addNewEnv() + .withName("CLASS_NAME") + .withValue(className) + .endEnv() + .addNewEnv() + .withName("HADOOP_CONF_DIR") + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .addNewEnv() + .withName("APP_LOCATOR_LABEL") + .withValue(appLabel) + .endEnv() + .addNewEnv() + .withName("SPARK_PRINT_LAUNCH_COMMAND") + .withValue("true") + .endEnv() + .addNewEnv() + .withName("TMP_KRB_LOC") + .withValue(s"$KRB_FILE_DIR/${kerberosFiles.head}") + .endEnv() + .addNewEnv() + .withName("TMP_CORE_LOC") + .withValue(s"$KRB_FILE_DIR/${kerberosFiles(1)}") + .endEnv() + .addNewEnv() + .withName("TMP_HDFS_LOC") + .withValue(s"$KRB_FILE_DIR/${kerberosFiles(2)}") + .endEnv() + .addNewEnv() + .withName("BASE_SPARK_IMAGE") + .withValue(sparkImage) + .endEnv() + .addNewVolumeMount() + .withName(KRB_VOLUME) + .withMountPath(KRB_FILE_DIR) + .endVolumeMount() + .withImage(kerberosImage) + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .build()) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/WatcherCacheConfiguration.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/WatcherCacheConfiguration.scala new file mode 100644 index 000000000000..955c23062656 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/kerberos/WatcherCacheConfiguration.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.kerberos + +/** + * A collection of functions that together represent a WatcherCache. The function of these + * WatcherCaches are to watch the KerberosStorage object and insure it is properly created + * by blocking with a condition. + */ +private[spark] trait WatcherCacheConfiguration[T <: KerberosStorage] { + + /** + * This function defines the boolean condition which would block the + * completion of the deploy() block + */ + def check(name: String): Boolean + + /** + * This functions deploys the KerberosStorage object by having the KubernetesClient + * create the resulting KerberosStorage object. + */ + def deploy(storage: T) : Unit + + /** + * This function closes all Watcher threads. + */ + def stopWatch(): Unit +} diff --git a/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/core-site.xml b/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/core-site.xml new file mode 100755 index 000000000000..9a6ae2c50526 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/core-site.xml @@ -0,0 +1,38 @@ + + + + + + + + + hadoop.security.authentication + kerberos + + + + hadoop.security.authorization + true + + + + fs.defaultFS + hdfs://nn.REPLACE_ME.svc.cluster.local:9000 + + + hadoop.rpc.protection + authentication + + diff --git a/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/hdfs-site.xml b/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/hdfs-site.xml new file mode 100755 index 000000000000..66dc969c46b6 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/hdfs-site.xml @@ -0,0 +1,157 @@ + + + + + + + + + + dfs.replication + 1 + + + + + dfs.permissions + true + + + dfs.block.access.token.enable + true + + + + + dfs.namenode.keytab.file + /var/keytabs/hdfs.keytab + + + dfs.namenode.kerberos.principal + hdfs/nn.REPLACE_ME.svc.cluster.local@CLUSTER.LOCAL + + + dfs.namenode.kerberos.internal.spnego.principal + HTTP/nn.REPLACE_ME.svc.cluster.local@CLUSTER.LOCAL + + + dfs.namenode.rpc-address + nn.REPLACE_ME.svc.cluster.local:9000 + + + + + + dfs.namenode.delegation.token.max-lifetime + 3600000 + + + dfs.namenode.delegation.token.renew-interval + 3600000 + + + + + + + dfs.data.transfer.protection + integrity + + + dfs.datanode.address + 0.0.0.0:10019 + + + + dfs.datanode.http.address + 0.0.0.0:10022 + + + + dfs.http.policy + HTTPS_ONLY + + + + + dfs.namenode.keytab.file + /var/keytabs/hdfs.keytab + + + dfs.namenode.kerberos.principal + hdfs/nn.REPLACE_ME.svc.cluster.local@CLUSTER.LOCAL + + + dfs.namenode.kerberos.internal.spnego.principal + HTTP/nn.REPLACE_ME.svc.cluster.local@CLUSTER.LOCAL + + + + + dfs.namenode.datanode.registration.ip-hostname-check + false + + + dfs.datanode.data.dir.perm + 700 + + + dfs.namenode.name.dir + file:///hadoop/etc/data + + + dfs.datanode.name.dir + file:///hadoop/etc/data + + + dfs.data.dir + file:///hadoop/etc/data + + + dfs.datanode.keytab.file + /var/keytabs/hdfs.keytab + + + dfs.datanode.kerberos.principal + hdfs/dn1.REPLACE_ME.svc.cluster.local@CLUSTER.LOCAL + + + dfs.encrypt.data.transfer + true + + + dfs.encrypt.data.transfer.cipher.suites + AES/CTR/NoPadding + + + dfs.encrypt.data.transfer.cipher.key.bitlength + 256 + + + + + dfs.webhdfs.enabled + true + + + dfs.web.authentication.kerberos.principal + HTTP/dn1.REPLACE_ME.svc.cluster.local@CLUSTER.LOCAL + + + dfs.web.authentication.kerberos.keytab + /var/keytabs/hdfs.keytab + + + diff --git a/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/krb5.conf b/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/krb5.conf new file mode 100755 index 000000000000..144f77d8995d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/test-data/hadoop-conf/krb5.conf @@ -0,0 +1,25 @@ +includedir /etc/krb5.conf.d/ + +[logging] +default = FILE:/var/log/krb5libs.log +kdc = FILE:/var/log/krb5kdc.log +admin_server = FILE:/var/log/kadmind.log + +[libdefaults] +dns_lookup_realm = false +ticket_lifetime = 24h +renew_lifetime = 7d +forwardable = true +rdns = false +default_realm = CLUSTER.LOCAL +# default_ccache_name = MEMORY + +[realms] +CLUSTER.LOCAL = { + kdc = kerberos.REPLACE_ME.svc.cluster.local + admin_server = kerberos.REPLACE_ME.svc.cluster.local +} + +[domain_realm] +.cluster.local = CLUSTER.LOCAL +cluster.local = CLUSTER.LOCAL