Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 60 additions & 24 deletions hive/src/main/java/org/apache/zeppelin/hive/HiveInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -73,9 +74,11 @@ public class HiveInterpreter extends Interpreter {
static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY;

private final HashMap<String, Properties> propertiesMap;
private final Map<String, Connection> keyConnectionMap;
private final Map<String, Statement> paragraphIdStatementMap;

private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap;
private final Map<String, Connection> paragraphIdConnectionMap;

static {
Interpreter.register(
"hql",
Expand All @@ -92,8 +95,9 @@ public class HiveInterpreter extends Interpreter {
public HiveInterpreter(Properties property) {
super(property);
propertiesMap = new HashMap<>();
keyConnectionMap = new HashMap<>();
propertyKeyUnusedConnectionListMap = new HashMap<>();
paragraphIdStatementMap = new HashMap<>();
paragraphIdConnectionMap = new HashMap<>();
}

public HashMap<String, Properties> getPropertiesMap() {
Expand Down Expand Up @@ -142,28 +146,37 @@ public void open() {
@Override
public void close() {
try {
for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) {
for (Connection c : connectionList) {
c.close();
}
}

for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
paragraphIdStatementMap.clear();

for (Connection connection : keyConnectionMap.values()) {
for (Connection connection : paragraphIdConnectionMap.values()) {
connection.close();
}
keyConnectionMap.clear();
paragraphIdConnectionMap.clear();

} catch (SQLException e) {
logger.error("Error while closing...", e);
}
}

public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException {
Connection connection = null;
if (keyConnectionMap.containsKey(propertyKey)) {
connection = keyConnectionMap.get(propertyKey);
if (connection.isClosed() || !connection.isValid(10)) {
connection.close();
connection = null;
keyConnectionMap.remove(propertyKey);
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey);
if (0 != connectionList.size()) {
connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
if (null != connection && connection.isClosed()) {
connection.close();
connection = null;
}
}
}
if (null == connection) {
Expand All @@ -177,28 +190,40 @@ public Connection getConnection(String propertyKey) throws ClassNotFoundExceptio
} else {
connection = DriverManager.getConnection(url, properties);
}
keyConnectionMap.put(propertyKey, connection);
}
return connection;
}

public Statement getStatement(String propertyKey, String paragraphId)
throws SQLException, ClassNotFoundException {
Statement statement = null;
if (paragraphIdStatementMap.containsKey(paragraphId)) {
statement = paragraphIdStatementMap.get(paragraphId);
if (statement.isClosed()) {
statement = null;
paragraphIdStatementMap.remove(paragraphId);
}
Connection connection;
if (paragraphIdConnectionMap.containsKey(paragraphId)) {
// Never enter for now.
connection = paragraphIdConnectionMap.get(paragraphId);
} else {
connection = getConnection(propertyKey);
}
if (null == statement) {
statement = getConnection(propertyKey).createStatement();
paragraphIdStatementMap.put(paragraphId, statement);

Statement statement = connection.createStatement();
if (isStatementClosed(statement)) {
connection = getConnection(propertyKey);
statement = connection.createStatement();
}
paragraphIdConnectionMap.put(paragraphId, connection);
paragraphIdStatementMap.put(paragraphId, statement);

return statement;
}

private boolean isStatementClosed(Statement statement) {
try {
return statement.isClosed();
} catch (Throwable t) {
logger.debug("{} doesn't support isClosed method", statement);
return false;
}
}

public InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
String paragraphId = interpreterContext.getParagraphId();
Expand Down Expand Up @@ -259,7 +284,7 @@ public InterpreterResult executeSql(String propertyKey, String sql,
}
statement.close();
} finally {
removeStatement(paragraphId);
moveConnectionToUnused(propertyKey, paragraphId);
}
}

Expand All @@ -271,8 +296,19 @@ public InterpreterResult executeSql(String propertyKey, String sql,
}
}

private void removeStatement(String paragraphId) {
paragraphIdStatementMap.remove(paragraphId);
private void moveConnectionToUnused(String propertyKey, String paragraphId) {
if (paragraphIdConnectionMap.containsKey(paragraphId)) {
Connection connection = paragraphIdConnectionMap.remove(paragraphId);
if (null != connection) {
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
propertyKeyUnusedConnectionListMap.get(propertyKey).add(connection);
} else {
ArrayList<Connection> connectionList = new ArrayList<>();
connectionList.add(connection);
propertyKeyUnusedConnectionListMap.put(propertyKey, connectionList);
}
}
}
}

@Override
Expand Down