Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recreate session for both session error and connection exception #516

Merged
merged 3 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public void release() {
connection.signout(sessionID);
connection.close();
} catch (Exception e) {
log.warn("release session failed, " + e.getMessage());
// not print the warn to avoid confuse for session and connect,
// when connection is broken, release will failed, just make connection as null.
// log.warn("release session failed, " + e.getMessage());
}
connection = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.vesoft.nebula.client.graph;

import static com.vesoft.nebula.client.graph.exception.IOErrorException.E_CONNECT_BROKEN;

import com.alibaba.fastjson.JSON;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.client.graph.data.HostAddress;
Expand Down Expand Up @@ -138,22 +140,27 @@ public ResultSet execute(String stmt) throws IOErrorException,
stmtCheck(stmt);
checkSessionPool();
NebulaSession nebulaSession = getSession();
ResultSet resultSet;
try {
resultSet = nebulaSession.execute(stmt);

// re-execute for session error
if (isSessionError(resultSet)) {
ResultSet resultSet = null;
int tryTimes = 3;
while (tryTimes-- > 0) {
try {
resultSet = nebulaSession.execute(stmt);
if (!isSessionError(resultSet)) {
useSpace(nebulaSession, resultSet);
return resultSet;
} else {
throw new IOErrorException(E_CONNECT_BROKEN, resultSet.getErrorMessage());
}
} catch (IOErrorException e) {
nebulaSession.release();
sessionList.remove(nebulaSession);
nebulaSession = getSession();
resultSet = nebulaSession.execute(stmt);
if (tryTimes > 0) {
nebulaSession = createSessionObject(SessionState.USED);
} else {
throw e;
}
}
} catch (IOErrorException e) {
useSpace(nebulaSession, null);
throw e;
}
useSpace(nebulaSession, resultSet);
return resultSet;
}

Expand Down Expand Up @@ -339,7 +346,8 @@ public HostAddress getAddress() {
private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
throws IOErrorException {
if (resultSet == null) {
releaseSession(nebulaSession);
nebulaSession.release();
sessionList.remove(nebulaSession);
return;
}
// space has been drop, close the SessionPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setUp() throws Exception {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(1);
Assert.assertTrue(pool.init(Arrays.asList(new HostAddress(
"127.0.0.1", 9670)),
"127.0.0.1", 9669)),
nebulaPoolConfig));
session = pool.getSession("root", "nebula", true);
ResultSet resp = session.execute("CREATE SPACE IF NOT EXISTS test_data"
Expand Down Expand Up @@ -502,16 +502,13 @@ public void tesDataset() {
Collections.sort(pathStrings);
String listString = String.join(", ",
pathStrings);

String expectString = "ColumnName: [p],"
+ " Values: [(\"a\" )-[:like@0{}]->(\"f\" )<-[:like@0{}]-(\"c\" )], "
+ "ColumnName: [p],"
+ " Values: [(\"a\" )-[:like@0{}]->(\"g\" )-[:like@0{}]->(\"c\" )], "
+ "ColumnName: [p],"
+ " Values: [(\"a\" )<-[:like@0{}]-(\"b\" )<-[:like@0{}]-(\"c\" )], "
+ "ColumnName: [p],"
+ " Values: [(\"a\" )<-[:like@0{}]-(\"d\" )-[:like@0{}]->(\"c\" )]";

String expectString = "ColumnName: [p], Values: [(\"a\" :player {})-[:like@0{}]->"
+ "(\"f\" :player {})<-[:like@0{}]-(\"c\" :player {})], ColumnName: [p], "
+ "Values: [(\"a\" :player {})-[:like@0{}]->(\"g\" :player {})-[:like@0{}]->"
+ "(\"c\" :player {})], ColumnName: [p], Values: [(\"a\" :player {})"
+ "<-[:like@0{}]-(\"b\" :player {})<-[:like@0{}]-(\"c\" :player {})], "
+ "ColumnName: [p], Values: [(\"a\" :player {})<-[:like@0{}]-(\"d\" :player {})"
+ "-[:like@0{}]->(\"c\" :player {})]";
Assert.assertEquals(expectString, listString);
} catch (IOErrorException
| InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,51 @@ public void testReleaseBrokenSession() {
}
}

@Test
public void testBrokenConnectionForSession() {
List<HostAddress> addresses = Arrays.asList(new HostAddress(ip, 9669));
SessionPoolConfig config = new SessionPoolConfig(addresses, "space_for_session_pool",
"root", "nebula");
SessionPool sessionPool = new SessionPool(config);
assert sessionPool.init();

Runtime runtime = Runtime.getRuntime();
try {
try {
ResultSet resultSet = sessionPool.execute("SHOW SPACES;");
if (!resultSet.isSucceeded()) {
System.out.println("show spaces failed, ErrorCode:"
+ resultSet.getErrorCode() + " ErrorMessage:"
+ resultSet.getErrorMessage());
}
} catch (Exception e) {
e.printStackTrace();
assert false;
}

String cmd = "docker restart nebula-docker-compose_graphd0_1";
Process p = runtime.exec(cmd);
p.waitFor(5, TimeUnit.SECONDS);
ProcessUtil.printProcessStatus(cmd, p);
Thread.sleep(30000);

try {
ResultSet resultSet = sessionPool.execute("SHOW SPACES;");
if (!resultSet.isSucceeded()) {
System.out.println("show spaces failed, ErrorCode:"
+ resultSet.getErrorCode() + " ErrorMessage:"
+ resultSet.getErrorMessage());
}
} catch (Exception e) {
e.printStackTrace();
assert false;
}
sessionPool.close();
} catch (Exception e) {
e.printStackTrace();
Assert.assertFalse(e.getMessage(), false);
}

}

}