From 43fa7b9f277c81eeb55843b6ceea25788073fddc Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 15 Mar 2016 19:11:20 -0400 Subject: [PATCH 01/15] Added serialization to resourcepoolutils. --- .../org/apache/zeppelin/resource/ResourcePoolUtils.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 1825bfed217..678b0cc7033 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -55,7 +57,10 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio try { client = remoteInterpreterProcess.getClient(); List resourceList = client.resourcePoolGetAll(); - Gson gson = new Gson(); + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); + Gson gson = gsonBuilder.create(); + for (String res : resourceList) { resourceSet.add(gson.fromJson(res, Resource.class)); } From 5de57565fac57465618995dc656a1d0b8802b002 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 15 Mar 2016 19:06:35 -0400 Subject: [PATCH 02/15] Added pool persistance. --- .../apache/zeppelin/resource/Resource.java | 2 +- .../zeppelin/resource/ResourceSerializer.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 6988b3ea762..9e38d402302 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -23,7 +23,7 @@ * Information and reference to the resource */ public class Resource { - private final transient Object r; + private final Object r; private final boolean serializable; private final ResourceId resourceId; private final String className; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java new file mode 100644 index 00000000000..a0b35fa63cc --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -0,0 +1,46 @@ +package org.apache.zeppelin.resource; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Serializes and Deserializes resources if they are serializable. + */ +public class ResourceSerializer implements JsonDeserializer, JsonSerializer { + + public ResourceSerializer() { + } + + @Override + public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { + // This is straightforward at the moment. + return context.serialize(src); + } + + @Override + public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + // This requires that we use the class that's stored in the element to deserialize. + JsonObject obj = json.getAsJsonObject(); + String className = obj.getAsJsonPrimitive("className").getAsString(); + + Gson gson = new Gson(); + Object r; + try { + r = gson.fromJson(obj.get("r"), Class.forName(className)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize the resource"); + } + ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class); + + return new Resource(id, r); + } + +} \ No newline at end of file From e633c444cee7cb3938dc591f03d0250063cc6b73 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 16 Mar 2016 10:59:29 -0400 Subject: [PATCH 03/15] Added test. --- .../zeppelin/resource/ResourceSerializer.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java index a0b35fa63cc..4db1f8742c0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.resource; import java.lang.reflect.Type; @@ -21,7 +37,8 @@ public ResourceSerializer() { @Override public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { // This is straightforward at the moment. - return context.serialize(src); + Gson gson = new Gson(); + return gson.toJsonTree(src); } @Override @@ -43,4 +60,4 @@ public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationC return new Resource(id, r); } -} \ No newline at end of file +} From 71609fae4d3476c19d23596ccd044c826fa3c7a9 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 12:14:36 -0400 Subject: [PATCH 04/15] Changed resource pool utils so that it will work anywhere. --- ...terpreterProcessResourcePoolConnector.java | 65 +++++++++++++++++++ .../apache/zeppelin/resource/Resource.java | 2 +- .../zeppelin/resource/ResourcePoolUtils.java | 12 ++-- 3 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java new file mode 100644 index 00000000000..7315af7c266 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +import com.google.gson.Gson; +/** + * Makes a remote interpreter service client act as a resource pool connector. + */ +public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { + + private Client client; + + public RemoteInterpreterProcessResourcePoolConnector(Client client) { + this.client = client; + } + + @Override + public ResourceSet getAllResources() { + try { + List resourceList = client.resourcePoolGetAll(); + ResourceSet resources = new ResourceSet(); + Gson gson = new Gson(); + + for (String res : resourceList) { + RemoteResource r = gson.fromJson(res, RemoteResource.class); + r.setResourcePoolConnector(this); + resources.add(r); + } + + return resources; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object readResource(ResourceId id) { + try { + // TODO(Object): Deserialize object + return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + } catch (TException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 9e38d402302..6988b3ea762 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -23,7 +23,7 @@ * Information and reference to the resource */ public class Resource { - private final Object r; + private final transient Object r; private final boolean serializable; private final ResourceId resourceId; private final String className; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 678b0cc7033..3aadffb8524 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -56,13 +56,13 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resourcePoolGetAll(); - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); - Gson gson = gsonBuilder.create(); + RemoteInterpreterProcessResourcePoolConnector remoteConnector = + new RemoteInterpreterProcessResourcePoolConnector(client); + //List resourceList = client.resourcePoolGetAll(); + //gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); - for (String res : resourceList) { - resourceSet.add(gson.fromJson(res, Resource.class)); + for (Resource r: remoteConnector.getAllResources()) { + resourceSet.add(r); } } catch (Exception e) { logger.error(e.getMessage(), e); From 6a22e3bbf819a9102292f2bd7630e468cd9b2ab9 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 13:42:00 -0400 Subject: [PATCH 05/15] Removed unnecessary serialization. --- .../java/org/apache/zeppelin/resource/ResourcePoolUtils.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 3aadffb8524..4eb1d6a8896 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -58,8 +58,6 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio client = remoteInterpreterProcess.getClient(); RemoteInterpreterProcessResourcePoolConnector remoteConnector = new RemoteInterpreterProcessResourcePoolConnector(client); - //List resourceList = client.resourcePoolGetAll(); - //gsonBuilder.registerTypeAdapter(Resource.class, new ResourceSerializer()); for (Resource r: remoteConnector.getAllResources()) { resourceSet.add(r); From 17fed3d52f1cf74a56913ff661b3884e1d30296b Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 28 Mar 2016 17:02:18 -0400 Subject: [PATCH 06/15] Added explicit save to the serializer. --- .../org/apache/zeppelin/resource/ResourceSerializer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java index 4db1f8742c0..887f3078525 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -38,7 +38,12 @@ public ResourceSerializer() { public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { // This is straightforward at the moment. Gson gson = new Gson(); - return gson.toJsonTree(src); + JsonElement elem = gson.toJsonTree(src); + JsonObject obj = elem.getAsJsonObject(); + if (src.isSerializable()) { + obj.add("r", gson.toJsonTree(src.get())); + } + return obj; } @Override From 067da3703af14d614b835f5117bb04fbbe9bde7c Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 21 Mar 2016 15:46:03 -0400 Subject: [PATCH 07/15] Added vfs resource pool. --- .../zeppelin/resource/VFSResourcePool.java | 218 ++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java new file mode 100644 index 00000000000..ce0a01a5504 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -0,0 +1,218 @@ +package org.apache.zeppelin.resource; + +import org.slf4j.Logger; +import org.apache.zeppelin.conf.ZeppelinConfiguration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import javax.management.RuntimeErrorException; + +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSelectInfo; +import org.apache.commons.vfs2.FileSelector; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; + +/** + * Resource pool that saves resources to the local file system. + * + */ +public class VFSResourcePool extends DistributedResourcePool { + Logger logger = LoggerFactory.getLogger(VFSResourcePool.class); + + + @Override + public void put(String name, Object object) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + Gson gson = new Gson(); + + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultFile.getContent().getOutputStream(false); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + out.close(); + } catch (IOException e) { + super.put(name, object); + throw new RuntimeException(e); + } + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + @Override + public Resource get(String noteId, String paragraphId, String name) { + Resource r = get(noteId, paragraphId, name, true); + return new Resource( + new ResourceId(this.id(), noteId, paragraphId, name), r.get()); + } + + @Override + public Resource get(String name, boolean remote) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists() || !isDirectory(resultDir)) + return null; + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + + InputStream instream = resultFile.getContent().getInputStream(); + ObjectInputStream ois = new ObjectInputStream(instream); + + try { + Object o = ois.readObject(); + return new Resource(new ResourceId(this.id(), name), o); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + } catch (IOException e) { + } + return super.get(name, remote); + } + + @Override + public Resource get(String noteId, String paragraphId, String name, boolean remote) { + return get(noteId + "___" + paragraphId + "___" + name, remote); + } + + @Override + public ResourceSet getAll() { + return super.getAll(true); + } + + @Override + public ResourceSet getAll(boolean remote) { + return super.getAll(remote); + } + + @Override + public void put(String noteId, String paragraphId, String name, Object object) { + put(noteId + "___" + paragraphId + "___name", object); + } + + @Override + public Resource remove(String name) { + try { + Resource r = get(name); + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (resultDir.exists()) { + resultDir.delete(new FileSelector() { + @Override + public boolean traverseDescendents(FileSelectInfo fileInfo) throws Exception { + return true; + } + + @Override + public boolean includeFile(FileSelectInfo fileInfo) throws Exception { + return true; + } + }); + } + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Resource remove(String noteId, String paragraphId, String name) { + return remove(noteId + "___" + paragraphId + "___" + name); + } + + private FileSystemManager fsManager; + private URI filesystemRoot; + public VFSResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id, connector, property); + try { + this.filesystemRoot = new + URI(property.getProperty("Resource_Path", "notebook/zeppelin_resources")); + } catch (URISyntaxException e1) { + throw new RuntimeException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( filesystemRoot.getPath()).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + + try { + fsManager = VFS.getManager(); + FileObject file; + file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + logger.info("Notebook dir doesn't exist, create."); + file.createFolder(); + } + } catch (FileSystemException e) { + throw new RuntimeException("Unable to load new file system."); + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + // Does nothing if the folder already exists. + rootDir.createFolder(); + + + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } +} From 0cfeb33364d1f8779f616373eaf780d3ac62f6af Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Tue, 5 Apr 2016 16:37:40 -0400 Subject: [PATCH 08/15] Added pluggable resource pool. --- zeppelin-interpreter/pom.xml | 12 ++++- .../remote/RemoteInterpreterServer.java | 51 +++++++++++++++---- .../resource/DistributedResourcePool.java | 6 ++- .../zeppelin/resource/VFSResourcePool.java | 33 +++++++++--- 4 files changed, 85 insertions(+), 17 deletions(-) diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 67b4d5fbc9a..da1f8d023ef 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -214,8 +214,18 @@ + + org.apache.commons + commons-vfs2 + 2.0 + + + plexus-utils + org.codehaus.plexus + + + - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6e369c0694a..db0cb366b1b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -134,17 +134,46 @@ public static void main(String[] args) System.exit(0); } + private DistributedResourcePool getResourcePool() + /* InterpreterGroup group, + Properties prop, + RemoteInterpreterEventClient client) */ + throws TException { + if (resourcePool != null) + return resourcePool; + try { + String resourcePoolClassName = (String) interpreterGroup.getProperty() + .getOrDefault("ResourcePoolClass", + "org.apache.zeppelin.resource.DistributedResourcePool"); + logger.debug("Getting resource pool {}", resourcePoolClassName); + Class resourcePoolClass = Class.forName(resourcePoolClassName); + + Constructor constructor = resourcePoolClass + .getConstructor(new Class[] {String.class, + ResourcePoolConnector.class, + Properties.class }); + resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), + this.eventClient, + interpreterGroup.getProperty()); + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; + } catch (SecurityException | NoSuchMethodException | + InstantiationException | IllegalAccessException | + IllegalArgumentException | InvocationTargetException | + ClassNotFoundException e) { + logger.error(e.toString(), e); + throw new TException(e); + } + } @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, - Map properties) throws TException { + Map properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); } try { @@ -170,7 +199,12 @@ public void createInterpreter(String interpreterGroupId, String noteId, String } logger.info("Instantiate interpreter {}", className); + + interpreterGroup.setResourcePool(getResourcePool()); + repl.setInterpreterGroup(interpreterGroup); + + //setResourcePool(interpreterGroup, p, eventClient); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -178,7 +212,6 @@ public void createInterpreter(String interpreterGroupId, String noteId, String throw new TException(e); } } - private Interpreter getInterpreter(String noteId, String className) throws TException { if (interpreterGroup == null) { throw new TException( @@ -388,7 +421,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int @Override public int getProgress(String noteId, String className, - RemoteInterpreterContext interpreterContext) + RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext)); @@ -411,7 +444,7 @@ public List completion(String noteId, String className, String buf, int private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>() { + new TypeToken>() { }.getType()); for (InterpreterContextRunner r : runners) { @@ -572,7 +605,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, if (value == null) { try { value = gson.fromJson(object, - new TypeToken>() { + new TypeToken>() { }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type @@ -608,7 +641,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str try { value = gson.fromJson(object, new TypeToken>() { - }.getType()); + }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string logger.debug(e.getMessage(), e); @@ -624,7 +657,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str @Override public void angularObjectRemove(String name, String noteId, String paragraphId) throws - TException { + TException { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index ba31f017b83..57cef050f5f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -16,16 +16,20 @@ */ package org.apache.zeppelin.resource; +import java.util.Properties; + /** * distributed resource pool */ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; + protected Properties property; - public DistributedResourcePool(String id, ResourcePoolConnector connector) { + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { super(id); this.connector = connector; + this.property = property; } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java index ce0a01a5504..045c8261820 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -1,7 +1,7 @@ + package org.apache.zeppelin.resource; import org.slf4j.Logger; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import java.io.File; import java.io.IOException; @@ -89,7 +89,16 @@ public Resource get(String name, boolean remote) { try { Object o = ois.readObject(); - return new Resource(new ResourceId(this.id(), name), o); + String[] splitName = name.split("___"); + ResourceId r = null; + if (splitName.length == 3) + { + r = new ResourceId(this.id(), splitName[0], splitName[1], splitName[2]); + } + else { + r = new ResourceId(this.id(), name); + } + return new Resource(r, o); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } @@ -106,12 +115,26 @@ public Resource get(String noteId, String paragraphId, String name, boolean remo @Override public ResourceSet getAll() { - return super.getAll(true); + return getAll(true); } @Override public ResourceSet getAll(boolean remote) { - return super.getAll(remote); + ResourceSet resources = new ResourceSet(); + try { + FileObject rootDir = getRootDir(); + for (FileObject resourceDir: rootDir.getChildren()) + { + if (resourceDir.getType() == FileType.FOLDER) + resources.add(get(resourceDir.getName().getBaseName())); + } + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + if (remote) + resources.addAll(super.getAll(remote)); + return resources; } @Override @@ -187,8 +210,6 @@ private FileObject getRootDir() throws IOException { FileObject rootDir = fsManager.resolveFile(getPath("/")); // Does nothing if the folder already exists. rootDir.createFolder(); - - if (!isDirectory(rootDir)) { throw new IOException("Root path is not a directory"); } From 52cf1be713c1462d768f8df2ad804e5a551d9298 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 09:00:53 -0400 Subject: [PATCH 09/15] Added missing rename. --- .../main/java/org/apache/zeppelin/resource/VFSResourcePool.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {zeppelin-zengine => zeppelin-interpreter}/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java (100%) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java similarity index 100% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java From a1b0bbda1105453656bdaf1977761272aba0c59e Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 11:11:49 -0400 Subject: [PATCH 10/15] Added a constructor to DistributedResourcePools to fix tests. --- .../apache/zeppelin/resource/DistributedResourcePool.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index 57cef050f5f..12292f202ad 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -25,6 +25,13 @@ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; protected Properties property; + + public DistributedResourcePool(String id, ResourcePoolConnector connector) { + super(id); + this.connector = connector; + this.property = new Properties(); + } + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { super(id); From f24127299ee9890b303268cca10c07fdfd60caf9 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 12:09:08 -0400 Subject: [PATCH 11/15] Switched to java 7 method for java.util.properties. --- .../zeppelin/interpreter/remote/RemoteInterpreterServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index db0cb366b1b..4505bffa331 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -143,7 +143,7 @@ private DistributedResourcePool getResourcePool() return resourcePool; try { String resourcePoolClassName = (String) interpreterGroup.getProperty() - .getOrDefault("ResourcePoolClass", + .getProperty("ResourcePoolClass", "org.apache.zeppelin.resource.DistributedResourcePool"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); From 3ed89b083f2aa6f1183c65c8618695bba01e248d Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 13:47:02 -0400 Subject: [PATCH 12/15] Removed the need for resource pools during tests. --- .../remote/RemoteInterpreterServer.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 4505bffa331..89066671c4d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -157,12 +157,10 @@ private DistributedResourcePool getResourcePool() interpreterGroup.getProperty()); interpreterGroup.setResourcePool(resourcePool); return resourcePool; - } catch (SecurityException | NoSuchMethodException | - InstantiationException | IllegalAccessException | - IllegalArgumentException | InvocationTargetException | - ClassNotFoundException e) { + } catch (Exception e) { logger.error(e.toString(), e); - throw new TException(e); + return null; + // throw new TException(e); } } @@ -386,11 +384,13 @@ protected Object jobRun() throws Throwable { } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (context.getResourcePool() != null) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ParagraphResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); From 0fa4b3288ba95b8ab8f8d69d0239a72c8b707429 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Wed, 6 Apr 2016 14:37:12 -0400 Subject: [PATCH 13/15] Adding missing license file. --- .../zeppelin/resource/VFSResourcePool.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java index 045c8261820..856b03a15b7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.resource; import org.slf4j.Logger; From 69d11d30f7b7f110cbe81a5b66d24dd5a30eb8b7 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Mon, 11 Apr 2016 11:03:14 -0400 Subject: [PATCH 14/15] Set mock interpreter to register properly. --- .../interpreter/remote/RemoteInterpreterServer.java | 13 ++++++++----- .../remote/mock/MockInterpreterResourcePool.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 89066671c4d..2ec532b648a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -142,24 +142,27 @@ private DistributedResourcePool getResourcePool() if (resourcePool != null) return resourcePool; try { - String resourcePoolClassName = (String) interpreterGroup.getProperty() - .getProperty("ResourcePoolClass", + Properties prop = interpreterGroup.getProperty(); + //Happens during tests. + if (prop == null) + prop = new Properties(); + String resourcePoolClassName = (String) prop.getProperty("ResourcePoolClass", "org.apache.zeppelin.resource.DistributedResourcePool"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); - + Constructor constructor = resourcePoolClass .getConstructor(new Class[] {String.class, ResourcePoolConnector.class, Properties.class }); resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), this.eventClient, - interpreterGroup.getProperty()); + prop); interpreterGroup.setResourcePool(resourcePool); return resourcePool; } catch (Exception e) { logger.error(e.toString(), e); - return null; + return new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); // throw new TException(e); } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 3826b903115..8e408da58bd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -37,7 +37,7 @@ public class MockInterpreterResourcePool extends Interpreter { Interpreter.register( "resourcePoolTest", "resourcePool", - MockInterpreterA.class.getName(), + MockInterpreterResourcePool.class.getName(), new InterpreterPropertyBuilder() .add("p1", "v1", "property1").build()); From df46376213a3f9c9d766f0f108fdac72125cdc93 Mon Sep 17 00:00:00 2001 From: Rusty Phillips Date: Fri, 15 Apr 2016 13:38:21 -0400 Subject: [PATCH 15/15] Switched interpreter to use default configuration. --- .../remote/RemoteInterpreterServer.java | 4 ++-- .../zeppelin/conf/ZeppelinConfiguration.java | 2 ++ .../interpreter/InterpreterFactory.java | 21 ++++++++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 2ec532b648a..b19fd9cd09e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -146,8 +146,8 @@ private DistributedResourcePool getResourcePool() //Happens during tests. if (prop == null) prop = new Properties(); - String resourcePoolClassName = (String) prop.getProperty("ResourcePoolClass", - "org.apache.zeppelin.resource.DistributedResourcePool"); + String resourcePoolClassName = (String) prop.getProperty( + "zeppelin.interpreter.resourcePoolClass"); logger.debug("Getting resource pool {}", resourcePoolClassName); Class resourcePoolClass = Class.forName(resourcePoolClassName); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index eafbbbe224d..9fc32cd5a13 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -504,6 +504,8 @@ public static enum ConfVars { // i.e. http://localhost:8080 ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), + ZEPPELIN_RESOURCE_POOL_CLASS("zeppelin.interpreter.resourcePoolClass", + "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"); ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); private String varName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 269b54a1fce..e060a45ab80 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -198,6 +198,8 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, setting.getName()); } + + } private void loadFromFile() throws IOException { @@ -234,16 +236,19 @@ private void loadFromFile() throws IOException { // enable/disable option on GUI). // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - + + Properties mergedProperties = + this.getInterpreterPropertiesFromZeppelinConf(); + mergedProperties.putAll(setting.getProperties());; + InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), setting.getGroup(), setting.getInterpreterInfos(), - setting.getProperties(), + mergedProperties, setting.getDependencies(), setting.getOption()); - InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption()); intpSetting.setInterpreterGroup(interpreterGroup); @@ -260,6 +265,16 @@ private void loadFromFile() throws IOException { } } } + + private Properties getInterpreterPropertiesFromZeppelinConf() { + Iterator keySet = this.conf.getKeys("zeppelin.interpreter"); + Properties p = new Properties(); + while (keySet.hasNext()) { + String key = keySet.next(); + p.setProperty(key, this.conf.getProperty(key).toString()); + } + return p; + } private void loadInterpreterDependencies(InterpreterSetting intSetting) throws IOException, RepositoryException {