From de576726c1c2c49da6bfa47903aeb03c7485b3c9 Mon Sep 17 00:00:00 2001 From: Zou Xinyi Date: Tue, 31 Dec 2024 17:53:06 +0800 Subject: [PATCH] 1 --- .../arrowflight/DorisFlightSqlProducer.java | 72 +++- .../arrowflight/FlightSqlSchemaHelper.java | 389 ++++++++++++++++++ .../arrowflight/auth2/FlightAuthUtils.java | 7 +- 3 files changed, 456 insertions(+), 12 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index b968ab04c57c83..59676639ad18b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -72,10 +72,12 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -131,21 +133,19 @@ private void getStreamStatementResult(String handle, ServerStreamListener listen String[] handleParts = handle.split(":"); String executedPeerIdentity = handleParts[0]; String queryId = handleParts[1]; + // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity); try { - // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull( connectContext.getFlightSqlChannel().getResult(queryId)); final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot(); listener.start(vectorSchemaRoot); listener.putNext(); } catch (Exception e) { - listener.error(e); String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); - throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + handleStreamException(e, errMsg, listener); } finally { listener.completed(); // The result has been sent or sent failed, delete it. @@ -280,7 +280,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } finally { connectContext.setCommand(MysqlCommand.COM_SLEEP); @@ -361,7 +361,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage( e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); return; } catch (final Throwable t) { @@ -407,7 +407,7 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate } catch (Exception e) { String errMsg = "acceptPutPreparedStatementUpdate failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } }; @@ -451,7 +451,21 @@ public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final @Override public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCatalogs unimplemented").toRuntimeException(); + try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA, + rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + // Only show Internal Catalog, which is consistent with `jdbc:mysql`. + // Otherwise, if the configured ExternalCatalog cannot be connected, + // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. + catalogNameVector.setSafe(0, new Text("internal")); + vectorSchemaRoot.setRowCount(1); + listener.putNext(); + listener.completed(); + } catch (final Exception ex) { + handleStreamException(ex, "", listener); + } } @Override @@ -463,7 +477,22 @@ public FlightInfo getFlightInfoSchemas(final CommandGetDbSchemas request, final @Override public void getStreamSchemas(final CommandGetDbSchemas command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + flightSqlSchemaHelper.setParameterForGetDbSchemas(command); + final Schema schema = Schemas.GET_SCHEMAS_SCHEMA; + + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getSchemas(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -479,7 +508,23 @@ public FlightInfo getFlightInfoTables(final CommandGetTables request, final Call @Override public void getStreamTables(final CommandGetTables command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + flightSqlSchemaHelper.setParameterForGetTables(command); + final Schema schema = command.getIncludeSchema() ? Schemas.GET_TABLES_SCHEMA + : Schemas.GET_TABLES_SCHEMA_NO_SCHEMA; + + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getTables(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -545,9 +590,14 @@ public void getStreamCrossReference(CommandGetCrossReference command, CallContex private FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, final Schema schema) { final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); - // TODO Support multiple endpoints. final List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); return new FlightInfo(schema, descriptor, endpoints, -1, -1); } + + private static void handleStreamException(Exception e, String errMsg, ServerStreamListener listener) { + LOG.error(errMsg, e); + listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); + throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java new file mode 100644 index 00000000000000..eec1c47d9c21ca --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.service.FrontendServiceImpl; +import org.apache.doris.thrift.TColumnDef; +import org.apache.doris.thrift.TColumnDesc; +import org.apache.doris.thrift.TDescribeTablesParams; +import org.apache.doris.thrift.TDescribeTablesResult; +import org.apache.doris.thrift.TGetDbsParams; +import org.apache.doris.thrift.TGetDbsResult; +import org.apache.doris.thrift.TGetTablesParams; +import org.apache.doris.thrift.TListTableStatusResult; +import org.apache.doris.thrift.TTableStatus; + +import com.google.common.base.Preconditions; +import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; +import org.apache.arrow.flight.sql.FlightSqlColumnMetadata; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.complex.BaseRepeatedValueVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FlightSqlSchemaHelper { + private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class); + private final ConnectContext ctx; + private final FrontendServiceImpl impl; + private boolean includeSchema; + private String catalogFilterPattern; + private String dbSchemaFilterPattern; + private String tableNameFilterPattern; + private List tableTypesList; + private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); + + public FlightSqlSchemaHelper(ConnectContext context) { + ctx = context; + impl = new FrontendServiceImpl(ExecuteEnv.getInstance()); + } + + /** + * Set in the Tables request object the parameter that user passed via CommandGetTables. + */ + public void setParameterForGetTables(CommandGetTables command) { + includeSchema = command.getIncludeSchema(); + // Only show Internal Catalog, which is consistent with `jdbc:mysql`. + // Otherwise, if the configured ExternalCatalog cannot be connected, + // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + if (command.hasTableNameFilterPattern()) { + if (command.getTableNameFilterPattern().contains(".")) { + Preconditions.checkState(command.getTableNameFilterPattern().split("\\.", -1).length == 2); + dbSchemaFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[0]; + tableNameFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[1]; + } else { + tableNameFilterPattern = command.getTableNameFilterPattern(); + } + } else { + tableNameFilterPattern = null; + } + tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); + } + + /** + * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. + */ + public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + } + + /** + * Call FrontendServiceImpl->getDbNames. + */ + private TGetDbsResult getDbNames() throws TException { + TGetDbsParams getDbsParams = new TGetDbsParams(); + getDbsParams.setCatalog(catalogFilterPattern); + if (dbSchemaFilterPattern != null) { + getDbsParams.setPattern(dbSchemaFilterPattern); + } + getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.getDbNames(getDbsParams); + } + + /** + * Call FrontendServiceImpl->listTableStatus. + */ + private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { + TGetTablesParams getTablesParams = new TGetTablesParams(); + getTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + getTablesParams.setCatalog(catalogName); + } + if (tableNameFilterPattern != null) { + getTablesParams.setPattern(tableNameFilterPattern); + } + if (tableTypesList != null) { + getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. + } + getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.listTableStatus(getTablesParams); + } + + /** + * Call FrontendServiceImpl->describeTables. + */ + private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) + throws TException { + TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); + describeTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + describeTablesParams.setCatalog(catalogName); + } + describeTablesParams.setTablesName(tablesName); + describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.describeTables(describeTablesParams); + } + + /** + * Convert Doris data type to an arrowType. + *

+ * Ref: `convert_to_arrow_type` in be/src/util/arrow/row_batch.cpp. + * which is consistent with the type of Arrow data returned by Doris Arrow Flight Sql query. + */ + private static ArrowType getArrowType(PrimitiveType primitiveType, Integer precision, Integer scale, + String timeZone) { + switch (primitiveType) { + case BOOLEAN: + return new ArrowType.Bool(); + case TINYINT: + return new ArrowType.Int(8, true); + case SMALLINT: + return new ArrowType.Int(16, true); + case INT: + case IPV4: + return new ArrowType.Int(32, true); + case BIGINT: + return new ArrowType.Int(64, true); + case FLOAT: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case LARGEINT: + case VARCHAR: + case STRING: + case CHAR: + case DATETIME: + case DATE: + case JSONB: + case IPV6: + case VARIANT: + return new ArrowType.Utf8(); + case DATEV2: + return new ArrowType.Date(DateUnit.MILLISECOND); + case DATETIMEV2: + if (scale > 3) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZone); + } else if (scale > 0) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, timeZone); + } else { + return new ArrowType.Timestamp(TimeUnit.SECOND, timeZone); + } + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return new ArrowType.Decimal(precision, scale, 128); + case DECIMAL256: + return new ArrowType.Decimal(precision, scale, 256); + case DECIMALV2: + return new ArrowType.Decimal(27, 9, 128); + case HLL: + case BITMAP: + case QUANTILE_STATE: + return new ArrowType.Binary(); + case MAP: + return new ArrowType.Map(false); + case ARRAY: + return new ArrowType.List(); + case STRUCT: + return new ArrowType.Struct(); + default: + return new ArrowType.Null(); + } + } + + private static ArrowType columnDescToArrowType(final TColumnDesc desc) { + PrimitiveType primitiveType = PrimitiveType.fromThrift(desc.getColumnType()); + Integer precision = desc.isSetColumnPrecision() ? desc.getColumnPrecision() : null; + Integer scale = desc.isSetColumnScale() ? desc.getColumnScale() : null; + // TODO there is no timezone in TColumnDesc, so use current timezone. + String timeZone = JdbcToArrowUtils.getUtcCalendar().getTimeZone().getID(); + return getArrowType(primitiveType, precision, scale, timeZone); + } + + private static Map createFlightSqlColumnMetadata(final String dbName, final String tableName, + final TColumnDesc desc) { + final FlightSqlColumnMetadata.Builder columnMetadataBuilder = new FlightSqlColumnMetadata.Builder().schemaName( + dbName).tableName(tableName).typeName(PrimitiveType.fromThrift(desc.getColumnType()).toString()) + .isAutoIncrement(false).isCaseSensitive(false).isReadOnly(true).isSearchable(true); + + if (desc.isSetColumnPrecision()) { + columnMetadataBuilder.precision(desc.getColumnPrecision()); + } + if (desc.isSetColumnScale()) { + columnMetadataBuilder.scale(desc.getColumnScale()); + } + return columnMetadataBuilder.build().getMetadataMap(); + } + + /** + * Construct > + */ + private Map> buildTableToFields(String dbName, TDescribeTablesResult describeTablesResult, + List tablesName) { + Map> tableToFields = new HashMap<>(); + int columnIndex = 0; + for (int tableIndex = 0; tableIndex < describeTablesResult.getTablesOffsetSize(); tableIndex++) { + String tableName = tablesName.get(tableIndex); + final List fields = new ArrayList<>(); + Integer tableOffset = describeTablesResult.getTablesOffset().get(tableIndex); + for (; columnIndex < tableOffset; columnIndex++) { + TColumnDef columnDef = describeTablesResult.getColumns().get(columnIndex); + TColumnDesc columnDesc = columnDef.getColumnDesc(); + final ArrowType columnArrowType = columnDescToArrowType(columnDesc); + + List columnArrowTypeChildren; + // Arrow complex types may require children fields for parsing the schema on C++ + switch (columnArrowType.getTypeID()) { + case List: + case LargeList: + case FixedSizeList: + columnArrowTypeChildren = Collections.singletonList( + Field.notNullable(BaseRepeatedValueVector.DATA_VECTOR_NAME, + ZeroVector.INSTANCE.getField().getType())); + break; + case Map: + columnArrowTypeChildren = Collections.singletonList( + Field.notNullable(MapVector.DATA_VECTOR_NAME, new ArrowType.List())); + break; + case Struct: + columnArrowTypeChildren = Collections.emptyList(); + break; + default: + columnArrowTypeChildren = null; + break; + } + + final Field field = new Field(columnDesc.getColumnName(), + new FieldType(columnDesc.isIsAllowNull(), columnArrowType, null, + createFlightSqlColumnMetadata(dbName, tableName, columnDesc)), columnArrowTypeChildren); + fields.add(field); + } + tableToFields.put(tableName, fields); + } + return tableToFields; + } + + protected static byte[] getSerializedSchema(List fields) { + if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { + fields = Collections.emptyList(); + } else if (fields == null) { + return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); + } + + final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); + final Schema schema = new Schema(fields); + + try { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); + } catch (final IOException e) { + throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + } + + return columnOutputStream.toByteArray(); + } + + /** + * for FlightSqlProducer Schemas.GET_SCHEMAS_SCHEMA + */ + public void getSchemas(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name"); + + TGetDbsResult getDbsResult = getDbNames(); + for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); dbIndex++) { + String dbName = getDbsResult.getDbs().get(dbIndex); + String catalogName = getDbsResult.isSetCatalogs() ? getDbsResult.getCatalogs().get(dbIndex) : ""; + catalogNameVector.setSafe(dbIndex, new Text(catalogName)); + schemaNameVector.setSafe(dbIndex, new Text(dbName)); + } + vectorSchemaRoot.setRowCount(getDbsResult.getDbs().size()); + } + + /** + * for FlightSqlProducer Schemas.GET_TABLES_SCHEMA_NO_SCHEMA and Schemas.GET_TABLES_SCHEMA + */ + public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name"); + VarCharVector tableNameVector = (VarCharVector) vectorSchemaRoot.getVector("table_name"); + VarCharVector tableTypeVector = (VarCharVector) vectorSchemaRoot.getVector("table_type"); + VarBinaryVector schemaVector = (VarBinaryVector) vectorSchemaRoot.getVector("table_schema"); + + int tablesCount = 0; + TGetDbsResult getDbsResult = getDbNames(); + for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); dbIndex++) { + String dbName = getDbsResult.getDbs().get(dbIndex); + String catalogName = getDbsResult.isSetCatalogs() ? getDbsResult.getCatalogs().get(dbIndex) : ""; + TListTableStatusResult listTableStatusResult = listTableStatus(dbName, catalogName); + + Map> tableToFields; + if (includeSchema) { + List tablesName = new ArrayList<>(); + for (TTableStatus tableStatus : listTableStatusResult.getTables()) { + tablesName.add(tableStatus.getName()); + } + TDescribeTablesResult describeTablesResult = describeTables(dbName, catalogName, tablesName); + tableToFields = buildTableToFields(dbName, describeTablesResult, tablesName); + } else { + tableToFields = null; + } + + for (TTableStatus tableStatus : listTableStatusResult.getTables()) { + catalogNameVector.setSafe(tablesCount, new Text(catalogName)); + schemaNameVector.setSafe(tablesCount, new Text(dbName)); + // DBeaver uses `Arrow Flight SQL JDBC Driver Core [16.1.0]` driver to connect to Doris. + // The metadata only shows one layer of `Tables`. All tables will be displayed together, + // so the database name and table name are spelled together for distinction. + // When DBeaver uses `MySQL Connector/J [mysql-connector-j-8.2.0` driver to connect to Doris, + // the metadata will show two layers of `Databases - Tables`. + // + // TODO, show two layers of original data `Databases - Tables` in DBeaver. + tableNameVector.setSafe(tablesCount, new Text(dbName + "." + tableStatus.getName())); + tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType())); + if (includeSchema) { + List fields = tableToFields.get(tableStatus.getName()); + schemaVector.setSafe(tablesCount, getSerializedSchema(fields)); + } + tablesCount++; + } + } + vectorSchemaRoot.setRowCount(tablesCount); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java index b605dff66b6a21..c93e0b5a309442 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java @@ -49,7 +49,12 @@ public static FlightAuthResult authenticateCredentials(String username, String p Logger logger) { try { List currentUserIdentity = Lists.newArrayList(); - + // If the password is empty, DBeaver will pass "null" string for authentication. + // This behavior of DBeaver is strange, but we have to be compatible with it, of course, + // it may be a problem with Arrow Flight Jdbc driver. + // Here, "null" is converted to null, if user's password is really the string "null", + // authentication will fail. Usually, the user's password will not be "null", let's hope so. + password = (password.equals("null")) ? null : password; Env.getCurrentEnv().getAuth().checkPlainPassword(username, remoteIp, password, currentUserIdentity); Preconditions.checkState(currentUserIdentity.size() == 1); return FlightAuthResult.of(username, currentUserIdentity.get(0), remoteIp);