diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 420ebd5fda9..2bc895003e6 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -430,13 +430,13 @@ public List completion(String buf, int cursor) { statementSetNotifier.notify(); } + String[] completionList = null; synchronized (statementFinishedNotifier) { long startTime = System.currentTimeMillis(); while (statementOutput == null - && pythonScriptInitialized == false && pythonscriptRunning) { try { - if (System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) { + if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) { logger.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC); break; } @@ -447,16 +447,20 @@ public List completion(String buf, int cursor) { return new LinkedList<>(); } } + if (statementError) { + return new LinkedList<>(); + } + InterpreterResult completionResult; + completionResult = new InterpreterResult(Code.SUCCESS, statementOutput); + Gson gson = new Gson(); + completionList = gson.fromJson(completionResult.message(), String[].class); } + //end code for completion - if (statementError) { + if (completionList == null) { return new LinkedList<>(); } - InterpreterResult completionResult = new InterpreterResult(Code.SUCCESS, statementOutput); - //end code for completion - Gson gson = new Gson(); - String[] completionList = gson.fromJson(completionResult.message(), String[].class); List results = new LinkedList<>(); for (String name: completionList) { results.add(new InterpreterCompletion(name, name)); diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 401e0fd463c..6a60fef7d77 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; @@ -37,6 +38,7 @@ import java.io.File; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import static org.junit.Assert.*; @@ -144,4 +146,11 @@ public void testBasicIntp() { } } + @Test + public void testCompletion() { + if (getSparkVersionNumber() > 11) { + List completions = pySparkInterpreter.completion("sc.", "sc.".length()); + assertTrue(completions.size() > 0); + } + } }