Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
"pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
String jobGroup = sparkInterpreter.getJobGroup(context);
String jobGroup = Utils.buildJobGroupId(context);
ZeppelinContext z = sparkInterpreter.getZeppelinContext();
z.setInterpreterContext(context);
z.setGui(context.getGui());
Expand Down
54 changes: 43 additions & 11 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.ui.jobs.JobProgressListener;
Expand All @@ -57,6 +59,7 @@
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
Expand Down Expand Up @@ -112,7 +115,7 @@ public class SparkInterpreter extends Interpreter {

private InterpreterOutputStream out;
private SparkDependencyResolver dep;
private String sparkUrl;
private static String sparkUrl;

/**
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
Expand Down Expand Up @@ -156,7 +159,36 @@ public boolean isSparkContextInitialized() {
}

static JobProgressListener setupListeners(SparkContext context) {
JobProgressListener pl = new JobProgressListener(context.getConf());
JobProgressListener pl = new JobProgressListener(context.getConf()) {
@Override
public synchronized void onJobStart(SparkListenerJobStart jobStart) {
super.onJobStart(jobStart);
int jobId = jobStart.jobId();
String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id");
String jobUrl = getJobUrl(jobId);
String noteId = Utils.getNoteId(jobGroupId);
String paragraphId = Utils.getParagraphId(jobGroupId);
if (jobUrl != null && noteId != null && paragraphId != null) {
RemoteEventClientWrapper eventClient = ZeppelinContext.getEventClient();
Map<String, String> infos = new java.util.HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "SPARK JOB");
infos.put("tooltip", "View in Spark web UI");
if (eventClient != null) {
eventClient.onParaInfosReceived(noteId, paragraphId, infos);
}
}
}

private String getJobUrl(int jobId) {
String jobUrl = null;
if (sparkUrl != null) {
jobUrl = sparkUrl + "/jobs/job?id=" + jobId;
}
return jobUrl;
}

};
try {
Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context);

Expand Down Expand Up @@ -950,7 +982,10 @@ public void open() {
numReferenceOfSparkContext.incrementAndGet();
}

private String getSparkUIUrl() {
public String getSparkUIUrl() {
if (sparkUrl != null) {
return sparkUrl;
}
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
SparkUI sparkUi = sparkUiOption.get();
String sparkWebUrl = sparkUi.appUIAddress();
Expand All @@ -971,8 +1006,9 @@ public void populateSparkWebUrl(InterpreterContext ctx) {
Map<String, String> infos = new java.util.HashMap<>();
if (sparkUrl != null) {
infos.put("url", sparkUrl);
logger.info("Sending metainfos to Zeppelin server: {}", infos.toString());
if (ctx != null && ctx.getClient() != null) {
logger.info("Sending metainfos to Zeppelin server: {}", infos.toString());
getZeppelinContext().setEventClient(ctx.getClient());
ctx.getClient().onMetaInfosReceived(infos);
}
}
Expand Down Expand Up @@ -1105,10 +1141,6 @@ public Object getLastObject() {
return obj;
}

String getJobGroup(InterpreterContext context){
return "zeppelin-" + context.getParagraphId();
}

/**
* Interpret a single line.
*/
Expand All @@ -1129,7 +1161,7 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
synchronized (this) {
z.setGui(context.getGui());
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false);
InterpreterResult r = interpretInput(lines, context);
sc.clearJobGroup();
return r;
Expand Down Expand Up @@ -1252,12 +1284,12 @@ private void putLatestVarInResourcePool(InterpreterContext context) {

@Override
public void cancel(InterpreterContext context) {
sc.cancelJobGroup(getJobGroup(context));
sc.cancelJobGroup(Utils.buildJobGroupId(context));
}

@Override
public int getProgress(InterpreterContext context) {
String jobGroup = getJobGroup(context);
String jobGroup = Utils.buildJobGroupId(context);
int completedTasks = 0;
int totalTasks = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ String getJobGroup(InterpreterContext context){
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {

getSparkInterpreter().populateSparkWebUrl(interpreterContext);
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(interpreterContext);

String jobGroup = Utils.buildJobGroupId(interpreterContext);
sparkInterpreter.getSparkContext().setJobGroup(jobGroup, "Zeppelin", false);

String imageWidth = getProperty("zeppelin.R.image.width");

String[] sl = lines.split("\n");
Expand All @@ -122,7 +127,6 @@ public InterpreterResult interpret(String lines, InterpreterContext interpreterC
}
}

String jobGroup = getJobGroup(interpreterContext);
String setJobGroup = "";
// assign setJobGroup to dummy__, otherwise it would print NULL for this statement
if (Utils.isSpark2()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public class SparkSqlInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
AtomicInteger num = new AtomicInteger(0);

private String getJobGroup(InterpreterContext context){
return "zeppelin-" + context.getParagraphId();
}

private int maxResult;

public SparkSqlInterpreter(Properties property) {
Expand Down Expand Up @@ -105,7 +101,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
sc.setLocalProperty("spark.scheduler.pool", null);
}

sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false);
Object rdd = null;
try {
// method signature of sqlc.sql() is changed
Expand Down Expand Up @@ -134,10 +130,11 @@ public InterpreterResult interpret(String st, InterpreterContext context) {

@Override
public void cancel(InterpreterContext context) {
SQLContext sqlc = getSparkInterpreter().getSQLContext();
SparkInterpreter sparkInterpreter = getSparkInterpreter();
SQLContext sqlc = sparkInterpreter.getSQLContext();
SparkContext sc = sqlc.sparkContext();

sc.cancelJobGroup(getJobGroup(context));
sc.cancelJobGroup(Utils.buildJobGroupId(context));
}

@Override
Expand Down
17 changes: 17 additions & 0 deletions spark/src/main/java/org/apache/zeppelin/spark/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.zeppelin.spark;

import org.apache.zeppelin.interpreter.InterpreterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,4 +107,20 @@ static boolean isSpark2() {
return false;
}
}

public static String buildJobGroupId(InterpreterContext context) {
return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId();
}

public static String getNoteId(String jobgroupId) {
int indexOf = jobgroupId.indexOf("-");
int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
return jobgroupId.substring(indexOf + 1, secondIndex);
}

public static String getParagraphId(String jobgroupId) {
int indexOf = jobgroupId.indexOf("-");
int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
Expand All @@ -61,6 +62,7 @@ public class ZeppelinContext {
// Map interpreter class name (to be used by hook registry) from
// given replName in parapgraph
private static final Map<String, String> interpreterClassMap;
private static RemoteEventClientWrapper eventClient;
static {
interpreterClassMap = new HashMap<>();
interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
Expand Down Expand Up @@ -221,7 +223,7 @@ public static String showDF(SparkContext sc,
Object df, int maxResult) {
Object[] rows = null;
Method take;
String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
String jobGroup = Utils.buildJobGroupId(interpreterContext);
sc.setJobGroup(jobGroup, "Zeppelin", false);

try {
Expand Down Expand Up @@ -930,4 +932,21 @@ public ResourceSet getAll() {
return resourcePool.getAll();
}

/**
* Get the event client
*/
@ZeppelinApi
public static RemoteEventClientWrapper getEventClient() {
return eventClient;
}

/**
* Set event client
*/
@ZeppelinApi
public void setEventClient(RemoteEventClientWrapper eventClient) {
if (ZeppelinContext.eventClient == null) {
ZeppelinContext.eventClient = eventClient;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
Expand All @@ -54,6 +56,8 @@ public class SparkInterpreterTest {
public static InterpreterGroup intpGroup;
private InterpreterContext context;
public static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class);
private static Map<String, Map<String, String>> paraIdToInfosMap =
new HashMap<>();

/**
* Get spark version number as a numerical value.
Expand Down Expand Up @@ -92,14 +96,40 @@ public void setUp() throws Exception {
repl.open();
}

final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() {

@Override
public void onParaInfosReceived(String noteId, String paragraphId,
Map<String, String> infos) {
if (infos != null) {
paraIdToInfosMap.put(paragraphId, infos);
}
}

@Override
public void onMetaInfosReceived(Map<String, String> infos) {
}
};
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
new InterpreterOutput(null)) {

@Override
public RemoteEventClientWrapper getClient() {
Copy link
Member

@Leemoonsoo Leemoonsoo Dec 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1637 also exposed one of method of RemoteEventClient to interpreter. But little different way.
#1637 created new class RemoteWorksController to wrap RemoveEventClient.
https://github.com/apache/zeppelin/pull/1637/files#diff-5f11952b2fbf4e41e273df20c409ec4b

This PR's approach and #1637 's approach is basically the same, but in the future refactoring, i think we need to decide either a) create new class for every feature exported from RemoveEventClient (like RemoteWorksController), or b) create a common wrapper class RemoteEventClientWrapper and export all necessary methods from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Leemoonsoo Regarding the refactoring, I not only mean RemoveEventClient, but also other interpreter related components. Like InterpreterFactory, InterpreterSetting, InterpreterGroup, RemoteInterpreter and etc. I just feel the boundary and responsibility of these classes/components is not clear for now. InterpreterFactory is too complicated and should delegate many work to InterpreterSetting. And currently there's no single component to manage the lifecycle of interpreters. I know refactoring is a little risky and may affect functions. So I think maybe we put this as part of 0.8. If you think this is feasible and necessary. I can write a more detailed document and gather more feedback from communities. What do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason i mentioned especially RemoveEventClient is because its part of Interpreter development API and we want to minimize API change on every release. (doesn't mean this PR has to take care)

Agree, we can reactor overall interpreter layer once we freeze 0.7.

return remoteEventClientWrapper;
}
};
// The first para interpretdr will set the Eventclient wrapper
//SparkInterpreter.interpret(String, InterpreterContext) ->
//SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
//ZeppelinContext.setEventClient(RemoteEventClientWrapper)
//running a dummy to ensure that we dont have any race conditions among tests
repl.interpret("sc", context);
}

@Test
Expand Down Expand Up @@ -273,4 +303,27 @@ public void testCompletion() {
List<InterpreterCompletion> completions = repl.completion("sc.", "sc.".length());
assertTrue(completions.size() > 0);
}

@Test
public void testParagraphUrls() {
String paraId = "test_para_job_url";
InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx);
Map<String, String> paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId());
String jobUrl = null;
if (paraInfos != null) {
jobUrl = paraInfos.get("jobUrl");
}
String sparkUIUrl = repl.getSparkUIUrl();
assertNotNull(jobUrl);
assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id="));

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.zeppelin.interpreter.remote;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -21,4 +22,13 @@ public void onMetaInfosReceived(Map<String, String> infos) {
client.onMetaInfosReceived(infos);
}

@Override
public void onParaInfosReceived(String noteId, String paragraphId, Map<String, String> infos) {
Map<String, String> paraInfos = new HashMap<String, String>(infos);
paraInfos.put("noteId", noteId);
paraInfos.put("paraId", paragraphId);
client.onParaInfosReceived(paraInfos);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ public interface RemoteEventClientWrapper {

public void onMetaInfosReceived(Map<String, String> infos);

public void onParaInfosReceived(String noteId, String paragraphId,
Map<String, String> infos);

}
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,10 @@ public void onMetaInfosReceived(Map<String, String> infos) {
gson.toJson(infos)));
}

public void onParaInfosReceived(Map<String, String> infos) {
sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.PARA_INFOS,
gson.toJson(infos)));
}
/**
* Wait for eventQueue becomes empty
*/
Expand Down
Loading