diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/config/JupyterConfigFilesGenerator.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/config/JupyterConfigFilesGenerator.java index fdaa8a15b9..bbf7c3ad82 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/config/JupyterConfigFilesGenerator.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/config/JupyterConfigFilesGenerator.java @@ -44,6 +44,8 @@ import com.logicalclocks.servicediscoverclient.service.Service; import freemarker.template.TemplateException; import io.hops.hopsworks.common.jobs.JobController; +import io.hops.hopsworks.common.serving.ServingConfig; +import io.hops.hopsworks.exceptions.ApiKeyException; import io.hops.hopsworks.exceptions.JobException; import io.hops.hopsworks.persistence.entity.jobs.configuration.DockerJobConfiguration; import io.hops.hopsworks.common.hive.HiveController; @@ -65,6 +67,7 @@ import io.hops.hopsworks.common.util.templates.jupyter.SparkMagicConfigTemplate; import io.hops.hopsworks.common.util.templates.jupyter.SparkMagicConfigTemplateBuilder; import io.hops.hopsworks.exceptions.ServiceException; +import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.restutils.RESTCodes; import org.apache.commons.io.FileUtils; @@ -112,12 +115,14 @@ public class JupyterConfigFilesGenerator { private HiveController hiveController; @EJB private JobController jobController; + @Inject + private ServingConfig servingConfig; public JupyterPaths generateJupyterPaths(Project project, String hdfsUser, String secretConfig) { return new JupyterPaths(settings.getJupyterDir(), project.getName(), hdfsUser, secretConfig); } - public JupyterPaths generateConfiguration(Project project, String secretConfig, String hdfsUser, + public JupyterPaths generateConfiguration(Project project, String secretConfig, String hdfsUser, Users hopsworksUser, JupyterSettings js, Integer port, String allowOrigin) throws ServiceException, JobException { boolean newDir = false; @@ -126,8 +131,8 @@ public JupyterPaths generateConfiguration(Project project, String secretConfig, try { newDir = createJupyterDirs(jp); - createConfigFiles(jp, hdfsUser, project, port, js, allowOrigin); - } catch (IOException | ServiceException | ServiceDiscoveryException e) { + createConfigFiles(jp, hdfsUser, hopsworksUser, project, port, js, allowOrigin); + } catch (IOException | ServiceException | ServiceDiscoveryException | ApiKeyException e) { if (newDir) { // if the folder was newly created delete it removeProjectUserDirRecursive(jp); } @@ -269,8 +274,8 @@ public void createJupyterNotebookConfig(Writer out, Project project, int port, } public void createSparkMagicConfig(Writer out, Project project, JupyterSettings js, String hdfsUser, - String confDirPath, Map extraEnvVars) throws IOException, ServiceDiscoveryException, - JobException { + Users hopsworksUser, String confDirPath) throws IOException, ServiceDiscoveryException, + JobException, ApiKeyException { SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration) js.getJobConfig(); @@ -294,9 +299,9 @@ public void createSparkMagicConfig(Writer out, Project project, JupyterSettings constructServiceFQDNWithPort(ServiceDiscoveryController.HopsworksService.HOPSWORKS_APP); finalSparkConfiguration.putAll( - sparkConfigurationUtil.setFrameworkProperties(project, sparkJobConfiguration, settings, hdfsUser, - extraJavaOptions, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, serviceDiscoveryController, - extraEnvVars)); + sparkConfigurationUtil.setFrameworkProperties(project, sparkJobConfiguration, settings, hdfsUser, hopsworksUser, + extraJavaOptions, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, servingConfig, + serviceDiscoveryController)); StringBuilder sparkConfBuilder = new StringBuilder(); ArrayList keys = new ArrayList<>(finalSparkConfiguration.keySet()); @@ -340,9 +345,9 @@ public void createSparkMagicConfig(Writer out, Project project, JupyterSettings } // returns true if one of the conf files were created anew - private void createConfigFiles(JupyterPaths jp, String hdfsUser, Project project, + private void createConfigFiles(JupyterPaths jp, String hdfsUser, Users hopsworksUser, Project project, Integer port, JupyterSettings js, String allowOrigin) - throws IOException, ServiceException, ServiceDiscoveryException, JobException { + throws IOException, ServiceException, ServiceDiscoveryException, JobException, ApiKeyException { String confDirPath = jp.getConfDirPath(); String kernelsDir = jp.getKernelsDir(); String certsDir = jp.getCertificatesDir(); @@ -369,7 +374,7 @@ private void createConfigFiles(JupyterPaths jp, String hdfsUser, Project project if (!sparkmagic_config_file.exists()) { try (Writer out = new FileWriter(sparkmagic_config_file, false)) { - createSparkMagicConfig(out, project, js, hdfsUser, confDirPath, null); + createSparkMagicConfig(out, project, js, hdfsUser, hopsworksUser, confDirPath); } } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java index a340adfd0a..66a88cc171 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/erasureCode/ErasureCodeJob.java @@ -61,7 +61,7 @@ public class ErasureCodeJob extends YarnJob { public ErasureCodeJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, String hadoopDir, Settings settings, ServiceDiscoveryController serviceDiscoveryController) { - super(job, services, user, jobUser, hadoopDir, settings,null, null, serviceDiscoveryController); + super(job, services, user, jobUser, hadoopDir, settings,null, null, null, serviceDiscoveryController); if (!(job.getJobConfig() instanceof ErasureCodeJobConfiguration)) { throw new IllegalArgumentException( diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java index 738d0692ed..dd613f79b6 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkController.java @@ -42,6 +42,7 @@ import com.google.common.base.Strings; import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.exceptions.ServiceException; import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode; import io.hops.hopsworks.persistence.entity.hdfs.user.HdfsUsers; @@ -79,6 +80,7 @@ import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; +import javax.inject.Inject; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -119,6 +121,8 @@ public class FlinkController { private KafkaBrokers kafkaBrokers; @EJB private ServiceDiscoveryController serviceDiscoveryController; + @Inject + private ServingConfig servingConfig; public Execution startJob(final Jobs job, final Users user) @@ -144,7 +148,7 @@ public Execution startJob(final Jobs job, final Users user) try { flinkjob = proxyUser.doAs((PrivilegedExceptionAction) () -> new FlinkJob(job, submitter, user, hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), settings, - kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, serviceDiscoveryController)); + kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, servingConfig, serviceDiscoveryController)); } catch (InterruptedException ex) { LOGGER.log(Level.SEVERE, null, ex); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java index 577a7a3863..d5ee239bb6 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkJob.java @@ -40,6 +40,7 @@ package io.hops.hopsworks.common.jobs.flink; import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType; import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration; import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; @@ -67,10 +68,10 @@ public class FlinkJob extends YarnJob { private FlinkYarnRunnerBuilder flinkBuilder; FlinkJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, - Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, + Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) { super(job, services, user, jobUser, settings.getHadoopSymbolicLinkDir(), settings, - kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController); + kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController); if (!(job.getJobConfig() instanceof FlinkJobConfiguration)) { throw new IllegalArgumentException("Job must contain a FlinkJobConfiguration object. Received: " @@ -92,8 +93,9 @@ protected boolean setupJob() throws JobException { } try { runner = flinkBuilder - .getYarnRunner(jobs.getProject(), jobUser, services.getFileOperations(hdfsUser.getUserName()), - yarnClient, services, settings, kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController); + .getYarnRunner(jobs.getProject(), jobUser, user, services.getFileOperations(hdfsUser.getUserName()), + yarnClient, services, settings, kafkaBrokersString, hopsworksRestEndpoint, servingConfig, + serviceDiscoveryController); } catch (IOException | ServiceDiscoveryException e) { LOG.log(Level.SEVERE, diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkYarnRunnerBuilder.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkYarnRunnerBuilder.java index 49fa1c6994..8f41c0ef9e 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkYarnRunnerBuilder.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/flink/FlinkYarnRunnerBuilder.java @@ -45,6 +45,7 @@ import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor; import io.hops.hopsworks.common.jobs.yarn.YarnRunner; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.common.util.FlinkConfigurationUtil; import io.hops.hopsworks.common.util.ProjectUtils; import io.hops.hopsworks.common.util.Settings; @@ -52,6 +53,7 @@ import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration; import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; import io.hops.hopsworks.persistence.entity.project.Project; +import io.hops.hopsworks.persistence.entity.user.Users; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.hadoop.conf.Configuration; @@ -105,9 +107,9 @@ void addDynamicProperty(String name, String value) { dynamicProperties.put(name, value); } - YarnRunner getYarnRunner(Project project, String jobUser, DistributedFileSystemOps dfsClient, + YarnRunner getYarnRunner(Project project, String jobUser, Users hopsworksUser, DistributedFileSystemOps dfsClient, YarnClient yarnClient, AsynchronousJobExecutor services, Settings settings, - String kafkaBrokersString, String hopsworksRestEndpoint, + String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) throws IOException, ServiceDiscoveryException { @@ -134,8 +136,8 @@ YarnRunner getYarnRunner(Project project, String jobUser, DistributedFileSystemO project.getName().toLowerCase() + "," + job.getName() + "," + job.getId() + "," + YarnRunner.APPID_PLACEHOLDER); Map finalJobProps = flinkConfigurationUtil - .setFrameworkProperties(project, job.getJobConfig(), settings, jobUser, extraJavaOptions, - kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController, null); + .setFrameworkProperties(project, job.getJobConfig(), settings, jobUser, hopsworksUser, extraJavaOptions, + kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController); //Parse properties from Spark config file Yaml yaml = new Yaml(); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java index 245c896a3b..83999da431 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkController.java @@ -48,6 +48,7 @@ import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; import io.hops.hopsworks.common.jupyter.JupyterController; import io.hops.hopsworks.common.kafka.KafkaBrokers; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.common.util.HopsUtils; import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState; import io.hops.hopsworks.persistence.entity.jobs.history.Execution; @@ -72,6 +73,7 @@ import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; +import javax.inject.Inject; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.jar.Attributes; @@ -107,6 +109,8 @@ public class SparkController { private KafkaBrokers kafkaBrokers; @EJB private ServiceDiscoveryController serviceDiscoveryController; + @Inject + private ServingConfig servingConfig; /** * Start the Spark job as the given user. @@ -277,7 +281,7 @@ private SparkJob createSparkJob(String username, Jobs job, Users user) throws Jo sparkjob = proxyUser.doAs((PrivilegedExceptionAction) () -> new SparkJob(job, submitter, user, settings.getHadoopSymbolicLinkDir(), hdfsUsersBean.getHdfsUserName(job.getProject(), user), - settings, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, + settings, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, servingConfig, serviceDiscoveryController)); } catch (InterruptedException ex) { LOGGER.log(Level.SEVERE, null, ex); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java index d7c8fa726b..f2e5aa1fd0 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkJob.java @@ -38,6 +38,7 @@ */ package io.hops.hopsworks.common.jobs.spark; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType; import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration; import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; @@ -65,8 +66,8 @@ public class SparkJob extends YarnJob { SparkJob(Jobs job, AsynchronousJobExecutor services, Users user, final String hadoopDir, String jobUser, Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, - ServiceDiscoveryController serviceDiscoveryController) { - super(job, services, user, jobUser, hadoopDir, settings, kafkaBrokersString, hopsworksRestEndpoint, + ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) { + super(job, services, user, jobUser, hadoopDir, settings, kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController); if (!(job.getJobConfig() instanceof SparkJobConfiguration)) { throw new IllegalArgumentException( @@ -108,9 +109,9 @@ protected boolean setupJob() { try { runner = runnerbuilder. getYarnRunner(jobs.getProject(), - jobUser, + jobUser, user, services, services.getFileOperations(hdfsUser.getUserName()), yarnClient, - settings, kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController); + settings, kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController); } catch (Exception e) { LOG.log(Level.WARNING, "Failed to create YarnRunner.", e); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkYarnRunnerBuilder.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkYarnRunnerBuilder.java index 15500653a5..7bcc4160b5 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkYarnRunnerBuilder.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/spark/SparkYarnRunnerBuilder.java @@ -40,6 +40,8 @@ import com.google.common.base.Strings; import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; +import io.hops.hopsworks.common.serving.ServingConfig; +import io.hops.hopsworks.exceptions.ApiKeyException; import io.hops.hopsworks.exceptions.JobException; import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration; import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; @@ -54,6 +56,7 @@ import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.common.util.SparkConfigurationUtil; import io.hops.hopsworks.common.util.templates.ConfigProperty; +import io.hops.hopsworks.persistence.entity.user.Users; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -116,11 +119,12 @@ public SparkYarnRunnerBuilder(Jobs job) { * @return The YarnRunner instance to launch the Spark job on Yarn. * @throws IOException If creation failed. */ - public YarnRunner getYarnRunner(Project project, String jobUser, AsynchronousJobExecutor services, - final DistributedFileSystemOps dfsClient, final YarnClient yarnClient, - Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, + public YarnRunner getYarnRunner(Project project, String jobUser, Users hopsworksUser, + AsynchronousJobExecutor services, final DistributedFileSystemOps dfsClient, + final YarnClient yarnClient, Settings settings, String kafkaBrokersString, + String hopsworksRestEndpoint, ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) - throws IOException, ServiceDiscoveryException, JobException { + throws IOException, ServiceDiscoveryException, JobException, ApiKeyException { Map jobHopsworksProps = new HashMap<>(); JobType jobType = job.getJobConfig().getJobType(); @@ -194,7 +198,8 @@ public YarnRunner getYarnRunner(Project project, String jobUser, AsynchronousJob Map finalJobProps = new HashMap<>(); finalJobProps.putAll(sparkConfigurationUtil.setFrameworkProperties(project, job.getJobConfig(), settings, - jobUser, extraJavaOptions, kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController, null)); + jobUser, hopsworksUser, extraJavaOptions, kafkaBrokersString, hopsworksRestEndpoint, servingConfig, + serviceDiscoveryController)); finalJobProps.put(Settings.SPARK_YARN_APPMASTER_SPARK_USER, jobUser); finalJobProps.put(Settings.SPARK_EXECUTOR_SPARK_USER, jobUser); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java index 6bbcd2fdb7..3d47673fa8 100755 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnJob.java @@ -39,6 +39,7 @@ package io.hops.hopsworks.common.jobs.yarn; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.LocalResourceDTO; import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.YarnJobConfiguration; import io.hops.hopsworks.persistence.entity.jobs.description.Jobs; @@ -82,6 +83,7 @@ public abstract class YarnJob extends HopsJob { protected Settings settings; protected String kafkaBrokersString; protected String hopsworksRestEndpoint; + protected ServingConfig servingConfig; protected ServiceDiscoveryController serviceDiscoveryController; @@ -98,7 +100,7 @@ public abstract class YarnJob extends HopsJob { YarnJobConfiguration object. */ public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, String hadoopDir, - Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, + Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) { super(job, services, user, hadoopDir); @@ -114,6 +116,7 @@ public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jo this.settings = settings; this.kafkaBrokersString = kafkaBrokersString; this.hopsworksRestEndpoint = hopsworksRestEndpoint; + this.servingConfig = servingConfig; this.serviceDiscoveryController = serviceDiscoveryController; } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/LocalHostJupyterProcessMgr.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/LocalHostJupyterProcessMgr.java index 7a5f32603e..cf4d615311 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/LocalHostJupyterProcessMgr.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/LocalHostJupyterProcessMgr.java @@ -135,7 +135,7 @@ public JupyterDTO startJupyterServer(Project project, String secretConfig, Strin String prog = settings.getSudoersDir() + "/jupyter.sh"; Integer port = ThreadLocalRandom.current().nextInt(40000, 59999); - JupyterPaths jp = jupyterConfigFilesGenerator.generateConfiguration(project, secretConfig, hdfsUser, + JupyterPaths jp = jupyterConfigFilesGenerator.generateConfiguration(project, secretConfig, hdfsUser, user, js, port, allowOrigin); String secretDir = settings.getStagingDir() + Settings.PRIVATE_DIRS + js.getSecret(); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/LocalhostServingConfig.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/LocalhostServingConfig.java new file mode 100755 index 0000000000..dbae98ae62 --- /dev/null +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/LocalhostServingConfig.java @@ -0,0 +1,31 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2022, Logical Clocks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.serving; + +import io.hops.hopsworks.common.integrations.LocalhostStereotype; + +import javax.ejb.Stateless; + +@LocalhostStereotype +@Stateless +public class LocalhostServingConfig implements ServingConfig { + + @Override + public String getClassName() { + return LocalhostServingConfig.class.getName(); + } +} \ No newline at end of file diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/ServingConfig.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/ServingConfig.java new file mode 100755 index 0000000000..cc65ef49c9 --- /dev/null +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/ServingConfig.java @@ -0,0 +1,37 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2022, Logical Clocks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.serving; + +import io.hops.hopsworks.exceptions.ApiKeyException; +import io.hops.hopsworks.persistence.entity.user.Users; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +/** + * Interface for managing serving configuration. Different type of serving + * config e.g (localhost or Kubernetes) should implement this interface. + */ +public interface ServingConfig { + + default Map getEnvVars(Users user, boolean includeSecrets) throws ApiKeyException, + UnsupportedEncodingException { + return null; + } + + String getClassName(); +} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/ConfigurationUtil.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/ConfigurationUtil.java index a880f5fe7b..309197d482 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/ConfigurationUtil.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/ConfigurationUtil.java @@ -18,19 +18,22 @@ import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; +import io.hops.hopsworks.common.serving.ServingConfig; +import io.hops.hopsworks.exceptions.ApiKeyException; import io.hops.hopsworks.exceptions.JobException; import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration; +import io.hops.hopsworks.persistence.entity.user.Users; import java.io.IOException; import java.util.Map; public abstract class ConfigurationUtil { public abstract Map setFrameworkProperties(Project project, JobConfiguration jobConfiguration, - Settings settings, String hdfsUser, + Settings settings, String hdfsUser, Users hopsworksUser, Map extraJavaOptions, String kafkaBrokersString, String hopsworksRestEndpoint, - ServiceDiscoveryController serviceDiscoveryController, - Map extraEnvVars) - throws IOException, ServiceDiscoveryException, JobException; + ServingConfig servingConfig, + ServiceDiscoveryController serviceDiscoveryController) + throws IOException, ServiceDiscoveryException, JobException, ApiKeyException; } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/FlinkConfigurationUtil.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/FlinkConfigurationUtil.java index fb6fa1cf56..7e78dfc9e9 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/FlinkConfigurationUtil.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/FlinkConfigurationUtil.java @@ -15,12 +15,14 @@ */ package io.hops.hopsworks.common.util; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.common.hdfs.Utils; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration; import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration; import io.hops.hopsworks.common.util.templates.ConfigProperty; +import io.hops.hopsworks.persistence.entity.user.Users; import java.io.IOException; import java.util.HashMap; @@ -29,11 +31,10 @@ public class FlinkConfigurationUtil extends ConfigurationUtil { @Override public Map setFrameworkProperties(Project project, JobConfiguration jobConfiguration, - Settings settings, String hdfsUser, Map - extraJavaOptions, String kafkaBrokersString, - String hopsworksRestEndpoint, - ServiceDiscoveryController serviceDiscoveryController, - Map extraEnvVars) + Settings settings, String hdfsUser, Users hopsworksUser, + Map extraJavaOptions, String kafkaBrokersString, + String hopsworksRestEndpoint, ServingConfig servingConfig, + ServiceDiscoveryController serviceDiscoveryController) throws IOException { FlinkJobConfiguration flinkJobConfiguration = (FlinkJobConfiguration) jobConfiguration; diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/SparkConfigurationUtil.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/SparkConfigurationUtil.java index 59af9a4eb7..0f2614d25a 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/SparkConfigurationUtil.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/SparkConfigurationUtil.java @@ -19,15 +19,18 @@ import com.google.common.base.Strings; import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; +import io.hops.hopsworks.common.serving.ServingConfig; +import io.hops.hopsworks.common.util.templates.ConfigProperty; +import io.hops.hopsworks.common.util.templates.ConfigReplacementPolicy; +import io.hops.hopsworks.exceptions.ApiKeyException; import io.hops.hopsworks.exceptions.JobException; -import io.hops.hopsworks.persistence.entity.project.Project; -import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration; -import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType; import io.hops.hopsworks.persistence.entity.jobs.configuration.DistributionStrategy; import io.hops.hopsworks.persistence.entity.jobs.configuration.ExperimentType; +import io.hops.hopsworks.persistence.entity.jobs.configuration.JobConfiguration; +import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType; import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration; -import io.hops.hopsworks.common.util.templates.ConfigProperty; -import io.hops.hopsworks.common.util.templates.ConfigReplacementPolicy; +import io.hops.hopsworks.persistence.entity.project.Project; +import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.restutils.RESTCodes; import java.io.File; @@ -37,14 +40,13 @@ import java.util.logging.Level; public class SparkConfigurationUtil extends ConfigurationUtil { + public Map setFrameworkProperties(Project project, JobConfiguration jobConfiguration, - Settings settings, String hdfsUser, - Map extraJavaOptions, - String kafkaBrokersString, String hopsworksRestEndpoint, - ServiceDiscoveryController serviceDiscoveryController, - Map extraEnvVars) - throws IOException, ServiceDiscoveryException, JobException { - SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration)jobConfiguration; + Settings settings, String hdfsUser, Users hopsworksUser, Map extraJavaOptions, + String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig, + ServiceDiscoveryController serviceDiscoveryController) + throws IOException, ServiceDiscoveryException, JobException, ApiKeyException { + SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration) jobConfiguration; validateExecutorMemory(sparkJobConfiguration.getExecutorMemory(), settings); @@ -186,8 +188,11 @@ public Map setFrameworkProperties(Project project, JobConfigurat HopsUtils.IGNORE); // add extra env vars - if (extraEnvVars != null) { - extraEnvVars.forEach((key, value) -> addToSparkEnvironment(sparkProps, key, value, HopsUtils.IGNORE)); + if (servingConfig != null) { + Map servingEnvVars = servingConfig.getEnvVars(hopsworksUser, true); + if (servingEnvVars != null) { + servingEnvVars.forEach((key, value) -> addToSparkEnvironment(sparkProps, key, value, HopsUtils.IGNORE)); + } } addLibHdfsOpts(userSparkProperties, settings, sparkProps, sparkJobConfiguration); diff --git a/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/common/KubeIstioClientService.java b/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/common/KubeIstioClientService.java index 3c1a5342a4..66a8671f21 100644 --- a/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/common/KubeIstioClientService.java +++ b/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/common/KubeIstioClientService.java @@ -35,6 +35,10 @@ public class KubeIstioClientService { @EJB private KubeIstioHostPort kubeIstioHostPort; + public String getIstioEndpoint(Pair istioIngressHostPort) { + return "http://" + istioIngressHostPort.getL() + ":" + istioIngressHostPort.getR(); + } + public Pair getIstioIngressHostPort() { return getIstioIngressHostPort(Host.NODE, Port.HTTP); } diff --git a/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/jupyter/KubeJupyterManager.java b/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/jupyter/KubeJupyterManager.java index bd439f211d..230059fe60 100644 --- a/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/jupyter/KubeJupyterManager.java +++ b/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/jupyter/KubeJupyterManager.java @@ -37,7 +37,6 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.hops.common.Pair; import io.hops.hopsworks.common.dao.hdfsUser.HdfsUsersFacade; import io.hops.hopsworks.common.dao.jupyter.config.JupyterConfigFilesGenerator; import io.hops.hopsworks.common.dao.jupyter.config.JupyterDTO; @@ -47,10 +46,11 @@ import io.hops.hopsworks.common.dao.user.UserFacade; import io.hops.hopsworks.common.hdfs.HdfsUsersController; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; +import io.hops.hopsworks.common.jobs.JobController; import io.hops.hopsworks.common.jupyter.JupyterManager; import io.hops.hopsworks.common.jupyter.JupyterManagerImpl; import io.hops.hopsworks.common.jupyter.TokenGenerator; -import io.hops.hopsworks.common.jobs.JobController; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.common.util.HopsUtils; import io.hops.hopsworks.common.util.ProjectUtils; import io.hops.hopsworks.common.util.Settings; @@ -58,10 +58,9 @@ import io.hops.hopsworks.common.util.templates.jupyter.KernelTemplate; import io.hops.hopsworks.common.util.templates.jupyter.SparkMagicConfigTemplate; import io.hops.hopsworks.exceptions.ApiKeyException; -import io.hops.hopsworks.exceptions.ServiceException; import io.hops.hopsworks.exceptions.JobException; +import io.hops.hopsworks.exceptions.ServiceException; import io.hops.hopsworks.kube.common.KubeClientService; -import io.hops.hopsworks.kube.common.KubeIstioClientService; import io.hops.hopsworks.kube.common.KubeStereotype; import io.hops.hopsworks.kube.project.KubeProjectConfigMaps; import io.hops.hopsworks.kube.security.KubeApiKeyUtils; @@ -71,15 +70,16 @@ import io.hops.hopsworks.persistence.entity.jupyter.JupyterSettings; import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.persistence.entity.user.Users; -import io.hops.hopsworks.persistence.entity.user.security.apiKey.ApiKey; import io.hops.hopsworks.restutils.RESTCodes; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; +import javax.inject.Inject; import java.io.IOException; import java.io.StringWriter; +import java.io.UnsupportedEncodingException; import java.io.Writer; import java.util.ArrayList; import java.util.HashMap; @@ -147,8 +147,8 @@ public class KubeJupyterManager extends JupyterManagerImpl implements JupyterMan private JobController jobController; @EJB private KubeApiKeyUtils kubeApiKeyUtils; - @EJB - private KubeIstioClientService kubeIstioClientService; + @Inject + private ServingConfig servingConfig; @Override @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) @@ -180,23 +180,12 @@ public JupyterDTO startJupyterServer(Project project, String secretConfig, Strin // jupyter notebook config Writer jupyterNotebookConfig = new StringWriter(); jupyterConfigFilesGenerator.createJupyterNotebookConfig(jupyterNotebookConfig, project, nodePort, - jupyterSettings, hdfsUser, jupyterPaths.getCertificatesDir(), allowOrigin); - - // get serving api key - Optional apiKey = kubeApiKeyUtils.getServingApiKey(user); - if (!apiKey.isPresent()) { - throw new ApiKeyException(RESTCodes.ApiKeyErrorCode.KEY_NOT_FOUND, Level.SEVERE, - "Serving API key for user " + user.getUsername() + " not found"); - } - String secretName = kubeApiKeyUtils.getServingApiKeySecretName(apiKey.get().getPrefix()); - String secret = kubeApiKeyUtils.getServingApiKeyValueFromKubeSecret(secretName); + jupyterSettings, hdfsUser, jupyterPaths.getCertificatesDir(), allowOrigin); // spark config Writer sparkMagicConfig = new StringWriter(); - Map extraEnvVars = new HashMap<>(); - extraEnvVars.put("SERVING_API_KEY", secret); - jupyterConfigFilesGenerator.createSparkMagicConfig(sparkMagicConfig, project, jupyterSettings, hdfsUser, - jupyterPaths.getConfDirPath(), extraEnvVars); + jupyterConfigFilesGenerator.createSparkMagicConfig(sparkMagicConfig, project, jupyterSettings, hdfsUser, user, + jupyterPaths.getConfDirPath()); //If user selected Experiments or Spark we should use the default docker config for the Python kernel if(!jupyterSettings.isPythonKernel()) { @@ -277,7 +266,8 @@ public String serviceAndDeploymentName(String kubeProjectUsername) { private List buildContainer(JupyterPaths jupyterPaths, String anacondaEnv, String pythonKernelName, String secretDir, String certificatesDir, String hdfsUser, String token, ResourceRequirements resourceRequirements, Integer nodePort, String jupyterMode, boolean isGit, - Project project, Users user, Map filebeatEnv) throws ServiceDiscoveryException { + Project project, Users user, Map filebeatEnv) + throws ServiceDiscoveryException, UnsupportedEncodingException, ApiKeyException { String jupyterHome = jupyterPaths.getNotebookPath(); String hadoopHome = settings.getHadoopSymbolicLinkDir(); String hadoopConfDir = hadoopHome + "/etc/hadoop"; @@ -304,16 +294,14 @@ private List buildContainer(JupyterPaths jupyterPaths, String anacond environment.add(new EnvVarBuilder().withName("PYTHONHASHSEED").withValue("0").build()); environment.add(new EnvVarBuilder().withName("IS_GIT").withValue(Boolean.toString(isGit)).build()); environment.add(new EnvVarBuilder().withName("FLINK_LIB_DIR").withValue(settings.getFlinkLibDir()).build()); + + // serving env vars environment.add(new EnvVarBuilder().withName("SERVING_API_KEY").withValueFrom( new EnvVarSourceBuilder().withNewSecretKeyRef(KubeApiKeyUtils.SERVING_API_KEY_SECRET_KEY, kubeApiKeyUtils.getProjectServingApiKeySecretName(user), false).build()).build()); + Map servingEnvVars = servingConfig.getEnvVars(user, false); + servingEnvVars.forEach((key, value) -> environment.add(new EnvVarBuilder().withName(key).withValue(value).build())); - if (settings.getKubeKFServingInstalled()) { - Pair istioIngressHostPort = kubeIstioClientService.getIstioIngressHostPort(); - environment.add(new EnvVarBuilder().withName("ISTIO_ENDPOINT") - .withValue("http://" + istioIngressHostPort.getL() + ":" + istioIngressHostPort.getR()).build()); - } - List containers = new ArrayList<>(); VolumeMount logMount = new VolumeMountBuilder() .withName("logs") @@ -459,7 +447,7 @@ private PodSpec buildPodSpec(Project project, String kubeProjectUser, private Deployment buildDeployment(String name, String kubeProjectUser, JupyterPaths jupyterPaths, String anacondaEnv, String pythonKernelName, String secretDir, String certificatesDir, String hadoopUser, String token, DockerJobConfiguration dockerConfig, Integer nodePort, String jupyterMode, boolean isGit, Project project, - Users user) throws ServiceDiscoveryException { + Users user) throws ServiceDiscoveryException, UnsupportedEncodingException, ApiKeyException { ResourceRequirements resourceRequirements = kubeClientService. buildResourceRequirements(dockerConfig.getResourceConfig()); diff --git a/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/serving/KubeServingConfig.java b/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/serving/KubeServingConfig.java new file mode 100755 index 0000000000..198a19232d --- /dev/null +++ b/hopsworks-kube/src/main/java/io/hops/hopsworks/kube/serving/KubeServingConfig.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2022, Logical Clocks AB. All rights reserved + */ + +package io.hops.hopsworks.kube.serving; + +import io.hops.common.Pair; +import io.hops.hopsworks.common.serving.ServingConfig; +import io.hops.hopsworks.common.util.Settings; +import io.hops.hopsworks.exceptions.ApiKeyException; +import io.hops.hopsworks.kube.common.KubeIstioClientService; +import io.hops.hopsworks.kube.common.KubeStereotype; +import io.hops.hopsworks.kube.security.KubeApiKeyUtils; +import io.hops.hopsworks.persistence.entity.user.Users; +import io.hops.hopsworks.persistence.entity.user.security.apiKey.ApiKey; +import io.hops.hopsworks.restutils.RESTCodes; + +import javax.ejb.EJB; +import javax.ejb.Stateless; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; + +@KubeStereotype +@Stateless +@TransactionAttribute(TransactionAttributeType.NEVER) +public class KubeServingConfig implements ServingConfig { + + @EJB + private KubeApiKeyUtils kubeApiKeyUtils; + @EJB + private KubeIstioClientService kubeIstioClientService; + @EJB + private Settings settings; + + @Override + public Map getEnvVars(Users user, boolean includeSecrets) throws ApiKeyException, + UnsupportedEncodingException { + return settings.getKubeKFServingInstalled() + ? getKFServingEnvVars(user, includeSecrets) // kfserving env vars + : null; + } + + @Override + public String getClassName() { + return KubeServingConfig.class.getName(); + } + + private Map getKFServingEnvVars(Users user, boolean includeSecrets) throws ApiKeyException, + UnsupportedEncodingException { + Map envVars = new HashMap<>(); + + if (includeSecrets) { + // add serving api key + Optional apiKey = kubeApiKeyUtils.getServingApiKey(user); + if (!apiKey.isPresent()) { + throw new ApiKeyException(RESTCodes.ApiKeyErrorCode.KEY_NOT_FOUND, Level.SEVERE, + "Serving API key for user " + user.getUsername() + " not found"); + } + String secretName = kubeApiKeyUtils.getServingApiKeySecretName(apiKey.get().getPrefix()); + String secret = kubeApiKeyUtils.getServingApiKeyValueFromKubeSecret(secretName); + envVars.put("SERVING_API_KEY", secret); + } + + // add istio endpoint + Pair istioIngressHostPort = kubeIstioClientService.getIstioIngressHostPort(); + String istioEndpoint = kubeIstioClientService.getIstioEndpoint(istioIngressHostPort); + envVars.put("ISTIO_ENDPOINT", istioEndpoint); + + return envVars; + } +} + \ No newline at end of file diff --git a/hopsworks-web/yo/app/scripts/controllers/servingCtrl.js b/hopsworks-web/yo/app/scripts/controllers/servingCtrl.js index 67ce29644c..f737ff7ea4 100644 --- a/hopsworks-web/yo/app/scripts/controllers/servingCtrl.js +++ b/hopsworks-web/yo/app/scripts/controllers/servingCtrl.js @@ -791,7 +791,7 @@ angular.module('hopsWorksApp') self.artifactVersion = self.artifactVersions.slice(-1)[0]; self.editServing.artifactVersion = self.artifactVersion.key; } else { - version = self.findArtifactVersion(self.editServing.artifactVersion, self.artifactVersions.slice(-1)[0]); + const version = self.findArtifactVersion(self.editServing.artifactVersion, self.artifactVersions.slice(-1)[0]); self.artifactVersion = version; self.editServing.artifactVersion = self.artifactVersion.key; } @@ -1028,10 +1028,12 @@ angular.module('hopsWorksApp') self.validateTransformer(); if (self.artifactVersions && self.artifactVersions.length > 0) { self.artifactVersion = self.artifactVersions[0]; - for (var idx in self.artifactVersions) { - if (self.artifactVersions[idx].value === self.artifactModelOnly) { - self.artifactVersion = self.artifactVersions[idx]; - break; + if (!self.editServing.predictor) { + for (var idx in self.artifactVersions) { + if (self.artifactVersions[idx].value === self.artifactModelOnly) { + self.artifactVersion = self.artifactVersions[idx]; + break; + } } } self.editServing.artifactVersion = self.artifactVersion.key; diff --git a/jobs-ee/src/main/java/io/hops/hopsworks/jobs/KubeExecutionController.java b/jobs-ee/src/main/java/io/hops/hopsworks/jobs/KubeExecutionController.java index eb73e0d5f6..b7d76d3082 100644 --- a/jobs-ee/src/main/java/io/hops/hopsworks/jobs/KubeExecutionController.java +++ b/jobs-ee/src/main/java/io/hops/hopsworks/jobs/KubeExecutionController.java @@ -46,9 +46,11 @@ import io.hops.hopsworks.common.jobs.yarn.YarnLogUtil; import io.hops.hopsworks.common.jupyter.JupyterController; import io.hops.hopsworks.common.kafka.KafkaBrokers; +import io.hops.hopsworks.common.serving.ServingConfig; import io.hops.hopsworks.common.util.HopsUtils; import io.hops.hopsworks.common.util.ProjectUtils; import io.hops.hopsworks.common.util.Settings; +import io.hops.hopsworks.exceptions.ApiKeyException; import io.hops.hopsworks.exceptions.GenericException; import io.hops.hopsworks.exceptions.JobException; import io.hops.hopsworks.exceptions.ProjectException; @@ -74,6 +76,7 @@ import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; +import javax.inject.Inject; import java.io.File; import java.io.IOException; import java.nio.file.Path; @@ -128,6 +131,8 @@ public class KubeExecutionController extends AbstractExecutionController impleme private KubeProjectConfigMaps kubeProjectConfigMaps; @EJB private KubeApiKeyUtils kubeApiKeyUtils; + @Inject + private ServingConfig servingConfig; @Override @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) @@ -568,7 +573,7 @@ private List buildContainers(JobType jobType, private List getPrimaryContainerEnv(JobType jobType, Users user, String hadoopUser, Project project, Execution execution, String certificatesDir, String secretsDir, String anacondaEnv, - DockerJobConfiguration dockerJobConfiguration) throws ServiceDiscoveryException, IOException { + DockerJobConfiguration dockerJobConfiguration) throws ServiceDiscoveryException, IOException, ApiKeyException { List environment = new ArrayList<>(); switch (jobType) { @@ -632,9 +637,14 @@ private List getPrimaryContainerEnv(JobType jobType, Users user, String environment.add(new EnvVarBuilder().withName("APP_ARGS").withValue(execution.getArgs()).build()); environment.add(new EnvVarBuilder().withName("APP_FILES") .withValue(((PythonJobConfiguration)dockerJobConfiguration).getFiles()).build()); + + // serving env vars environment.add(new EnvVarBuilder().withName("SERVING_API_KEY").withValueFrom( new EnvVarSourceBuilder().withNewSecretKeyRef(KubeApiKeyUtils.SERVING_API_KEY_SECRET_KEY, kubeApiKeyUtils.getProjectServingApiKeySecretName(user), false).build()).build()); + Map servingEnvVars = servingConfig.getEnvVars(user, false); + servingEnvVars.forEach((key, value) -> environment.add( + new EnvVarBuilder().withName(key).withValue(value).build())); break; case DOCKER: if (dockerJobConfiguration.getEnvVars() != null && !dockerJobConfiguration.getEnvVars().isEmpty()) {