Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ NotificationEventResponse getNextNotification(long lastEventId,

void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
List<String> partitionNames, long timeoutMs);

String getCatalogLocation(String catalogName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -474,4 +474,9 @@ protected String getDatabaseQuery() {
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}

@Override
public String getCatalogLocation(String catalogName) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
Expand Down Expand Up @@ -468,5 +469,20 @@ private ThriftHMSClient getClient() throws MetaException {
Thread.currentThread().setContextClassLoader(classLoader);
}
}

@Override
public String getCatalogLocation(String catalogName) {
try (ThriftHMSClient client = getClient()) {
try {
Catalog catalog = client.client.getCatalog(catalogName);
return catalog.getLocationUri();
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new HMSClientException("failed to get location for %s from hms client", e, catalogName);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,38 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.external.iceberg.util.DorisTypeToType;
import org.apache.doris.external.iceberg.util.DorisTypeVisitor;
import org.apache.doris.external.iceberg.util.IcebergUtils;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class IcebergExternalCatalog extends ExternalCatalog {
Expand Down Expand Up @@ -108,4 +124,52 @@ public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) {
.getIcebergMetadataCache()
.getIcebergTable(catalog, id, dbName, tblName, getProperties());
}

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
makeSureInitialized();
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
nsCatalog.createNamespace(Namespace.of(dbName), properties);
// TODO 增加刷新流程,否则create之后,show不出来,只能refresh之后才能show出来
}

@Override
public void dropDb(DropDbStmt stmt) throws DdlException {
makeSureInitialized();
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
String dbName = stmt.getDbName();
if (dbNameToId.containsKey(dbName)) {
Long aLong = dbNameToId.get(dbName);
idToDb.remove(aLong);
dbNameToId.remove(dbName);
}
nsCatalog.dropNamespace(Namespace.of(dbName));
}

@Override
public void createTable(CreateTableStmt stmt) throws UserException {
makeSureInitialized();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
List<StructField> collect = columns.stream()
.map(col -> new StructField(col.getName(), col.getType(), col.getComment(), col.isAllowNull()))
.collect(Collectors.toList());
StructType structType = new StructType(new ArrayList<>(collect));
org.apache.iceberg.types.Type visit = DorisTypeVisitor.visit(structType, new DorisTypeToType(structType));
Schema schema = new Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = stmt.getProperties();
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema);
catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties);
}

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
makeSureInitialized();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
catalog.dropTable(TableIdentifier.of(dbName, tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.HMSClientException;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HMSCachedClientFactory;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.hive.HiveCatalog;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -44,8 +53,34 @@ protected void initLocalObjectsImpl() {
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");

catalogProperties.put(CatalogProperties.URI, metastoreUris);
HiveConf hiveConf = new HiveConf();
for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
hiveConf.set(kv.getKey(), kv.getValue());
}
hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
String.valueOf(Config.hive_metastore_client_timeout_second));
String authentication = catalogProperty.getOrDefault(
HdfsResource.HADOOP_SECURITY_AUTHENTICATION, "");
if (AuthType.KERBEROS.getDesc().equals(authentication)) {
hiveConf.set(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authentication);
UserGroupInformation.setConfiguration(hiveConf);
try {
/**
* Because metastore client is created by using
* {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy}
* it will relogin when TGT is expired, so we don't need to relogin manually.
*/
UserGroupInformation.loginUserFromKeytab(
catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, ""),
catalogProperty.getOrDefault(HdfsResource.HADOOP_KERBEROS_KEYTAB, ""));
} catch (IOException e) {
throw new HMSClientException("login with kerberos auth failed for catalog %s", e, this.getName());
}
}
HMSCachedClient cachedClient = HMSCachedClientFactory.createCachedClient(hiveConf, 1, null);
String location = cachedClient.getCatalogLocation("hive");
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, location);
hiveCatalog.initialize(icebergCatalogType, catalogProperties);
catalog = hiveCatalog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;

import com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.util.List;


/**
* Convert Doris type to Iceberg type
*/
Expand All @@ -54,8 +56,21 @@ private int getNextId() {

@Override
public Type struct(StructType struct, List<Type> types) {
throw new UnsupportedOperationException(
"Not a supported type: " + struct.toSql(0));
List<StructField> fields = struct.getFields();
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(fields.size());
boolean isRoot = root == struct;
for (int i = 0; i < fields.size(); i++) {
StructField field = fields.get(i);
Type type = types.get(i);

int id = isRoot ? i : getNextId();
if (field.getContainsNull()) {
newFields.add(Types.NestedField.optional(id, field.getName(), type, field.getComment()));
} else {
newFields.add(Types.NestedField.required(id, field.getName(), type, field.getComment()));
}
}
return Types.StructType.of(newFields);
}

@Override
Expand All @@ -65,52 +80,60 @@ public Type field(StructField field, Type typeResult) {

@Override
public Type array(ArrayType array, Type elementType) {
throw new UnsupportedOperationException(
"Not a supported type: " + array.toSql(0));
if (array.getContainsNull()) {
return Types.ListType.ofOptional(getNextId(), elementType);
} else {
return Types.ListType.ofRequired(getNextId(), elementType);
}
}

@Override
public Type map(MapType map, Type keyType, Type valueType) {
throw new UnsupportedOperationException(
"Not a supported type: " + map.toSql(0));
if (map.getIsValueContainsNull()) {
return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
} else {
return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
}
}

@Override
public Type atomic(org.apache.doris.catalog.Type atomic) {
if (atomic.getPrimitiveType().equals(PrimitiveType.BOOLEAN)) {
PrimitiveType primitiveType = atomic.getPrimitiveType();
if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
return Types.BooleanType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.TINYINT)
|| atomic.getPrimitiveType().equals(PrimitiveType.SMALLINT)
|| atomic.getPrimitiveType().equals(PrimitiveType.INT)) {
} else if (primitiveType.equals(PrimitiveType.TINYINT)
|| primitiveType.equals(PrimitiveType.SMALLINT)
|| primitiveType.equals(PrimitiveType.INT)) {
return Types.IntegerType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.BIGINT)
|| atomic.getPrimitiveType().equals(PrimitiveType.LARGEINT)) {
} else if (primitiveType.equals(PrimitiveType.BIGINT)
|| primitiveType.equals(PrimitiveType.LARGEINT)) {
return Types.LongType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.FLOAT)) {
} else if (primitiveType.equals(PrimitiveType.FLOAT)) {
return Types.FloatType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.DOUBLE)) {
} else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
return Types.DoubleType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.CHAR)
|| atomic.getPrimitiveType().equals(PrimitiveType.VARCHAR)) {
} else if (primitiveType.equals(PrimitiveType.CHAR)
|| primitiveType.equals(PrimitiveType.VARCHAR)
|| primitiveType.equals(PrimitiveType.STRING)) {
return Types.StringType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.DATE)
|| atomic.getPrimitiveType().equals(PrimitiveType.DATEV2)) {
} else if (primitiveType.equals(PrimitiveType.DATE)
|| primitiveType.equals(PrimitiveType.DATEV2)) {
return Types.DateType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.TIME)
|| atomic.getPrimitiveType().equals(PrimitiveType.TIMEV2)) {
} else if (primitiveType.equals(PrimitiveType.TIME)
|| primitiveType.equals(PrimitiveType.TIMEV2)) {
return Types.TimeType.get();
} else if (atomic.getPrimitiveType().equals(PrimitiveType.DECIMALV2)
|| atomic.getPrimitiveType().isDecimalV3Type()) {
} else if (primitiveType.equals(PrimitiveType.DECIMALV2)
|| primitiveType.isDecimalV3Type()) {
return Types.DecimalType.of(
((ScalarType) atomic).getScalarPrecision(),
((ScalarType) atomic).getScalarScale());
} else if (atomic.getPrimitiveType().equals(PrimitiveType.DATETIME)
|| atomic.getPrimitiveType().equals(PrimitiveType.DATETIMEV2)) {
return Types.TimestampType.withZone();
} else if (primitiveType.equals(PrimitiveType.DATETIME)
|| primitiveType.equals(PrimitiveType.DATETIMEV2)) {
return Types.TimestampType.withoutZone();
}
// unsupported type: PrimitiveType.HLL BITMAP BINARY

throw new UnsupportedOperationException(
"Not a supported type: " + atomic.getPrimitiveType());
"Not a supported type: " + primitiveType);
}
}
Loading