Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e1f219
code base workflow for remote zeppelin server control default thrift …
cloverhearts Nov 11, 2016
0570ae8
add remote works controller class and include interpreter factory
cloverhearts Nov 14, 2016
2523238
Implement eventForWait class
cloverhearts Nov 14, 2016
3d34f9e
Implement getParagraphRunner transaction.
cloverhearts Nov 15, 2016
3ed556c
remove debug console message.
cloverhearts Nov 15, 2016
afb9db7
Merge branch 'master' into workflow
cloverhearts Nov 15, 2016
5a7886f
fix sio support
cloverhearts Nov 15, 2016
55e8704
support spark r
cloverhearts Nov 15, 2016
3f75bd5
support scald
cloverhearts Nov 15, 2016
6fbe08a
Merge branch 'master' into workflow
cloverhearts Nov 17, 2016
2628a20
fix thrift
cloverhearts Nov 17, 2016
4b1ef08
fix thrift interface
cloverhearts Nov 17, 2016
c074f07
fix sio support
cloverhearts Nov 17, 2016
f11fed4
Merge branch 'workflow' into extends-zrun-remote-transaction
cloverhearts Nov 17, 2016
8d42c16
Merge branch 'master' into extends-zrun-remote-transaction
cloverhearts Nov 18, 2016
8cbe46c
remote remoteworksController in interpreter.java
cloverhearts Nov 18, 2016
9ab05af
Change structure and remove remoteWorksManager
cloverhearts Nov 18, 2016
f9661c8
Merge branch 'master' into extends-zrun-remote-transaction
cloverhearts Nov 21, 2016
10c2a47
Implement runNote and re implement run method
cloverhearts Nov 21, 2016
292319a
add test case for extends z.run and z.runNote
cloverhearts Nov 22, 2016
342752d
add document for extends z.run and z.runNote
cloverhearts Nov 22, 2016
8a54917
Merge branch 'master' into extends-zrun-remote-transaction
cloverhearts Nov 23, 2016
7562535
remove unused import and asterisk import
cloverhearts Nov 23, 2016
5ec4640
change defined protocol for thrift
cloverhearts Nov 23, 2016
3862166
regenerate thrfit class
cloverhearts Nov 25, 2016
e6cd82c
Merge branch 'master' into extends-zrun-remote-transaction
cloverhearts Nov 25, 2016
5a80a5a
last test case time check to print string
cloverhearts Nov 26, 2016
f2e3bcf
fix TestCase
cloverhearts Nov 26, 2016
2a2c173
Merge branch 'master' into extends-zrun-remote-transaction
cloverhearts Nov 26, 2016
03a3a2b
testcase change z.run(2, context) to z.run(2)
cloverhearts Nov 28, 2016
113b475
Merge branch 'master' into extends-zrun-remote-transaction
cloverhearts Nov 28, 2016
41fa9d7
restore unless changed and import
cloverhearts Nov 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/displaysystem/front-end-angular.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,17 @@ How does the front-end AngularJS API compares to the [back-end API](./back-end-a
<td>Executing Paragraph</td>
<td>z.runParagraph(paragraphId)</td>
<td>z.run(paragraphId)</td>
</tr>
</tr>
<tr>
<td>Executing Paragraph (Specific paragraphs in other notes) (</td>
<td></td>
<td>z.run(noteid, paragraphId)</td>
</tr>
<tr>
<td>Executing note</td>
<td></td>
<td>z.runNote(noteId)</td>
</tr>
<tbody>
<tbody>
</table>
Expand Down
119 changes: 94 additions & 25 deletions spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
Expand Down Expand Up @@ -300,32 +301,103 @@ public static String showDF(SparkContext sc,

/**
* Run paragraph by id
* @param id
* @param noteId
* @param paragraphId
*/
@ZeppelinApi
public void run(String id) {
run(id, interpreterContext);
public void run(String noteId, String paragraphId) {
run(noteId, paragraphId, interpreterContext);
}

/**
* Run paragraph by id
* @param id
* @param paragraphId
*/
@ZeppelinApi
public void run(String paragraphId) {
String noteId = interpreterContext.getNoteId();
run(noteId, paragraphId, interpreterContext);
}

/**
* Run paragraph by id
* @param noteId
* @param context
*/
@ZeppelinApi
public void run(String id, InterpreterContext context) {
if (id.equals(context.getParagraphId())) {
public void run(String noteId, String paragraphId, InterpreterContext context) {
if (paragraphId.equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}

for (InterpreterContextRunner r : context.getRunners()) {
if (id.equals(r.getParagraphId())) {
r.run();
return;
List<InterpreterContextRunner> runners =
getInterpreterContextRunner(noteId, paragraphId, context);

if (runners.size() <= 0) {
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
}

for (InterpreterContextRunner r : runners) {
r.run();
}

}

public void runNote(String noteId) {
runNote(noteId, interpreterContext);
}

public void runNote(String noteId, InterpreterContext context) {
String runningNoteId = context.getNoteId();
String runningParagraphId = context.getParagraphId();
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);

if (runners.size() <= 0) {
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
}

for (InterpreterContextRunner r : runners) {
if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
continue;
}
r.run();
}
}


/**
* get Zeppelin Paragraph Runner from zeppelin server
* @param noteId
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(
String noteId, InterpreterContext interpreterContext) {
List<InterpreterContextRunner> runners = new LinkedList<>();
RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();

throw new InterpreterException("Paragraph " + id + " not found");
if (remoteWorksController != null) {
runners = remoteWorksController.getRemoteContextRunner(noteId);
}

return runners;
}

/**
* get Zeppelin Paragraph Runner from zeppelin server
* @param noteId
* @param paragraphId
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(
String noteId, String paragraphId, InterpreterContext interpreterContext) {
List<InterpreterContextRunner> runners = new LinkedList<>();
RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();

if (remoteWorksController != null) {
runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
}

return runners;
}

/**
Expand All @@ -334,20 +406,22 @@ public void run(String id, InterpreterContext context) {
*/
@ZeppelinApi
public void run(int idx) {
run(idx, interpreterContext);
String noteId = interpreterContext.getNoteId();
run(noteId, idx, interpreterContext);
}

/**
* Run paragraph at index
* @param idx index starting from 0
* @param context interpreter context
*/
public void run(int idx, InterpreterContext context) {
if (idx >= context.getRunners().size()) {
public void run(String noteId, int idx, InterpreterContext context) {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (idx >= runners.size()) {
throw new InterpreterException("Index out of bound");
}

InterpreterContextRunner runner = context.getRunners().get(idx);
InterpreterContextRunner runner = runners.get(idx);
if (runner.getParagraphId().equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
Expand All @@ -366,13 +440,14 @@ public void run(List<Object> paragraphIdOrIdx) {
*/
@ZeppelinApi
public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
String noteId = context.getNoteId();
for (Object idOrIdx : paragraphIdOrIdx) {
if (idOrIdx instanceof String) {
String id = (String) idOrIdx;
run(id, context);
String paragraphId = (String) idOrIdx;
run(noteId, paragraphId, context);
} else if (idOrIdx instanceof Integer) {
Integer idx = (Integer) idOrIdx;
run(idx, context);
run(noteId, idx, context);
} else {
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
}
Expand All @@ -389,13 +464,7 @@ public void runAll() {
*/
@ZeppelinApi
public void runAll(InterpreterContext context) {
for (InterpreterContextRunner r : context.getRunners()) {
if (r.getParagraphId().equals(context.getParagraphId())) {
// skip itself
continue;
}
r.run();
}
runNote(context.getNoteId());
}

@ZeppelinApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void remove() {
private List<InterpreterContextRunner> runners;
private String className;
private RemoteEventClientWrapper client;
private RemoteWorksController remoteWorksController;

public InterpreterContext(String noteId,
String paragraphId,
Expand All @@ -75,6 +76,24 @@ public InterpreterContext(String noteId,
List<InterpreterContextRunner> runners,
InterpreterOutput out
) {
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
config, gui, angularObjectRegistry, resourcePool, runners, out, null);
}

public InterpreterContext(String noteId,
String paragraphId,
String replName,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
ResourcePool resourcePool,
List<InterpreterContextRunner> runners,
InterpreterOutput out,
RemoteWorksController remoteWorksController
) {
this.noteId = noteId;
this.paragraphId = paragraphId;
this.replName = replName;
Expand All @@ -87,6 +106,7 @@ public InterpreterContext(String noteId,
this.resourcePool = resourcePool;
this.runners = runners;
this.out = out;
this.remoteWorksController = remoteWorksController;
}

public InterpreterContext(String noteId,
Expand All @@ -101,9 +121,11 @@ public InterpreterContext(String noteId,
ResourcePool resourcePool,
List<InterpreterContextRunner> contextRunners,
InterpreterOutput output,
RemoteWorksController remoteWorksController,
RemoteInterpreterEventClient eventClient) {
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
config, gui, angularObjectRegistry, resourcePool, contextRunners, output);
config, gui, angularObjectRegistry, resourcePool, contextRunners, output,
remoteWorksController);
this.client = new RemoteEventClient(eventClient);
}

Expand Down Expand Up @@ -162,4 +184,12 @@ public void setClassName(String className) {
public RemoteEventClientWrapper getClient() {
return client;
}

public RemoteWorksController getRemoteWorksController() {
return remoteWorksController;
}

public void setRemoteWorksController(RemoteWorksController remoteWorksController) {
this.remoteWorksController = remoteWorksController;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter;

import java.util.List;

/**
* zeppelin job for Remote works controller by interpreter
*
*/
public interface RemoteWorksController {
List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter;

/**
* Remote Zeppelin Server Resource
*/
public class RemoteZeppelinServerResource {
/**
* Resource Type for Zeppelin Server
*/
public enum Type{
PARAGRAPH_RUNNERS
}

private String ownerKey;
private Type resourceType;
private Object data;

public Type getResourceType() {
return resourceType;
}

public String getOwnerKey() {
return ownerKey;
}

public void setOwnerKey(String ownerKey) {
this.ownerKey = ownerKey;
}

public void setResourceType(Type resourceType) {
this.resourceType = resourceType;
}

public Object getData() {
return data;
}

public void setData(Object data) {
this.data = data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -48,6 +50,22 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
private final Gson gson = new Gson();

/**
* Run paragraph
* @param runner
*/
public void getZeppelinServerNoteRunner(
String eventOwnerKey, ZeppelinServerResourceParagraphRunner runner) {
RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource();
eventBody.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
eventBody.setOwnerKey(eventOwnerKey);
eventBody.setData(runner);

sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE,
gson.toJson(eventBody)));
}

/**
* Run paragraph
* @param runner
Expand Down
Loading