Skip to content

Commit

Permalink
[Arctic-1362]: Refactor ArcticFileIO to decouple it from Hadoop (apac…
Browse files Browse the repository at this point in the history
…he#1443)

* fileIo code refactor and use recoverable file io implement arctic file id

* fileIo refactor in hive module

* replace ams module with refactored codes

* remove optimizer module

* fix compile errors

* Refactor core and hive

* fix terminal catalog connector check

* check terminal session NPE before release.

* modify ams-server and dashboard module and package names

* [Arctic-1372] Resolve compile error (apache#1417)

* resolve compile error.

* [ARCTIC-1062][AMS]Terminal support config spark properties in the local model (apache#1094)

* [ARCTIC-1090][AMS]Terminal support add hadoop conf into sparkConf (apache#1099)

* add some PRs since v0.4

* [AMS-Refactor] fix runtime bug (apache#1421)

fix runtime bug

* return null when got thrift MISSING_RESULT error

* [ARCTIC][AMS]:display login error msg (apache#1424)

feat: display login error msg

Co-authored-by: zhubeibei <zhubeibei@corp.netease.com>

* Commit the DeleteFile delete operation separately in Iceberg Format (apache#1425)

* add 0.5.0-init.sql

* rename optimize_group to optimizer_groups in config.xml

* remove useless configs

* Reuse PropertyNames property definetion in OptimizerConfig

* remove old optimizer module files

* fix some compile error in test codes

* [AMS-Refactor] fix runtime bug (apache#1430)

* fix runtime bug

* fix runtime bug

* add unit tests for optimizer

* rename util class name from utils to util

* remove resourceId from optimizerRegisterInfo

* support derby database

* format sql in mappers

* fix some warnings

* change toString implementation by guava

* [AMS-Refactor] fix runtime bug (apache#1433)

* fix runtime bug

* fix runtime bug

* fix runtime bug

* Mix-format committer

* Fix compile error

* fix optimizer executor retry bug

* [Arctic-1372][Refactor-AMS] Add unit test for executors (apache#1434)

* resolve compile error.

* resolve compile error.

* add Executor unit test

* [Arctic-1372][Refactor-AMS] Refactor package name (apache#1435)

* resolve compile error.

* resolve compile error.

* add Executor unit test

* refactor package level

* [AMS-refactor]: add committing status (apache#1439)

* feat: display login error msg

* feat: optimizers tables add committing status

---------

Co-authored-by: zhubeibei <zhubeibei@corp.netease.com>

* support ha mode for ams

* add fromSequence toSequence to TableOptimizingProcess

* fix unit test errors in module core

* fix unit tests in module hive

* add default external container

* adapt ams ha thrift url for optimizer container

* [AMS-Refactor] fix runtime bug (apache#1436)

* fix runtime bug

* fix runtime bug

* fix runtime bug

* fix runtime bug

* [AMS-Refactor] Abstracted test code (apache#1441)

* resolve compile error.

* resolve compile error.

* add Executor unit test

* refactor package level

* abstracted test code to reduce duplication of code

* [AMS-Refactor] merge `blocker` to refactor branch (apache#1427)

* introduce blocker to 0.5.0

* add exception compatative

* fix get optimizer groups bug for optimizer controller

* [AMS-Refactor]Use ArcticSparkSessionCatalog for terminal (apache#1442)

* use ArcticSparkSessionCatalog in terminal

* remove useless import

---------

Co-authored-by: jinsilei <jinsilei@corp.netease.com>

* add TestCatalogService

* mixed iceberg plan

* support mixed hive plan

* refactor plan scan files

* refactor AbstractPartitionPlan

* refactor FileTree

* replace OptimizingTaskProperties with OptimizingInputProperties

* refactor MixedFormatTableFileScanHelper to UnkeyedTableFileScanHelper and KeyedTableFileScanHelper

* refactor unkeyed table split task

* add partition plan simple test case

* refactor DefaultPartitionEvaluator

* union addFile of AbstractPartitionPlan

* add is FullNecessary

* fix compile error

* split deleteFiles in RewriteFilesInput into readOnlyDeleteFiles and rewriteDeleteFiles

* add BasicMixedHivePartitionPlan

* refactor test

* add assert for unit test

* fix conflict

* [WIP][AMS-Refactor] merge allocating transaction id from AMS (apache#1428)

* support new transactionId in ams

* remove table_transaction_meta from debry

* not support allocate TransactionId from AMS in 0.5.0

* fix checkstyle

* remove currentTxId

* add max-txId to hidden properties

* refactor test case AbstractMixedTablePartitionPlan

* add TestTableService

* fix conflicts

* add TestKeyedTableFileScanHelper

* fix checkstyle

* fix scan for empty table

* Fix some error

* Fix some error

* Fix some error

* Fix some error

* Format code style

* Format code style

* refactor test case for plan and scan

* fix checkstyle

* Add only delete

* Add cdc

* DataTestHelpers support writeBaseStorePosDelete

* add plan test case for segment files and pos delete files

* convert Map<String, Long> partitionSequence to StructLikeMap<Long>

* fix trhift service proxy exception handle bug

* remove dir operation from ArcticFileIO

* fix plan for only one file

* [AMS-Refactor] fix runtime bug and add unit test (apache#1446)

1.fix runtime bug
2.add ams env and optimize result checker unit test

* persist from and to sequence for optimizing process

* add server catalog for mixed-hive format tables

* remove duplicate codes in mixed hive catalog

* optimizing adapt to blocker with snapshot id

* add TestKeyedOptimizingPlanner TestKeyedOptimizingEvaluator

* fix test

* add test case for TestKeyedOptimizingEvaluator

* expose hive client pool in mixed hive server catalog

* fix OptimizingPlanner isNecessary

* planer

* fix errors in AMSTableTestBase

* Optimizer Unit test

* refactor some service code

* add file statistics for PartitionEvaluator

* remove all usage for ArcticFileIO.list:FileStatus except AMS module

* mock TableRuntime

* check style

* Format code style

* Add Iceberg write data

* add test case for plan testWithDeleteFiles

* add plan test case testChangeFilesWithDelete

* add test case for plan evaluator for unkeyed table

* add test case for full optimizing

* refactor some service code1

* merge

* fix compile error

* modify blocker

* add TestIcebergTableFileScanHelper

* add TestIcebergPartitionPlan

* check style

* fix keyedTable data losing

* add test case for check from to sequence

* refactor to MixedIcebergPartition MixedHivePartitionPlan

* move TableSnapshot to package com.netease.arctic.server.table

* support set minorLeastInterval to -1

* fix optimize type of OptimizingPlanner

* fix evaluator cost

* fix different hive locations

* fix hive subdirectory

* fix new hive output dir

* ADD StatedPersistentBase and unit test

* merge from master

* Make TaskRuntime and TableRuntime extended from StatedPersistentBase

* fix conflicts

* fix AMS start up error

* fix trash manager location ut

* fix NestSqlSession close() bug

* add MixedHivePartitionEvaluator MixedIcebergPartitionEvaluator

* fix full optimize continus execute

* fix getBaseSplitCount

* fix init BasicPartitionEvaluator

* rename BasicPartitionEvaluator to CommonPartitionEvaluator

* fix ams-mysql-init.sql

* remove repair.sh

* [AMS-Refactor] fix runtime bug and add unit test (apache#1459)

* fix runtime bug

* fix runtime bug

* fix runtime bug

* fix runtime bug

* 1.fix runtime bug
2.add ams env and optimize result checker unit test

* 1.fix runtime bug
2.controller adapt mixed-format

* 1.fix runtime bug
2.add mix-format table optimize unit test

* resolve comment

* resolve comment

* resolve comment

* Fix some bug

* Add Optimizing UT

* fix table not exist

* fix get operations of keyed table

* fix transaction detail of keyed table

* add target_change_snapshot_id and last_optimized_change_snapshotId to fix keyed table optimizing

* fix select/update/insert properties of task_runtime

* remove useless import

* fix check style

* unit-tests passed. core/hive

* remove usage of io.list

* close table trash

* Add Schedule policy

* load to sequence and from sequence

* fix not update currentStatusStartTime

* fix some bug

* fix output file

* Format code style

* not show begin transaction snapshot

* fix create catalog rollback

* remove useless optimizing properties

* [Refactor-AMS] Fix some bugs and polish UT test (apache#1466)

* resolve compile error.

* resolve compile error.

* add Executor unit test

* refactor package level

* abstracted test code to reduce duplication of code

* fix some bugs and polish ut test

* fix ut case

* fix some unit test erros in ams server

* code coverage support for asm server

* add unit test for blocker

* add unit test for BlockerExpiringExecutor

* move unit test for Executor to server.table.exector

* addTaskQuota before retry

* add more test case for TestOptimizingQueue

* fix load optimize input

* add test case for TestOptimizingQueue

* remove useless dependencies

* reverse operations

* fix Transactions order by commit time

* fix checkstyle

* TableRuntimeRefreshExecutor always execute periodically

* [AMS-Refactor] Solve some restart problems (apache#1472)

* resolve compile error.

* resolve compile error.

* add Executor unit test

* refactor package level

* abstracted test code to reduce duplication of code

* fix some bugs and polish ut test

* fix ut case

* fix some bugs

* fix some bugs

* fix some bugs

* fix some bugs

* fix some bugs

* fix some bugs

* [ARCTIC][AMS]: update container settings info (apache#1473)

feat: container settings update

Co-authored-by: zhubeibei <zhubeibei@corp.netease.com>

* Fix UnKeyed commit error

* make ArcticRuntimeException methods more clear

* support s3 orphan file clean

* [WIP]Upgrade SQL script (apache#1450)

* add sql file

* add sql file

* update sql file

* fix

* init table_runtime

* init table_runtime

* add column

* add column

---------

Co-authored-by: jinsilei <jinsilei@corp.netease.com>

* change readme

* update uprade sql script

* fix optimizer execute error message is null

* version control for slf4j-log4j12

* [AMS-Refactor] add scan table file list (apache#1468)

* add scan table file list

* add scan table file list

* add scan table file list

* add scan table file list

* resolve comment

* fix hive table upgrade logic

* resolve comment

* unit test path logic

* unit test create db

* rm spark ams properties tests

* update docs to adapt new ams configurations

* fix conflicts

* [AMS-Refactor] Optimizing trigger support hive/base max delay for each partition (apache#1480)

* refactor PartitionTransactionOperation: apply() return partition properties

* core support table partition property 'base-op-time'

* optimizing support base max delay and hive max delay

* add partition evaluator weight and add unit test case

* add TestHiveOptimizingEvaluator

* add result cache for CommonPartitionEvaluator

* remove useless code

* modify property name

* change property to base.hive.refresh-interval and base.refresh-interval

* remove annotations.NotNull

* change to filesNotInHiveLocation

* fix configuration file values

* ignore spark and executor failed task

* fix pom dependency erros

* [Arctic-1160][Spark]: Unit test refactor for spark module (apache#1366)

* unit test framework for spark create table test

* refactor create table tests

* stash

* test framework

* test framework

* create table test

* create table test assert hive schema

* test create table like

* refactor test code

* create table like

* TestTable

* Test Tables

* junit5 tests

* junit5 tests

* junit5 tests  framework

* junit5 tests  framework

* junit5 tests  framework

* junit5 tests  framework

* add test

* add alter table test

* add update test

* CTAS

* some comments

* some comments

* add jacoco exclude packages

* insert into test

* optimize alter table test

* optimize truncate test

* some insert overwrite tests

* some insert overwrite tests

* optimize write for overwrite

* add desc test and optimize code

* add dataFarame api test

* merge into tests

* MultiDelegateSessionCatalog test case

* fix npe in unit tests

* fix resource clean-up in test

* remove some codes

* remove some codes

* new antlr for extend sql

* new antlr for extend sql

* remove used codes

* remove un-used junit4 tests

* add ut for ast builder

* ctas test for query

* ctas test for query

* ctas test for query

* ctas test for query

* write avro files

* merge from master

* simplify command parser

* spark 3.2 new unit tests

* spark 3.2 new unit tests

* fix truncate problem

* fix merge into 3.1

* unit test case for spark 3.3

* fix merge into for 3.2 & 3,3

* fix merge into for 3.2 & 3,3

* remove useless comment

* code check style

* fix 3.3 check condition

* fix alter table add column

* fix alter table add column

* session catalog tests

* spark 3.3 sql extend parser

* spark 3.3 sql command parser

* enable jacoco in github ci

* add junit vintage engines

* flink 1.4 test

* add junit vintage engines

* disable jacoco code cover report for flink module

* fix ams compile error

* spark surefire plugin remove suite entry

* class load for spark 3.2

* remove useless code for optimzie

* fix test impala in spark

* optimize drop partition test

* Add log when create HMS thread

* increment HMS pool size

* change log level for spark test logger

* test code style for spark/v3.1

* test code style for spark/v3.2

* test code style for spark/v3.3

* fix unit tests

* fix review comment

* remove CollectionUtil.asMap

* remove CollectionUtil.zip

---------

Co-authored-by: jinsilei <jinsilei@corp.netease.com>

* change insert table runtime table sql

* fix a ut bug in optimizer

* Add UnKeyed table flow UT

* Delete useless code

* fix spark compile error

* fix checkstyle

* merged from refactor-ams fix codestyle problems

* handle fix ams merge error

* add license, rm useless file

* handle flink unit test failed

* fix conflict with master

* fix github review comment

* fix unit tests

* remove useless import

* fix create db error

* fix checkstyle

* fix unit tests

* add java docs

---------

Co-authored-by: zhoujinsong <463763777@qq.com>
Co-authored-by: shidayang <530847445@qq.com>
Co-authored-by: majin1102 <majin1102@163.com>
Co-authored-by: HuangFru <68625618+HuangFru@users.noreply.github.com>
Co-authored-by: wangzeyu <hameizi369@gmail.com>
Co-authored-by: huiyuan_ <819747197@qq.com>
Co-authored-by: zhubeibei <zhubeibei@corp.netease.com>
Co-authored-by: wangzeyu <1249369293@qq.com>
Co-authored-by: wangtao <wangtao3@corp.netease.com>
Co-authored-by: wangtaohz <103108928+wangtaohz@users.noreply.github.com>
Co-authored-by: PlanetWalker <52364847+hellojinsilei@users.noreply.github.com>
Co-authored-by: jinsilei <jinsilei@corp.netease.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
  • Loading branch information
14 people authored and ShawHee committed Dec 29, 2023
1 parent dda5a7f commit b169be5
Show file tree
Hide file tree
Showing 55 changed files with 885 additions and 642 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ public class ArcticServiceConstants {
public static final long QUOTA_LOOK_BACK_TIME = 60 * 60 * 1000;

public static final long INVALID_SNAPSHOT_ID = -1L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,4 @@ private void expandConfigMap(Map<String, Object> config, String prefix, Map<Stri
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ private DataFile moveTargetFiles(DataFile targetFile, String hiveLocation) {
if (!table.io().exists(newFilePath)) {
if (!table.io().exists(hiveLocation)) {
LOG.debug("{} hive location {} does not exist and need to mkdir before rename", table.id(), hiveLocation);
table.io().mkdirs(hiveLocation);
table.io().asFileSystemIO().makeDirectories(hiveLocation);
}
table.io().rename(oldFilePath, newFilePath);
table.io().asFileSystemIO().rename(oldFilePath, newFilePath);
LOG.debug("{} move file from {} to {}", table.id(), oldFilePath, newFilePath);
}

Expand Down Expand Up @@ -308,4 +308,4 @@ private static Set<String> getCommittedDataFilesFromSnapshotId(UnkeyedTable tabl

return committedFilePath;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.base.Strings;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.PathInfo;
import com.netease.arctic.io.SupportsFileSystemOperations;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.HiveLocationUtil;
Expand All @@ -31,11 +33,11 @@
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -167,83 +169,95 @@ private static Set<String> getValidContentFiles(ArcticTable arcticTable) {
}

private static int clearInternalTableContentsFiles(
UnkeyedTable internalTable, long lastTime,
Set<String> exclude) {
int deleteFilesCnt = 0;
UnkeyedTable internalTable, long lastTime, Set<String> exclude) {
String dataLocation = internalTable.location() + File.separator + DATA_FOLDER_NAME;
if (internalTable.io().exists(dataLocation)) {
for (FileStatus fileStatus : internalTable.io().list(dataLocation)) {
deleteFilesCnt += deleteInvalidContentFiles(
internalTable.io(),
fileStatus,
lastTime,
exclude);

try (ArcticFileIO io = internalTable.io()) {
// listPrefix will not return the directory and the orphan file clean should clean the empty dir.
if (io.supportFileSystemOperations()) {
SupportsFileSystemOperations fio = io.asFileSystemIO();
return deleteInvalidFilesInFs(fio, dataLocation, lastTime, exclude);
} else if (io.supportPrefixOperations()) {
SupportsPrefixOperations pio = io.asPrefixFileIO();
return deleteInvalidFilesByPrefix(pio, dataLocation, lastTime, exclude);
} else {
LOG.warn(String.format(
"Table %s doesn't support a fileIo with listDirectory or listPrefix, so skip clear files.",
internalTable.name()
));
}
}
return deleteFilesCnt;

return 0;
}

private static int deleteInvalidContentFiles(
ArcticFileIO io,
FileStatus fileStatus,
Long lastTime,
Set<String> exclude) {
String location = fileStatus.getPath().toString();
if (io.isDirectory(location)) {
if (!io.isEmptyDirectory(location)) {
LOG.info("start orphan files clean in {}", location);
int deleteFileCnt = 0;
for (FileStatus file : io.list(location)) {
deleteFileCnt += deleteInvalidContentFiles(io, file, lastTime, exclude);
}
LOG.info("delete {} files in {}", deleteFileCnt, location);
private static int deleteInvalidFilesInFs(
SupportsFileSystemOperations fio, String location, long lastTime, Set<String> excludes
) {
if (!fio.exists(location)) {
return 0;
}

if (location.endsWith(METADATA_FOLDER_NAME) || location.endsWith(DATA_FOLDER_NAME)) {
return 0;
int deleteCount = 0;
for (PathInfo p : fio.listDirectory(location)) {
String uriPath = TableFileUtil.getUriPath(p.location());
if (p.isDirectory()) {
int deleted = deleteInvalidFilesInFs(fio, p.location(), lastTime, excludes);
deleteCount += deleted;
if (!p.location().endsWith(METADATA_FOLDER_NAME) &&
!p.location().endsWith(DATA_FOLDER_NAME) &&
p.createdAtMillis() < lastTime &&
fio.isEmptyDirectory(p.location())) {
TableFileUtil.deleteEmptyDirectory(fio, p.location(), excludes);
}
TableFileUtil.deleteEmptyDirectory(io, location, exclude);
return deleteFileCnt;
} else if (io.isEmptyDirectory(location) &&
fileStatus.getModificationTime() < lastTime) {
if (location.endsWith(METADATA_FOLDER_NAME) || location.endsWith(DATA_FOLDER_NAME)) {
return 0;
}

TableFileUtil.deleteEmptyDirectory(io, location, exclude);
LOG.info("delete empty dir : {}[{}]", location, formatTime(fileStatus.getModificationTime()));
return 0;
} else {
return 0;
}
} else {
if (!exclude.contains(TableFileUtil.getUriPath(location)) &&
!exclude.contains(TableFileUtil.getUriPath(new Path(location).getParent().toString())) &&
fileStatus.getModificationTime() < lastTime) {
io.deleteFile(location);
return 1;
String parentLocation = TableFileUtil.getParent(p.location());
String parentUriPath = TableFileUtil.getUriPath(parentLocation);
if (!excludes.contains(uriPath) &&
!excludes.contains(parentUriPath) &&
p.createdAtMillis() < lastTime) {
fio.deleteFile(uriPath);
deleteCount += 1;
}
}
}
return deleteCount;
}

return 0;
private static int deleteInvalidFilesByPrefix(
SupportsPrefixOperations pio, String prefix, long lastTime, Set<String> excludes
) {
int deleteCount = 0;
for (FileInfo fileInfo : pio.listPrefix(prefix)) {
String uriPath = TableFileUtil.getUriPath(fileInfo.location());
if (!excludes.contains(uriPath) && fileInfo.createdAtMillis() < lastTime) {
pio.deleteFile(fileInfo.location());
deleteCount += 1;
}
}
return deleteCount;
}

private static int clearInternalTableMetadata(UnkeyedTable internalTable, long lastTime) {
Set<String> validFiles = getValidMetadataFiles(internalTable);
LOG.info("{} table getRuntime {} valid files", internalTable.id(), validFiles.size());
Pattern excludeFileNameRegex = getExcludeFileNameRegex(internalTable);
LOG.info("{} table getRuntime exclude file name pattern {}", internalTable.id(), excludeFileNameRegex);
int deleteFilesCnt = 0;
String metadataLocation = internalTable.location() + File.separator + METADATA_FOLDER_NAME;
LOG.info("start orphan files clean in {}", metadataLocation);
for (FileStatus fileStatus : internalTable.io().list(metadataLocation)) {
deleteFilesCnt += deleteInvalidMetadata(
internalTable.io(),
fileStatus,
lastTime,
validFiles,
excludeFileNameRegex);

try (ArcticFileIO io = internalTable.io()) {
if (io.supportPrefixOperations()) {
SupportsPrefixOperations pio = io.asPrefixFileIO();
return deleteInvalidMetadataFile(pio, metadataLocation, lastTime, validFiles, excludeFileNameRegex);
} else {
LOG.warn(String.format(
"Table %s doesn't support a fileIo with listDirectory or listPrefix, so skip clear files.",
internalTable.name()
));
}
}
return deleteFilesCnt;
return 0;
}

private static Set<String> getValidMetadataFiles(UnkeyedTable internalTable) {
Expand All @@ -266,7 +280,8 @@ private static Set<String> getValidMetadataFiles(UnkeyedTable internalTable) {
validFiles.add(TableFileUtil.getUriPath(manifestFile.path()));
}

LOG.info("{} scan snapshot {}: {} and getRuntime {} files, complete {}/{}",
LOG.info(
"{} scan snapshot {}: {} and getRuntime {} files, complete {}/{}",
tableIdentifier,
snapshot.snapshotId(),
formatTime(snapshot.timestampMillis()),
Expand Down Expand Up @@ -298,26 +313,21 @@ private static Pattern getExcludeFileNameRegex(UnkeyedTable table) {
return null;
}

private static int deleteInvalidMetadata(
ArcticFileIO io,
FileStatus fileStatus,
Long lastTime,
Set<String> exclude,
Pattern excludeFileNameRegex) {
String location = fileStatus.getPath().toString();
if (io.isDirectory(location)) {
LOG.warn("unexpected dir in metadata/, {}", location);
return 0;
} else {
if (!exclude.contains(TableFileUtil.getUriPath(location)) && fileStatus.getModificationTime() < lastTime &&
(excludeFileNameRegex == null ||
!excludeFileNameRegex.matcher(TableFileUtil.getFileName(location)).matches())) {
io.deleteFile(location);
return 1;
} else {
return 0;
private static int deleteInvalidMetadataFile(
SupportsPrefixOperations pio, String location, long lastTime, Set<String> exclude, Pattern excludeRegex
) {
int count = 0;
for (FileInfo fileInfo : pio.listPrefix(location)) {
String uriPath = TableFileUtil.getUriPath(fileInfo.location());
if (!exclude.contains(uriPath) &&
fileInfo.createdAtMillis() < lastTime &&
(excludeRegex == null || !excludeRegex.matcher(
TableFileUtil.getFileName(fileInfo.location())).matches())) {
pio.deleteFile(uriPath);
count += 1;
}
}
return count;
}

private static String formatTime(long timestamp) {
Expand Down
2 changes: 1 addition & 1 deletion ams/server/src/main/resources/derby/ams-derby-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,4 @@ CREATE TABLE table_blocker (
PRIMARY KEY (blocker_id)
);

INSERT INTO catalog_metadata(catalog_name,catalog_metastore,storage_configs,auth_configs, catalog_properties) VALUES ('local_catalog','ams','{"storage.type":"hdfs","hive.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.core.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.hdfs.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg=="}','{"auth.type":"simple","auth.simple.hadoop_username":"root"}','{"warehouse":"/tmp/arctic/warehouse","table-formats":"MIXED_ICEBERG"}');
INSERT INTO catalog_metadata(catalog_name,catalog_metastore,storage_configs,auth_configs, catalog_properties) VALUES ('local_catalog','ams','{"storage.type":"hdfs","hive.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.core.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.hdfs.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg=="}','{"auth.type":"simple","auth.simple.hadoop_username":"root"}','{"warehouse":"/tmp/arctic/warehouse","table-formats":"MIXED_ICEBERG"}');
2 changes: 1 addition & 1 deletion ams/server/src/main/resources/mysql/ams-mysql-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,4 @@ CREATE TABLE `table_blocker` (
KEY `table_index` (`catalog_name`,`db_name`,`table_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Table blockers';

INSERT INTO catalog_metadata(catalog_name,catalog_metastore,storage_configs,auth_configs, catalog_properties) VALUES ('local_catalog','ams','{"storage.type":"hdfs","hive.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.core.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.hdfs.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg=="}','{"auth.type":"simple","auth.simple.hadoop_username":"root"}','{"warehouse":"/tmp/arctic/warehouse","table-formats":"MIXED_ICEBERG"}');
INSERT INTO catalog_metadata(catalog_name,catalog_metastore,storage_configs,auth_configs, catalog_properties) VALUES ('local_catalog','ams','{"storage.type":"hdfs","hive.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.core.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg==","hadoop.hdfs.site":"PGNvbmZpZ3VyYXRpb24+PC9jb25maWd1cmF0aW9uPg=="}','{"auth.type":"simple","auth.simple.hadoop_username":"root"}','{"warehouse":"/tmp/arctic/warehouse","table-formats":"MIXED_ICEBERG"}');
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* limitations under the License.
*/

import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.io.ArcticHadoopFileIO;
import com.netease.arctic.io.DataTestHelpers;
import com.netease.arctic.server.dashboard.model.TableOptimizingProcess;
import com.netease.arctic.table.ArcticTable;
Expand All @@ -28,7 +30,9 @@
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.thrift.TException;

import java.io.IOException;
Expand Down Expand Up @@ -136,7 +140,9 @@ private List<Record> readHiveTableData() throws TException, IOException {
}

private List<String> filesInLocation(String location) {
return arcticTable.io().list(location).stream().map(fileStatus -> fileStatus.getPath().toString())
ArcticHadoopFileIO io = ((SupportHive) arcticTable).io();
return Streams.stream(io.listDirectory(location))
.map(FileInfo::location)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.utils.ConvertStructUtil;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -39,7 +40,6 @@
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;

public class AMSTableTestBase extends TableServiceTestBase {
Expand All @@ -63,7 +63,8 @@ public AMSTableTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tab
this(catalogTestHelper, tableTestHelper, false);
}

public AMSTableTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper,
public AMSTableTestBase(
CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper,
boolean autoInitTable) {
this.catalogTestHelper = catalogTestHelper;
this.tableTestHelper = tableTestHelper;
Expand All @@ -81,9 +82,13 @@ public void init() throws IOException, TException {
tableMeta = buildTableMeta();
}
tableService().createCatalog(catalogMeta);
Database database = new Database();
database.setName(TableTestHelper.TEST_DB_NAME);
TEST_HMS.getHiveClient().createDatabase(database);
try {
Database database = new Database();
database.setName(TableTestHelper.TEST_DB_NAME);
TEST_HMS.getHiveClient().createDatabase(database);
} catch (AlreadyExistsException e) {
//pass
}
if (autoInitTable) {
createDatabase();
createTable();
Expand Down Expand Up @@ -117,15 +122,16 @@ protected TableMeta buildTableMeta() {

protected void createDatabase() {
if (TableFormat.ICEBERG.equals(catalogTestHelper.tableFormat())) {
((SupportsNamespaces)icebergCatalog).createNamespace(Namespace.of(TableTestHelper.TEST_DB_NAME));
} else {
((SupportsNamespaces) icebergCatalog).createNamespace(Namespace.of(TableTestHelper.TEST_DB_NAME));
} else if (!tableService().listDatabases(TableTestHelper.TEST_CATALOG_NAME)
.contains(TableTestHelper.TEST_DB_NAME)) {
tableService().createDatabase(TableTestHelper.TEST_CATALOG_NAME, TableTestHelper.TEST_DB_NAME);
}
}

protected void dropDatabase() {
if (TableFormat.ICEBERG.equals(catalogTestHelper.tableFormat())) {
((SupportsNamespaces)icebergCatalog).dropNamespace(Namespace.of(TableTestHelper.TEST_DB_NAME));
((SupportsNamespaces) icebergCatalog).dropNamespace(Namespace.of(TableTestHelper.TEST_DB_NAME));
} else {
tableService().dropDatabase(TableTestHelper.TEST_CATALOG_NAME, TableTestHelper.TEST_DB_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,10 @@ protected void fillTableProperties(TableMeta meta) {
protected String getDatabaseLocation() {
if (catalogMeta.getCatalogProperties() != null) {
String catalogWarehouse = catalogMeta.getCatalogProperties().getOrDefault(
CatalogMetaProperties.KEY_WAREHOUSE,null);
CatalogMetaProperties.KEY_WAREHOUSE, null);
if (catalogWarehouse == null) {
catalogWarehouse = catalogMeta.getCatalogProperties().getOrDefault(
CatalogMetaProperties.KEY_WAREHOUSE_DIR,null);
CatalogMetaProperties.KEY_WAREHOUSE_DIR, null);
}
if (catalogWarehouse == null) {
throw new NullPointerException("Catalog warehouse is null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,4 @@ public TableFormat format() {
return TableFormat.ICEBERG;
}
}
}
}
Loading

0 comments on commit b169be5

Please sign in to comment.