Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,56 @@ Yarn
# ./conf/zeppelin-env.sh
export SPARK_HOME=/path/to/spark_dir

Yarn Cluster

- Install Livy server in the cluster master node from : https://github.com/cloudera/hue/tree/master/apps/spark/java
- Start Livy server in **yarn mode**, refer to Livy documentation.
- You need to white list spark configuration in Livy in order to be able to send custom spark configuration. Copy the file `spark-user-configurable-options.template` in the conf dir of Livy to `spark-user-configurable-options.conf`
- Zeppelin note conf:

```
livy.server.host to cluster-master-node-ip:8998
spark.driver.cores
spark.driver.memory
spark.executor.instances
spark.executor.cores
spark.dynamicAllocation.enabled
spark.dynamicAllocation.cachedExecutorIdleTimeout
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.maxExecutors
```

- examples
- spark

```
%livy.spark
object HelloWorld {
def main(args: Array[String]) {
println("Hello, world!")
}
}
HelloWorld.main(null)
```

- pyspark

```
%livy.pyspark
def welcome(name):
print 'Hello', name
welcome('Livy')
```

- sparkR

```
%livy.sparkr
msg <- "Hello sparkR"
print(msg)
```

### Run
./bin/zeppelin-daemon.sh start

Expand Down
2 changes: 1 addition & 1 deletion conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@

<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.RestSparkInterpreter,org.apache.zeppelin.livy.RestSparkRInterpreter,org.apache.zeppelin.livy.RestPySparkInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

Expand Down
50 changes: 50 additions & 0 deletions livy-spark/src/main/java/org/apache/zeppelin/livy/Http.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.apache.zeppelin.livy;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Response;

/**
*
* */
public class Http {

static AsyncHttpClient asyncHttpClient = new AsyncHttpClient();

private static Response getFromFuture(Future<Response> f) {
try {
Response r = f.get();
return r;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}


public static Response get(String url) {
Future<Response> f = asyncHttpClient.prepareGet(url).execute();
return getFromFuture(f);
}


public static Response post(String url, String jsonString) {
final AsyncHttpClient.BoundRequestBuilder builder = asyncHttpClient.preparePost(url);

builder.addHeader("Content-Type", "application/json");
builder.setBody(jsonString);
Future<Response> f = builder.execute();
return getFromFuture(f);

}

public static Response delete(String url) {
Future<Response> f = asyncHttpClient.prepareDelete(url).execute();
return getFromFuture(f);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.livy;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import static org.apache.zeppelin.livy.Http.*;

import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ning.http.client.Response;

/**
*
*
*/
public class RestPySparkInterpreter extends Interpreter {

private Session session = null;
private final String host;

public RestPySparkInterpreter(Properties property) {
super(property);
host = getProperty("livy.server.host");
}

public static Logger logger = LoggerFactory.getLogger(RestPySparkInterpreter.class);

static {

Interpreter
.register(
"pyspark",
"livy",
RestPySparkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.app.name",
getSystemDefault(null, "spark.app.name",
"Zeppelin_yarn_cluster"), "The name of spark application.")
.add("livy.server.host",
getSystemDefault(
null, "livy.server.host", "localhost:8998"),
"The host of livy server.")
.add("spark.driver.cores", getSystemDefault(
null, "spark.driver.cores", "1"),
"Driver cores. ex) 1, 2")
.add("spark.driver.memory", getSystemDefault(
null, "spark.driver.memory", "512m"),
"Driver memory. ex) 512m, 32g")
.add("spark.executor.instances",
getSystemDefault(null, "spark.executor.instances", "3"),
"Executor instances. ex) 1, 4")
.add("spark.executor.cores", getSystemDefault(
null, "spark.executor.cores", "1"),
"Num cores per executor. ex) 1, 4")
.add("spark.executor.memory",
getSystemDefault(
null, "spark.executor.memory", "512m"),
"Executor memory per worker instance. ex) 512m, 32g")
.add("spark.dynamicAllocation.enabled",
getSystemDefault(
null, "spark.dynamicAllocation.enabled", "false"),
"Use dynamic resource allocation")
.add(
"spark.dynamicAllocation.cachedExecutorIdleTimeout",
getSystemDefault(
null, "spark.dynamicAllocation.cachedExecutorIdleTimeout",
"120s"), "Remove an executor which has cached data blocks")
.add("spark.dynamicAllocation.minExecutors",
getSystemDefault(
null, "spark.dynamicAllocation.minExecutors", "0"),
"Lower bound for the number of executors if dynamic allocation is enabled. ")
.add("spark.dynamicAllocation.initialExecutors",
getSystemDefault(
null, "spark.dynamicAllocation.initialExecutors", "1"),
"Initial number of executors to run if dynamic allocation is enabled. ")
.add("spark.dynamicAllocation.maxExecutors",
getSystemDefault(
null, "spark.dynamicAllocation.maxExecutors", "10"),
"Upper bound for the number of executors if dynamic allocation is enabled. ")
.build());
}

public static String getSystemDefault(
String envName,
String propertyName,
String defaultValue) {

if (envName != null && !envName.isEmpty()) {
String envValue = System.getenv().get(envName);
if (envValue != null) {
return envValue;
}
}

if (propertyName != null && !propertyName.isEmpty()) {
String propValue = System.getProperty(propertyName);
if (propValue != null) {
return propValue;
}
}
return defaultValue;
}

@Override
public void open() {
}

@Override
public void close() {
if (session != null) {
SessionFactory.deleteSession(session);
}
}

private boolean checkLivyServer() {
try {
Response r = get(host);
if (r.hasResponseStatus()) {
return true;
}
return false;
} catch (Exception e) {
logger.info("Interpreter exception", e);
return false;
}
}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {

// Check if livy server is running in that host
if (!checkLivyServer()) {
return new InterpreterResult(Code.ERROR,
"you need to have the livy server running in the master node of the cluster " +
"and set the property: livy.server.host to <master-node-hostname-or-ip>:8998");
}



if (session != null) {
try {
session = SessionFactory.getSession(session);
} catch (IOException e) {
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR,
"you need to have the livy server running in the master node of the cluster " +
"and set the property: livy.server.host to <master-node-hostname-or-ip>:8998");
}
}
if (session == null) {
try {
session = SessionFactory.createSession(host, "pyspark", getProperty());

if (session == null) {
return new InterpreterResult(Code.ERROR, "Can not create a session, please try again.");
}

if (session.state.equals("error")) {
SessionFactory.deleteSession(session);

return new InterpreterResult(Code.ERROR,
"Resources aren't enough or error happened while creating session,"
+ " please try again.");
}

} catch (IOException e) {
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR,
"you need to have the livy server running in the master node of the cluster " +
"and set the property: livy.server.host to <master-node-hostname-or-ip>:8998");
}
}
Statement statement = new Statement();
try {
statement = session.createStatement(st);
} catch (IOException e) {
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, "Can not create a statement, please try again.");

}
if (statement.state.equals("available") && statement.output.status.equals("ok")) {
return new InterpreterResult(Code.SUCCESS, statement.output.data.get("text/plain"));
}

return new InterpreterResult(Code.ERROR, statement.output.evalue);
}

@Override
public void cancel(InterpreterContext context) {
}

@Override
public FormType getFormType() {
return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext context) {
return 0;
}

@Override
public List<String> completion(String buf, int cursor) {
return null;
}

}
Loading