diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 6e7f45aaa415c6..e083e43f898176 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -78,4 +78,6 @@ NotificationEventResponse getNextNotification(long lastEventId, void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs); + + String getCatalogLocation(String catalogName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index afbd929c5a1417..b1cf3d8fac6e25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -474,4 +474,9 @@ protected String getDatabaseQuery() { protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } + + @Override + public String getCatalogLocation(String catalogName) { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index abb3fda24b3605..f0294069fde9b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.DataOperationType; @@ -468,5 +469,20 @@ private ThriftHMSClient getClient() throws MetaException { Thread.currentThread().setContextClassLoader(classLoader); } } + + @Override + public String getCatalogLocation(String catalogName) { + try (ThriftHMSClient client = getClient()) { + try { + Catalog catalog = client.client.getCatalog(catalogName); + return catalog.getLocationUri(); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to get location for %s from hms client", e, catalogName); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index c8ff468ab29d2b..8cdde9801107c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,14 +17,28 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.external.iceberg.util.DorisTypeToType; +import org.apache.doris.external.iceberg.util.DorisTypeVisitor; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -32,7 +46,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public abstract class IcebergExternalCatalog extends ExternalCatalog { @@ -108,4 +124,52 @@ public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) { .getIcebergMetadataCache() .getIcebergTable(catalog, id, dbName, tblName, getProperties()); } + + @Override + public void createDb(CreateDbStmt stmt) throws DdlException { + makeSureInitialized(); + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + String dbName = stmt.getFullDbName(); + Map properties = stmt.getProperties(); + nsCatalog.createNamespace(Namespace.of(dbName), properties); + // TODO 增加刷新流程,否则create之后,show不出来,只能refresh之后才能show出来 + } + + @Override + public void dropDb(DropDbStmt stmt) throws DdlException { + makeSureInitialized(); + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + String dbName = stmt.getDbName(); + if (dbNameToId.containsKey(dbName)) { + Long aLong = dbNameToId.get(dbName); + idToDb.remove(aLong); + dbNameToId.remove(dbName); + } + nsCatalog.dropNamespace(Namespace.of(dbName)); + } + + @Override + public void createTable(CreateTableStmt stmt) throws UserException { + makeSureInitialized(); + String dbName = stmt.getDbName(); + String tableName = stmt.getTableName(); + List columns = stmt.getColumns(); + List collect = columns.stream() + .map(col -> new StructField(col.getName(), col.getType(), col.getComment(), col.isAllowNull())) + .collect(Collectors.toList()); + StructType structType = new StructType(new ArrayList<>(collect)); + org.apache.iceberg.types.Type visit = DorisTypeVisitor.visit(structType, new DorisTypeToType(structType)); + Schema schema = new Schema(visit.asNestedType().asStructType().fields()); + Map properties = stmt.getProperties(); + PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema); + catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + makeSureInitialized(); + String dbName = stmt.getDbName(); + String tableName = stmt.getTableName(); + catalog.dropTable(TableIdentifier.of(dbName, tableName)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index 0300477361bf46..2c0e18e56f5935 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -17,13 +17,22 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.catalog.AuthType; +import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.Config; import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.HMSClientException; +import org.apache.doris.datasource.hive.HMSCachedClient; +import org.apache.doris.datasource.hive.HMSCachedClientFactory; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.hive.HiveCatalog; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -44,8 +53,34 @@ protected void initLocalObjectsImpl() { // initialize hive catalog Map catalogProperties = new HashMap<>(); String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); - catalogProperties.put(CatalogProperties.URI, metastoreUris); + HiveConf hiveConf = new HiveConf(); + for (Map.Entry kv : catalogProperty.getHadoopProperties().entrySet()) { + hiveConf.set(kv.getKey(), kv.getValue()); + } + hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), + String.valueOf(Config.hive_metastore_client_timeout_second)); + String authentication = catalogProperty.getOrDefault( + HdfsResource.HADOOP_SECURITY_AUTHENTICATION, ""); + if (AuthType.KERBEROS.getDesc().equals(authentication)) { + hiveConf.set(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authentication); + UserGroupInformation.setConfiguration(hiveConf); + try { + /** + * Because metastore client is created by using + * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} + * it will relogin when TGT is expired, so we don't need to relogin manually. + */ + UserGroupInformation.loginUserFromKeytab( + catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, ""), + catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_KEYTAB, "")); + } catch (IOException e) { + throw new HMSClientException("login with kerberos auth failed for catalog %s", e, this.getName()); + } + } + HMSCachedClient cachedClient = HMSCachedClientFactory.createCachedClient(hiveConf, 1, null); + String location = cachedClient.getCatalogLocation("hive"); + catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, location); hiveCatalog.initialize(icebergCatalogType, catalogProperties); catalog = hiveCatalog; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java index 531de582ea53d6..52dd7446cc25ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/DorisTypeToType.java @@ -24,11 +24,13 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; +import com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import java.util.List; + /** * Convert Doris type to Iceberg type */ @@ -54,8 +56,21 @@ private int getNextId() { @Override public Type struct(StructType struct, List types) { - throw new UnsupportedOperationException( - "Not a supported type: " + struct.toSql(0)); + List fields = struct.getFields(); + List newFields = Lists.newArrayListWithExpectedSize(fields.size()); + boolean isRoot = root == struct; + for (int i = 0; i < fields.size(); i++) { + StructField field = fields.get(i); + Type type = types.get(i); + + int id = isRoot ? i : getNextId(); + if (field.getContainsNull()) { + newFields.add(Types.NestedField.optional(id, field.getName(), type, field.getComment())); + } else { + newFields.add(Types.NestedField.required(id, field.getName(), type, field.getComment())); + } + } + return Types.StructType.of(newFields); } @Override @@ -65,52 +80,60 @@ public Type field(StructField field, Type typeResult) { @Override public Type array(ArrayType array, Type elementType) { - throw new UnsupportedOperationException( - "Not a supported type: " + array.toSql(0)); + if (array.getContainsNull()) { + return Types.ListType.ofOptional(getNextId(), elementType); + } else { + return Types.ListType.ofRequired(getNextId(), elementType); + } } @Override public Type map(MapType map, Type keyType, Type valueType) { - throw new UnsupportedOperationException( - "Not a supported type: " + map.toSql(0)); + if (map.getIsValueContainsNull()) { + return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType); + } else { + return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType); + } } @Override public Type atomic(org.apache.doris.catalog.Type atomic) { - if (atomic.getPrimitiveType().equals(PrimitiveType.BOOLEAN)) { + PrimitiveType primitiveType = atomic.getPrimitiveType(); + if (primitiveType.equals(PrimitiveType.BOOLEAN)) { return Types.BooleanType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.TINYINT) - || atomic.getPrimitiveType().equals(PrimitiveType.SMALLINT) - || atomic.getPrimitiveType().equals(PrimitiveType.INT)) { + } else if (primitiveType.equals(PrimitiveType.TINYINT) + || primitiveType.equals(PrimitiveType.SMALLINT) + || primitiveType.equals(PrimitiveType.INT)) { return Types.IntegerType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.BIGINT) - || atomic.getPrimitiveType().equals(PrimitiveType.LARGEINT)) { + } else if (primitiveType.equals(PrimitiveType.BIGINT) + || primitiveType.equals(PrimitiveType.LARGEINT)) { return Types.LongType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.FLOAT)) { + } else if (primitiveType.equals(PrimitiveType.FLOAT)) { return Types.FloatType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.DOUBLE)) { + } else if (primitiveType.equals(PrimitiveType.DOUBLE)) { return Types.DoubleType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.CHAR) - || atomic.getPrimitiveType().equals(PrimitiveType.VARCHAR)) { + } else if (primitiveType.equals(PrimitiveType.CHAR) + || primitiveType.equals(PrimitiveType.VARCHAR) + || primitiveType.equals(PrimitiveType.STRING)) { return Types.StringType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.DATE) - || atomic.getPrimitiveType().equals(PrimitiveType.DATEV2)) { + } else if (primitiveType.equals(PrimitiveType.DATE) + || primitiveType.equals(PrimitiveType.DATEV2)) { return Types.DateType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.TIME) - || atomic.getPrimitiveType().equals(PrimitiveType.TIMEV2)) { + } else if (primitiveType.equals(PrimitiveType.TIME) + || primitiveType.equals(PrimitiveType.TIMEV2)) { return Types.TimeType.get(); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.DECIMALV2) - || atomic.getPrimitiveType().isDecimalV3Type()) { + } else if (primitiveType.equals(PrimitiveType.DECIMALV2) + || primitiveType.isDecimalV3Type()) { return Types.DecimalType.of( ((ScalarType) atomic).getScalarPrecision(), ((ScalarType) atomic).getScalarScale()); - } else if (atomic.getPrimitiveType().equals(PrimitiveType.DATETIME) - || atomic.getPrimitiveType().equals(PrimitiveType.DATETIMEV2)) { - return Types.TimestampType.withZone(); + } else if (primitiveType.equals(PrimitiveType.DATETIME) + || primitiveType.equals(PrimitiveType.DATETIMEV2)) { + return Types.TimestampType.withoutZone(); } // unsupported type: PrimitiveType.HLL BITMAP BINARY throw new UnsupportedOperationException( - "Not a supported type: " + atomic.getPrimitiveType()); + "Not a supported type: " + primitiveType); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java index 4c3ad20a3e1a1f..9e82149d6617c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -36,6 +36,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.TExprOpcode; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -45,6 +46,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Iceberg utils @@ -58,6 +62,7 @@ public Integer initialValue() { } }; static long MILLIS_TO_NANO_TIME = 1000; + private static final Pattern PARTITION_REG = Pattern.compile("(\\w+)\\((\\d+)?,?(\\w+)\\)"); public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { @@ -234,4 +239,56 @@ private static SlotRef convertDorisExprToSlotRef(Expr expr) { } return slotRef; } + + // "partition"="c1;day(c1);bucket(4,c3)" + public static PartitionSpec solveIcebergPartitionSpec(Map properties, Schema schema) { + if (properties.containsKey("partition")) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + String par = properties.get("partition").replaceAll(" ", ""); + String[] pars = par.split(";"); + for (String func : pars) { + if (func.contains("(")) { + Matcher matcher = PARTITION_REG.matcher(func); + if (matcher.matches()) { + switch (matcher.group(1).toLowerCase()) { + case "bucket": + builder.bucket(matcher.group(3), Integer.parseInt(matcher.group(2))); + break; + case "year": + case "years": + builder.year(matcher.group(3)); + break; + case "month": + case "months": + builder.month(matcher.group(3)); + break; + case "date": + case "day": + case "days": + builder.day(matcher.group(3)); + break; + case "date_hour": + case "hour": + case "hours": + builder.hour(matcher.group(3)); + break; + case "truncate": + builder.truncate(matcher.group(3), Integer.parseInt(matcher.group(2))); + break; + default: + LOG.warn("unsupported partition for " + matcher.group(1)); + } + } else { + LOG.warn("failed to get partition info from " + func); + } + } else { + builder.identity(func); + } + } + properties.remove("partition"); + return builder.build(); + } else { + return PartitionSpec.unpartitioned(); + } + } }