diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 52f0ac027b93d9..6c15cd4dd1e2bf 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -64,8 +64,29 @@ DEFINE_Int32(brpc_port, "8060"); DEFINE_Int32(arrow_flight_sql_port, "-1"); -DEFINE_mString(public_access_ip, ""); -DEFINE_Int32(public_access_port, "-1"); +// If the external client cannot directly access priority_networks, set public_host to be accessible +// to external client. +// There are usually two usage scenarios: +// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network. +// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be +// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx. +// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP, +// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP. +DEFINE_mString(public_host, ""); + +// If the BE node is connected to the external network through a reverse proxy like Nginx +// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy +// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example: +// upstream arrowflight { +// server 10.16.10.8:8069; +// server 10.16.10.8:8068; +//} +// server { +// listen 8167 http2; +// listen [::]:8167 http2; +// server_name doris.arrowflight.com; +// } +DEFINE_Int32(arrow_flight_sql_proxy_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/common/config.h b/be/src/common/config.h index 25127f222e2484..22462202b2e28b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port); // Default -1, do not start arrow flight sql server. DECLARE_Int32(arrow_flight_sql_port); -// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP. -// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result. -// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip. -DECLARE_mString(public_access_ip); -DECLARE_Int32(public_access_port); +// If the external client cannot directly access priority_networks, set public_host to be accessible +// to external client. +// There are usually two usage scenarios: +// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network. +// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be +// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx. +// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP, +// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP. +DECLARE_mString(public_host); + +// If the BE node is connected to the external network through a reverse proxy like Nginx +// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy +// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example: +// upstream arrowflight { +// server 10.16.10.8:8069; +// server 10.16.10.8:8068; +//} +// server { +// listen 8167 http2; +// listen [::]:8167 http2; +// server_name doris.arrowflight.com; +// } +DECLARE_Int32(arrow_flight_sql_proxy_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c476398a1a4962..fb0b2f090bc045 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -908,9 +908,11 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController st = serialize_arrow_schema(&schema, &schema_str); if (st.ok()) { result->set_schema(std::move(schema_str)); - if (!config::public_access_ip.empty() && config::public_access_port != -1) { - result->set_be_arrow_flight_ip(config::public_access_ip); - result->set_be_arrow_flight_port(config::public_access_port); + if (!config::public_host.empty()) { + result->set_be_arrow_flight_ip(config::public_host); + } + if (config::arrow_flight_sql_proxy_port != -1) { + result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port); } } st.to_protobuf(result->mutable_status()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 265d1f894556eb..33411bc282f903 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.DecimalLiteral; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FloatLiteral; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; @@ -61,11 +60,11 @@ import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; import org.apache.doris.task.LoadTaskInfo; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultSinkType; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -131,10 +130,7 @@ public enum ConnectType { protected volatile String peerIdentity; private final Map preparedQuerys = new HashMap<>(); private String runningQuery; - private TNetworkAddress resultFlightServerAddr; - private TNetworkAddress resultInternalServiceAddr; - private ArrayList resultOutputExprs; - private TUniqueId finstId; + private final List flightSqlEndpointsLocations = Lists.newArrayList(); private boolean returnResultFromLocal = true; // mysql net protected volatile MysqlChannel mysqlChannel; @@ -702,36 +698,16 @@ public String getRunningQuery() { return runningQuery; } - public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) { - this.resultFlightServerAddr = resultFlightServerAddr; + public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) { + this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation); } - public TNetworkAddress getResultFlightServerAddr() { - return resultFlightServerAddr; + public List getFlightSqlEndpointsLocations() { + return flightSqlEndpointsLocations; } - public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) { - this.resultInternalServiceAddr = resultInternalServiceAddr; - } - - public TNetworkAddress getResultInternalServiceAddr() { - return resultInternalServiceAddr; - } - - public void setResultOutputExprs(ArrayList resultOutputExprs) { - this.resultOutputExprs = resultOutputExprs; - } - - public ArrayList getResultOutputExprs() { - return resultOutputExprs; - } - - public void setFinstId(TUniqueId finstId) { - this.finstId = finstId; - } - - public TUniqueId getFinstId() { - return finstId; + public void clearFlightSqlEndpointsLocations() { + flightSqlEndpointsLocations.clear(); } public void setReturnResultFromLocal(boolean returnResultFromLocal) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 94fb92812962ca..774e4efa4328a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -83,6 +83,7 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; @@ -731,29 +732,27 @@ private void execInternal() throws Exception { enableParallelResultSink = queryOptions.isEnableParallelOutfile(); } - TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; Set addrs = new HashSet<>(); for (FInstanceExecParam param : topParams.instanceExecParams) { if (addrs.contains(param.host)) { continue; } addrs.add(param.host); - receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), - toBrpcHost(param.host), this.timeoutDeadline, - context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); - } - - if (!context.isReturnResultFromLocal()) { - Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); - if (enableParallelResultSink) { - context.setFinstId(queryId); + if (context.isReturnResultFromLocal()) { + receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), + toBrpcHost(param.host), this.timeoutDeadline, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); } else { - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); + Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); + TUniqueId finstId; + if (enableParallelResultSink) { + finstId = queryId; + } else { + finstId = topParams.instanceExecParams.get(0).instanceId; + } + context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId, + toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs())); } - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); - context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr)); - context.setResultInternalServiceAddr(toBrpcHost(execBeAddr)); - context.setResultOutputExprs(fragments.get(0).getOutputExprs()); } LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId), @@ -764,7 +763,8 @@ private void execInternal() throws Exception { // set the broker address for OUTFILE sink ResultFileSink topResultFileSink = (ResultFileSink) topDataSink; FsBroker broker = Env.getCurrentEnv().getBrokerMgr() - .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname()); + .getBroker(topResultFileSink.getBrokerName(), + topParams.instanceExecParams.get(0).host.getHostname()); topResultFileSink.setBrokerAddr(broker.host, broker.port); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 371680813502f4..154fd9f0b6b83c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -25,11 +25,13 @@ import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -129,21 +131,19 @@ private void getStreamStatementResult(String handle, ServerStreamListener listen String[] handleParts = handle.split(":"); String executedPeerIdentity = handleParts[0]; String queryId = handleParts[1]; + // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity); try { - // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull( connectContext.getFlightSqlChannel().getResult(queryId)); final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot(); listener.start(vectorSchemaRoot); listener.putNext(); } catch (Exception e) { - listener.error(e); String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); - throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + handleStreamException(e, errMsg, listener); } finally { listener.completed(); // The result has been sent or sent failed, delete it. @@ -187,6 +187,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con Preconditions.checkState(!query.isEmpty()); // After the previous query was executed, there was no getStreamStatement to take away the result. connectContext.getFlightSqlChannel().reset(); + connectContext.clearFlightSqlEndpointsLocations(); try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) { flightSQLConnectProcessor.handleQuery(query); if (connectContext.getState().getStateType() == MysqlStateType.ERR) { @@ -225,50 +226,59 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con } } else { // Now only query stmt will pull results from BE. - Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); - if (schema == null) { + flightSQLConnectProcessor.fetchArrowFlightSchema(5000); + if (flightSQLConnectProcessor.getArrowSchema() == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") .toRuntimeException(); } - TUniqueId queryId = connectContext.queryId(); - if (!connectContext.getSessionVariable().enableParallelResultSink()) { - // only one instance - queryId = connectContext.getFinstId(); - } - // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. - final ByteString handle = ByteString.copyFromUtf8( - DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname - + "&" + connectContext.getResultInternalServiceAddr().port + "&" + query); - TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) - .build(); - Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); - // TODO Support multiple endpoints. - Location location; - if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) { - // In a production environment, it is often inconvenient to expose Doris BE nodes - // to the external network. - // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, - // and the external client will be randomly routed to a Doris BE node when connecting to nginx. - // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. - // If it is different from the Doris BE node randomly routed by nginx, - // data forwarding needs to be done inside the Doris BE node. - location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname, - flightSQLConnectProcessor.getPublicAccessAddr().port); - } else { - location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); + List endpoints = Lists.newArrayList(); + for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) { + TUniqueId tid = endpointLoc.getFinstId(); + // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. + final ByteString handle = ByteString.copyFromUtf8( + DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&" + + endpointLoc.getResultInternalServiceAddr().port + "&" + query); + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(handle).build(); + Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + Location location; + if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) { + // In a production environment, it is often inconvenient to expose Doris BE nodes + // to the external network. + // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, + // and the external client will be randomly routed to a Doris BE node when connecting + // to nginx. + // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. + // If it is different from the Doris BE node randomly routed by nginx, + // data forwarding needs to be done inside the Doris BE node. + if (endpointLoc.getResultPublicAccessAddr().isSetPort()) { + location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname, + endpointLoc.getResultPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname, + endpointLoc.getResultFlightServerAddr().port); + } + } else { + location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname, + endpointLoc.getResultFlightServerAddr().port); + } + // By default, the query results of all BE nodes will be aggregated to one BE node. + // ADBC Client will only receive one endpoint and pull data from the BE node + // corresponding to this endpoint. + // `set global enable_parallel_result_sink=true;` to allow each BE to return query results + // separately. ADBC Client will receive multiple endpoints and pull data from each endpoint. + endpoints.add(new FlightEndpoint(ticket, location)); } - List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. - return new FlightInfo(schema, descriptor, endpoints, -1, -1); + return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1); } } } catch (Exception e) { String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } finally { connectContext.setCommand(MysqlCommand.COM_SLEEP); @@ -349,7 +359,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage( e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); return; } catch (final Throwable t) { @@ -395,7 +405,7 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate } catch (Exception e) { String errMsg = "acceptPutPreparedStatementUpdate failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } }; @@ -439,7 +449,21 @@ public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final @Override public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCatalogs unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + final Schema schema = Schemas.GET_CATALOGS_SCHEMA; + + try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getCatalogs(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -451,7 +475,22 @@ public FlightInfo getFlightInfoSchemas(final CommandGetDbSchemas request, final @Override public void getStreamSchemas(final CommandGetDbSchemas command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + flightSqlSchemaHelper.setParameterForGetDbSchemas(command); + final Schema schema = Schemas.GET_SCHEMAS_SCHEMA; + + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getSchemas(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -467,7 +506,23 @@ public FlightInfo getFlightInfoTables(final CommandGetTables request, final Call @Override public void getStreamTables(final CommandGetTables command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + flightSqlSchemaHelper.setParameterForGetTables(command); + final Schema schema = command.getIncludeSchema() ? Schemas.GET_TABLES_SCHEMA + : Schemas.GET_TABLES_SCHEMA_NO_SCHEMA; + + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getTables(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -490,7 +545,6 @@ public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, @Override public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException(); } @@ -533,9 +587,14 @@ public void getStreamCrossReference(CommandGetCrossReference command, CallContex private FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, final Schema schema) { final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); - // TODO Support multiple endpoints. final List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); return new FlightInfo(schema, descriptor, endpoints, -1, -1); } + + private static void handleStreamException(Exception e, String errMsg, ServerStreamListener listener) { + LOG.error(errMsg, e); + listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); + throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 9ba7a1118e674f..345d7d824a23ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -31,6 +31,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -58,7 +59,7 @@ */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); - private TNetworkAddress publicAccessAddr = new TNetworkAddress(); + private Schema arrowSchema; public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -67,8 +68,8 @@ public FlightSqlConnectProcessor(ConnectContext context) { context.setReturnResultFromLocal(true); } - public TNetworkAddress getPublicAccessAddr() { - return publicAccessAddr; + public Schema getArrowSchema() { + return arrowSchema; } public void prepare(MysqlCommand command) { @@ -107,78 +108,87 @@ public void handleQuery(String query) throws ConnectionException { // handleFieldList(tableName); // } - public Schema fetchArrowFlightSchema(int timeoutMs) { - TNetworkAddress address = ctx.getResultInternalServiceAddr(); - TUniqueId tid; - if (ctx.getSessionVariable().enableParallelResultSink()) { - tid = ctx.queryId(); - } else { - // only one instance - tid = ctx.getFinstId(); + public void fetchArrowFlightSchema(int timeoutMs) { + if (ctx.getFlightSqlEndpointsLocations().isEmpty()) { + throw new RuntimeException("fetch arrow flight schema failed, no FlightSqlEndpointsLocations."); } - ArrayList resultOutputExprs = ctx.getResultOutputExprs(); - Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); - try { - InternalService.PFetchArrowFlightSchemaRequest request = - InternalService.PFetchArrowFlightSchemaRequest.newBuilder() - .setFinstId(queryId) - .build(); - - Future future - = BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request); - InternalService.PFetchArrowFlightSchemaResult pResult; - pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); - if (pResult == null) { - throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s", - DebugUtil.printId(tid))); - } - Status resultStatus = new Status(pResult.getStatus()); - if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", - DebugUtil.printId(tid), resultStatus)); - } - if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) { - publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); - publicAccessAddr.port = pResult.getBeArrowFlightPort(); - } - if (pResult.hasSchema() && pResult.getSchema().size() > 0) { - RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); - ArrowStreamReader arrowStreamReader = new ArrowStreamReader( - new ByteArrayInputStream(pResult.getSchema().toByteArray()), - rootAllocator - ); - try { - VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); - List fieldVectors = root.getFieldVectors(); - if (fieldVectors.size() != resultOutputExprs.size()) { - throw new RuntimeException(String.format( - "Schema size %s' is not equal to arrow field size %s, queryId: %s.", - fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + for (FlightSqlEndpointsLocation endpointLoc : ctx.getFlightSqlEndpointsLocations()) { + TNetworkAddress address = endpointLoc.getResultInternalServiceAddr(); + TUniqueId tid = endpointLoc.getFinstId(); + ArrayList resultOutputExprs = endpointLoc.getResultOutputExprs(); + Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + try { + InternalService.PFetchArrowFlightSchemaRequest request + = InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build(); + + Future future = BackendServiceProxy.getInstance() + .fetchArrowFlightSchema(address, request); + InternalService.PFetchArrowFlightSchemaResult pResult; + pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); + if (pResult == null) { + throw new RuntimeException( + String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid))); + } + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + throw new RuntimeException( + String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", + DebugUtil.printId(tid), resultStatus)); + } + + TNetworkAddress resultPublicAccessAddr = new TNetworkAddress(); + if (pResult.hasBeArrowFlightIp()) { + resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); + } + if (pResult.hasBeArrowFlightPort()) { + resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort()); + } + endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr); + if (pResult.hasSchema() && pResult.getSchema().size() > 0) { + RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); + ArrowStreamReader arrowStreamReader = new ArrowStreamReader( + new ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator); + try { + Schema schema; + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + List fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() != resultOutputExprs.size()) { + throw new RuntimeException( + String.format("Schema size %s' is not equal to arrow field size %s, queryId: %s.", + fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + } + schema = root.getSchema(); + if (arrowSchema == null) { + arrowSchema = schema; + } else if (!arrowSchema.equals(schema)) { + throw new RuntimeException(String.format( + "The schema returned by results BE is different, first schema: %s, " + + "new schema: %s, queryId: %s,backend: %s", arrowSchema, schema, + DebugUtil.printId(tid), address)); + } + } catch (Exception e) { + throw new RuntimeException("Read Arrow Flight Schema failed.", e); } - return root.getSchema(); - } catch (Exception e) { - throw new RuntimeException("Read Arrow Flight Schema failed.", e); + } else { + throw new RuntimeException( + String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid))); } - } else { - throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s", - DebugUtil.printId(tid))); + } catch (RpcException e) { + throw new RuntimeException( + String.format("arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (InterruptedException e) { + throw new RuntimeException( + String.format("arrow flight schema future get interrupted exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (ExecutionException e) { + throw new RuntimeException( + String.format("arrow flight schema future get execution exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (TimeoutException e) { + throw new RuntimeException(String.format("arrow flight schema fetch timeout, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } - } catch (RpcException e) { - throw new RuntimeException(String.format( - "arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (InterruptedException e) { - throw new RuntimeException(String.format( - "arrow flight schema future get interrupted exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (ExecutionException e) { - throw new RuntimeException(String.format( - "arrow flight schema future get execution exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (TimeoutException e) { - throw new RuntimeException(String.format( - "arrow flight schema fetch timeout, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java new file mode 100644 index 00000000000000..4b314a16c5fe94 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java @@ -0,0 +1,395 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.service.FrontendServiceImpl; +import org.apache.doris.thrift.TColumnDef; +import org.apache.doris.thrift.TColumnDesc; +import org.apache.doris.thrift.TDescribeTablesParams; +import org.apache.doris.thrift.TDescribeTablesResult; +import org.apache.doris.thrift.TGetDbsParams; +import org.apache.doris.thrift.TGetDbsResult; +import org.apache.doris.thrift.TGetTablesParams; +import org.apache.doris.thrift.TListTableStatusResult; +import org.apache.doris.thrift.TTableStatus; + +import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; +import org.apache.arrow.flight.sql.FlightSqlColumnMetadata; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.complex.BaseRepeatedValueVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class FlightSqlSchemaHelper { + private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class); + private final ConnectContext ctx; + private final FrontendServiceImpl impl; + private boolean includeSchema; + private String catalogFilterPattern = null; + private String dbSchemaFilterPattern = null; + private String tableNameFilterPattern = null; + private List tableTypesList = null; + + public FlightSqlSchemaHelper(ConnectContext context) { + ctx = context; + impl = new FrontendServiceImpl(ExecuteEnv.getInstance()); + } + + private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); + + /** + * Convert Doris data type to an arrowType. + *

+ * Ref: `convert_to_arrow_type` in be/src/util/arrow/row_batch.cpp. + * which is consistent with the type of Arrow data returned by Doris Arrow Flight Sql query. + */ + private static ArrowType getArrowType(PrimitiveType primitiveType, Integer precision, Integer scale, + String timeZone) { + switch (primitiveType) { + case BOOLEAN: + return new ArrowType.Bool(); + case TINYINT: + return new ArrowType.Int(8, true); + case SMALLINT: + return new ArrowType.Int(16, true); + case INT: + case IPV4: + return new ArrowType.Int(32, true); + case BIGINT: + return new ArrowType.Int(64, true); + case FLOAT: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case LARGEINT: + case VARCHAR: + case STRING: + case CHAR: + case DATETIME: + case DATE: + case JSONB: + case IPV6: + case VARIANT: + return new ArrowType.Utf8(); + case DATEV2: + return new ArrowType.Date(DateUnit.MILLISECOND); + case DATETIMEV2: + if (scale > 3) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZone); + } else if (scale > 0) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, timeZone); + } else { + return new ArrowType.Timestamp(TimeUnit.SECOND, timeZone); + } + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return new ArrowType.Decimal(precision, scale, 128); + case DECIMAL256: + return new ArrowType.Decimal(precision, scale, 256); + case DECIMALV2: + return new ArrowType.Decimal(27, 9, 128); + case HLL: + case BITMAP: + case QUANTILE_STATE: + return new ArrowType.Binary(); + case MAP: + return new ArrowType.Map(false); + case ARRAY: + return new ArrowType.List(); + case STRUCT: + return new ArrowType.Struct(); + default: + return new ArrowType.Null(); + } + } + + private static ArrowType columnDescToArrowType(final TColumnDesc desc) { + PrimitiveType primitiveType = PrimitiveType.fromThrift(desc.getColumnType()); + Integer precision = desc.isSetColumnPrecision() ? desc.getColumnPrecision() : null; + Integer scale = desc.isSetColumnScale() ? desc.getColumnScale() : null; + // TODO there is no timezone in TColumnDesc, so use current timezone. + String timeZone = JdbcToArrowUtils.getUtcCalendar().getTimeZone().getID(); + return getArrowType(primitiveType, precision, scale, timeZone); + } + + private static Map createFlightSqlColumnMetadata(final String dbName, final String tableName, + final TColumnDesc desc) { + final FlightSqlColumnMetadata.Builder columnMetadataBuilder = new FlightSqlColumnMetadata.Builder().schemaName( + dbName).tableName(tableName).typeName(PrimitiveType.fromThrift(desc.getColumnType()).toString()) + .isAutoIncrement(false).isCaseSensitive(false).isReadOnly(true).isSearchable(true); + + if (desc.isSetColumnPrecision()) { + columnMetadataBuilder.precision(desc.getColumnPrecision()); + } + if (desc.isSetColumnScale()) { + columnMetadataBuilder.scale(desc.getColumnScale()); + } + return columnMetadataBuilder.build().getMetadataMap(); + } + + protected static byte[] getSerializedSchema(List fields) { + if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { + fields = Collections.emptyList(); + } else if (fields == null) { + return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); + } + + final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); + final Schema schema = new Schema(fields); + + try { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); + } catch (final IOException e) { + throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + } + + return columnOutputStream.toByteArray(); + } + + /** + * Set in the Tables request object the parameter that user passed via CommandGetTables. + */ + public void setParameterForGetTables(CommandGetTables command) { + includeSchema = command.getIncludeSchema(); + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + tableNameFilterPattern = command.hasTableNameFilterPattern() ? command.getTableNameFilterPattern() : null; + tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); + } + + /** + * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. + */ + public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + } + + /** + * Call FrontendServiceImpl->getDbNames. + */ + private TGetDbsResult getDbNames() throws TException { + TGetDbsParams getDbsParams = new TGetDbsParams(); + if (catalogFilterPattern != null) { + getDbsParams.setCatalog(catalogFilterPattern); + } + if (dbSchemaFilterPattern != null) { + getDbsParams.setPattern(dbSchemaFilterPattern); + } + getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.getDbNames(getDbsParams); + } + + /** + * Call FrontendServiceImpl->listTableStatus. + */ + private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { + TGetTablesParams getTablesParams = new TGetTablesParams(); + getTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + getTablesParams.setCatalog(catalogName); + } + if (tableNameFilterPattern != null) { + getTablesParams.setPattern(tableNameFilterPattern); + } + if (tableTypesList != null) { + getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. + } + getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.listTableStatus(getTablesParams); + } + + /** + * Call FrontendServiceImpl->describeTables. + */ + private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) + throws TException { + TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); + describeTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + describeTablesParams.setCatalog(catalogName); + } + describeTablesParams.setTablesName(tablesName); + describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.describeTables(describeTablesParams); + } + + /** + * Construct > + */ + private Map> buildTableToFields(String dbName, TDescribeTablesResult describeTablesResult, + List tablesName) { + Map> tableToFields = new HashMap<>(); + int columnIndex = 0; + for (int tableIndex = 0; tableIndex < describeTablesResult.getTablesOffsetSize(); tableIndex++) { + String tableName = tablesName.get(tableIndex); + final List fields = new ArrayList<>(); + Integer tableOffset = describeTablesResult.getTablesOffset().get(tableIndex); + for (; columnIndex < tableOffset; columnIndex++) { + TColumnDef columnDef = describeTablesResult.getColumns().get(columnIndex); + TColumnDesc columnDesc = columnDef.getColumnDesc(); + final ArrowType columnArrowType = columnDescToArrowType(columnDesc); + + List columnArrowTypeChildren; + // Arrow complex types may require children fields for parsing the schema on C++ + switch (columnArrowType.getTypeID()) { + case List: + case LargeList: + case FixedSizeList: + columnArrowTypeChildren = Collections.singletonList( + Field.notNullable(BaseRepeatedValueVector.DATA_VECTOR_NAME, + ZeroVector.INSTANCE.getField().getType())); + break; + case Map: + columnArrowTypeChildren = Collections.singletonList( + Field.notNullable(MapVector.DATA_VECTOR_NAME, new ArrowType.List())); + break; + case Struct: + columnArrowTypeChildren = Collections.emptyList(); + break; + default: + columnArrowTypeChildren = null; + break; + } + + final Field field = new Field(columnDesc.getColumnName(), + new FieldType(columnDesc.isIsAllowNull(), columnArrowType, null, + createFlightSqlColumnMetadata(dbName, tableName, columnDesc)), columnArrowTypeChildren); + fields.add(field); + } + tableToFields.put(tableName, fields); + } + return tableToFields; + } + + /** + * for FlightSqlProducer Schemas.GET_CATALOGS_SCHEMA + */ + public void getCatalogs(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + + Set catalogsSet = new LinkedHashSet<>(); + catalogsSet.add("internal"); // An ordered Set with "internal" first. + for (CatalogIf catalog : Env.getCurrentEnv().getCatalogMgr().listCatalogs()) { + catalogsSet.add(catalog.getName()); + } + + int catalogIndex = 0; + for (String catalog : catalogsSet) { + catalogNameVector.setSafe(catalogIndex, new Text(catalog)); + catalogIndex++; + } + vectorSchemaRoot.setRowCount(catalogIndex); + } + + /** + * for FlightSqlProducer Schemas.GET_SCHEMAS_SCHEMA + */ + public void getSchemas(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name"); + + TGetDbsResult getDbsResult = getDbNames(); + for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); dbIndex++) { + String dbName = getDbsResult.getDbs().get(dbIndex); + String catalogName = getDbsResult.isSetCatalogs() ? getDbsResult.getCatalogs().get(dbIndex) : ""; + catalogNameVector.setSafe(dbIndex, new Text(catalogName)); + schemaNameVector.setSafe(dbIndex, new Text(dbName)); + } + vectorSchemaRoot.setRowCount(getDbsResult.getDbs().size()); + } + + /** + * for FlightSqlProducer Schemas.GET_TABLES_SCHEMA_NO_SCHEMA and Schemas.GET_TABLES_SCHEMA + */ + public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name"); + VarCharVector tableNameVector = (VarCharVector) vectorSchemaRoot.getVector("table_name"); + VarCharVector tableTypeVector = (VarCharVector) vectorSchemaRoot.getVector("table_type"); + VarBinaryVector schemaVector = (VarBinaryVector) vectorSchemaRoot.getVector("table_schema"); + + int tablesCount = 0; + TGetDbsResult getDbsResult = getDbNames(); + for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); dbIndex++) { + String dbName = getDbsResult.getDbs().get(dbIndex); + String catalogName = getDbsResult.isSetCatalogs() ? getDbsResult.getCatalogs().get(dbIndex) : ""; + TListTableStatusResult listTableStatusResult = listTableStatus(dbName, catalogName); + + Map> tableToFields; + if (includeSchema) { + List tablesName = new ArrayList<>(); + for (TTableStatus tableStatus : listTableStatusResult.getTables()) { + tablesName.add(tableStatus.getName()); + } + TDescribeTablesResult describeTablesResult = describeTables(dbName, catalogName, tablesName); + tableToFields = buildTableToFields(dbName, describeTablesResult, tablesName); + } else { + tableToFields = null; + } + + for (TTableStatus tableStatus : listTableStatusResult.getTables()) { + catalogNameVector.setSafe(tablesCount, new Text(catalogName)); + schemaNameVector.setSafe(tablesCount, new Text(dbName)); + tableNameVector.setSafe(tablesCount, new Text(tableStatus.getName())); + tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType())); + if (includeSchema) { + List fields = tableToFields.get(tableStatus.getName()); + schemaVector.setSafe(tablesCount, getSerializedSchema(fields)); + } + tablesCount++; + } + } + vectorSchemaRoot.setRowCount(tablesCount); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java index b605dff66b6a21..c93e0b5a309442 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java @@ -49,7 +49,12 @@ public static FlightAuthResult authenticateCredentials(String username, String p Logger logger) { try { List currentUserIdentity = Lists.newArrayList(); - + // If the password is empty, DBeaver will pass "null" string for authentication. + // This behavior of DBeaver is strange, but we have to be compatible with it, of course, + // it may be a problem with Arrow Flight Jdbc driver. + // Here, "null" is converted to null, if user's password is really the string "null", + // authentication will fail. Usually, the user's password will not be "null", let's hope so. + password = (password.equals("null")) ? null : password; Env.getCurrentEnv().getAuth().checkPlainPassword(username, remoteIp, password, currentUserIdentity); Preconditions.checkState(currentUserIdentity.size() == 1); return FlightAuthResult.of(username, currentUserIdentity.get(0), remoteIp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java new file mode 100644 index 00000000000000..61adc797cc5dc4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight.results; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TUniqueId; + +import java.util.ArrayList; + +public class FlightSqlEndpointsLocation { + private TUniqueId finstId; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private TNetworkAddress resultPublicAccessAddr; + private ArrayList resultOutputExprs; + + public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress resultFlightServerAddr, + TNetworkAddress resultInternalServiceAddr, ArrayList resultOutputExprs) { + this.finstId = finstId; + this.resultFlightServerAddr = resultFlightServerAddr; + this.resultInternalServiceAddr = resultInternalServiceAddr; + this.resultPublicAccessAddr = new TNetworkAddress(); + this.resultOutputExprs = resultOutputExprs; + } + + public TUniqueId getFinstId() { + return finstId; + } + + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public void setResultPublicAccessAddr(TNetworkAddress resultPublicAccessAddr) { + this.resultPublicAccessAddr = resultPublicAccessAddr; + } + + public TNetworkAddress getResultPublicAccessAddr() { + return resultPublicAccessAddr; + } + + public ArrayList getResultOutputExprs() { + return resultOutputExprs; + } +} diff --git a/regression-test/suites/query_p0/aggregate/aggregate_count1.groovy b/regression-test/suites/query_p0/aggregate/aggregate_count1.groovy index cf657cc8ef3e3d..3971f304e38646 100644 --- a/regression-test/suites/query_p0/aggregate/aggregate_count1.groovy +++ b/regression-test/suites/query_p0/aggregate/aggregate_count1.groovy @@ -17,7 +17,7 @@ * under the License. */ -suite("aggregate_count1", "query") { +suite("aggregate_count1", "query,arrow_flight_sql") { sql """ DROP TABLE IF EXISTS aggregate_count1 """ sql """create table if not exists aggregate_count1 ( name varchar(128), diff --git a/regression-test/suites/query_p0/aggregate/select_distinct.groovy b/regression-test/suites/query_p0/aggregate/select_distinct.groovy index 6456158bdadb0d..2d6a8679d87ed8 100644 --- a/regression-test/suites/query_p0/aggregate/select_distinct.groovy +++ b/regression-test/suites/query_p0/aggregate/select_distinct.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("select_distinct") { +suite("select_distinct", "arrow_flight_sql") { sql """DROP TABLE IF EXISTS decimal_a;""" sql """DROP TABLE IF EXISTS decimal_b;""" sql """DROP TABLE IF EXISTS decimal_c;""" diff --git a/regression-test/suites/query_p0/casesensetive_column/join_with_column_casesensetive.groovy b/regression-test/suites/query_p0/casesensetive_column/join_with_column_casesensetive.groovy index 45499fc6f248a2..8bd3b19088a486 100644 --- a/regression-test/suites/query_p0/casesensetive_column/join_with_column_casesensetive.groovy +++ b/regression-test/suites/query_p0/casesensetive_column/join_with_column_casesensetive.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("join_with_column_casesensetive") { +suite("join_with_column_casesensetive", "arrow_flight_sql") { def tables=["ad_order_data_v1","ad_order_data"] for (String table in tables) { diff --git a/regression-test/suites/query_p0/cast/test_cast.groovy b/regression-test/suites/query_p0/cast/test_cast.groovy index 947d61bc828861..dae669e2965f04 100644 --- a/regression-test/suites/query_p0/cast/test_cast.groovy +++ b/regression-test/suites/query_p0/cast/test_cast.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite('test_cast') { +suite('test_cast', "arrow_flight_sql") { def date = "date '2020-01-01'" def datev2 = "datev2 '2020-01-01'" def datetime = "timestamp '2020-01-01 12:34:45'" diff --git a/regression-test/suites/query_p0/except/test_query_except.groovy b/regression-test/suites/query_p0/except/test_query_except.groovy index 1a2aa742d2910d..410e24f89b92de 100644 --- a/regression-test/suites/query_p0/except/test_query_except.groovy +++ b/regression-test/suites/query_p0/except/test_query_except.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_query_except") { +suite("test_query_except", "arrow_flight_sql") { // test query except, depend on query_test_data_load.groovy sql "use test_query_db" qt_select_except1 """ diff --git a/regression-test/suites/query_p0/group_concat/test_group_concat.groovy b/regression-test/suites/query_p0/group_concat/test_group_concat.groovy index 5054dc2ee3a91d..522d66ed64b30b 100644 --- a/regression-test/suites/query_p0/group_concat/test_group_concat.groovy +++ b/regression-test/suites/query_p0/group_concat/test_group_concat.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_group_concat", "query,p0") { +suite("test_group_concat", "query,p0,arrow_flight_sql") { qt_select """ SELECT group_concat(k6) FROM test_query_db.test where k6='false' """ diff --git a/regression-test/suites/query_p0/grouping_sets/test_grouping_sets1.groovy b/regression-test/suites/query_p0/grouping_sets/test_grouping_sets1.groovy index 1f12de6628a5eb..f8180b0ab43846 100644 --- a/regression-test/suites/query_p0/grouping_sets/test_grouping_sets1.groovy +++ b/regression-test/suites/query_p0/grouping_sets/test_grouping_sets1.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_grouping_sets1") { +suite("test_grouping_sets1", "arrow_flight_sql") { qt_select """ select col1 diff --git a/regression-test/suites/query_p0/having/having.groovy b/regression-test/suites/query_p0/having/having.groovy index fb32b3834af4d6..bbad236c973870 100644 --- a/regression-test/suites/query_p0/having/having.groovy +++ b/regression-test/suites/query_p0/having/having.groovy @@ -19,7 +19,7 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate // and modified by Doris. -suite("having", "query,p0") { +suite("having", "query,p0,arrow_flight_sql") { sql """DROP TABLE IF EXISTS supplier""" sql """CREATE TABLE `supplier` ( `s_suppkey` int(11) NOT NULL, diff --git a/regression-test/suites/query_p0/intersect/test_intersect.groovy b/regression-test/suites/query_p0/intersect/test_intersect.groovy index 1c007b95d7d07d..7919bec324b876 100644 --- a/regression-test/suites/query_p0/intersect/test_intersect.groovy +++ b/regression-test/suites/query_p0/intersect/test_intersect.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_intersect") { +suite("test_intersect", "arrow_flight_sql") { qt_select """ SELECT * FROM (SELECT k1 FROM test_query_db.baseall INTERSECT SELECT k1 FROM test_query_db.test) a ORDER BY k1 diff --git a/regression-test/suites/query_p0/join/test_join2.groovy b/regression-test/suites/query_p0/join/test_join2.groovy index 6125b9a873f77e..9158133948f754 100644 --- a/regression-test/suites/query_p0/join/test_join2.groovy +++ b/regression-test/suites/query_p0/join/test_join2.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_join2", "query,p0") { +suite("test_join2", "query,p0,arrow_flight_sql") { def DBname = "regression_test_join2" def TBname1 = "J1_TBL" def TBname2 = "J2_TBL" diff --git a/regression-test/suites/query_p0/join/test_left_join1.groovy b/regression-test/suites/query_p0/join/test_left_join1.groovy index d4cbeeee65eda2..104adab4a850d0 100644 --- a/regression-test/suites/query_p0/join/test_left_join1.groovy +++ b/regression-test/suites/query_p0/join/test_left_join1.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_left_join1", "query,p0") { +suite("test_left_join1", "query,p0,arrow_flight_sql") { def tableName = "test_left_join1" sql """drop table if exists ${tableName}""" diff --git a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy index ad19e554690ee7..f99dfa042446e9 100644 --- a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy +++ b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_nestedloop_outer_join", "query_p0") { +suite("test_nestedloop_outer_join", "query_p0,arrow_flight_sql") { def tbl1 = "test_nestedloop_outer_join1" def tbl2 = "test_nestedloop_outer_join2" diff --git a/regression-test/suites/query_p0/join/test_partitioned_hash_join.groovy b/regression-test/suites/query_p0/join/test_partitioned_hash_join.groovy index cbe09ec527ffbc..676cdd06274a68 100644 --- a/regression-test/suites/query_p0/join/test_partitioned_hash_join.groovy +++ b/regression-test/suites/query_p0/join/test_partitioned_hash_join.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_partitioned_hash_join", "query,p0") { +suite("test_partitioned_hash_join", "query,p0,arrow_flight_sql") { sql "drop table if exists test_partitioned_hash_join_l" sql "drop table if exists test_partitioned_hash_join_r" sql """ create table test_partitioned_hash_join_l ( diff --git a/regression-test/suites/query_p0/lateral_view/lateral_view.groovy b/regression-test/suites/query_p0/lateral_view/lateral_view.groovy index a24623590cd0af..bfe6ca76872ea8 100644 --- a/regression-test/suites/query_p0/lateral_view/lateral_view.groovy +++ b/regression-test/suites/query_p0/lateral_view/lateral_view.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("lateral_view") { +suite("lateral_view", "arrow_flight_sql") { sql """ DROP TABLE IF EXISTS `test_explode_bitmap` """ sql """ CREATE TABLE `test_explode_bitmap` ( diff --git a/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy b/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy index da0c7231f425d1..caa75ac7be38ed 100644 --- a/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy +++ b/regression-test/suites/query_p0/limit/OffsetInSubqueryWithJoin.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_offset_in_subquery_with_join", "query") { +suite("test_offset_in_subquery_with_join", "query,arrow_flight_sql") { // define a sql table def testTable = "test_offset_in_subquery_with_join" diff --git a/regression-test/suites/query_p0/literal_view/lietral_test.groovy b/regression-test/suites/query_p0/literal_view/lietral_test.groovy index 6e9d51f0a0d610..27b82c16247a73 100644 --- a/regression-test/suites/query_p0/literal_view/lietral_test.groovy +++ b/regression-test/suites/query_p0/literal_view/lietral_test.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("literal_view_test") { +suite("literal_view_test", "arrow_flight_sql") { sql """DROP TABLE IF EXISTS table1""" diff --git a/regression-test/suites/query_p0/operator/test_set_operator.groovy b/regression-test/suites/query_p0/operator/test_set_operator.groovy index cb05e18b3e870b..7d6219585e4c4c 100644 --- a/regression-test/suites/query_p0/operator/test_set_operator.groovy +++ b/regression-test/suites/query_p0/operator/test_set_operator.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_set_operators", "query,p0") { +suite("test_set_operators", "query,p0,arrow_flight_sql") { sql """ DROP TABLE IF EXISTS t1; diff --git a/regression-test/suites/query_p0/operator/test_sort_operator.groovy b/regression-test/suites/query_p0/operator/test_sort_operator.groovy index 24a2b8ef73a424..d76daff01f6fcc 100644 --- a/regression-test/suites/query_p0/operator/test_sort_operator.groovy +++ b/regression-test/suites/query_p0/operator/test_sort_operator.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_sort_operator", "query,p0") { +suite("test_sort_operator", "query,p0,arrow_flight_sql") { sql """ DROP TABLE IF EXISTS dim_org_ful; diff --git a/regression-test/suites/query_p0/session_variable/test_default_limit.groovy b/regression-test/suites/query_p0/session_variable/test_default_limit.groovy index edda5d51790c56..2ce3b647142ae4 100644 --- a/regression-test/suites/query_p0/session_variable/test_default_limit.groovy +++ b/regression-test/suites/query_p0/session_variable/test_default_limit.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite('test_default_limit') { +suite('test_default_limit', "arrow_flight_sql") { sql 'drop table if exists baseall' sql 'drop table if exists bigtable' diff --git a/regression-test/suites/query_p0/show/test_show_create_table.groovy b/regression-test/suites/query_p0/show/test_show_create_table.groovy index 6325cbe319fd88..1e3fc7ff5cb527 100644 --- a/regression-test/suites/query_p0/show/test_show_create_table.groovy +++ b/regression-test/suites/query_p0/show/test_show_create_table.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_show_create_table", "query") { +suite("test_show_create_table", "query,arrow_flight_sql") { String tb_name = "tb_show_create_table"; try { sql """drop table if exists ${tb_name} """ diff --git a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy index 86a951d7ac33c6..cdab9472e27dbd 100644 --- a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy +++ b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_aggregate_all_functions") { +suite("test_aggregate_all_functions", "arrow_flight_sql") { sql "set batch_size = 4096" diff --git a/regression-test/suites/query_p0/sql_functions/case_function/test_case_function_null.groovy b/regression-test/suites/query_p0/sql_functions/case_function/test_case_function_null.groovy index 269a0bf0db87cf..5138db6e73b4ad 100644 --- a/regression-test/suites/query_p0/sql_functions/case_function/test_case_function_null.groovy +++ b/regression-test/suites/query_p0/sql_functions/case_function/test_case_function_null.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_case_function_null", "query,p0") { +suite("test_case_function_null", "query,p0,arrow_flight_sql") { sql """ drop table if exists case_null0 """ sql """ create table case_null0 ( `c0` decimalv3(17, 1) NULL, diff --git a/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy b/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy index 590ccd10821f61..d547e9fb287d71 100644 --- a/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/hash_functions/test_hash_function.groovy @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -suite("test_hash_function") { +suite("test_hash_function", "arrow_flight_sql") { sql "set batch_size = 4096;" sql "set enable_profile = true;" diff --git a/regression-test/suites/query_p0/sql_functions/ip_functions/test_ip_functions.groovy b/regression-test/suites/query_p0/sql_functions/ip_functions/test_ip_functions.groovy index 03e9788a58a3b8..5373217503a018 100644 --- a/regression-test/suites/query_p0/sql_functions/ip_functions/test_ip_functions.groovy +++ b/regression-test/suites/query_p0/sql_functions/ip_functions/test_ip_functions.groovy @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -suite("test_ip_functions") { +suite("test_ip_functions", "arrow_flight_sql") { sql "set batch_size = 4096;" qt_sql "SELECT ipv4_num_to_string(-1);" diff --git a/regression-test/suites/query_p0/sql_functions/json_function/test_query_json_insert.groovy b/regression-test/suites/query_p0/sql_functions/json_function/test_query_json_insert.groovy index c885e3ae3431f3..b5865034538a11 100644 --- a/regression-test/suites/query_p0/sql_functions/json_function/test_query_json_insert.groovy +++ b/regression-test/suites/query_p0/sql_functions/json_function/test_query_json_insert.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_query_json_insert", "query") { +suite("test_query_json_insert", "query,arrow_flight_sql") { qt_sql "select json_insert('{\"a\": 1, \"b\": [2, 3]}', '\$', null);" qt_sql "select json_insert('{\"k\": [1, 2]}', '\$.k[0]', null, '\$.[1]', null);" def tableName = "test_query_json_insert" diff --git a/regression-test/suites/query_p0/sql_functions/json_functions/test_json_function.groovy b/regression-test/suites/query_p0/sql_functions/json_functions/test_json_function.groovy index aa0deec96f46a2..4bd88bf131e727 100644 --- a/regression-test/suites/query_p0/sql_functions/json_functions/test_json_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/json_functions/test_json_function.groovy @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -suite("test_json_function") { +suite("test_json_function", "arrow_flight_sql") { sql "set batch_size = 4096;" qt_sql "SELECT get_json_double('{\"k1\":1.3, \"k2\":\"2\"}', \"\$.k1\");" diff --git a/regression-test/suites/query_p0/sql_functions/math_functions/test_conv.groovy b/regression-test/suites/query_p0/sql_functions/math_functions/test_conv.groovy index 6c4867174d11ac..3a74abfe9c8b22 100644 --- a/regression-test/suites/query_p0/sql_functions/math_functions/test_conv.groovy +++ b/regression-test/suites/query_p0/sql_functions/math_functions/test_conv.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_conv") { +suite("test_conv", "arrow_flight_sql") { qt_select "SELECT CONV(15,10,2)" sql """ drop table if exists test_conv; """ diff --git a/regression-test/suites/query_p0/sql_functions/search_functions/test_multi_string_search.groovy b/regression-test/suites/query_p0/sql_functions/search_functions/test_multi_string_search.groovy index 061665d3b9da6e..f1487d283dfcdf 100644 --- a/regression-test/suites/query_p0/sql_functions/search_functions/test_multi_string_search.groovy +++ b/regression-test/suites/query_p0/sql_functions/search_functions/test_multi_string_search.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_multi_string_search") { +suite("test_multi_string_search", "arrow_flight_sql") { def table_name = "test_multi_string_search_strings" sql """ DROP TABLE IF EXISTS ${table_name} """ diff --git a/regression-test/suites/query_p0/sql_functions/spatial_functions/test_gis_function.groovy b/regression-test/suites/query_p0/sql_functions/spatial_functions/test_gis_function.groovy index e98e11ba7e6888..f76cb44cb4ad4b 100644 --- a/regression-test/suites/query_p0/sql_functions/spatial_functions/test_gis_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/spatial_functions/test_gis_function.groovy @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -suite("test_gis_function") { +suite("test_gis_function", "arrow_flight_sql") { sql "set batch_size = 4096;" qt_sql "SELECT ST_AsText(ST_Point(24.7, 56.7));" diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function.groovy index f5d32653c818b5..6e18fb57eeb4cf 100644 --- a/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_string_function") { +suite("test_string_function", "arrow_flight_sql") { sql "set batch_size = 4096;" qt_sql "select elt(0, \"hello\", \"doris\");" diff --git a/regression-test/suites/query_p0/sql_functions/table_function/explode_split.groovy b/regression-test/suites/query_p0/sql_functions/table_function/explode_split.groovy index b7dd4d640799fb..53db931c03bb03 100644 --- a/regression-test/suites/query_p0/sql_functions/table_function/explode_split.groovy +++ b/regression-test/suites/query_p0/sql_functions/table_function/explode_split.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("explode_split") { +suite("explode_split", "arrow_flight_sql") { def tableName = "test_lv_str" sql """ DROP TABLE IF EXISTS ${tableName} """ diff --git a/regression-test/suites/query_p0/sql_functions/test_alias_function.groovy b/regression-test/suites/query_p0/sql_functions/test_alias_function.groovy index 8e0e94fa2df805..095ec89e220f1b 100644 --- a/regression-test/suites/query_p0/sql_functions/test_alias_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/test_alias_function.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite('test_alias_function') { +suite('test_alias_function', "arrow_flight_sql") { sql ''' CREATE ALIAS FUNCTION IF NOT EXISTS f1(DATETIMEV2(3), INT) with PARAMETER (datetime1, int1) as date_trunc(days_sub(datetime1, int1), 'day')''' diff --git a/regression-test/suites/query_p0/sql_functions/test_predicate.groovy b/regression-test/suites/query_p0/sql_functions/test_predicate.groovy index 20b3c179ad5c01..6cca6b62c9960b 100644 --- a/regression-test/suites/query_p0/sql_functions/test_predicate.groovy +++ b/regression-test/suites/query_p0/sql_functions/test_predicate.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_predicate") { +suite("test_predicate", "arrow_flight_sql") { sql """drop table if exists t1;""" sql """ create table t1 ( diff --git a/regression-test/suites/query_p0/sql_functions/width_bucket_fuctions/test_width_bucket_function.groovy b/regression-test/suites/query_p0/sql_functions/width_bucket_fuctions/test_width_bucket_function.groovy index d0862a580ca600..1a455da92446f8 100644 --- a/regression-test/suites/query_p0/sql_functions/width_bucket_fuctions/test_width_bucket_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/width_bucket_fuctions/test_width_bucket_function.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_width_bucket_function") { +suite("test_width_bucket_function", "arrow_flight_sql") { qt_sql "select width_bucket(1, 2, 3, 2)" qt_sql "select width_bucket(null, 2, 3, 2)" qt_sql "select width_bucket(6, 2, 6, 4)" diff --git a/regression-test/suites/query_p0/subquery/test_subquery2.groovy b/regression-test/suites/query_p0/subquery/test_subquery2.groovy index e572459cc72fe3..a14a44fa152b97 100644 --- a/regression-test/suites/query_p0/subquery/test_subquery2.groovy +++ b/regression-test/suites/query_p0/subquery/test_subquery2.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_subquery2") { +suite("test_subquery2", "arrow_flight_sql") { sql """DROP TABLE IF EXISTS subquerytest2""" diff --git a/regression-test/suites/query_p0/test_data_type_marks.groovy b/regression-test/suites/query_p0/test_data_type_marks.groovy index 79803d98723313..51fb7c9614e488 100644 --- a/regression-test/suites/query_p0/test_data_type_marks.groovy +++ b/regression-test/suites/query_p0/test_data_type_marks.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_data_type_marks") { +suite("test_data_type_marks", "arrow_flight_sql") { def tbName = "org" sql "DROP TABLE IF EXISTS ${tbName}" sql """ diff --git a/regression-test/suites/query_p0/test_dict_with_null.groovy b/regression-test/suites/query_p0/test_dict_with_null.groovy index b3738bb68aa1ba..83d253fa4d1b04 100644 --- a/regression-test/suites/query_p0/test_dict_with_null.groovy +++ b/regression-test/suites/query_p0/test_dict_with_null.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("dict_with_null", "query") { +suite("dict_with_null", "query,arrow_flight_sql") { def tableName = "test_dict_with_null" sql "DROP TABLE IF EXISTS ${tableName}" sql """ diff --git a/regression-test/suites/query_p0/test_orderby_nullliteral.groovy b/regression-test/suites/query_p0/test_orderby_nullliteral.groovy index fe11c778af0b98..e806060c8bcb1c 100644 --- a/regression-test/suites/query_p0/test_orderby_nullliteral.groovy +++ b/regression-test/suites/query_p0/test_orderby_nullliteral.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("orderby_nullliteral", "query") { +suite("orderby_nullliteral", "query,arrow_flight_sql") { def tableName = "test_orderby_nullliteral" sql "DROP TABLE IF EXISTS ${tableName}" diff --git a/regression-test/suites/query_p0/test_select_constant.groovy b/regression-test/suites/query_p0/test_select_constant.groovy index 6015e19576c690..68f0a28a20e853 100644 --- a/regression-test/suites/query_p0/test_select_constant.groovy +++ b/regression-test/suites/query_p0/test_select_constant.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_select_constant") { +suite("test_select_constant", "arrow_flight_sql") { qt_select1 'select 100, "test", date("2021-01-02");' qt_select_geo1 'SELECT ST_AsText(ST_Point(123.12345678901234567890,89.1234567890));' } diff --git a/regression-test/suites/query_p0/test_select_with_predicate_like.groovy b/regression-test/suites/query_p0/test_select_with_predicate_like.groovy index 9491c4271ca530..0d01f1b958a11c 100644 --- a/regression-test/suites/query_p0/test_select_with_predicate_like.groovy +++ b/regression-test/suites/query_p0/test_select_with_predicate_like.groovy @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -suite("test_select_with_predicate_like") { +suite("test_select_with_predicate_like", "arrow_flight_sql") { def tables=["test_basic_agg"] for (String table in tables) { diff --git a/regression-test/suites/query_p0/test_select_with_predicate_prune.groovy b/regression-test/suites/query_p0/test_select_with_predicate_prune.groovy index 768e04b4c327b5..ccd1b9160fb148 100644 --- a/regression-test/suites/query_p0/test_select_with_predicate_prune.groovy +++ b/regression-test/suites/query_p0/test_select_with_predicate_prune.groovy @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -suite("test_select_with_predicate_prune") { +suite("test_select_with_predicate_prune", "arrow_flight_sql") { sql """ drop table if exists `test_select_with_predicate_prune`; """ diff --git a/regression-test/suites/query_p0/type_inference/test_largeint.groovy b/regression-test/suites/query_p0/type_inference/test_largeint.groovy index d5cbfa4b479838..161359cfa97e72 100644 --- a/regression-test/suites/query_p0/type_inference/test_largeint.groovy +++ b/regression-test/suites/query_p0/type_inference/test_largeint.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_largeint") { +suite("test_largeint", "arrow_flight_sql") { def tbName = "test_largeint" sql "DROP TABLE IF EXISTS ${tbName}" sql """ diff --git a/regression-test/suites/query_p0/with/test_with_and_two_phase_agg.groovy b/regression-test/suites/query_p0/with/test_with_and_two_phase_agg.groovy index 99164a999c557e..d563ef1630517d 100644 --- a/regression-test/suites/query_p0/with/test_with_and_two_phase_agg.groovy +++ b/regression-test/suites/query_p0/with/test_with_and_two_phase_agg.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_with_and_two_phase_agg") { +suite("test_with_and_two_phase_agg", "arrow_flight_sql") { def tableName = "test_with_and_two_phase_agg_table" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """