From 5de2d035fadff6aa43e463f33ac030f831ee1d58 Mon Sep 17 00:00:00 2001 From: mrzhao Date: Tue, 19 Sep 2023 05:55:11 +0800 Subject: [PATCH] [ZEPPELIN-5962] LivyInterpreter support more parameter when create session (#4657) * LivyInterpreter support more parameter when create session * Add test for ZEPPELIN-5962 --------- Co-authored-by: mrzhao --- .../zeppelin/livy/BaseLivyInterpreter.java | 19 ++++++++-- .../zeppelin/livy/LivyInterpreterIT.java | 36 +++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java index f7b576d8188..b897537ec26 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; @@ -302,15 +303,19 @@ private SessionInfo createSession(String user, String kind) throws LivyException { try { Map conf = new HashMap<>(); + Map params = new HashMap<>(); for (Map.Entry entry : getProperties().entrySet()) { if (entry.getKey().toString().startsWith("livy.spark.") && !entry.getValue().toString().isEmpty()) { conf.put(entry.getKey().toString().substring(5), entry.getValue().toString()); + } else if (entry.getKey().toString().startsWith("livy.") && + !entry.getValue().toString().isEmpty()) { + params.put(entry.getKey().toString().substring(5), entry.getValue().toString()); } } CreateSessionRequest request = new CreateSessionRequest(kind, - user == null || user.equals("anonymous") ? null : user, conf); + user == null || user.equals("anonymous") ? null : user, conf, params); SessionInfo sessionInfo = SessionInfo.fromJson( callRestAPI("/sessions", "POST", request.toJson())); long start = System.currentTimeMillis(); @@ -776,15 +781,23 @@ private static class CreateSessionRequest { @SerializedName("proxyUser") public final String user; public final Map conf; + public final Map params; - CreateSessionRequest(String kind, String user, Map conf) { + CreateSessionRequest(String kind, String user, Map conf, + Map params) { this.kind = kind; this.user = user; this.conf = conf; + this.params = params; } public String toJson() { - return gson.toJson(this); + JsonObject jsonObject = new JsonObject(); + jsonObject.add("conf", gson.toJsonTree(conf)); + params.forEach(jsonObject::addProperty); + jsonObject.addProperty("kind", kind); + jsonObject.addProperty("proxyUser", user); + return gson.toJson(jsonObject); } } diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index a6490858972..3a3a17c5afb 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -644,6 +644,42 @@ public void run() { } } + @Test + void testLivyParams() throws InterpreterException { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList()); + Properties props = new Properties(properties); + props.setProperty("livy.spark.executor.cores", "4"); + props.setProperty("livy.name", "zeppelin-livy"); + LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(props); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = InterpreterContext.builder() + .setNoteId("noteId") + .setParagraphId("paragraphId") + .setAuthenticationInfo(authInfo) + .setInterpreterOut(output) + .build(); + sparkInterpreter.open(); + + try { + InterpreterResult result = sparkInterpreter.interpret("sc.version\n" + + "assert(sc.getConf.get(\"spark.executor.cores\") == \"4\" && " + + "sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")" + , context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString()); + assertEquals(1, result.message().size()); + } finally { + sparkInterpreter.close(); + } + } + @Test void testLivyTutorialNote() throws IOException, InterpreterException { if (!checkPreCondition()) {