Skip to content

Commit

Permalink
[AMORO-1846]: Extract new Interface for Catalog and Table to support …
Browse files Browse the repository at this point in the history
…multi-format tables in one catalog (apache#1852)

* extract common catalog api to support multi formats

* extract common catalog api to support multi formats

* fix checkstyle

* implement iceberg FormatCatalog

* implement iceberg FormatCatalog

* add some tests

* fix ut
  • Loading branch information
baiyangtx authored Aug 22, 2023
1 parent 4bbab6d commit 76da3c4
Show file tree
Hide file tree
Showing 25 changed files with 1,089 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
public enum TableFormat {
ICEBERG,
MIXED_ICEBERG,
MIXED_HIVE;
MIXED_HIVE,
PAIMON;
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ public void cleanUp() {
}

public void createCatalog(CatalogMeta catalogMeta) {
dropCatalog(catalogMeta.getCatalogName());
catalogs.add(catalogMeta);
}

Expand Down Expand Up @@ -360,7 +361,8 @@ public long allocateTransactionId(TableIdentifier tableIdentifier, String transa
}

@Override
public Blocker block(TableIdentifier tableIdentifier, List<BlockableOperation> operations,
public Blocker block(
TableIdentifier tableIdentifier, List<BlockableOperation> operations,
Map<String, String> properties) throws TException {
Map<String, Blocker> blockers = this.tableBlockers.computeIfAbsent(tableIdentifier, t -> new HashMap<>());
long now = System.currentTimeMillis();
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/java/com/netease/arctic/AlreadyExistsException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic;

public class AlreadyExistsException extends RuntimeException {
public AlreadyExistsException(String message) {
super(message);
}
}
78 changes: 78 additions & 0 deletions core/src/main/java/com/netease/arctic/AmoroCatalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic;

import java.util.List;

/**
* Base Catalog interface for Amoro.
*/
public interface AmoroCatalog {

/**
* Show database list of metastore.
*
* @return database list of metastore
*/
List<String> listDatabases();

/**
* Check whether database exists.
*
* @param database a database name
* @return true if the database exists, false otherwise
*/
boolean exist(String database);

/**
* Check whether table exists.
*
* @param database a database name
* @param table a table name
* @return true if the table exists, false otherwise
*/
boolean exist(String database, String table);

/**
* create database catalog.
*
* @param database database name
* @throws AlreadyExistsException when database already exists.
*/
void createDatabase(String database);

/**
* drop database from catalog.
*
* @param database database name
* @throws NoSuchDatabaseException when database not exists.
*/
void dropDatabase(String database);

/**
* load table from catalog.
*
* @param database database name
* @param table table name
* @return table instance
* @throws NoSuchDatabaseException when database not exists.
* @throws NoSuchTableException when table not exists.
*/
AmoroTable<?> loadTable(String database, String table);
}
58 changes: 58 additions & 0 deletions core/src/main/java/com/netease/arctic/AmoroTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic;

import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.table.TableIdentifier;
import java.util.Map;

public interface AmoroTable<T> {

/**
* Returns the {@link TableIdentifier} of this table
*/
TableIdentifier id();

/**
* Returns the name of this table
*/
default String name() {
return id().toString();
}

/**
* Returns the {@link TableFormat} of this table
*/
TableFormat format();

/**
* Returns the properties of this table
*/
Map<String, String> properties();

/**
* Returns the original of this table
*/
T originalTable();

/**
* Returns the current snapshot of this table
*/
Snapshot currentSnapshot();
}
171 changes: 171 additions & 0 deletions core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic;

import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CommonUnifiedCatalog implements UnifiedCatalog {

private Supplier<CatalogMeta> metaSupplier;
private CatalogMeta meta;
private Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newHashMap();
private Map<String, String> properties = Maps.newHashMap();

public CommonUnifiedCatalog(
Supplier<CatalogMeta> catalogMetaSupplier, CatalogMeta meta, Map<String, String> properties
) {
this.meta = meta;
this.properties.putAll(properties);
this.metaSupplier = catalogMetaSupplier;
initializeFormatCatalogs();
}

@Override
public List<String> listDatabases() {
return findFirstFormatCatalog(TableFormat.values()).listDatabases();
}

@Override
public boolean exist(String database) {
return listDatabases().contains(database);
}

@Override
public boolean exist(String database, String table) {
return formatCatalogAsOrder(TableFormat.values())
.anyMatch(formatCatalog -> formatCatalog.exist(database, table));
}

@Override
public void createDatabase(String database) {
if (exist(database)) {
throw new AlreadyExistsException("Database: " + database + " already exists.");
}

findFirstFormatCatalog(TableFormat.values()).createDatabase(database);
}

@Override
public void dropDatabase(String database) {
if (!exist(database)) {
throw new NoSuchDatabaseException("Database: " + database + " does not exist.");
}
if (listTableMetas(database).size() > 0) {
throw new IllegalStateException("Database: " + database + " is not empty.");
}
findFirstFormatCatalog(TableFormat.values()).dropDatabase(database);
}

@Override
public AmoroTable<?> loadTable(String database, String table) {
if (!exist(database)) {
throw new NoSuchDatabaseException("Database: " + database + " does not exist.");
}

return formatCatalogAsOrder(TableFormat.MIXED_ICEBERG, TableFormat.ICEBERG, TableFormat.PAIMON)
.map(formatCatalog -> {
try {
return formatCatalog.loadTable(database, table);
} catch (NoSuchTableException e) {
return null;
}
})
.filter(Objects::nonNull).findFirst()
.orElseThrow(() -> new NoSuchTableException("Table: " + table + " does not exist."));
}

@Override
public String name() {
return this.meta.getCatalogName();
}

@Override
public List<TableMeta> listTableMetas(String database) {
if (!exist(database)) {
throw new NoSuchDatabaseException("Database: " + database + " does not exist.");
}
TableFormat[] formats = new TableFormat[] {TableFormat.MIXED_ICEBERG, TableFormat.ICEBERG, TableFormat.PAIMON};

Map<String, TableFormat> tableNameToFormat = Maps.newHashMap();
for (TableFormat format : formats) {
if (formatCatalogs.containsKey(format)) {
formatCatalogs.get(format)
.listTables(database)
.forEach(table -> tableNameToFormat.putIfAbsent(table, format));
}
}

return tableNameToFormat.keySet().stream()
.map(tableName -> {
TableFormat format = tableNameToFormat.get(tableName);
return TableMeta.of(TableIdentifier.of(this.meta.getCatalogName(), database, tableName), format);
}).collect(Collectors.toList());
}

@Override
public synchronized void refresh() {
this.meta = metaSupplier.get();
CatalogUtil.mergeCatalogProperties(meta, properties);
}

protected void initializeFormatCatalogs() {
ServiceLoader<FormatCatalogFactory> loader = ServiceLoader.load(FormatCatalogFactory.class);
Set<TableFormat> formats = CatalogUtil.tableFormats(this.meta);
TableMetaStore store = CatalogUtil.buildMetaStore(this.meta);
Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newConcurrentMap();
for (FormatCatalogFactory factory : loader) {
if (formats.contains(factory.format())) {
FormatCatalog catalog = factory.create(
name(), meta.getCatalogType(), meta.getCatalogProperties(), store.getConfiguration());
formatCatalogs.put(factory.format(), catalog);
}
}
this.formatCatalogs = formatCatalogs;
}

/**
* get format catalogs as given format order
*/
private Stream<FormatCatalog> formatCatalogAsOrder(TableFormat... formats) {
return Stream.of(formats)
.filter(formatCatalogs::containsKey)
.map(formatCatalogs::get);
}

private FormatCatalog findFirstFormatCatalog(TableFormat... formats) {
return Stream.of(formats)
.filter(formatCatalogs::containsKey)
.map(formatCatalogs::get)
.findFirst()
.orElseThrow(() -> new IllegalStateException("No format catalog found."));
}
}
37 changes: 37 additions & 0 deletions core/src/main/java/com/netease/arctic/FormatCatalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic;

import java.util.List;

/**
* A catalog for a specific table format.
*/
public interface FormatCatalog extends AmoroCatalog {

/**
* Show table list of database.
*
* @param database database name
* @return table list of database
* @throws NoSuchDatabaseException when database not exists.
*/
List<String> listTables(String database);

}
Loading

0 comments on commit 76da3c4

Please sign in to comment.