Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class RemoteInterpreter extends Interpreter {
static Map<String, RemoteInterpreterProcess> interpreterGroupReference
= new HashMap<String, RemoteInterpreterProcess>();

private InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;

public RemoteInterpreter(Properties property,
Expand All @@ -72,7 +71,6 @@ public RemoteInterpreter(Properties property,
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
interpreterContextRunnerPool = new InterpreterContextRunnerPool();
this.connectTimeout = connectTimeout;
}

Expand Down Expand Up @@ -195,6 +193,9 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
throw new InterpreterException(e1);
}

InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
.getInterpreterContextRunnerPool();

List<InterpreterContextRunner> runners = context.getRunners();
if (runners != null && runners.size() != 0) {
// assume all runners in this InterpreterContext have the same note id
Expand Down Expand Up @@ -338,7 +339,7 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
|| (!intpProcess.isRunning() && intpProcess.getPort() == -1)) {
interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup),
new RemoteInterpreterProcess(interpreterRunner,
interpreterPath, env, interpreterContextRunnerPool, connectTimeout));
interpreterPath, env, connectTimeout));

logger.info("setInterpreterGroup = "
+ getInterpreterGroupKey(interpreterGroup) + " class=" + className
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,19 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
public RemoteInterpreterProcess(String intpRunner,
String intpDir,
Map<String, String> env,
InterpreterContextRunnerPool interpreterContextRunnerPool, int connectTimeout) {
this(intpRunner, intpDir, env, interpreterContextRunnerPool,
new RemoteInterpreterEventPoller(), connectTimeout);
int connectTimeout) {
this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout);
}

RemoteInterpreterProcess(String intpRunner,
String intpDir,
Map<String, String> env,
InterpreterContextRunnerPool interpreterContextRunnerPool,
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
this.interpreterRunner = intpRunner;
this.interpreterDir = intpDir;
this.env = env;
this.interpreterContextRunnerPool = interpreterContextRunnerPool;
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
referenceCount = new AtomicInteger(0);
this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
this.connectTimeout = connectTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public class RemoteInterpreterProcessTest {
@Test
public void testStartStop() {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
new InterpreterContextRunnerPool(), 10 * 1000);
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
10 * 1000);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));
Expand All @@ -48,8 +49,9 @@ public void testStartStop() {
@Test
public void testClientFactory() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
new InterpreterContextRunnerPool(), mock(RemoteInterpreterEventPoller.class), 10 * 1000);
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
rip.reference(intpGroup);
assertEquals(0, rip.getNumActiveClient());
assertEquals(0, rip.getNumIdleClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ public void pySparkTest() throws IOException {
ZeppelinServer.notebook.removeNote(note.id());
}

@Test
public void zRunTest() throws IOException {
// create new note
Note note = ZeppelinServer.notebook.createNote();
Paragraph p0 = note.addParagraph();
p0.setText("z.run(1)");
Paragraph p1 = note.addParagraph();
p1.setText("val a=10");
Paragraph p2 = note.addParagraph();
p2.setText("print(a)");

note.run(p0.getId());
waitForFinish(p0);

note.run(p2.getId());
waitForFinish(p2);
assertEquals("10", p2.getResult().message());

ZeppelinServer.notebook.removeNote(note.id());
}

/**
* Get spark version number as a numerical value.
* eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
Expand Down