diff --git a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java index b1f3339674e..912b55e2651 100644 --- a/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java +++ b/hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java @@ -23,6 +23,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -73,9 +74,11 @@ public class HiveInterpreter extends Interpreter { static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY; private final HashMap propertiesMap; - private final Map keyConnectionMap; private final Map paragraphIdStatementMap; + private final Map> propertyKeyUnusedConnectionListMap; + private final Map paragraphIdConnectionMap; + static { Interpreter.register( "hql", @@ -92,8 +95,9 @@ public class HiveInterpreter extends Interpreter { public HiveInterpreter(Properties property) { super(property); propertiesMap = new HashMap<>(); - keyConnectionMap = new HashMap<>(); + propertyKeyUnusedConnectionListMap = new HashMap<>(); paragraphIdStatementMap = new HashMap<>(); + paragraphIdConnectionMap = new HashMap<>(); } public HashMap getPropertiesMap() { @@ -142,15 +146,22 @@ public void open() { @Override public void close() { try { + for (List connectionList : propertyKeyUnusedConnectionListMap.values()) { + for (Connection c : connectionList) { + c.close(); + } + } + for (Statement statement : paragraphIdStatementMap.values()) { statement.close(); } paragraphIdStatementMap.clear(); - for (Connection connection : keyConnectionMap.values()) { + for (Connection connection : paragraphIdConnectionMap.values()) { connection.close(); } - keyConnectionMap.clear(); + paragraphIdConnectionMap.clear(); + } catch (SQLException e) { logger.error("Error while closing...", e); } @@ -158,12 +169,14 @@ public void close() { public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException { Connection connection = null; - if (keyConnectionMap.containsKey(propertyKey)) { - connection = keyConnectionMap.get(propertyKey); - if (connection.isClosed() || !connection.isValid(10)) { - connection.close(); - connection = null; - keyConnectionMap.remove(propertyKey); + if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) { + ArrayList connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey); + if (0 != connectionList.size()) { + connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0); + if (null != connection && connection.isClosed()) { + connection.close(); + connection = null; + } } } if (null == connection) { @@ -177,28 +190,40 @@ public Connection getConnection(String propertyKey) throws ClassNotFoundExceptio } else { connection = DriverManager.getConnection(url, properties); } - keyConnectionMap.put(propertyKey, connection); } return connection; } public Statement getStatement(String propertyKey, String paragraphId) throws SQLException, ClassNotFoundException { - Statement statement = null; - if (paragraphIdStatementMap.containsKey(paragraphId)) { - statement = paragraphIdStatementMap.get(paragraphId); - if (statement.isClosed()) { - statement = null; - paragraphIdStatementMap.remove(paragraphId); - } + Connection connection; + if (paragraphIdConnectionMap.containsKey(paragraphId)) { + // Never enter for now. + connection = paragraphIdConnectionMap.get(paragraphId); + } else { + connection = getConnection(propertyKey); } - if (null == statement) { - statement = getConnection(propertyKey).createStatement(); - paragraphIdStatementMap.put(paragraphId, statement); + + Statement statement = connection.createStatement(); + if (isStatementClosed(statement)) { + connection = getConnection(propertyKey); + statement = connection.createStatement(); } + paragraphIdConnectionMap.put(paragraphId, connection); + paragraphIdStatementMap.put(paragraphId, statement); + return statement; } + private boolean isStatementClosed(Statement statement) { + try { + return statement.isClosed(); + } catch (Throwable t) { + logger.debug("{} doesn't support isClosed method", statement); + return false; + } + } + public InterpreterResult executeSql(String propertyKey, String sql, InterpreterContext interpreterContext) { String paragraphId = interpreterContext.getParagraphId(); @@ -259,7 +284,7 @@ public InterpreterResult executeSql(String propertyKey, String sql, } statement.close(); } finally { - removeStatement(paragraphId); + moveConnectionToUnused(propertyKey, paragraphId); } } @@ -271,8 +296,19 @@ public InterpreterResult executeSql(String propertyKey, String sql, } } - private void removeStatement(String paragraphId) { - paragraphIdStatementMap.remove(paragraphId); + private void moveConnectionToUnused(String propertyKey, String paragraphId) { + if (paragraphIdConnectionMap.containsKey(paragraphId)) { + Connection connection = paragraphIdConnectionMap.remove(paragraphId); + if (null != connection) { + if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) { + propertyKeyUnusedConnectionListMap.get(propertyKey).add(connection); + } else { + ArrayList connectionList = new ArrayList<>(); + connectionList.add(connection); + propertyKeyUnusedConnectionListMap.put(propertyKey, connectionList); + } + } + } } @Override