Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import com.google.gson.Gson;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.rest.message.RestartInterpreterRequest;
import org.apache.zeppelin.utils.SecurityUtils;
import org.slf4j.Logger;
Expand All @@ -61,17 +62,17 @@
public class InterpreterRestApi {
private static final Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class);

private InterpreterFactory interpreterFactory;
private InterpreterSettingManager interpreterSettingManager;
private NotebookServer notebookServer;

Gson gson = new Gson();

public InterpreterRestApi() {
}

public InterpreterRestApi(InterpreterFactory interpreterFactory,
NotebookServer notebookWsServer) {
this.interpreterFactory = interpreterFactory;
public InterpreterRestApi(InterpreterSettingManager interpreterSettingManager,
NotebookServer notebookWsServer) {
this.interpreterSettingManager = interpreterSettingManager;
this.notebookServer = notebookWsServer;
}

Expand All @@ -82,7 +83,7 @@ public InterpreterRestApi(InterpreterFactory interpreterFactory,
@Path("setting")
@ZeppelinApi
public Response listSettings() {
return new JsonResponse<>(Status.OK, "", interpreterFactory.get()).build();
return new JsonResponse<>(Status.OK, "", interpreterSettingManager.get()).build();
}

/**
Expand All @@ -93,7 +94,7 @@ public Response listSettings() {
@ZeppelinApi
public Response getSetting(@PathParam("settingId") String settingId) {
try {
InterpreterSetting setting = interpreterFactory.get(settingId);
InterpreterSetting setting = interpreterSettingManager.get(settingId);
if (setting == null) {
return new JsonResponse<>(Status.NOT_FOUND).build();
} else {
Expand Down Expand Up @@ -123,7 +124,7 @@ public Response newSettings(String message) {
}
Properties p = new Properties();
p.putAll(request.getProperties());
InterpreterSetting interpreterSetting = interpreterFactory
InterpreterSetting interpreterSetting = interpreterSettingManager
.createNewSetting(request.getName(), request.getGroup(), request.getDependencies(),
request.getOption(), p);
logger.info("new setting created with {}", interpreterSetting.getId());
Expand All @@ -144,7 +145,7 @@ public Response updateSetting(String message, @PathParam("settingId") String set
try {
UpdateInterpreterSettingRequest request =
gson.fromJson(message, UpdateInterpreterSettingRequest.class);
interpreterFactory
interpreterSettingManager
.setPropertyAndRestart(settingId, request.getOption(), request.getProperties(),
request.getDependencies());
} catch (InterpreterException e) {
Expand All @@ -156,7 +157,7 @@ public Response updateSetting(String message, @PathParam("settingId") String set
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
ExceptionUtils.getStackTrace(e)).build();
}
InterpreterSetting setting = interpreterFactory.get(settingId);
InterpreterSetting setting = interpreterSettingManager.get(settingId);
if (setting == null) {
return new JsonResponse<>(Status.NOT_FOUND, "", settingId).build();
}
Expand All @@ -171,7 +172,7 @@ public Response updateSetting(String message, @PathParam("settingId") String set
@ZeppelinApi
public Response removeSetting(@PathParam("settingId") String settingId) throws IOException {
logger.info("Remove interpreterSetting {}", settingId);
interpreterFactory.remove(settingId);
interpreterSettingManager.remove(settingId);
return new JsonResponse(Status.OK).build();
}

Expand All @@ -184,12 +185,12 @@ public Response removeSetting(@PathParam("settingId") String settingId) throws I
public Response restartSetting(String message, @PathParam("settingId") String settingId) {
logger.info("Restart interpreterSetting {}, msg={}", settingId, message);

InterpreterSetting setting = interpreterFactory.get(settingId);
InterpreterSetting setting = interpreterSettingManager.get(settingId);
try {
RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class);

String noteId = request == null ? null : request.getNoteId();
interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal());
interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal());
notebookServer.clearParagraphRuntimeInfo(setting);

} catch (InterpreterException e) {
Expand All @@ -209,7 +210,7 @@ public Response restartSetting(String message, @PathParam("settingId") String se
@GET
@ZeppelinApi
public Response listInterpreter(String message) {
Map<String, InterpreterSetting> m = interpreterFactory.getAvailableInterpreterSettings();
Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings();
return new JsonResponse<>(Status.OK, "", m).build();
}

Expand All @@ -220,7 +221,7 @@ public Response listInterpreter(String message) {
@Path("repository")
@ZeppelinApi
public Response listRepositories() {
List<RemoteRepository> interpreterRepositories = interpreterFactory.getRepositories();
List<RemoteRepository> interpreterRepositories = interpreterSettingManager.getRepositories();
return new JsonResponse<>(Status.OK, "", interpreterRepositories).build();
}

Expand All @@ -235,8 +236,8 @@ public Response listRepositories() {
public Response addRepository(String message) {
try {
Repository request = gson.fromJson(message, Repository.class);
interpreterFactory.addRepository(request.getId(), request.getUrl(), request.isSnapshot(),
request.getAuthentication(), request.getProxy());
interpreterSettingManager.addRepository(request.getId(), request.getUrl(),
request.isSnapshot(), request.getAuthentication(), request.getProxy());
logger.info("New repository {} added", request.getId());
} catch (Exception e) {
logger.error("Exception in InterpreterRestApi while adding repository ", e);
Expand All @@ -258,7 +259,7 @@ public Response getMetaInfo(@Context HttpServletRequest req,
return new JsonResponse<>(Status.BAD_REQUEST).build();
}
String propValue = null;
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
InterpreterSetting interpreterSetting = interpreterSettingManager.get(settingId);
Map<String, String> infos = interpreterSetting.getInfos();
if (infos != null) {
propValue = infos.get(propName);
Expand All @@ -282,7 +283,7 @@ public Response getMetaInfo(@Context HttpServletRequest req,
public Response removeRepository(@PathParam("repoId") String repoId) {
logger.info("Remove repository {}", repoId);
try {
interpreterFactory.removeRepository(repoId);
interpreterSettingManager.removeRepository(repoId);
} catch (Exception e) {
logger.error("Exception in InterpreterRestApi while removing repository ", e);
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumBundleFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
Expand Down Expand Up @@ -85,6 +87,7 @@ public class ZeppelinServer extends Application {
public static NotebookServer notebookWsServer;
public static Helium helium;

private final InterpreterSettingManager interpreterSettingManager;
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
private SearchService noteSearchService;
Expand Down Expand Up @@ -139,14 +142,17 @@ public ZeppelinServer() throws Exception {
}

this.schedulerFactory = new SchedulerFactory();
this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver,
new InterpreterOption(true));
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated());
notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(),
interpreterSettingManager);
this.notebookRepo = new NotebookRepoSync(conf);
this.noteSearchService = new LuceneSearch();
this.notebookAuthorization = NotebookAuthorization.init(conf);
this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer,
notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
noteSearchService, notebookAuthorization, credentials);

// to update notebook from application event from remote process.
Expand Down Expand Up @@ -194,7 +200,7 @@ public static void main(String[] args) throws InterruptedException {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
notebook.getInterpreterFactory().shutdown();
notebook.getInterpreterSettingManager().shutdown();
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {
Expand All @@ -217,7 +223,7 @@ public static void main(String[] args) throws InterruptedException {
}

jettyWebServer.join();
ZeppelinServer.notebook.getInterpreterFactory().close();
ZeppelinServer.notebook.getInterpreterSettingManager().close();
}

private static Server setupJettyServer(ZeppelinConfiguration conf) {
Expand Down Expand Up @@ -377,7 +383,8 @@ public Set<Object> getSingletons() {
HeliumRestApi heliumApi = new HeliumRestApi(helium, notebook);
singletons.add(heliumApi);

InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory, notebookWsServer);
InterpreterRestApi interpreterApi = new InterpreterRestApi(interpreterSettingManager,
notebookWsServer);
singletons.add(interpreterApi);

CredentialRestApi credentialApi = new CredentialRestApi(credentials);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
Expand Down Expand Up @@ -473,7 +474,7 @@ private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
List<String> ids = notebook.getInterpreterFactory().getInterpreters(note.getId());
List<String> ids = notebook.getInterpreterSettingManager().getInterpreters(note.getId());
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(note.getId(), m);
Expand Down Expand Up @@ -991,7 +992,7 @@ private void createNote(NotebookSocket conn, HashSet<String> userAndRoles, Noteb
if (!StringUtils.isEmpty(defaultInterpreterId)) {
List<String> interpreterSettingIds = new LinkedList<>();
interpreterSettingIds.add(defaultInterpreterId);
for (String interpreterSettingId : notebook.getInterpreterFactory().
for (String interpreterSettingId : notebook.getInterpreterSettingManager().
getDefaultInterpreterSettingList()) {
if (!interpreterSettingId.equals(defaultInterpreterId)) {
interpreterSettingIds.add(interpreterSettingId);
Expand Down Expand Up @@ -1334,7 +1335,7 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRo
Note note = notebook.getNote(noteId);
if (note != null) {
List<InterpreterSetting> settings =
notebook.getInterpreterFactory().getInterpreterSettings(note.getId());
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup(user, note.getId()) == null) {
continue;
Expand Down Expand Up @@ -1376,7 +1377,7 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRo
// interpreter.
for (Note n : notebook.getAllNotes()) {
List<InterpreterSetting> settings =
notebook.getInterpreterFactory().getInterpreterSettings(note.getId());
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup(user, n.getId()) == null) {
continue;
Expand Down Expand Up @@ -2197,7 +2198,7 @@ public NotebookEventListener getNotebookInformationListener() {
private void sendAllAngularObjects(Note note, String user, NotebookSocket conn)
throws IOException {
List<InterpreterSetting> settings =
notebook().getInterpreterFactory().getInterpreterSettings(note.getId());
notebook().getInterpreterSettingManager().getInterpreterSettings(note.getId());
if (settings == null || settings.size() == 0) {
return;
}
Expand Down Expand Up @@ -2235,7 +2236,7 @@ public void onUpdate(String interpreterGroupId, AngularObject object) {
}

List<InterpreterSetting> intpSettings =
notebook.getInterpreterFactory().getInterpreterSettings(note.getId());
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
if (intpSettings.isEmpty()) {
continue;
}
Expand All @@ -2255,7 +2256,8 @@ public void onRemove(String interpreterGroupId, String name, String noteId, Stri
continue;
}

List<String> settingIds = notebook.getInterpreterFactory().getInterpreters(note.getId());
List<String> settingIds =
notebook.getInterpreterSettingManager().getInterpreters(note.getId());
for (String id : settingIds) {
if (interpreterGroupId.contains(id)) {
broadcast(note.getId(),
Expand All @@ -2274,21 +2276,25 @@ private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws I
String user = fromMessage.principal;
Message resp = new Message(OP.EDITOR_SETTING);
resp.put("paragraphId", paragraphId);
resp.put("editor", notebook().getInterpreterFactory().getEditorSetting(user, noteId, replName));
Interpreter interpreter =
notebook().getInterpreterFactory().getInterpreter(user, noteId, replName);
resp.put("editor", notebook().getInterpreterSettingManager().
getEditorSetting(interpreter, user, noteId, replName));
conn.send(serializeMessage(resp));
return;
}

private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject)
throws IOException {
List<InterpreterSetting> availableSettings = notebook().getInterpreterFactory().get();
List<InterpreterSetting> availableSettings = notebook().getInterpreterSettingManager().get();
conn.send(serializeMessage(
new Message(OP.INTERPRETER_SETTINGS).put("interpreterSettings", availableSettings)));
}

@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
InterpreterSetting interpreterSetting = notebook().getInterpreterFactory().get(settingId);
InterpreterSetting interpreterSetting =
notebook().getInterpreterSettingManager().get(settingId);
interpreterSetting.setInfos(metaInfos);
}

Expand Down Expand Up @@ -2342,8 +2348,8 @@ public void onParaInfosReceived(String noteId, String paragraphId,
if (note != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph != null) {
InterpreterSetting setting = notebook().getInterpreterFactory()
.get(interpreterSettingId);
InterpreterSetting setting = notebook().getInterpreterSettingManager()
.get(interpreterSettingId);
setting.addNoteToPara(noteId, paragraphId);
String label = metaInfos.get("label");
String tooltip = metaInfos.get("tooltip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static List<InterpreterSettingsList> getInterpreterBindings(Notebook note
setting.getInterpreterInfos(), true));
}

List<InterpreterSetting> availableSettings = notebook.getInterpreterFactory().get();
List<InterpreterSetting> availableSettings = notebook.getInterpreterSettingManager().get();
for (InterpreterSetting setting : availableSettings) {
boolean selected = false;
for (InterpreterSetting selectedSetting : selectedSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ private static void start(boolean withAuth) throws Exception {

// assume first one is spark
InterpreterSetting sparkIntpSetting = null;
for(InterpreterSetting intpSetting : ZeppelinServer.notebook.getInterpreterFactory().get()) {
for(InterpreterSetting intpSetting :
ZeppelinServer.notebook.getInterpreterSettingManager().get()) {
if (intpSetting.getName().equals("spark")) {
sparkIntpSetting = intpSetting;
}
Expand All @@ -208,7 +209,7 @@ private static void start(boolean withAuth) throws Exception {
sparkIntpSetting.setProperties(sparkProperties);
pySpark = true;
sparkR = true;
ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.getId());
ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
} else {
String sparkHome = getSparkHome();
if (sparkHome != null) {
Expand All @@ -225,7 +226,7 @@ private static void start(boolean withAuth) throws Exception {
sparkR = true;
}

ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.getId());
ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
}
}
}
Expand Down Expand Up @@ -292,10 +293,10 @@ private static boolean isActiveSparkHome(File dir) {
protected static void shutDown() throws Exception {
if (!wasRunning) {
// restart interpreter to stop all interpreter processes
List<String> settingList = ZeppelinServer.notebook.getInterpreterFactory()
List<String> settingList = ZeppelinServer.notebook.getInterpreterSettingManager()
.getDefaultInterpreterSettingList();
for (String setting : settingList) {
ZeppelinServer.notebook.getInterpreterFactory().restart(setting);
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting);
}
if (shiroIni != null) {
FileUtils.deleteQuietly(shiroIni);
Expand Down
Loading