From ab05f869eafefe72acf2542f0db8b03fec9cdf7b Mon Sep 17 00:00:00 2001 From: CloverHearts Date: Fri, 11 Dec 2015 19:50:52 +0900 Subject: [PATCH 1/5] implement pyspark completion --- .../zeppelin/spark/PySparkInterpreter.java | 93 ++++++++++++++++++- .../main/resources/python/zeppelin_pyspark.py | 47 +++++++++- .../paragraph/paragraph.controller.js | 29 +----- 3 files changed, 138 insertions(+), 31 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 273c89736ec..0bfad6a0b40 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -58,6 +58,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; + import py4j.GatewayServer; /** @@ -368,12 +373,96 @@ public int getProgress(InterpreterContext context) { return sparkInterpreter.getProgress(context); } + @Override public List completion(String buf, int cursor) { - // not supported - return new LinkedList(); + if (buf.length() < cursor) { + cursor = buf.length(); + } + String completionString = getCompletionTargetString(buf, cursor); + String completionCommand = "completion.getCompletion('" + completionString + "')"; + + //start code for completion + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.getSparkVersion().isUnsupportedVersion() == false + && pythonscriptRunning == false) { + return new LinkedList(); + } + + outputStream.reset(); + + pythonInterpretRequest = new PythonInterpretRequest(completionCommand, ""); + statementOutput = null; + + synchronized (statementSetNotifier) { + statementSetNotifier.notify(); + } + + synchronized (statementFinishedNotifier) { + while (statementOutput == null) { + try { + statementFinishedNotifier.wait(1000); + } catch (InterruptedException e) { + // not working + logger.info("wait drop"); + return new LinkedList(); + } + } + } + + if (statementError) { + return new LinkedList(); + } + InterpreterResult completionResult = new InterpreterResult(Code.SUCCESS, statementOutput); + //end code for completion + + Gson gson = new Gson(); + + return gson.fromJson(completionResult.message(), LinkedList.class); } + private String getCompletionTargetString(String text, int cursor) { + String[] completionSeqCharaters = {" ", "\n", "\t"}; + int completionEndPosition = cursor; + int completionStartPosition = cursor; + int indexOfReverseSeqPostion = cursor; + + String resultCompletionText = ""; + String completionScriptText = ""; + try { + completionScriptText = text.substring(0, cursor); + } + catch (Exception e) { + logger.error(e.toString()); + return null; + } + completionEndPosition = completionScriptText.length(); + + String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString(); + + for (String seqCharacter : completionSeqCharaters) { + indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter); + + if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) { + completionStartPosition = indexOfReverseSeqPostion; + } + + } + + if (completionStartPosition == completionEndPosition) { + completionStartPosition = 0; + } + else + { + completionStartPosition = completionEndPosition - completionStartPosition; + } + resultCompletionText = completionScriptText.substring( + completionStartPosition , completionEndPosition); + + return resultCompletionText; + } + + private SparkInterpreter getSparkInterpreter() { InterpreterGroup intpGroup = getInterpreterGroup(); LazyOpenInterpreter lazy = null; diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 1b17772b736..4d8a097d378 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -15,7 +15,7 @@ # limitations under the License. # -import sys, getopt, traceback +import sys, getopt, traceback, json from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.protocol import Py4JJavaError @@ -107,6 +107,50 @@ def isAutoConvertEnabled(self): def isImportAllPackageUnderSparkSql(self): return self.version >= self.SPARK_1_3_0 +class PySparkCompletion: + def getGlobalCompletion(self): + objectDefList = [] + try: + for completionItem in list(globals().iterkeys()): + objectDefList.append(completionItem) + except: + return None + else: + return objectDefList + + def getMethodCompletion(self, text_value): + objectDefList = [] + completion_target = text_value + try: + if len(completion_target) <= 0: + return None + if text_value[-1] == ".": + completion_target = text_value[:-1] + exec("%s = %s(%s)" % ("objectDefList", "dir", completion_target)) + except: + return None + else: + return objectDefList + + + def getCompletion(self, text_value): + completionList = set() + + globalCompletionList = self.getGlobalCompletion() + if globalCompletionList != None: + for completionItem in list(globalCompletionList): + completionList.add(completionItem) + + if text_value != None: + objectCompletionList = self.getMethodCompletion(text_value) + if objectCompletionList != None: + for completionItem in list(objectCompletionList): + completionList.add(completionItem) + if len(completionList) <= 0: + print "" + else: + print json.dumps(list(completionList)) + output = Logger() sys.stdout = output @@ -149,6 +193,7 @@ def isImportAllPackageUnderSparkSql(self): sqlc = SQLContext(sc, intp.getSQLContext()) sqlContext = sqlc +completion = PySparkCompletion() z = PyZeppelinContext(intp.getZeppelinContext()) while True : diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index dd66a593206..5aff815da09 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -540,35 +540,8 @@ angular.module('zeppelinWebApp') pos = session.getTextRange(new Range(0, 0, pos.row, pos.column)).length; var buf = session.getValue(); - var completionString = buf; - if (pos > 0) { - var completionStartPosition = pos; - var completionSeqCharaters = [' ', '\n']; - - // replace \r\n or \n\r other to \n - var reverseCompletionString = buf.replace(/\r?\n|\r/g, '\n').substr(0, pos).split('').reverse(); - for (var seqCharacterIndex in completionSeqCharaters) { - var indexOfReverseSeqPostion = reverseCompletionString.indexOf(completionSeqCharaters[seqCharacterIndex]); - - if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) { - completionStartPosition = indexOfReverseSeqPostion; - } - } - - if (completionStartPosition === pos) { - completionStartPosition = 0; - } - else - { - completionStartPosition = pos - completionStartPosition; - } - - completionString = buf.substr( completionStartPosition , pos); - pos = completionString.length -1; - } - - websocketMsgSrv.completion($scope.paragraph.id, completionString, pos); + websocketMsgSrv.completion($scope.paragraph.id, buf, pos); $scope.$on('completionList', function(event, data) { if (data.completions) { From f4dc4874cc52adb67c48cbe3498a2294e1a6d211 Mon Sep 17 00:00:00 2001 From: CloverHearts Date: Wed, 16 Dec 2015 10:08:47 +0900 Subject: [PATCH 2/5] Pyspark completion __ filtering --- spark/src/main/resources/python/zeppelin_pyspark.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 4d8a097d378..3973e82276b 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -15,7 +15,7 @@ # limitations under the License. # -import sys, getopt, traceback, json +import sys, getopt, traceback, json, re from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.protocol import Py4JJavaError @@ -149,7 +149,8 @@ def getCompletion(self, text_value): if len(completionList) <= 0: print "" else: - print json.dumps(list(completionList)) + resultCompletion = filter(lambda x : not re.match("__.*__", x), list(completionList)) + print json.dumps(resultCompletion) output = Logger() From 1f8282519da9fc0bbcce5d3a1d6defc2de179dab Mon Sep 17 00:00:00 2001 From: CloverHearts Date: Wed, 16 Dec 2015 14:24:28 +0900 Subject: [PATCH 3/5] remove variable resultComplaetion --- spark/src/main/resources/python/zeppelin_pyspark.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 3973e82276b..9e52f2569e0 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -149,8 +149,7 @@ def getCompletion(self, text_value): if len(completionList) <= 0: print "" else: - resultCompletion = filter(lambda x : not re.match("__.*__", x), list(completionList)) - print json.dumps(resultCompletion) + print json.dumps(filter(lambda x : not re.match("__.*__", x), list(completionList))) output = Logger() From d7efe8822cc8d8dc5088d008a7b1cb5c5c7a2c06 Mon Sep 17 00:00:00 2001 From: CloverHearts Date: Wed, 16 Dec 2015 17:06:19 -0800 Subject: [PATCH 4/5] change filter regex character --- spark/src/main/resources/python/zeppelin_pyspark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 9e52f2569e0..d4ab46af670 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -149,7 +149,7 @@ def getCompletion(self, text_value): if len(completionList) <= 0: print "" else: - print json.dumps(filter(lambda x : not re.match("__.*__", x), list(completionList))) + print json.dumps(filter(lambda x : not re.match("__.+[^__]$", x), list(completionList))) output = Logger() From 68324099140e85f3aa7a035f80cd872a93c3901d Mon Sep 17 00:00:00 2001 From: CloverHearts Date: Wed, 16 Dec 2015 20:12:17 -0800 Subject: [PATCH 5/5] Changed PySpark Completion Filter regex --- spark/src/main/resources/python/zeppelin_pyspark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index d4ab46af670..7ab0be99938 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -149,7 +149,7 @@ def getCompletion(self, text_value): if len(completionList) <= 0: print "" else: - print json.dumps(filter(lambda x : not re.match("__.+[^__]$", x), list(completionList))) + print json.dumps(filter(lambda x : not re.match("^__.*", x), list(completionList))) output = Logger()