Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ public class SparkInterpreter extends Interpreter {
SparkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.app.name", "Zeppelin", "The name of spark application")
.add("master", getMaster(),
.add("master",
getSystemDefault("MASTER", "spark.master", "local[*]"),
"spark master uri. ex) spark://masterhost:7077")
.add("spark.executor.memory", "1g", "executor memory per worker instance")
.add("spark.cores.max", "1", "total number of cores to use")
.add("spark.executor.memory",
getSystemDefault(null, "spark.executor.memory", "512m"),
"executor memory per worker instance. ex) 512m, 32g")
.add("spark.cores.max",
getSystemDefault(null, "spark.cores.max", ""),
"total number of cores to use. Empty value uses all available core")
.add("args", "", "spark commandline args").build());

}
Expand Down Expand Up @@ -142,26 +147,42 @@ public SparkContext createSparkContext() {
conf.set("spark.scheduler.mode", "FAIR");

Properties intpProperty = getProperty();

for (Object k : intpProperty.keySet()) {
String key = (String) k;
if (key.startsWith("spark.")) {
conf.set(key, intpProperty.getProperty(key));
Object value = intpProperty.get(key);
if (value != null
&& value instanceof String
&& !((String) value).trim().isEmpty()) {
conf.set(key, (String) value);
}
}
}

SparkContext sparkContext = new SparkContext(conf);
return sparkContext;
}

public static String getMaster() {
String envMaster = System.getenv().get("MASTER");
if (envMaster != null) {
return envMaster;
private static String getSystemDefault(
String envName,
String propertyName,
String defaultValue) {

if (envName != null && !envName.isEmpty()) {
String envValue = System.getenv().get(envName);
if (envValue != null) {
return envValue;
}
}
String propMaster = System.getProperty("spark.master");
if (propMaster != null) {
return propMaster;

if (propertyName != null && !propertyName.isEmpty()) {
String propValue = System.getProperty(propertyName);
if (propValue != null) {
return propValue;
}
}
return "local[*]";
return defaultValue;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public Properties getProperty() {
.findRegisteredInterpreterByClassName(getClassName()).getProperties();
for (String k : defaultProperties.keySet()) {
if (!p.contains(k)) {
p.put(k, defaultProperties.get(k).getDefaultValue());
String value = defaultProperties.get(k).getDefaultValue();
if (value != null) {
p.put(k, defaultProperties.get(k).getDefaultValue());
}
}
}

Expand Down