diff --git a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java index 5c3dee37387..1a17a766d37 100644 --- a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java +++ b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java @@ -20,7 +20,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.List; @@ -32,6 +31,7 @@ import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.JdbcInterpreterResult; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -117,10 +117,12 @@ public void close() { Statement currentStatement; private InterpreterResult executeSql(String sql) { try { + logger.debug("Running execute."); if (exceptionOnConnect != null) { return new InterpreterResult(Code.ERROR, exceptionOnConnect.getMessage()); } currentStatement = jdbcConnection.createStatement(); + logger.debug("Created statement."); StringBuilder msg = null; if (StringUtils.containsIgnoreCase(sql, "EXPLAIN ")) { //return the explain as text, make this visual explain later @@ -129,23 +131,12 @@ private InterpreterResult executeSql(String sql) { else { msg = new StringBuilder("%table "); } + logger.debug("Building JDBC result"); + ResultSet res = currentStatement.executeQuery(sql); + try { - ResultSetMetaData md = res.getMetaData(); - for (int i = 1; i < md.getColumnCount() + 1; i++) { - if (i == 1) { - msg.append(md.getColumnName(i)); - } else { - msg.append("\t" + md.getColumnName(i)); - } - } - msg.append("\n"); - while (res.next()) { - for (int i = 1; i < md.getColumnCount() + 1; i++) { - msg.append(res.getString(i) + "\t"); - } - msg.append("\n"); - } + return new JdbcInterpreterResult(Code.SUCCESS, res); } finally { try { @@ -157,8 +148,6 @@ private InterpreterResult executeSql(String sql) { } } - InterpreterResult rett = new InterpreterResult(Code.SUCCESS, msg.toString()); - return rett; } catch (SQLException ex) { logger.error("Can not run " + sql, ex); diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index edfa5e3d31c..8bc15b48685 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -92,5 +92,12 @@ commons-lang3 3.3.2 + + + org.apache.commons + commons-vfs2 + 2.0 + + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/JdbcInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/JdbcInterpreterResult.java new file mode 100644 index 00000000000..1931904b5b2 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/JdbcInterpreterResult.java @@ -0,0 +1,140 @@ +package org.apache.zeppelin.interpreter; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; + +import javax.sql.rowset.RowSetProvider; +import javax.sql.rowset.CachedRowSet; +import java.sql.SQLException; + + +import org.apache.commons.codec.binary.StringUtils; +import java.util.UUID; +import com.google.gson.GsonBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unlike a regular interpreter result, a JdbcInterpreter Result caches its + * output so that it can be used later and persisted independently of the regular result. + * It also has a standard return for all tabular SQL data. + * @author Rusty Phillips {@literal: } + * + */ +public class JdbcInterpreterResult extends InterpreterResult { + + private ResultSet results; + private String id; + private String repoName; + + private InterpreterResult innerResult; + + private Logger logger() { + + Logger logger = LoggerFactory.getLogger(JdbcInterpreterResult.class); + return logger; + } + + + public String getRepoName() { + return repoName; + } + + public void setRepoName(String repoName) { + this.repoName = repoName; + } + + public ResultSet getResults() { + return results; + } + + + public JdbcInterpreterResult(Code code, String Id) + { + super(code); + this.type = Type.TABLE; + this.setId(Id); + } + + public JdbcInterpreterResult(Code code, ResultSet resultSet) { + this(code, resultSet, + UUID.randomUUID().toString()); + } + + public JdbcInterpreterResult(Code code, ResultSet resultSet, String Id) { + super(code); + try { + + this.type = Type.TABLE; + // Not necessary at this time, but may be something to consider in the future. + // the results in memory, it is not necessarily a better option than the previous one. + /* + CachedRowSet impl = RowSetProvider.newFactory().createCachedRowSet(); + logger().debug("Populating result set"); + impl.populate(resultSet); + */ + logger().debug("Finished populating result set."); + this.results = resultSet; + this.id = Id; + message(); + } catch (Exception ex) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + this.code = code.ERROR; + ex.printStackTrace(pw); + logger().debug("Failed to populate result set.\n{}\n{}", + ex.getMessage(), sw.toString()); + this.msg = ex.getMessage(); + } + } + + + @Override + public String message() { + if (this.msg != null) + { + return this.msg; + } + + StringBuilder msg = new StringBuilder(); + if (code == code.ERROR) { return this.msg; } + try { + if (this.results == null) + { + this.code = code.ERROR; + this.msg = "Unable to find any results table"; + return this.msg; + } + ResultSetMetaData md = this.results.getMetaData(); + for (int i = 1; i < md.getColumnCount() + 1; i++) { + if (i == 1) { + msg.append(md.getColumnName(i)); + } else { + msg.append("\t" + md.getColumnName(i)); + } + } + msg.append("\n"); + while (this.results.next()) { + for (int i = 1; i < md.getColumnCount() + 1; i++) { + msg.append(results.getString(i) + "\t"); + } + msg.append("\n"); + } + this.msg = msg.toString(); + } catch (SQLException ex) { + code = code.ERROR; + this.msg = ex.getMessage(); + } + return this.msg; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java index f8f1bdd95a9..6a26fec7ddd 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java @@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterSerializer; +import org.apache.zeppelin.interpreter.InterpreterResult; import com.google.gson.Gson; import com.google.gson.GsonBuilder; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index a6e944da8be..fafc6f962a6 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -33,6 +33,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.ResultRepoFactory; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; @@ -68,6 +69,8 @@ public class ZeppelinServer extends Application { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class); private SchedulerFactory schedulerFactory; + private ResultRepoFactory resultRepoFactory; + public static Notebook notebook; public static NotebookServer notebookServer; @@ -252,8 +255,10 @@ public ZeppelinServer() throws Exception { this.schedulerFactory = new SchedulerFactory(); - this.replFactory = new InterpreterFactory(conf, notebookServer); + this.resultRepoFactory = new ResultRepoFactory(conf); + this.replFactory = new InterpreterFactory(conf, notebookServer, resultRepoFactory); this.notebookRepo = new NotebookRepoSync(conf); + notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/FilesystemResultRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/FilesystemResultRepo.java new file mode 100644 index 00000000000..bac2f285351 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/FilesystemResultRepo.java @@ -0,0 +1,164 @@ +package org.apache.zeppelin.interpreter; + +import java.sql.ResultSet; +import org.apache.commons.vfs2.FileContent; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.Selectors; +import org.apache.commons.vfs2.VFS; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.commons.io.IOUtils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import javax.sql.rowset.CachedRowSet; + +import java.util.UUID; + +/** + * Store the results back into the filesystem. + * @author Rusty Phillips {@literal: } + * + */ +public class FilesystemResultRepo extends RemoteResultRepo { + static { + RemoteResultRepo.register("File System", FilesystemResultRepo.class.getName()); + } + private FileSystemManager fsManager; + private URI filesystemRoot; + + private Properties properties; + private ZeppelinConfiguration conf; + + public FilesystemResultRepo(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + + try { + filesystemRoot = new URI(conf.getNotebookDir() + "/_results"); + } catch (URISyntaxException e1) { + throw new IOException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( + conf.getRelativeDir(filesystemRoot.getPath())).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + fsManager = VFS.getManager(); + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + + if (!rootDir.exists()) { + throw new IOException("Root path does not exists"); + } + + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + + + @Override + public String save(String result, String id) throws IOException { + removeResult(id); + + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(id, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + + FileObject resultJson = resultDir.resolveFile("result.txt", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultJson.getContent().getOutputStream(false); + out.write(result.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING))); + out.close(); + return id; + } + + private String getResult(FileObject resultDir) throws IOException { + FileObject resultJson = resultDir.resolveFile("result.txt", NameScope.CHILD); + if (!resultJson.exists()) { + throw new IOException(resultJson.getName().toString() + " not found"); + } + + FileContent content = resultJson.getContent(); + InputStream ins = content.getInputStream(); + String text = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + ins.close(); + + return text; + } + + @Override + public void removeResult(String id) throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + FileObject resultDir = rootDir.resolveFile(id, NameScope.CHILD); + + if (!resultDir.exists()) { + // nothing to do + return; + } + + if (!isDirectory(resultDir)) { + // it is not look like zeppelin note savings + throw new IOException("Can not remove " + resultDir.getName().toString()); + } + + resultDir.delete(Selectors.SELECT_SELF_AND_CHILDREN); + } + + @Override + public String get(String id) throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + FileObject noteDir = rootDir.resolveFile(id, NameScope.CHILD); + + return getResult(noteDir); + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 8a1d6ff2752..5483af2f903 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -49,6 +49,8 @@ import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.ParagraphSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,17 @@ */ public class InterpreterFactory { Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); + ResultRepoFactory resultRepoFactory; + + public ResultRepoFactory getResultRepoFactory() { + return resultRepoFactory; + } + + + public void setResultRepoFactory(ResultRepoFactory resultRepoFactory) { + this.resultRepoFactory = resultRepoFactory; + } + private Map cleanCl = Collections .synchronizedMap(new HashMap()); @@ -80,24 +93,28 @@ public class InterpreterFactory { AngularObjectRegistryListener angularObjectRegistryListener; public InterpreterFactory(ZeppelinConfiguration conf, - AngularObjectRegistryListener angularObjectRegistryListener) + AngularObjectRegistryListener angularObjectRegistryListener, + ResultRepoFactory resultrepo) throws InterpreterException, IOException { - this(conf, new InterpreterOption(true), angularObjectRegistryListener); + this(conf, new InterpreterOption(true), angularObjectRegistryListener, resultrepo); } public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption, - AngularObjectRegistryListener angularObjectRegistryListener) + AngularObjectRegistryListener angularObjectRegistryListener, ResultRepoFactory resultrepo) throws InterpreterException, IOException { this.conf = conf; this.defaultOption = defaultOption; this.angularObjectRegistryListener = angularObjectRegistryListener; + this.resultRepoFactory = resultrepo; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); interpreterClassList = replsConf.split(","); GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + /* builder.registerTypeAdapter(JdbcInterpreterResult.class, + new InterpreterResultSerializer(resultRepoFactory)); */ gson = builder.create(); init(); @@ -197,6 +214,7 @@ private void loadFromFile() throws IOException { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + builder.registerTypeAdapter(Paragraph.class, new ParagraphSerializer(resultRepoFactory)); Gson gson = builder.create(); File settingFile = new File(conf.getInterpreterSettingPath()); @@ -227,8 +245,6 @@ private void loadFromFile() throws IOException { // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - - InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), @@ -248,7 +264,6 @@ private void loadFromFile() throws IOException { this.interpreterBindings = info.interpreterBindings; } - private void saveToFile() throws IOException { String jsonString; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultSerializer.java new file mode 100644 index 00000000000..618752e0f25 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultSerializer.java @@ -0,0 +1,82 @@ +package org.apache.zeppelin.interpreter; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Type; +import java.sql.ResultSet; +import java.util.Map.Entry; + +import org.apache.zeppelin.interpreter.InterpreterResult.Code; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * Separates results from everything else. + * + * @author Rusty Phillips {@literal: } + * + */ +public class InterpreterResultSerializer + implements JsonSerializer, JsonDeserializer { + + RemoteResultRepo repo; + String id; + + public InterpreterResultSerializer() { + } + + public InterpreterResultSerializer(RemoteResultRepo repo, String ParagraphId) { + this.repo = repo; + this.id = ParagraphId; + } + + @Override + public InterpreterResult deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) + throws JsonParseException { + JsonObject object = json.getAsJsonObject(); + + String code = object.get("code").getAsString(); + String type = object.get("type").getAsString(); + InterpreterResult interpreterResult = new InterpreterResult(null); + + String result = null; + try { + for (Entry entry : object.entrySet()) { + try { + Field f; + f = InterpreterResult.class.getDeclaredField(entry.getKey()); + f.setAccessible(true); + f.set(interpreterResult, context.deserialize(entry.getValue(), f.getType())); + } catch (NoSuchFieldException ex) { + } + } + // Only fetch from the repo if it's not stored already. + if (interpreterResult.msg == null) + result = repo.get(id); + } catch (IOException | IllegalArgumentException | IllegalAccessException e) { + throw new JsonParseException(e.getMessage()); + } + return interpreterResult; + } + + @Override + public JsonElement serialize(InterpreterResult src, Type typeOfSrc, + JsonSerializationContext context) { + try { + JsonObject object = new JsonObject(); + repo.save(src.message(), id); + object.addProperty("code", src.code().toString()); + object.addProperty("type", src.type().toString()); + return object; + } catch (IOException ex) { + throw new JsonParseException(ex); + } + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteResultRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteResultRepo.java new file mode 100644 index 00000000000..14484b71784 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteResultRepo.java @@ -0,0 +1,68 @@ +package org.apache.zeppelin.interpreter; + +import java.sql.ResultSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.Properties; + +import javax.tools.FileObject; + +import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.commons.vfs2.FileType; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; + + +/** + * Describes how to save/load results. + * @author Rusty Phillips {@literal: } + * + */ +public class RemoteResultRepo { + protected Properties property; + + // TODO(Add Class loading): Make it so that result repositories load their classes. + private URL[] classloaderUrls; + + private RemoteResultRepo childRepo; + + public RemoteResultRepo() { + childRepo = null; + } + + public RemoteResultRepo(String ClassName, Properties props) throws ReflectiveOperationException { + if (!registeredRepos.containsValue(ClassName)) { + ClassName = registeredRepos.values().iterator().next(); + } + childRepo = (RemoteResultRepo) Class.forName(ClassName) + .getConstructor(new Class[] { Properties.class }).newInstance(props); + + } + + public URL[] getClassloaderUrls() { + return classloaderUrls; + } + + public void setClassloaderUrls(URL[] classloaderUrls) { + this.classloaderUrls = classloaderUrls; + } + + // Returns the thing that finds the result. + public String save(String result, String id) throws IOException + { return childRepo.save(result, id); } + public String get(String id) throws IOException { return childRepo.get(id); } + public void removeResult(String id) throws IOException { childRepo.removeResult(id); } + + public static Map registeredRepos = + Collections.synchronizedMap(new HashMap()); + + public static void register(String name, String repoClass) { + registeredRepos.put(name, repoClass); + } +} + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ResultRepoFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ResultRepoFactory.java new file mode 100644 index 00000000000..2b8314ade92 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ResultRepoFactory.java @@ -0,0 +1,62 @@ +package org.apache.zeppelin.interpreter; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; + +import javax.management.ReflectionException; + +import java.util.HashMap; +import java.util.Collections; +/** + * Keeps track of the different varieties of result repositories. + * @author Rusty Phillips {@literal: } + * + */ +public class ResultRepoFactory { + Logger logger = LoggerFactory.getLogger(ResultRepoFactory.class); + private ZeppelinConfiguration conf; + + private Map repos = Collections + .synchronizedMap(new HashMap()); + + public ResultRepoFactory(ZeppelinConfiguration conf) + throws IOException, ReflectiveOperationException { + this.conf = conf; + for (String className: RemoteResultRepo.registeredRepos.values()) { + repos.put(className, (RemoteResultRepo) + Class.forName(className).getConstructor( + new Class[] {ZeppelinConfiguration.class }).newInstance(conf)); + } + } + + public RemoteResultRepo getDefaultRepo() throws IOException + { + return getRepoByClassName("org.apache.zeppelin.interpreter.FilesystemResultRepo"); + } + + public RemoteResultRepo getRepoByClassName(String className) + throws IOException { + if (repos.containsKey(className)) + return repos.get(className); + try { + ClassLoader cl = ClassLoader.getSystemClassLoader(); + cl.loadClass(className); + return repos.put(className, (RemoteResultRepo) + Class.forName(className).getConstructor( + new Class[] {ZeppelinConfiguration.class }).newInstance(conf) ); + + } catch (ClassNotFoundException | NoSuchMethodException | SecurityException + | InstantiationException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + throw new IOException(e); + } + } + + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java index 509a064ab63..4e90c5a142c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java @@ -18,8 +18,6 @@ package org.apache.zeppelin.notebook; import java.io.IOException; -import java.util.LinkedList; -import java.util.List; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; @@ -27,6 +25,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.ResultRepoFactory; /** * Repl loader per note. @@ -39,6 +38,10 @@ public NoteInterpreterLoader(InterpreterFactory factory) { this.factory = factory; } + public ResultRepoFactory getResultFactory() { + return factory.getResultRepoFactory(); + } + public void setNoteId(String noteId) { this.noteId = noteId; } @@ -147,4 +150,5 @@ public Interpreter get(String replName) { throw new InterpreterException(replName + " interpreter not found"); } + } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 770172a43c3..efc74729e1b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -35,6 +35,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.ResultRepoFactory; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -58,6 +59,7 @@ public class Notebook { Logger logger = LoggerFactory.getLogger(Notebook.class); private SchedulerFactory schedulerFactory; private InterpreterFactory replFactory; + /** Keep the order. */ Map notes = new LinkedHashMap(); private ZeppelinConfiguration conf; @@ -67,8 +69,9 @@ public class Notebook { private NotebookRepo notebookRepo; public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo, - SchedulerFactory schedulerFactory, - InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException, + SchedulerFactory schedulerFactory, InterpreterFactory replFactory, + JobListenerFactory jobListenerFactory) + throws IOException, SchedulerException { this.conf = conf; this.notebookRepo = notebookRepo; @@ -218,7 +221,7 @@ private Note loadNoteFromRepo(String id) { // set NoteInterpreterLoader NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader( - replFactory); + replFactory); note.setReplLoader(noteInterpreterLoader); noteInterpreterLoader.setNoteId(note.id()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index fc3646aaf57..698654ec265 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.*; @@ -43,8 +44,9 @@ public class Paragraph extends Job implements Serializable, Cloneable { String title; String text; Date dateUpdated; - private Map config; // paragraph configs like isOpen, colWidth, etc - public final GUI settings; // form and parameter settings + private Map config; // paragraph configs like isOpen, + // colWidth, etc + public final GUI settings; // form and parameter settings public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) { super(generateId(), listener); @@ -55,11 +57,14 @@ public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoad dateUpdated = null; settings = new GUI(); config = new HashMap(); + config.put("RESULT_REPO", + "org.apache.zeppelin.interpreter.FilesystemResultRepo"); + //replLoader.getResultFactory().getDefaultRepo().getClass().getName()); } private static String generateId() { - return "paragraph_" + System.currentTimeMillis() + "_" - + new Random(System.currentTimeMillis()).nextInt(); + return "paragraph_" + System.currentTimeMillis() + + "_" + new Random(System.currentTimeMillis()).nextInt(); } public String getText() { @@ -71,7 +76,6 @@ public void setText(String newText) { this.dateUpdated = new Date(); } - public String getTitle() { return title; } @@ -118,6 +122,7 @@ public static String getRequiredReplName(String text) { } private String getScriptBody() { + return getScriptBody(text); } @@ -204,6 +209,8 @@ protected Object jobRun() throws Throwable { script = Input.getSimpleQuery(settings.getParams(), scriptBody); } logger().debug("RUN : " + script); + // Delete existing result before setting new one. + InterpreterResult ret = repl.interpret(script, getInterpreterContext()); return ret; } @@ -228,15 +235,9 @@ private InterpreterContext getInterpreterContext() { runners.add(new ParagraphRunner(note, note.id(), p.getId())); } - InterpreterContext interpreterContext = new InterpreterContext( - note.id(), - getId(), - this.getTitle(), - this.getText(), - this.getConfig(), - this.settings, - registry, - runners); + InterpreterContext interpreterContext = new InterpreterContext + (note.id(), getId(), this.getTitle(), this.getText(), + this.getConfig(), this.settings, registry, runners); return interpreterContext; } @@ -254,13 +255,11 @@ public void run() { } } - private Logger logger() { Logger logger = LoggerFactory.getLogger(Paragraph.class); return logger; } - public Map getConfig() { return config; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphSerializer.java new file mode 100644 index 00000000000..1dfa9e5b41a --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphSerializer.java @@ -0,0 +1,117 @@ +package org.apache.zeppelin.notebook; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.lang3.reflect.FieldUtils; +import javax.naming.spi.DirStateFactory.Result; +import org.apache.zeppelin.interpreter.FilesystemResultRepo; + +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultSerializer; +import org.apache.zeppelin.interpreter.ResultRepoFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * Serializes paragraphs. + * @author Rusty Phillips {@literal: } + * + */ +public class ParagraphSerializer implements JsonSerializer, JsonDeserializer { + + private ResultRepoFactory factory; + + public ParagraphSerializer(ResultRepoFactory factory) { + this.factory = factory; + } + + private static String getResultRepo(JsonElement json) + { + JsonElement repo = json.getAsJsonObject().get("config").getAsJsonObject().get("RESULT_REPO"); + if (repo == null) + return null; + return repo.getAsString(); + } + + @Override + public Paragraph deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + try { + Paragraph p = new Paragraph(null, null, null); + // p = context.deserialize(json, typeOfT); + Field[] fields = Paragraph.class.getFields(); + String resultRepo = getResultRepo(json); + JsonObject object = json.getAsJsonObject(); + GsonBuilder builder = new GsonBuilder(); + String id = object.get("id").getAsString(); + + builder.setPrettyPrinting(); + if (resultRepo != null && factory != null) { + builder.registerTypeAdapter(InterpreterResult.class, + new InterpreterResultSerializer(factory.getRepoByClassName(resultRepo), id)); + } + Gson gson = builder.create(); + for (Entry entry: object.entrySet()) { + try { + Field f; + f = Paragraph.class.getDeclaredField(entry.getKey()); + f.setAccessible(true); + f.set(p, gson.fromJson(entry.getValue(), f.getType())); + } catch (NoSuchFieldException e) { + // If the paragraph format has changed, we do not want to do anything + // with the fields that are extraneous. + } + + } + return p; + } catch (SecurityException | IllegalArgumentException | IllegalAccessException | IOException ex) + { throw new JsonParseException(ex); } + + } + + + @Override + public JsonElement serialize(Paragraph src, Type typeOfSrc, JsonSerializationContext context) { + try { + Field[] fields = FieldUtils.getAllFields(Paragraph.class); + GsonBuilder builder = new GsonBuilder(); + JsonObject object = new JsonObject(); + Boolean serializeResult = src.getConfig().containsKey("RESULT_REPO") && factory != null; + if (serializeResult) { + builder.registerTypeAdapter(InterpreterResult.class, + new InterpreterResultSerializer( + factory.getRepoByClassName(src.getConfig().get("RESULT_REPO").toString()), + src.getId())); + } + + Gson gson = builder.create(); + + for (Field f: fields) + { + if ((f.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC | Modifier.VOLATILE)) == 0 ) + { + f.setAccessible(true); + object.add(f.getName(), gson.toJsonTree(f.get(src), f.getType())); + } + } + + return object; + } catch + (SecurityException | IllegalArgumentException | IllegalAccessException | IOException ex ) + { throw new JsonParseException(ex); } + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java index bb9e5d1571d..d8a6d4c4f3a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java @@ -29,9 +29,11 @@ import org.apache.commons.io.IOUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.interpreter.ResultRepoFactory; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.ParagraphSerializer; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,13 +80,19 @@ public class S3NotebookRepo implements NotebookRepo { private static String bucketName = ""; private String user = ""; - + private ResultRepoFactory factory; private ZeppelinConfiguration conf; public S3NotebookRepo(ZeppelinConfiguration conf) throws IOException { this.conf = conf; user = conf.getUser(); + try { + factory = new ResultRepoFactory(conf); + } catch (ReflectiveOperationException e) { + throw new IOException(e); + } + bucketName = conf.getBucketName(); } @@ -132,8 +140,8 @@ public List list() throws IOException { private Note getNote(String key) throws IOException { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.setPrettyPrinting(); + gsonBuilder.registerTypeAdapter(Paragraph.class, new ParagraphSerializer(factory)); Gson gson = gsonBuilder.create(); - S3Object s3object = s3client.getObject(new GetObjectRequest( bucketName, key)); @@ -164,6 +172,7 @@ public Note get(String noteId) throws IOException { public void save(Note note) throws IOException { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.setPrettyPrinting(); + gsonBuilder.registerTypeAdapter(Paragraph.class, new ParagraphSerializer(factory)); Gson gson = gsonBuilder.create(); String json = gson.toJson(note); String key = user + "/" + "notebook" + "/" + note.id() + "/" + "note.json"; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java index 3039f80ff7d..5ab835ff5f6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java @@ -36,9 +36,11 @@ import org.apache.commons.vfs2.VFS; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.interpreter.ResultRepoFactory; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.ParagraphSerializer; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,8 @@ public class VFSNotebookRepo implements NotebookRepo { private URI filesystemRoot; private ZeppelinConfiguration conf; - + private ResultRepoFactory factory; + public VFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { this.conf = conf; @@ -77,6 +80,13 @@ public VFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { this.filesystemRoot = filesystemRoot; } fsManager = VFS.getManager(); + + try { + factory = new ResultRepoFactory(conf); + } catch (ReflectiveOperationException e) { + throw new IOException(e); + } + } private String getPath(String path) { @@ -148,6 +158,8 @@ private Note getNote(FileObject noteDir) throws IOException { } GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(Paragraph.class, new ParagraphSerializer(factory)); + gsonBuilder.setPrettyPrinting(); Gson gson = gsonBuilder.create(); @@ -199,6 +211,7 @@ private FileObject getRootDir() throws IOException { @Override public void save(Note note) throws IOException { GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(Paragraph.class, new ParagraphSerializer(factory)); gsonBuilder.setPrettyPrinting(); Gson gson = gsonBuilder.create(); String json = gson.toJson(note);