diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 5a8e0407158..0679fcc7b12 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -377,7 +377,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported")); return new InterpreterResult(Code.ERROR, errorMessage); } - String jobGroup = sparkInterpreter.getJobGroup(context); + String jobGroup = Utils.buildJobGroupId(context); ZeppelinContext z = sparkInterpreter.getZeppelinContext(); z.setInterpreterContext(context); z.setGui(context.getGui()); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 0584a302c55..3c1288e1afc 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -42,6 +42,8 @@ import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.SparkUI; import org.apache.spark.ui.jobs.JobProgressListener; @@ -57,6 +59,7 @@ import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.WellKnownResourceName; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -112,7 +115,7 @@ public class SparkInterpreter extends Interpreter { private InterpreterOutputStream out; private SparkDependencyResolver dep; - private String sparkUrl; + private static String sparkUrl; /** * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) @@ -156,7 +159,36 @@ public boolean isSparkContextInitialized() { } static JobProgressListener setupListeners(SparkContext context) { - JobProgressListener pl = new JobProgressListener(context.getConf()); + JobProgressListener pl = new JobProgressListener(context.getConf()) { + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + super.onJobStart(jobStart); + int jobId = jobStart.jobId(); + String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); + String jobUrl = getJobUrl(jobId); + String noteId = Utils.getNoteId(jobGroupId); + String paragraphId = Utils.getParagraphId(jobGroupId); + if (jobUrl != null && noteId != null && paragraphId != null) { + RemoteEventClientWrapper eventClient = ZeppelinContext.getEventClient(); + Map infos = new java.util.HashMap<>(); + infos.put("jobUrl", jobUrl); + infos.put("label", "SPARK JOB"); + infos.put("tooltip", "View in Spark web UI"); + if (eventClient != null) { + eventClient.onParaInfosReceived(noteId, paragraphId, infos); + } + } + } + + private String getJobUrl(int jobId) { + String jobUrl = null; + if (sparkUrl != null) { + jobUrl = sparkUrl + "/jobs/job?id=" + jobId; + } + return jobUrl; + } + + }; try { Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); @@ -950,7 +982,10 @@ public void open() { numReferenceOfSparkContext.incrementAndGet(); } - private String getSparkUIUrl() { + public String getSparkUIUrl() { + if (sparkUrl != null) { + return sparkUrl; + } Option sparkUiOption = (Option) Utils.invokeMethod(sc, "ui"); SparkUI sparkUi = sparkUiOption.get(); String sparkWebUrl = sparkUi.appUIAddress(); @@ -971,8 +1006,9 @@ public void populateSparkWebUrl(InterpreterContext ctx) { Map infos = new java.util.HashMap<>(); if (sparkUrl != null) { infos.put("url", sparkUrl); - logger.info("Sending metainfos to Zeppelin server: {}", infos.toString()); if (ctx != null && ctx.getClient() != null) { + logger.info("Sending metainfos to Zeppelin server: {}", infos.toString()); + getZeppelinContext().setEventClient(ctx.getClient()); ctx.getClient().onMetaInfosReceived(infos); } } @@ -1105,10 +1141,6 @@ public Object getLastObject() { return obj; } - String getJobGroup(InterpreterContext context){ - return "zeppelin-" + context.getParagraphId(); - } - /** * Interpret a single line. */ @@ -1129,7 +1161,7 @@ public InterpreterResult interpret(String line, InterpreterContext context) { public InterpreterResult interpret(String[] lines, InterpreterContext context) { synchronized (this) { z.setGui(context.getGui()); - sc.setJobGroup(getJobGroup(context), "Zeppelin", false); + sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); InterpreterResult r = interpretInput(lines, context); sc.clearJobGroup(); return r; @@ -1252,12 +1284,12 @@ private void putLatestVarInResourcePool(InterpreterContext context) { @Override public void cancel(InterpreterContext context) { - sc.cancelJobGroup(getJobGroup(context)); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); } @Override public int getProgress(InterpreterContext context) { - String jobGroup = getJobGroup(context); + String jobGroup = Utils.buildJobGroupId(context); int completedTasks = 0; int totalTasks = 0; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 8f3e93c024c..16b1a21144f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -102,7 +102,12 @@ String getJobGroup(InterpreterContext context){ @Override public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) { - getSparkInterpreter().populateSparkWebUrl(interpreterContext); + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + sparkInterpreter.populateSparkWebUrl(interpreterContext); + + String jobGroup = Utils.buildJobGroupId(interpreterContext); + sparkInterpreter.getSparkContext().setJobGroup(jobGroup, "Zeppelin", false); + String imageWidth = getProperty("zeppelin.R.image.width"); String[] sl = lines.split("\n"); @@ -122,7 +127,6 @@ public InterpreterResult interpret(String lines, InterpreterContext interpreterC } } - String jobGroup = getJobGroup(interpreterContext); String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement if (Utils.isSpark2()) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index e6fe137273a..1d5282f132e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -45,10 +45,6 @@ public class SparkSqlInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); AtomicInteger num = new AtomicInteger(0); - private String getJobGroup(InterpreterContext context){ - return "zeppelin-" + context.getParagraphId(); - } - private int maxResult; public SparkSqlInterpreter(Properties property) { @@ -105,7 +101,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { sc.setLocalProperty("spark.scheduler.pool", null); } - sc.setJobGroup(getJobGroup(context), "Zeppelin", false); + sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); Object rdd = null; try { // method signature of sqlc.sql() is changed @@ -134,10 +130,11 @@ public InterpreterResult interpret(String st, InterpreterContext context) { @Override public void cancel(InterpreterContext context) { - SQLContext sqlc = getSparkInterpreter().getSQLContext(); + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + SQLContext sqlc = sparkInterpreter.getSQLContext(); SparkContext sc = sqlc.sparkContext(); - sc.cancelJobGroup(getJobGroup(context)); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); } @Override diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java index 78304fd8713..17edb0d3257 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.spark; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,4 +107,20 @@ static boolean isSpark2() { return false; } } + + public static String buildJobGroupId(InterpreterContext context) { + return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId(); + } + + public static String getNoteId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(indexOf + 1, secondIndex); + } + + public static String getParagraphId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); + } } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index d1234dfd987..d62b68e75ff 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -46,6 +46,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -61,6 +62,7 @@ public class ZeppelinContext { // Map interpreter class name (to be used by hook registry) from // given replName in parapgraph private static final Map interpreterClassMap; + private static RemoteEventClientWrapper eventClient; static { interpreterClassMap = new HashMap<>(); interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter"); @@ -221,7 +223,7 @@ public static String showDF(SparkContext sc, Object df, int maxResult) { Object[] rows = null; Method take; - String jobGroup = "zeppelin-" + interpreterContext.getParagraphId(); + String jobGroup = Utils.buildJobGroupId(interpreterContext); sc.setJobGroup(jobGroup, "Zeppelin", false); try { @@ -930,4 +932,21 @@ public ResourceSet getAll() { return resourcePool.getAll(); } + /** + * Get the event client + */ + @ZeppelinApi + public static RemoteEventClientWrapper getEventClient() { + return eventClient; + } + + /** + * Set event client + */ + @ZeppelinApi + public void setEventClient(RemoteEventClientWrapper eventClient) { + if (ZeppelinContext.eventClient == null) { + ZeppelinContext.eventClient = eventClient; + } + } } diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 14108901ce7..8c78b663b2e 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -23,11 +23,13 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.resource.WellKnownResourceName; @@ -54,6 +56,8 @@ public class SparkInterpreterTest { public static InterpreterGroup intpGroup; private InterpreterContext context; public static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class); + private static Map> paraIdToInfosMap = + new HashMap<>(); /** * Get spark version number as a numerical value. @@ -92,6 +96,20 @@ public void setUp() throws Exception { repl.open(); } + final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() { + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + Map infos) { + if (infos != null) { + paraIdToInfosMap.put(paragraphId, infos); + } + } + + @Override + public void onMetaInfosReceived(Map infos) { + } + }; context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), new HashMap(), @@ -99,7 +117,19 @@ public void setUp() throws Exception { new AngularObjectRegistry(intpGroup.getId(), null), new LocalResourcePool("id"), new LinkedList(), - new InterpreterOutput(null)); + new InterpreterOutput(null)) { + + @Override + public RemoteEventClientWrapper getClient() { + return remoteEventClientWrapper; + } + }; + // The first para interpretdr will set the Eventclient wrapper + //SparkInterpreter.interpret(String, InterpreterContext) -> + //SparkInterpreter.populateSparkWebUrl(InterpreterContext) -> + //ZeppelinContext.setEventClient(RemoteEventClientWrapper) + //running a dummy to ensure that we dont have any race conditions among tests + repl.interpret("sc", context); } @Test @@ -273,4 +303,27 @@ public void testCompletion() { List completions = repl.completion("sc.", "sc.".length()); assertTrue(completions.size() > 0); } + + @Test + public void testParagraphUrls() { + String paraId = "test_para_job_url"; + InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text", + new AuthenticationInfo(), + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList(), + new InterpreterOutput(null)); + repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx); + Map paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId()); + String jobUrl = null; + if (paraInfos != null) { + jobUrl = paraInfos.get("jobUrl"); + } + String sparkUIUrl = repl.getSparkUIUrl(); + assertNotNull(jobUrl); + assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id=")); + + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java index 3585a59eae4..5015a3f27df 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java @@ -1,5 +1,6 @@ package org.apache.zeppelin.interpreter.remote; +import java.util.HashMap; import java.util.Map; /** @@ -21,4 +22,13 @@ public void onMetaInfosReceived(Map infos) { client.onMetaInfosReceived(infos); } + @Override + public void onParaInfosReceived(String noteId, String paragraphId, Map infos) { + Map paraInfos = new HashMap(infos); + paraInfos.put("noteId", noteId); + paraInfos.put("paraId", paragraphId); + client.onParaInfosReceived(paraInfos); + } + + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java index 339f7714a21..bf36cd6354f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java @@ -12,4 +12,7 @@ public interface RemoteEventClientWrapper { public void onMetaInfosReceived(Map infos); + public void onParaInfosReceived(String noteId, String paragraphId, + Map infos); + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 606d35f60ee..4b721f5ad60 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -470,6 +470,10 @@ public void onMetaInfosReceived(Map infos) { gson.toJson(infos))); } + public void onParaInfosReceived(Map infos) { + sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.PARA_INFOS, + gson.toJson(infos))); + } /** * Wait for eventQueue becomes empty */ diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index e794140e8bd..f46d31af6c2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -243,10 +243,20 @@ public void run() { Map metaInfos = gson.fromJson(event.getData(), new TypeToken>() { }.getType()); - String id = interpreterGroup.getId(); - int indexOfColon = id.indexOf(":"); - String settingId = id.substring(0, indexOfColon); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); listener.onMetaInfosReceived(settingId, metaInfos); + } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) { + Map paraInfos = gson.fromJson(event.getData(), + new TypeToken>() { + }.getType()); + String noteId = paraInfos.get("noteId"); + String paraId = paraInfos.get("paraId"); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); + if (noteId != null && paraId != null && settingId != null) { + listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos); + } } logger.debug("Event from remote process {}", event.getType()); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index 66b08c95a1d..0e9dc5128dc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -40,4 +40,6 @@ public interface RemoteWorksEventListener { public void onFinished(Object resultObject); public void onError(); } + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map metaInfos); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 2937e2d4c09..8308222d9a8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,4 +64,13 @@ public static boolean checkIfRemoteEndpointAccessible(String host, int port) { return false; } } + + public static String getInterpreterSettingId(String intpGrpId) { + String settingId = null; + if (intpGrpId != null) { + int indexOfColon = intpGrpId.indexOf(":"); + settingId = intpGrpId.substring(0, indexOfColon); + } + return settingId; + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 7ca406c6709..9e5a0b49f07 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -43,7 +43,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { APP_STATUS_UPDATE(12), META_INFOS(13), REMOTE_ZEPPELIN_SERVER_RESOURCE(14), - RESOURCE_INVOKE_METHOD(15); + RESOURCE_INVOKE_METHOD(15), + PARA_INFOS(16); private final int value; @@ -94,6 +95,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return REMOTE_ZEPPELIN_SERVER_RESOURCE; case 15: return RESOURCE_INVOKE_METHOD; + case 16: + return PARA_INFOS; default: return null; } diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 08a15ad9702..fc09adea5d4 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -56,7 +56,8 @@ enum RemoteInterpreterEventType { APP_STATUS_UPDATE = 12, META_INFOS = 13, REMOTE_ZEPPELIN_SERVER_RESOURCE = 14, - RESOURCE_INVOKE_METHOD = 15 + RESOURCE_INVOKE_METHOD = 15, + PARA_INFOS = 16 } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index e3dc6b4c1b3..3f865cb370d 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -182,4 +182,10 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map metaInfos) { + } + } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d7b2007e73c..ebb51004285 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -355,6 +355,10 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks @Override public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { + } + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map metaInfos) { } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java new file mode 100644 index 00000000000..a22c03b1518 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.json; + +import java.io.IOException; + +import org.apache.zeppelin.socket.NotebookServer; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; + +/** + * Custom adapter type factory + * Modify the jsonObject before serailaization/deserialization + * Check sample implementation at {@link NotebookServer} + * @param the type whose json is to be customized for serialization/deserialization + */ +public class NotebookTypeAdapterFactory implements TypeAdapterFactory { + private final Class customizedClass; + + public NotebookTypeAdapterFactory(Class customizedClass) { + this.customizedClass = customizedClass; + } + + @SuppressWarnings("unchecked") + // we use a runtime check to guarantee that 'C' and 'T' are equal + public final TypeAdapter create(Gson gson, TypeToken type) { + return type.getRawType() == customizedClass ? (TypeAdapter) customizeTypeAdapter(gson, + (TypeToken) type) : null; + } + + private TypeAdapter customizeTypeAdapter(Gson gson, TypeToken type) { + final TypeAdapter delegate = gson.getDelegateAdapter(this, type); + final TypeAdapter elementAdapter = gson.getAdapter(JsonElement.class); + return new TypeAdapter() { + @Override + public void write(JsonWriter out, C value) throws IOException { + JsonElement tree = delegate.toJsonTree(value); + beforeWrite(value, tree); + elementAdapter.write(out, tree); + } + + @Override + public C read(JsonReader in) throws IOException { + JsonElement tree = elementAdapter.read(in); + afterRead(tree); + return delegate.fromJsonTree(tree); + } + }; + } + + /** + * Override this to change {@code toSerialize} before it is written to the + * outgoing JSON stream. + */ + protected void beforeWrite(C source, JsonElement toSerialize) { + } + + /** + * Override this to change {@code deserialized} before it parsed into the + * application type. + */ + protected void afterRead(JsonElement deserialized) { + } +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index 09280074375..57eb8514a31 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -51,6 +51,7 @@ import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest; import org.apache.zeppelin.server.JsonResponse; +import org.apache.zeppelin.socket.NotebookServer; /** * Interpreter Rest API @@ -61,14 +62,17 @@ public class InterpreterRestApi { private static final Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class); private InterpreterFactory interpreterFactory; + private NotebookServer notebookServer; Gson gson = new Gson(); public InterpreterRestApi() { } - public InterpreterRestApi(InterpreterFactory interpreterFactory) { + public InterpreterRestApi(InterpreterFactory interpreterFactory, + NotebookServer notebookWsServer) { this.interpreterFactory = interpreterFactory; + this.notebookServer = notebookWsServer; } /** @@ -179,18 +183,20 @@ public Response removeSetting(@PathParam("settingId") String settingId) throws I @ZeppelinApi public Response restartSetting(String message, @PathParam("settingId") String settingId) { logger.info("Restart interpreterSetting {}, msg={}", settingId, message); + + InterpreterSetting setting = interpreterFactory.get(settingId); try { RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class); String noteId = request == null ? null : request.getNoteId(); interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal()); + notebookServer.clearParagraphRuntimeInfo(setting); } catch (InterpreterException e) { logger.error("Exception in InterpreterRestApi while restartSetting ", e); return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e)) .build(); } - InterpreterSetting setting = interpreterFactory.get(settingId); if (setting == null) { return new JsonResponse<>(Status.NOT_FOUND, "", settingId).build(); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 371d0a131d0..045558f45a4 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -374,7 +374,7 @@ public Set getSingletons() { HeliumRestApi heliumApi = new HeliumRestApi(helium, notebook); singletons.add(heliumApi); - InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory); + InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory, notebookWsServer); singletons.add(interpreterApi); CredentialRestApi credentialApi = new CredentialRestApi(credentials); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 68b015d0bac..e692b12fadf 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,6 +20,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -54,6 +55,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.json.NotebookTypeAdapterFactory; import org.apache.zeppelin.notebook.JobListenerFactory; import org.apache.zeppelin.notebook.Folder; import org.apache.zeppelin.notebook.Note; @@ -62,6 +64,7 @@ import org.apache.zeppelin.notebook.NotebookEventListener; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.ParagraphJobListener; +import org.apache.zeppelin.notebook.ParagraphRuntimeInfo; import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; @@ -86,6 +89,10 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Queues; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; /** @@ -113,7 +120,20 @@ String getKey() { private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); + Gson gson = new GsonBuilder() + .registerTypeAdapterFactory(new NotebookTypeAdapterFactory(Paragraph.class) { + @Override + protected void beforeWrite(Paragraph source, JsonElement toSerialize) { + Map runtimeInfos = source.getRuntimeInfos(); + if (runtimeInfos != null) { + JsonElement jsonTree = gson.toJsonTree(runtimeInfos); + if (toSerialize instanceof JsonObject) { + JsonObject jsonObj = (JsonObject) toSerialize; + jsonObj.add("runtimeInfos", jsonTree); + } + } + } + }).setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); final Map> noteSocketMap = new HashMap<>(); final Queue connectedSockets = new ConcurrentLinkedQueue<>(); final Map> userConnectedSockets = new ConcurrentHashMap<>(); @@ -2314,4 +2334,52 @@ private void broadcastToWatchers(String noteId, String subject, Message message) } } } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map metaInfos) { + Note note = notebook().getNote(noteId); + if (note != null) { + Paragraph paragraph = note.getParagraph(paragraphId); + if (paragraph != null) { + InterpreterSetting setting = notebook().getInterpreterFactory() + .get(interpreterSettingId); + setting.addNoteToPara(noteId, paragraphId); + String label = metaInfos.get("label"); + String tooltip = metaInfos.get("tooltip"); + List keysToRemove = Arrays.asList("noteId", "paraId", "label", "tooltip"); + for (String removeKey : keysToRemove) { + metaInfos.remove(removeKey); + } + paragraph + .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); + broadcast( + note.getId(), + new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", + paragraph.getRuntimeInfos())); + } + } + } + + public void clearParagraphRuntimeInfo(InterpreterSetting setting) { + Map> noteIdAndParaMap = setting.getNoteIdAndParaMap(); + if (noteIdAndParaMap != null && !noteIdAndParaMap.isEmpty()) { + for (String noteId : noteIdAndParaMap.keySet()) { + Set paraIdSet = noteIdAndParaMap.get(noteId); + if (paraIdSet != null && !paraIdSet.isEmpty()) { + for (String paraId : paraIdSet) { + Note note = notebook().getNote(noteId); + if (note != null) { + Paragraph paragraph = note.getParagraph(paraId); + if (paragraph != null) { + paragraph.clearRuntimeInfo(setting.getId()); + broadcast(noteId, new Message(OP.PARAGRAPH).put("paragraph", paragraph)); + } + } + } + } + } + } + setting.clearNoteIdAndParaMap(); + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java index f8498685580..41f4cec4841 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java @@ -64,14 +64,14 @@ protected String getParagraphXPath(int paragraphNo) { protected boolean waitForParagraph(final int paragraphNo, final String state) { By locator = By.xpath(getParagraphXPath(paragraphNo) - + "//div[contains(@class, 'control')]//span[1][contains(.,'" + state + "')]"); + + "//div[contains(@class, 'control')]//span[2][contains(.,'" + state + "')]"); WebElement element = pollingWait(locator, MAX_PARAGRAPH_TIMEOUT_SEC); return element.isDisplayed(); } protected String getParagraphStatus(final int paragraphNo) { By locator = By.xpath(getParagraphXPath(paragraphNo) - + "//div[contains(@class, 'control')]//span[1]"); + + "//div[contains(@class, 'control')]/span[2]"); return driver.findElement(locator).getText(); } diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html index 351fb5ffecc..5f9c4620ae0 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html @@ -13,7 +13,27 @@ -->
- + + + + + {{paragraph.runtimeInfos.jobUrl.label}} + + + + + + {{paragraph.runtimeInfos.jobUrl.label}}S + + + + {{paragraph.status}} @@ -31,6 +51,20 @@ tooltip="Cancel (Ctrl+{{ (isMac ? 'Option' : 'Alt') }}+c)" ng-click="cancelParagraph(paragraph)" ng-show="paragraph.status=='RUNNING' || paragraph.status=='PENDING'"> + + Spark job + + + Spark Jobs + + + diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 342d41f37d2..e6f3244eee4 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -1036,6 +1036,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat } }); + $scope.$on('updateParaInfos', function(event, data) { + if (data.id === $scope.paragraph.id) { + $scope.paragraph.runtimeInfos = data.infos; + } + }); + $scope.$on('angularObjectRemove', function(event, data) { var noteId = $route.current.pathParams.noteId; if (!data.noteId || data.noteId === noteId) { @@ -1083,7 +1089,8 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat isEmpty(newPara.results) !== isEmpty(oldPara.results) || newPara.errorMessage !== oldPara.errorMessage || !angular.equals(newPara.settings, oldPara.settings) || - !angular.equals(newPara.config, oldPara.config))) + !angular.equals(newPara.config, oldPara.config) || + !angular.equals(newPara.runtimeInfos, oldPara.runtimeInfos))) } $scope.updateAllScopeTexts = function(oldPara, newPara) { @@ -1126,46 +1133,47 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat $scope.paragraph.results = newPara.results; } $scope.paragraph.settings = newPara.settings; + $scope.paragraph.runtimeInfos = newPara.runtimeInfos; if ($scope.editor) { $scope.editor.setReadOnly($scope.isRunning(newPara)); } if (!$scope.asIframe) { - $scope.paragraph.config = newPara.config; - initializeDefault(newPara.config); - } else { - newPara.config.editorHide = true; - newPara.config.tableHide = false; - $scope.paragraph.config = newPara.config; - } - }; - - $scope.updateParagraph = function(oldPara, newPara, updateCallback) { - // 1. get status, refreshed - const statusChanged = (newPara.status !== oldPara.status); - const resultRefreshed = (newPara.dateFinished !== oldPara.dateFinished) || - isEmpty(newPara.results) !== isEmpty(oldPara.results) || - newPara.status === 'ERROR' || (newPara.status === 'FINISHED' && statusChanged); - - // 2. update texts managed by $scope - $scope.updateAllScopeTexts(oldPara, newPara); - - // 3. execute callback to update result - updateCallback(); - - // 4. update remaining paragraph objects - $scope.updateParagraphObjectWhenUpdated(newPara); - - // 5. handle scroll down by key properly if new paragraph is added - if (statusChanged || resultRefreshed) { - // when last paragraph runs, zeppelin automatically appends new paragraph. - // this broadcast will focus to the newly inserted paragraph - const paragraphs = angular.element('div[id$="_paragraphColumn_main"]'); - if (paragraphs.length >= 2 && paragraphs[paragraphs.length - 2].id.indexOf($scope.paragraph.id) === 0) { - // rendering output can took some time. So delay scrolling event firing for sometime. - setTimeout(() => { $rootScope.$broadcast('scrollToCursor'); }, 500); - } - } + $scope.paragraph.config = newPara.config; + initializeDefault(newPara.config); + } else { + newPara.config.editorHide = true; + newPara.config.tableHide = false; + $scope.paragraph.config = newPara.config; + } + }; + + $scope.updateParagraph = function(oldPara, newPara, updateCallback) { + // 1. get status, refreshed + const statusChanged = (newPara.status !== oldPara.status); + const resultRefreshed = (newPara.dateFinished !== oldPara.dateFinished) || + isEmpty(newPara.results) !== isEmpty(oldPara.results) || + newPara.status === 'ERROR' || (newPara.status === 'FINISHED' && statusChanged); + + // 2. update texts managed by $scope + $scope.updateAllScopeTexts(oldPara, newPara); + + // 3. execute callback to update result + updateCallback(); + + // 4. update remaining paragraph objects + $scope.updateParagraphObjectWhenUpdated(newPara); + + // 5. handle scroll down by key properly if new paragraph is added + if (statusChanged || resultRefreshed) { + // when last paragraph runs, zeppelin automatically appends new paragraph. + // this broadcast will focus to the newly inserted paragraph + const paragraphs = angular.element('div[id$="_paragraphColumn_main"]'); + if (paragraphs.length >= 2 && paragraphs[paragraphs.length - 2].id.indexOf($scope.paragraph.id) === 0) { + // rendering output can took some time. So delay scrolling event firing for sometime. + setTimeout(() => { $rootScope.$broadcast('scrollToCursor'); }, 500); + } + } }; $scope.$on('runParagraphUsingSpell', function(event, data) { diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index aceffbbe30f..df0ea4806fb 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -164,6 +164,8 @@ function websocketEvents($rootScope, $websocket, $location, baseUrlSrv) { $rootScope.$broadcast('updateNote', data.name, data.config, data.info); } else if (op === 'SET_NOTE_REVISION') { $rootScope.$broadcast('setNoteRevisionResult', data); + } else if (op === 'PARAS_INFO') { + $rootScope.$broadcast('updateParaInfos', data); } }); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index bd7d664841e..74424303daf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.gson.annotations.SerializedName; @@ -48,6 +49,10 @@ public class InterpreterSetting { private String group; private transient Map infos; + // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. + // This map is used to clear the infos in paragraph when the interpretersetting is restarted + private transient Map> runtimeInfosToBeCleared; + /** * properties can be either Properties or Map * properties should be: @@ -125,7 +130,7 @@ public String getName() { return name; } - String getGroup() { + public String getGroup() { return group; } @@ -401,4 +406,24 @@ public InterpreterRunner getInterpreterRunner() { public void setInterpreterRunner(InterpreterRunner interpreterRunner) { this.interpreterRunner = interpreterRunner; } + + public void addNoteToPara(String noteId, String paraId) { + if (runtimeInfosToBeCleared == null) { + runtimeInfosToBeCleared = new HashMap<>(); + } + Set paraIdSet = runtimeInfosToBeCleared.get(noteId); + if (paraIdSet == null) { + paraIdSet = new HashSet<>(); + runtimeInfosToBeCleared.put(noteId, paraIdSet); + } + paraIdSet.add(paraId); + } + + public Map> getNoteIdAndParaMap() { + return runtimeInfosToBeCleared; + } + + public void clearNoteIdAndParaMap() { + runtimeInfosToBeCleared = null; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 26f4e1a9032..224dd4b7697 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -379,6 +379,7 @@ public Paragraph clearParagraphOutput(String paragraphId) { for (Paragraph p : paragraphs) { if (p.getId().equals(paragraphId)) { p.setReturn(null, null); + p.clearRuntimeInfo(null); return p; } } @@ -563,6 +564,7 @@ public void run(String paragraphId) { return; } + p.clearRuntimeInfo(null); String requiredReplName = p.getRequiredReplName(); Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 8b946f2c344..b853e070431 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -480,6 +480,7 @@ public Note loadNoteFromRepo(String id, AuthenticationInfo subject) { if (p.getDateFinished() != null && lastUpdatedDate.before(p.getDateFinished())) { lastUpdatedDate = p.getDateFinished(); } + p.clearRuntimeInfo(null); } Map> savedObjects = note.getAngularObjects(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 27a707137a1..12501946730 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -70,9 +70,10 @@ public class Paragraph extends Job implements Serializable, Cloneable { // For backward compatibility of note.json format after ZEPPELIN-212 Object result; + private Map runtimeInfos; /** - * Applicaiton states in this paragraph + * Application states in this paragraph */ private final List apps = new LinkedList<>(); @@ -676,4 +677,52 @@ private boolean isValidInterpreter(String replName) { return false; } } + + public void updateRuntimeInfos(String label, String tooltip, Map infos, + String group, String intpSettingId) { + if (this.runtimeInfos == null) { + this.runtimeInfos = new HashMap(); + } + + if (infos != null) { + for (String key : infos.keySet()) { + ParagraphRuntimeInfo info = this.runtimeInfos.get(key); + if (info == null) { + info = new ParagraphRuntimeInfo(key, label, tooltip, group, intpSettingId); + this.runtimeInfos.put(key, info); + } + info.addValue(infos.get(key)); + } + } + } + + /** + * Remove runtimeinfo taht were got from the setting with id settingId + * @param settingId + */ + public void clearRuntimeInfo(String settingId) { + if (settingId != null) { + Set keys = runtimeInfos.keySet(); + if (keys.size() > 0) { + List infosToRemove = new ArrayList<>(); + for (String key : keys) { + ParagraphRuntimeInfo paragraphRuntimeInfo = runtimeInfos.get(key); + if (paragraphRuntimeInfo.getInterpreterSettingId().equals(settingId)) { + infosToRemove.add(key); + } + } + if (infosToRemove.size() > 0) { + for (String info : infosToRemove) { + runtimeInfos.remove(info); + } + } + } + } else { + this.runtimeInfos = null; + } + } + + public Map getRuntimeInfos() { + return runtimeInfos; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java new file mode 100644 index 00000000000..0042023d0b1 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java @@ -0,0 +1,39 @@ +package org.apache.zeppelin.notebook; + +import java.util.ArrayList; +import java.util.List; + +/** + * Store runtime infos of each para + * + */ +public class ParagraphRuntimeInfo { + + private String propertyName; // Name of the property + private String label; // Label to be used in UI + private String tooltip; // Tooltip text toshow in UI + private String group; // The interpretergroup from which the info was derived + private List values; // values for the property + private String interpreterSettingId; + + public ParagraphRuntimeInfo(String propertyName, String label, + String tooltip, String group, String intpSettingId) { + if (intpSettingId == null) { + throw new IllegalArgumentException("Interpreter setting Id cannot be null"); + } + this.propertyName = propertyName; + this.label = label; + this.tooltip = tooltip; + this.group = group; + this.interpreterSettingId = intpSettingId; + this.values = new ArrayList<>(); + } + + public void addValue(String value) { + values.add(value); + } + + public String getInterpreterSettingId() { + return interpreterSettingId; + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index a6d15462152..d40afc22ffb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -174,7 +174,8 @@ public static enum OP { NOTE_UPDATED, // [s-c] paragraph updated(name, config) RUN_ALL_PARAGRAPHS, // [c-s] run all paragraphs PARAGRAPH_EXECUTED_BY_SPELL, // [c-s] paragraph was executed by spell - RUN_PARAGRAPH_USING_SPELL // [s-c] run paragraph using spell + RUN_PARAGRAPH_USING_SPELL, // [s-c] run paragraph using spell + PARAS_INFO // [s-c] paragraph runtime infos } public static final Message EMPTY = new Message(null);