Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 30, 2024
1 parent 8e09e1b commit 47427a9
Show file tree
Hide file tree
Showing 3 changed files with 352 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,19 @@ private void getStreamStatementResult(String handle, ServerStreamListener listen
String[] handleParts = handle.split(":");
String executedPeerIdentity = handleParts[0];
String queryId = handleParts[1];
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different.
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
try {
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different.
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull(
connectContext.getFlightSqlChannel().getResult(queryId));
final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot();
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (Exception e) {
listener.error(e);
String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
handleStreamException(e, errMsg, listener);
} finally {
listener.completed();
// The result has been sent or sent failed, delete it.
Expand Down Expand Up @@ -280,7 +278,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
LOG.error(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
} finally {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
Expand Down Expand Up @@ -361,7 +359,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(
e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
LOG.error(errMsg, e);
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
return;
} catch (final Throwable t) {
Expand Down Expand Up @@ -407,7 +405,7 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate
} catch (Exception e) {
String errMsg = "acceptPutPreparedStatementUpdate failed, " + e.getMessage() + ", "
+ Util.getRootCauseMessage(e);
LOG.warn(errMsg, e);
LOG.error(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
};
Expand Down Expand Up @@ -479,7 +477,29 @@ public FlightInfo getFlightInfoTables(final CommandGetTables request, final Call
@Override
public void getStreamTables(final CommandGetTables command, final CallContext context,
final ServerStreamListener listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException();
try {
// TODO
// command.getDbSchemaFilterPattern();
// command.getTableNameFilterPattern();
// command.getTableTypesList();
// context.isCancelled();

ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext);
final boolean includeSchema = command.getIncludeSchema();
final Schema schema = includeSchema ? Schemas.GET_TABLES_SCHEMA
: Schemas.GET_TABLES_SCHEMA_NO_SCHEMA;

try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) {
listener.start(vectorSchemaRoot);
vectorSchemaRoot.allocateNew();
flightSqlSchemaHelper.getTablesSchema(vectorSchemaRoot, includeSchema);
listener.putNext();
listener.completed();
}
} catch (final Exception e) {
handleStreamException(e, "", listener);
}
}

@Override
Expand Down Expand Up @@ -550,4 +570,10 @@ private <T extends Message> FlightInfo getFlightInfoForSchema(final T request, f

return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}

private static void handleStreamException(Exception e, String errMsg, ServerStreamListener listener) {
LOG.error(errMsg, e);
listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
}
Loading

0 comments on commit 47427a9

Please sign in to comment.