diff --git a/README.md b/README.md
index cca45d4391d..905b21456d6 100644
--- a/README.md
+++ b/README.md
@@ -199,6 +199,56 @@ Yarn
# ./conf/zeppelin-env.sh
export SPARK_HOME=/path/to/spark_dir
+Yarn Cluster
+
+- Install Livy server in the cluster master node from : https://github.com/cloudera/hue/tree/master/apps/spark/java
+- Start Livy server in **yarn mode**, refer to Livy documentation.
+- You need to white list spark configuration in Livy in order to be able to send custom spark configuration. Copy the file `spark-user-configurable-options.template` in the conf dir of Livy to `spark-user-configurable-options.conf`
+- Zeppelin note conf:
+
+ ```
+ livy.server.host to cluster-master-node-ip:8998
+ spark.driver.cores
+ spark.driver.memory
+ spark.executor.instances
+ spark.executor.cores
+ spark.dynamicAllocation.enabled
+ spark.dynamicAllocation.cachedExecutorIdleTimeout
+ spark.dynamicAllocation.minExecutors
+ spark.dynamicAllocation.initialExecutors
+ spark.dynamicAllocation.maxExecutors
+ ```
+
+- examples
+ - spark
+
+ ```
+ %livy.spark
+ object HelloWorld {
+ def main(args: Array[String]) {
+ println("Hello, world!")
+ }
+ }
+ HelloWorld.main(null)
+ ```
+
+ - pyspark
+
+ ```
+ %livy.pyspark
+ def welcome(name):
+ print 'Hello', name
+ welcome('Livy')
+ ```
+
+ - sparkR
+
+ ```
+ %livy.sparkr
+ msg <- "Hello sparkR"
+ print(msg)
+ ```
+
### Run
./bin/zeppelin-daemon.sh start
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 16e26b76f6f..6aedd1c94ba 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -138,7 +138,7 @@
zeppelin.interpreters
- org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter
+ org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.RestSparkInterpreter,org.apache.zeppelin.livy.RestSparkRInterpreter,org.apache.zeppelin.livy.RestPySparkInterpreter
Comma separated interpreter configurations. First interpreter become a default
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/Http.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/Http.java
new file mode 100644
index 00000000000..ebf382b842c
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/Http.java
@@ -0,0 +1,50 @@
+package org.apache.zeppelin.livy;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.Response;
+
+/**
+ *
+ * */
+public class Http {
+
+ static AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
+
+ private static Response getFromFuture(Future f) {
+ try {
+ Response r = f.get();
+ return r;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+
+ public static Response get(String url) {
+ Future f = asyncHttpClient.prepareGet(url).execute();
+ return getFromFuture(f);
+ }
+
+
+ public static Response post(String url, String jsonString) {
+ final AsyncHttpClient.BoundRequestBuilder builder = asyncHttpClient.preparePost(url);
+
+ builder.addHeader("Content-Type", "application/json");
+ builder.setBody(jsonString);
+ Future f = builder.execute();
+ return getFromFuture(f);
+
+ }
+
+ public static Response delete(String url) {
+ Future f = asyncHttpClient.prepareDelete(url).execute();
+ return getFromFuture(f);
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/RestPySparkInterpreter.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/RestPySparkInterpreter.java
new file mode 100644
index 00000000000..d058a528083
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/RestPySparkInterpreter.java
@@ -0,0 +1,232 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.zeppelin.livy.Http.*;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.http.client.Response;
+
+/**
+ *
+ *
+ */
+public class RestPySparkInterpreter extends Interpreter {
+
+ private Session session = null;
+ private final String host;
+
+ public RestPySparkInterpreter(Properties property) {
+ super(property);
+ host = getProperty("livy.server.host");
+ }
+
+ public static Logger logger = LoggerFactory.getLogger(RestPySparkInterpreter.class);
+
+ static {
+
+ Interpreter
+ .register(
+ "pyspark",
+ "livy",
+ RestPySparkInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("spark.app.name",
+ getSystemDefault(null, "spark.app.name",
+ "Zeppelin_yarn_cluster"), "The name of spark application.")
+ .add("livy.server.host",
+ getSystemDefault(
+ null, "livy.server.host", "localhost:8998"),
+ "The host of livy server.")
+ .add("spark.driver.cores", getSystemDefault(
+ null, "spark.driver.cores", "1"),
+ "Driver cores. ex) 1, 2")
+ .add("spark.driver.memory", getSystemDefault(
+ null, "spark.driver.memory", "512m"),
+ "Driver memory. ex) 512m, 32g")
+ .add("spark.executor.instances",
+ getSystemDefault(null, "spark.executor.instances", "3"),
+ "Executor instances. ex) 1, 4")
+ .add("spark.executor.cores", getSystemDefault(
+ null, "spark.executor.cores", "1"),
+ "Num cores per executor. ex) 1, 4")
+ .add("spark.executor.memory",
+ getSystemDefault(
+ null, "spark.executor.memory", "512m"),
+ "Executor memory per worker instance. ex) 512m, 32g")
+ .add("spark.dynamicAllocation.enabled",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.enabled", "false"),
+ "Use dynamic resource allocation")
+ .add(
+ "spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ "120s"), "Remove an executor which has cached data blocks")
+ .add("spark.dynamicAllocation.minExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.minExecutors", "0"),
+ "Lower bound for the number of executors if dynamic allocation is enabled. ")
+ .add("spark.dynamicAllocation.initialExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.initialExecutors", "1"),
+ "Initial number of executors to run if dynamic allocation is enabled. ")
+ .add("spark.dynamicAllocation.maxExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.maxExecutors", "10"),
+ "Upper bound for the number of executors if dynamic allocation is enabled. ")
+ .build());
+ }
+
+ public static String getSystemDefault(
+ String envName,
+ String propertyName,
+ String defaultValue) {
+
+ if (envName != null && !envName.isEmpty()) {
+ String envValue = System.getenv().get(envName);
+ if (envValue != null) {
+ return envValue;
+ }
+ }
+
+ if (propertyName != null && !propertyName.isEmpty()) {
+ String propValue = System.getProperty(propertyName);
+ if (propValue != null) {
+ return propValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ if (session != null) {
+ SessionFactory.deleteSession(session);
+ }
+ }
+
+ private boolean checkLivyServer() {
+ try {
+ Response r = get(host);
+ if (r.hasResponseStatus()) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ logger.info("Interpreter exception", e);
+ return false;
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+
+// Check if livy server is running in that host
+ if (!checkLivyServer()) {
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+
+
+
+ if (session != null) {
+ try {
+ session = SessionFactory.getSession(session);
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+ }
+ if (session == null) {
+ try {
+ session = SessionFactory.createSession(host, "pyspark", getProperty());
+
+ if (session == null) {
+ return new InterpreterResult(Code.ERROR, "Can not create a session, please try again.");
+ }
+
+ if (session.state.equals("error")) {
+ SessionFactory.deleteSession(session);
+
+ return new InterpreterResult(Code.ERROR,
+ "Resources aren't enough or error happened while creating session,"
+ + " please try again.");
+ }
+
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+ }
+ Statement statement = new Statement();
+ try {
+ statement = session.createStatement(st);
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, "Can not create a statement, please try again.");
+
+ }
+ if (statement.state.equals("available") && statement.output.status.equals("ok")) {
+ return new InterpreterResult(Code.SUCCESS, statement.output.data.get("text/plain"));
+ }
+
+ return new InterpreterResult(Code.ERROR, statement.output.evalue);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/RestSparkInterpreter.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/RestSparkInterpreter.java
new file mode 100644
index 00000000000..a3d9fe9d3fd
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/RestSparkInterpreter.java
@@ -0,0 +1,232 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.zeppelin.livy.Http.*;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.http.client.Response;
+
+/**
+ *
+ *
+ */
+public class RestSparkInterpreter extends Interpreter {
+
+ private Session session = null;
+ private final String host;
+
+ public RestSparkInterpreter(Properties property) {
+ super(property);
+ host = getProperty("livy.server.host");
+ }
+
+ public static Logger logger = LoggerFactory.getLogger(RestSparkInterpreter.class);
+
+ static {
+
+ Interpreter
+ .register(
+ "spark",
+ "livy",
+ RestSparkInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("spark.app.name",
+ getSystemDefault(null, "spark.app.name",
+ "Zeppelin_yarn_cluster"), "The name of spark application.")
+ .add("livy.server.host",
+ getSystemDefault(
+ null, "livy.server.host", "localhost:8998"),
+ "The host of livy server.")
+ .add("spark.driver.cores", getSystemDefault(
+ null, "spark.driver.cores", "1"),
+ "Driver cores. ex) 1, 2")
+ .add("spark.driver.memory", getSystemDefault(
+ null, "spark.driver.memory", "512m"),
+ "Driver memory. ex) 512m, 32g")
+ .add("spark.executor.instances",
+ getSystemDefault(null, "spark.executor.instances", "3"),
+ "Executor instances. ex) 1, 4")
+ .add("spark.executor.cores", getSystemDefault(
+ null, "spark.executor.cores", "1"),
+ "Num cores per executor. ex) 1, 4")
+ .add("spark.executor.memory",
+ getSystemDefault(
+ null, "spark.executor.memory", "512m"),
+ "Executor memory per worker instance. ex) 512m, 32g")
+ .add("spark.dynamicAllocation.enabled",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.enabled", "false"),
+ "Use dynamic resource allocation")
+ .add(
+ "spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ "120s"), "Remove an executor which has cached data blocks")
+ .add("spark.dynamicAllocation.minExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.minExecutors", "0"),
+ "Lower bound for the number of executors if dynamic allocation is enabled. ")
+ .add("spark.dynamicAllocation.initialExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.initialExecutors", "1"),
+ "Initial number of executors to run if dynamic allocation is enabled. ")
+ .add("spark.dynamicAllocation.maxExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.maxExecutors", "10"),
+ "Upper bound for the number of executors if dynamic allocation is enabled. ")
+ .build());
+ }
+
+ public static String getSystemDefault(
+ String envName,
+ String propertyName,
+ String defaultValue) {
+
+ if (envName != null && !envName.isEmpty()) {
+ String envValue = System.getenv().get(envName);
+ if (envValue != null) {
+ return envValue;
+ }
+ }
+
+ if (propertyName != null && !propertyName.isEmpty()) {
+ String propValue = System.getProperty(propertyName);
+ if (propValue != null) {
+ return propValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ if (session != null) {
+ SessionFactory.deleteSession(session);
+ }
+ }
+
+ private boolean checkLivyServer() {
+ try {
+ Response r = get(host);
+ if (r.hasResponseStatus()) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ logger.info("Interpreter exception", e);
+ return false;
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+
+// Check if livy server is running in that host
+ if (!checkLivyServer()) {
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+
+
+
+ if (session != null) {
+ try {
+ session = SessionFactory.getSession(session);
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+ }
+ if (session == null) {
+ try {
+ session = SessionFactory.createSession(host, "spark", getProperty());
+
+ if (session == null) {
+ return new InterpreterResult(Code.ERROR, "Can not create a session, please try again.");
+ }
+
+ if (session.state.equals("error")) {
+ SessionFactory.deleteSession(session);
+
+ return new InterpreterResult(Code.ERROR,
+ "Resources aren't enough or error happened while creating session,"
+ + " please try again.");
+ }
+
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+ }
+ Statement statement = new Statement();
+ try {
+ statement = session.createStatement(st);
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, "Can not create a statement, please try again.");
+
+ }
+ if (statement.state.equals("available") && statement.output.status.equals("ok")) {
+ return new InterpreterResult(Code.SUCCESS, statement.output.data.get("text/plain"));
+ }
+
+ return new InterpreterResult(Code.ERROR, statement.output.evalue);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/RestSparkRInterpreter.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/RestSparkRInterpreter.java
new file mode 100644
index 00000000000..e580e1dec77
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/RestSparkRInterpreter.java
@@ -0,0 +1,232 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.zeppelin.livy.Http.*;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.http.client.Response;
+
+/**
+ *
+ *
+ */
+public class RestSparkRInterpreter extends Interpreter {
+
+ private Session session = null;
+ private final String host;
+
+ public RestSparkRInterpreter(Properties property) {
+ super(property);
+ host = getProperty("livy.server.host");
+ }
+
+ public static Logger logger = LoggerFactory.getLogger(RestSparkRInterpreter.class);
+
+ static {
+
+ Interpreter
+ .register(
+ "sparkr",
+ "livy",
+ RestSparkRInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("spark.app.name",
+ getSystemDefault(null, "spark.app.name",
+ "Zeppelin_yarn_cluster"), "The name of spark application.")
+ .add("livy.server.host",
+ getSystemDefault(
+ null, "livy.server.host", "localhost:8998"),
+ "The host of livy server.")
+ .add("spark.driver.cores", getSystemDefault(
+ null, "spark.driver.cores", "1"),
+ "Driver cores. ex) 1, 2")
+ .add("spark.driver.memory", getSystemDefault(
+ null, "spark.driver.memory", "512m"),
+ "Driver memory. ex) 512m, 32g")
+ .add("spark.executor.instances",
+ getSystemDefault(null, "spark.executor.instances", "3"),
+ "Executor instances. ex) 1, 4")
+ .add("spark.executor.cores", getSystemDefault(
+ null, "spark.executor.cores", "1"),
+ "Num cores per executor. ex) 1, 4")
+ .add("spark.executor.memory",
+ getSystemDefault(
+ null, "spark.executor.memory", "512m"),
+ "Executor memory per worker instance. ex) 512m, 32g")
+ .add("spark.dynamicAllocation.enabled",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.enabled", "false"),
+ "Use dynamic resource allocation")
+ .add(
+ "spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ "120s"), "Remove an executor which has cached data blocks")
+ .add("spark.dynamicAllocation.minExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.minExecutors", "0"),
+ "Lower bound for the number of executors if dynamic allocation is enabled. ")
+ .add("spark.dynamicAllocation.initialExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.initialExecutors", "1"),
+ "Initial number of executors to run if dynamic allocation is enabled. ")
+ .add("spark.dynamicAllocation.maxExecutors",
+ getSystemDefault(
+ null, "spark.dynamicAllocation.maxExecutors", "10"),
+ "Upper bound for the number of executors if dynamic allocation is enabled. ")
+ .build());
+ }
+
+ public static String getSystemDefault(
+ String envName,
+ String propertyName,
+ String defaultValue) {
+
+ if (envName != null && !envName.isEmpty()) {
+ String envValue = System.getenv().get(envName);
+ if (envValue != null) {
+ return envValue;
+ }
+ }
+
+ if (propertyName != null && !propertyName.isEmpty()) {
+ String propValue = System.getProperty(propertyName);
+ if (propValue != null) {
+ return propValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ if (session != null) {
+ SessionFactory.deleteSession(session);
+ }
+ }
+
+ private boolean checkLivyServer() {
+ try {
+ Response r = get(host);
+ if (r.hasResponseStatus()) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ logger.info("Interpreter exception", e);
+ return false;
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+
+// Check if livy server is running in that host
+ if (!checkLivyServer()) {
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+
+
+
+ if (session != null) {
+ try {
+ session = SessionFactory.getSession(session);
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+ }
+ if (session == null) {
+ try {
+ session = SessionFactory.createSession(host, "sparkr", getProperty());
+
+ if (session == null) {
+ return new InterpreterResult(Code.ERROR, "Can not create a session, please try again.");
+ }
+
+ if (session.state.equals("error")) {
+ SessionFactory.deleteSession(session);
+
+ return new InterpreterResult(Code.ERROR,
+ "Resources aren't enough or error happened while creating session,"
+ + " please try again.");
+ }
+
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR,
+ "you need to have the livy server running in the master node of the cluster " +
+ "and set the property: livy.server.host to :8998");
+ }
+ }
+ Statement statement = new Statement();
+ try {
+ statement = session.createStatement(st);
+ } catch (IOException e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, "Can not create a statement, please try again.");
+
+ }
+ if (statement.state.equals("available") && statement.output.status.equals("ok")) {
+ return new InterpreterResult(Code.SUCCESS, statement.output.data.get("text/plain"));
+ }
+
+ return new InterpreterResult(Code.ERROR, statement.output.evalue);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/Session.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/Session.java
new file mode 100644
index 00000000000..74b70729303
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/Session.java
@@ -0,0 +1,66 @@
+package org.apache.zeppelin.livy;
+
+import static org.apache.zeppelin.livy.Http.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.gson.Gson;
+import com.ning.http.client.Response;
+/**
+ *
+ *
+ */
+public class Session {
+
+ public int id;
+ public String name;
+ public String state;
+ public String kind;
+ public String[] log;
+ public String url;
+ public String driverMemory;
+ public String driverCores;
+ public String executorMemory;
+ public String executorCores;
+ public String numExecutors;
+
+ Gson gson = new Gson();
+
+ public Session() {
+ }
+
+ public Statement createStatement(String st) throws IOException {
+ HashMap command = new HashMap();
+
+ command.put("code", st);
+ String data = gson.toJson(command);
+ Response r = post(this.url + "/statements", data);
+ String json = r.getResponseBody();
+ Statement statement = gson.fromJson(json, Statement.class);
+ Callable callableTask = new StatementCallable(this, statement);
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ Future future = executor.submit(callableTask);
+ try {
+ statement = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+
+ executor.shutdown();
+ return statement;
+ }
+
+ public Statement getStatement(Statement statement) throws IOException {
+ Response r = get(this.url + "/statements/" + statement.id);
+ String json = r.getResponseBody();
+ statement = gson.fromJson(json, Statement.class);
+ return statement;
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/SessionCallable.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/SessionCallable.java
new file mode 100644
index 00000000000..0ef7eab7927
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/SessionCallable.java
@@ -0,0 +1,28 @@
+package org.apache.zeppelin.livy;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+/**
+ *
+ *
+ */
+public class SessionCallable implements Callable {
+ int count = 0;
+ private Session session;
+
+ public SessionCallable(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public Session call() throws Exception {
+ while (!this.session.state.equals("idle")) {
+ if (this.session.state.equals("error")){
+ return this.session;
+ }
+ this.session = SessionFactory.getSession(this.session);
+ TimeUnit.MILLISECONDS.sleep(2000);
+ }
+ return this.session;
+ }
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/SessionFactory.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/SessionFactory.java
new file mode 100644
index 00000000000..317a6632e60
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/SessionFactory.java
@@ -0,0 +1,121 @@
+package org.apache.zeppelin.livy;
+
+import static org.apache.zeppelin.livy.Http.delete;
+import static org.apache.zeppelin.livy.Http.get;
+import static org.apache.zeppelin.livy.Http.post;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.gson.Gson;
+import com.ning.http.client.Response;
+
+/**
+ *
+ *
+ */
+public class SessionFactory {
+
+ public SessionFactory() {
+ }
+
+ public static Session createSession(String host, String kind,
+ Properties property) throws IOException {
+ HashMap conf = new HashMap();
+ Gson gson = new Gson();
+
+ String appName = property.getProperty("spark.app.name");
+
+ conf.put("spark.driver.cores", property.getProperty("spark.driver.cores"));
+ conf.put("spark.executor.cores", property.getProperty("spark.executor.cores"));
+ conf.put("spark.driver.memory", property.getProperty("spark.driver.memory"));
+ conf.put("spark.executor.memory", property.getProperty("spark.executor.memory"));
+
+ if (!property.getProperty("spark.dynamicAllocation.enabled").equals("true")) {
+ conf.put("spark.executor.instances", property.getProperty("spark.executor.instances"));
+ }
+
+ if (property.getProperty("spark.dynamicAllocation.enabled").equals("true")) {
+ conf.put("spark.dynamicAllocation.enabled",
+ property.getProperty("spark.dynamicAllocation.enabled"));
+ conf.put("spark.shuffle.service.enabled", "true");
+ conf.put("spark.dynamicAllocation.cachedExecutorIdleTimeout",
+ property.getProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout"));
+ conf.put("spark.dynamicAllocation.minExecutors",
+ property.getProperty("spark.dynamicAllocation.minExecutors"));
+ conf.put("spark.dynamicAllocation.initialExecutors",
+ property.getProperty("spark.dynamicAllocation.initialExecutors"));
+ conf.put("spark.dynamicAllocation.maxExecutors",
+ property.getProperty("spark.dynamicAllocation.maxExecutors"));
+ }
+
+ String confData = gson.toJson(conf);
+ String data = "{\"kind\": \"" + kind + "\", \"name\": \"" +
+ appName + "\", \"conf\": " + confData
+ + "}";
+
+ Response r = post(host + "/sessions", data);
+ String json = r.getResponseBody();
+ Session session = gson.fromJson(json, Session.class);
+ session.url = host + "/sessions/" + session.id;
+
+ Callable callableTask = new SessionCallable(session);
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ Future future = executor.submit(callableTask);
+
+ int maxWaitTime = 180000;
+ int curentWaitTime = 0;
+ // Waiting 3 minutes for session creation otherwise kill
+ while (!future.isDone()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(2000);
+ curentWaitTime += 2000;
+ if (curentWaitTime == maxWaitTime) {
+ future.cancel(true);
+ executor.shutdown();
+ SessionFactory.deleteSession(session);
+ return null;
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+ try {
+ session = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+
+ executor.shutdown();
+
+ return session;
+ }
+
+ public static Session getSession(Session session) throws IOException {
+ String url = session.url;
+ Response r = get(url);
+ if (r.getStatusCode() >= 300) {
+ return null;
+ }
+ String json = r.getResponseBody();
+ Gson gson = new Gson();
+ session = gson.fromJson(json, Session.class);
+ session.url = url;
+
+ return session;
+ }
+
+ public static void deleteSession(Session session) {
+ delete(session.url);
+
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/Statement.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/Statement.java
new file mode 100644
index 00000000000..88cfc5b96a7
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/Statement.java
@@ -0,0 +1,16 @@
+package org.apache.zeppelin.livy;
+
+/**
+ *
+ *
+ */
+public class Statement {
+
+ public int id;
+ public String state;
+ public StatementOutput output;
+
+ public Statement() {
+ }
+
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/StatementCallable.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/StatementCallable.java
new file mode 100644
index 00000000000..859d0e49e14
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/StatementCallable.java
@@ -0,0 +1,27 @@
+package org.apache.zeppelin.livy;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+/**
+ *
+ *
+ */
+public class StatementCallable implements Callable {
+ int count = 0;
+ private Session session;
+ private Statement statement;
+
+ public StatementCallable(Session session, Statement statement) {
+ this.session = session;
+ this.statement = statement;
+ }
+
+ @Override
+ public Statement call() throws Exception {
+ while (this.statement.state.equals("running")) {
+ this.statement = this.session.getStatement(this.statement);
+ TimeUnit.MILLISECONDS.sleep(2000);
+ }
+ return this.statement;
+ }
+}
diff --git a/livy-spark/src/main/java/org/apache/zeppelin/livy/StatementOutput.java b/livy-spark/src/main/java/org/apache/zeppelin/livy/StatementOutput.java
new file mode 100644
index 00000000000..5803641e2cc
--- /dev/null
+++ b/livy-spark/src/main/java/org/apache/zeppelin/livy/StatementOutput.java
@@ -0,0 +1,19 @@
+package org.apache.zeppelin.livy;
+
+import java.util.HashMap;
+/**
+ *
+ *
+ */
+public class StatementOutput {
+
+ public String status;
+ public int executionCount;
+ public String ename;
+ public String evalue;
+ public HashMap data;
+
+ public StatementOutput() {
+ }
+
+}
diff --git a/livy-spark/src/test/java/org/apache/zeppelin/livy/RestSparkInterpreterTest.java b/livy-spark/src/test/java/org/apache/zeppelin/livy/RestSparkInterpreterTest.java
new file mode 100644
index 00000000000..2b7d6583ab2
--- /dev/null
+++ b/livy-spark/src/test/java/org/apache/zeppelin/livy/RestSparkInterpreterTest.java
@@ -0,0 +1,82 @@
+package org.apache.zeppelin.livy;
+
+import static org.junit.Assert.*;
+
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.livy.RestSparkInterpreter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RestSparkInterpreterTest {
+
+ private static RestSparkInterpreter yspark;
+ private static InterpreterContext context;
+
+ @BeforeClass
+ public static void setUp() {
+ Properties p = new Properties();
+ p.setProperty("livy.server.host", "locahost:8998");
+ yspark = new RestSparkInterpreter(p);
+ yspark.open();
+ context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null,
+ null);
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ yspark.close();
+ }
+
+ @Test
+ public void testServerShutdown() {
+ Properties p = new Properties();
+ RestSparkInterpreter yspark1 = new RestSparkInterpreter(p);
+ yspark1.open();
+ InterpreterResult result = yspark1.interpret("val a=1", context);
+ assertEquals("you need to have the livy server running in the master node of the cluster and set the property: livy.server.host to :8998",
+ result.message());
+ }
+
+ @Test
+ public void testSyntaxError() {
+ InterpreterResult result = yspark.interpret("sc.paralize(1 to 10)", context);
+ assertEquals(Code.ERROR, result.code());
+ }
+
+ @Test
+ public void testNormalCommand() {
+ InterpreterResult result = yspark.interpret("print(\"1\")", context);
+ assertEquals("1", result.message());
+ }
+
+ @Test
+ public void testWithNumberExecutorCores() {
+ yspark.interpret("sc.parallelize(1 to 1000000)", context);
+ InterpreterResult result = yspark.interpret("sc.parallelize(1 to 1000000).partitions.size",
+ context);
+ boolean message = result.message().startsWith("Int = 3", 6);
+ assertTrue(message);
+ }
+
+ @Test
+ public void testOverResources() {
+ Properties p = new Properties();
+ p.setProperty("livy.server.host", "localhost:8998");
+ p.setProperty("spark.executor.memory", "20G");
+ RestSparkInterpreter yspark1 = new RestSparkInterpreter(p);
+ yspark1.open();
+ InterpreterResult result = yspark1.interpret("sc.parallelize(1 to 1000000).partitions.size",
+ context);
+ yspark1.close();
+ assertEquals(Code.ERROR, result.code());
+ assertEquals("Resources aren't enough or error happened while creating session,"
+ + " please try again.", result.message());
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index dfb5eb9c35c..d223b38d346 100755
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
zeppelin-display
spark-dependencies
spark
+ livy-spark
markdown
angular
shell
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index a9c300b1d8a..e3b871cad02 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -459,7 +459,10 @@ public static enum ConfVars {
+ "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
+ "org.apache.zeppelin.jdbc.JDBCInterpreter,"
- + "org.apache.zeppelin.hbase.HbaseInterpreter"),
+ + "org.apache.zeppelin.hbase.HbaseInterpreter,"
+ + "org.apache.zeppelin.livy.RestSparkInterpreter,"
+ + "org.apache.zeppelin.livy.RestSparkRInterpreter,"
+ + "org.apache.zeppelin.livy.RestPySparkInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),