Skip to content

Commit

Permalink
[HOPSWORKS-2828][fix] Add serving env vars in notebooks and jobs (#786)
Browse files Browse the repository at this point in the history
* [HOPSWORKS-2828][fix] Add serving env vars in notebooks and jobs

* [HOPSWORKS-2828][fix] Fix file selection in serving page
  • Loading branch information
javierdlrm committed Feb 10, 2022
1 parent 1f479c1 commit d0d2141
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.logicalclocks.servicediscoverclient.service.Service;
import freemarker.template.TemplateException;
import io.hops.hopsworks.common.jobs.JobController;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.exceptions.ApiKeyException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
Expand All @@ -64,6 +66,7 @@
import io.hops.hopsworks.common.util.templates.jupyter.SparkMagicConfigTemplate;
import io.hops.hopsworks.common.util.templates.jupyter.SparkMagicConfigTemplateBuilder;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import org.apache.commons.io.FileUtils;

Expand Down Expand Up @@ -111,12 +114,14 @@ public class JupyterConfigFilesGenerator {
private HiveController hiveController;
@EJB
private JobController jobController;
@Inject
private ServingConfig servingConfig;

public JupyterPaths generateJupyterPaths(Project project, String hdfsUser, String secretConfig) {
return new JupyterPaths(settings.getJupyterDir(), project.getName(), hdfsUser, secretConfig);
}

public JupyterPaths generateConfiguration(Project project, String secretConfig, String hdfsUser,
public JupyterPaths generateConfiguration(Project project, String secretConfig, String hdfsUser, Users hopsworksUser,
JupyterSettings js, Integer port, String allowOrigin)
throws ServiceException, JobException {
boolean newDir = false;
Expand All @@ -125,8 +130,8 @@ public JupyterPaths generateConfiguration(Project project, String secretConfig,

try {
newDir = createJupyterDirs(jp);
createConfigFiles(jp, hdfsUser, project, port, js, allowOrigin);
} catch (IOException | ServiceException | ServiceDiscoveryException e) {
createConfigFiles(jp, hdfsUser, hopsworksUser, project, port, js, allowOrigin);
} catch (IOException | ServiceException | ServiceDiscoveryException | ApiKeyException e) {
if (newDir) { // if the folder was newly created delete it
removeProjectUserDirRecursive(jp);
}
Expand Down Expand Up @@ -266,8 +271,8 @@ public void createJupyterNotebookConfig(Writer out, Project project, int port,
}

public void createSparkMagicConfig(Writer out, Project project, JupyterSettings js, String hdfsUser,
String confDirPath, Map<String, String> extraEnvVars) throws IOException, ServiceDiscoveryException,
JobException {
Users hopsworksUser, String confDirPath) throws IOException, ServiceDiscoveryException,
JobException, ApiKeyException {

SparkJobConfiguration sparkJobConfiguration = (SparkJobConfiguration) js.getJobConfig();

Expand All @@ -291,9 +296,9 @@ public void createSparkMagicConfig(Writer out, Project project, JupyterSettings
constructServiceFQDNWithPort(ServiceDiscoveryController.HopsworksService.HOPSWORKS_APP);

finalSparkConfiguration.putAll(
sparkConfigurationUtil.setFrameworkProperties(project, sparkJobConfiguration, settings, hdfsUser,
extraJavaOptions, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, serviceDiscoveryController,
extraEnvVars));
sparkConfigurationUtil.setFrameworkProperties(project, sparkJobConfiguration, settings, hdfsUser, hopsworksUser,
extraJavaOptions, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, servingConfig,
serviceDiscoveryController));

StringBuilder sparkConfBuilder = new StringBuilder();
ArrayList<String> keys = new ArrayList<>(finalSparkConfiguration.keySet());
Expand Down Expand Up @@ -337,9 +342,9 @@ public void createSparkMagicConfig(Writer out, Project project, JupyterSettings
}

// returns true if one of the conf files were created anew
private void createConfigFiles(JupyterPaths jp, String hdfsUser, Project project,
private void createConfigFiles(JupyterPaths jp, String hdfsUser, Users hopsworksUser, Project project,
Integer port, JupyterSettings js, String allowOrigin)
throws IOException, ServiceException, ServiceDiscoveryException, JobException {
throws IOException, ServiceException, ServiceDiscoveryException, JobException, ApiKeyException {
String confDirPath = jp.getConfDirPath();
String kernelsDir = jp.getKernelsDir();
String certsDir = jp.getCertificatesDir();
Expand All @@ -366,7 +371,7 @@ private void createConfigFiles(JupyterPaths jp, String hdfsUser, Project project

if (!sparkmagic_config_file.exists()) {
try (Writer out = new FileWriter(sparkmagic_config_file, false)) {
createSparkMagicConfig(out, project, js, hdfsUser, confDirPath, null);
createSparkMagicConfig(out, project, js, hdfsUser, hopsworksUser, confDirPath);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class ErasureCodeJob extends YarnJob {
public ErasureCodeJob(Jobs job, AsynchronousJobExecutor services,
Users user, String jobUser, String hadoopDir, Settings settings,
ServiceDiscoveryController serviceDiscoveryController) {
super(job, services, user, jobUser, hadoopDir, settings,null, null, serviceDiscoveryController);
super(job, services, user, jobUser, hadoopDir, settings,null, null, null, serviceDiscoveryController);

if (!(job.getJobConfig() instanceof ErasureCodeJobConfiguration)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.hdfs.user.HdfsUsers;
Expand Down Expand Up @@ -79,6 +80,7 @@
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -119,6 +121,8 @@ public class FlinkController {
private KafkaBrokers kafkaBrokers;
@EJB
private ServiceDiscoveryController serviceDiscoveryController;
@Inject
private ServingConfig servingConfig;


public Execution startJob(final Jobs job, final Users user)
Expand All @@ -144,7 +148,7 @@ public Execution startJob(final Jobs job, final Users user)
try {
flinkjob = proxyUser.doAs((PrivilegedExceptionAction<FlinkJob>) () -> new FlinkJob(job, submitter, user,
hdfsUsersBean.getHdfsUserName(job.getProject(), job.getCreator()), settings,
kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, serviceDiscoveryController));
kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, servingConfig, serviceDiscoveryController));
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
package io.hops.hopsworks.common.jobs.flink;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
Expand Down Expand Up @@ -67,10 +68,10 @@ public class FlinkJob extends YarnJob {
private FlinkYarnRunnerBuilder flinkBuilder;

FlinkJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser,
Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint,
Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig,
ServiceDiscoveryController serviceDiscoveryController) {
super(job, services, user, jobUser, settings.getHadoopSymbolicLinkDir(), settings,
kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController);
kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController);

if (!(job.getJobConfig() instanceof FlinkJobConfiguration)) {
throw new IllegalArgumentException("Job must contain a FlinkJobConfiguration object. Received: "
Expand All @@ -92,8 +93,9 @@ protected boolean setupJob() throws JobException {
}
try {
runner = flinkBuilder
.getYarnRunner(jobs.getProject(), jobUser, services.getFileOperations(hdfsUser.getUserName()),
yarnClient, services, settings, kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController);
.getYarnRunner(jobs.getProject(), jobUser, user, services.getFileOperations(hdfsUser.getUserName()),
yarnClient, services, settings, kafkaBrokersString, hopsworksRestEndpoint, servingConfig,
serviceDiscoveryController);

} catch (IOException | ServiceDiscoveryException e) {
LOG.log(Level.SEVERE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jobs.AsynchronousJobExecutor;
import io.hops.hopsworks.common.jobs.yarn.YarnRunner;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.common.util.FlinkConfigurationUtil;
import io.hops.hopsworks.common.util.ProjectUtils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.flink.FlinkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -105,9 +107,9 @@ void addDynamicProperty(String name, String value) {
dynamicProperties.put(name, value);
}

YarnRunner getYarnRunner(Project project, String jobUser, DistributedFileSystemOps dfsClient,
YarnRunner getYarnRunner(Project project, String jobUser, Users hopsworksUser, DistributedFileSystemOps dfsClient,
YarnClient yarnClient, AsynchronousJobExecutor services, Settings settings,
String kafkaBrokersString, String hopsworksRestEndpoint,
String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig,
ServiceDiscoveryController serviceDiscoveryController)
throws IOException, ServiceDiscoveryException {

Expand All @@ -134,8 +136,8 @@ YarnRunner getYarnRunner(Project project, String jobUser, DistributedFileSystemO
project.getName().toLowerCase() + "," + job.getName() + "," + job.getId() + "," + YarnRunner.APPID_PLACEHOLDER);

Map<String, String> finalJobProps = flinkConfigurationUtil
.setFrameworkProperties(project, job.getJobConfig(), settings, jobUser, extraJavaOptions,
kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController, null);
.setFrameworkProperties(project, job.getJobConfig(), settings, jobUser, hopsworksUser, extraJavaOptions,
kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController);

//Parse properties from Spark config file
Yaml yaml = new Yaml();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.jupyter.JupyterController;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.common.util.HopsUtils;
import io.hops.hopsworks.persistence.entity.jobs.configuration.history.JobState;
import io.hops.hopsworks.persistence.entity.jobs.history.Execution;
Expand All @@ -72,6 +73,7 @@
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.jar.Attributes;
Expand Down Expand Up @@ -107,6 +109,8 @@ public class SparkController {
private KafkaBrokers kafkaBrokers;
@EJB
private ServiceDiscoveryController serviceDiscoveryController;
@Inject
private ServingConfig servingConfig;

/**
* Start the Spark job as the given user.
Expand Down Expand Up @@ -277,7 +281,7 @@ private SparkJob createSparkJob(String username, Jobs job, Users user) throws Jo
sparkjob = proxyUser.doAs((PrivilegedExceptionAction<SparkJob>) () ->
new SparkJob(job, submitter, user, settings.getHadoopSymbolicLinkDir(),
hdfsUsersBean.getHdfsUserName(job.getProject(), user),
settings, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint,
settings, kafkaBrokers.getKafkaBrokersString(), hopsworksRestEndpoint, servingConfig,
serviceDiscoveryController));
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
*/
package io.hops.hopsworks.common.jobs.spark;

import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.persistence.entity.jobs.configuration.JobType;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
Expand Down Expand Up @@ -65,8 +66,8 @@ public class SparkJob extends YarnJob {

SparkJob(Jobs job, AsynchronousJobExecutor services, Users user, final String hadoopDir,
String jobUser, Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint,
ServiceDiscoveryController serviceDiscoveryController) {
super(job, services, user, jobUser, hadoopDir, settings, kafkaBrokersString, hopsworksRestEndpoint,
ServingConfig servingConfig, ServiceDiscoveryController serviceDiscoveryController) {
super(job, services, user, jobUser, hadoopDir, settings, kafkaBrokersString, hopsworksRestEndpoint, servingConfig,
serviceDiscoveryController);
if (!(job.getJobConfig() instanceof SparkJobConfiguration)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -108,9 +109,9 @@ protected boolean setupJob() {
try {
runner = runnerbuilder.
getYarnRunner(jobs.getProject(),
jobUser,
jobUser, user,
services, services.getFileOperations(hdfsUser.getUserName()), yarnClient,
settings, kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController);
settings, kafkaBrokersString, hopsworksRestEndpoint, servingConfig, serviceDiscoveryController);
} catch (Exception e) {
LOG.log(Level.WARNING,
"Failed to create YarnRunner.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.exceptions.ApiKeyException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.persistence.entity.jobs.configuration.spark.SparkJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
Expand All @@ -54,6 +56,7 @@
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.common.util.SparkConfigurationUtil;
import io.hops.hopsworks.common.util.templates.ConfigProperty;
import io.hops.hopsworks.persistence.entity.user.Users;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.client.api.YarnClient;
Expand Down Expand Up @@ -116,11 +119,12 @@ public SparkYarnRunnerBuilder(Jobs job) {
* @return The YarnRunner instance to launch the Spark job on Yarn.
* @throws IOException If creation failed.
*/
public YarnRunner getYarnRunner(Project project, String jobUser, AsynchronousJobExecutor services,
final DistributedFileSystemOps dfsClient, final YarnClient yarnClient,
Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint,
public YarnRunner getYarnRunner(Project project, String jobUser, Users hopsworksUser,
AsynchronousJobExecutor services, final DistributedFileSystemOps dfsClient,
final YarnClient yarnClient, Settings settings, String kafkaBrokersString,
String hopsworksRestEndpoint, ServingConfig servingConfig,
ServiceDiscoveryController serviceDiscoveryController)
throws IOException, ServiceDiscoveryException, JobException {
throws IOException, ServiceDiscoveryException, JobException, ApiKeyException {

Map<String, ConfigProperty> jobHopsworksProps = new HashMap<>();
JobType jobType = job.getJobConfig().getJobType();
Expand Down Expand Up @@ -194,7 +198,8 @@ public YarnRunner getYarnRunner(Project project, String jobUser, AsynchronousJob
Map<String, String> finalJobProps = new HashMap<>();

finalJobProps.putAll(sparkConfigurationUtil.setFrameworkProperties(project, job.getJobConfig(), settings,
jobUser, extraJavaOptions, kafkaBrokersString, hopsworksRestEndpoint, serviceDiscoveryController, null));
jobUser, hopsworksUser, extraJavaOptions, kafkaBrokersString, hopsworksRestEndpoint, servingConfig,
serviceDiscoveryController));

finalJobProps.put(Settings.SPARK_YARN_APPMASTER_SPARK_USER, jobUser);
finalJobProps.put(Settings.SPARK_EXECUTOR_SPARK_USER, jobUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

package io.hops.hopsworks.common.jobs.yarn;

import io.hops.hopsworks.common.serving.ServingConfig;
import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.LocalResourceDTO;
import io.hops.hopsworks.persistence.entity.jobs.configuration.yarn.YarnJobConfiguration;
import io.hops.hopsworks.persistence.entity.jobs.description.Jobs;
Expand Down Expand Up @@ -82,6 +83,7 @@ public abstract class YarnJob extends HopsJob {
protected Settings settings;
protected String kafkaBrokersString;
protected String hopsworksRestEndpoint;
protected ServingConfig servingConfig;

protected ServiceDiscoveryController serviceDiscoveryController;

Expand All @@ -98,7 +100,7 @@ public abstract class YarnJob extends HopsJob {
YarnJobConfiguration object.
*/
public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jobUser, String hadoopDir,
Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint,
Settings settings, String kafkaBrokersString, String hopsworksRestEndpoint, ServingConfig servingConfig,
ServiceDiscoveryController serviceDiscoveryController) {
super(job, services, user, hadoopDir);

Expand All @@ -114,6 +116,7 @@ public YarnJob(Jobs job, AsynchronousJobExecutor services, Users user, String jo
this.settings = settings;
this.kafkaBrokersString = kafkaBrokersString;
this.hopsworksRestEndpoint = hopsworksRestEndpoint;
this.servingConfig = servingConfig;
this.serviceDiscoveryController = serviceDiscoveryController;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public JupyterDTO startJupyterServer(Project project, String secretConfig, Strin
String prog = settings.getSudoersDir() + "/jupyter.sh";

Integer port = ThreadLocalRandom.current().nextInt(40000, 59999);
JupyterPaths jp = jupyterConfigFilesGenerator.generateConfiguration(project, secretConfig, hdfsUser,
JupyterPaths jp = jupyterConfigFilesGenerator.generateConfiguration(project, secretConfig, hdfsUser, user,
js, port, allowOrigin);
String secretDir = settings.getStagingDir() + Settings.PRIVATE_DIRS + js.getSecret();

Expand Down
Loading

0 comments on commit d0d2141

Please sign in to comment.