Skip to content

Commit

Permalink
[AMORO-1341] [Flink]: fix checkstyle.
Browse files Browse the repository at this point in the history
  • Loading branch information
YesOrNo828 committed Nov 29, 2023
1 parent 862997e commit b0fd5da
Showing 1 changed file with 78 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@

package com.netease.arctic.flink.catalog;

import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;

import com.netease.arctic.TestAms;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory;
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
Expand All @@ -27,13 +36,6 @@
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.FactoryUtil;

import com.netease.arctic.TestAms;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory;
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -43,78 +45,75 @@
import java.util.Map;
import java.util.stream.Stream;

import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;

public class FlinkCatalogContext {

static final TestHMS TEST_HMS = new TestHMS();
static final TestAms TEST_AMS = new TestAms();
static final FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();

static ResolvedSchema resolvedSchema =
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT()));
static Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();

ObjectPath objectPath = new ObjectPath("default", "test_hive_from_flink");

static Stream<Arguments> getFlinkCatalogAndTable() {
return Stream.of(
Arguments.of(
initFlinkCatalog(TableFormat.MIXED_HIVE),
generateFlinkTable(TableFormat.MIXED_HIVE.toString()),
TableFormat.MIXED_HIVE),
Arguments.of(
initFlinkCatalog(TableFormat.MIXED_ICEBERG),
generateFlinkTable(TableFormat.MIXED_ICEBERG.toString()),
TableFormat.MIXED_ICEBERG));
}

static ResolvedCatalogTable generateFlinkTable(String tableFormat) {
return new ResolvedCatalogTable(
CatalogTable.of(
schema,
"Flink managed table",
new ArrayList<>(),
Collections.singletonMap(TABLE_FORMAT.key(), tableFormat)),
resolvedSchema);
}

void initial() throws Exception {
TEST_HMS.before();
TEST_AMS.before();
}

void close() {
TEST_AMS.after();
TEST_HMS.after();
}

static FlinkCatalog initFlinkCatalog(TableFormat tableFormat) {
FlinkCatalog flinkCatalog;
Map<String, String> factoryOptions = Maps.newHashMap();
CatalogMeta meta =
HiveCatalogTestHelper.build(TEST_HMS.getHiveConf(), tableFormat)
.buildCatalogMeta(TEST_HMS.getWareHouseLocation());
meta.setCatalogName(tableFormat.name().toLowerCase());

TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName());
TEST_AMS.getAmsHandler().createCatalog(meta);

factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + meta.getCatalogName());
final FactoryUtil.DefaultCatalogContext context =
new FactoryUtil.DefaultCatalogContext(
"flink_catalog_name",
factoryOptions,
new Configuration(),
FlinkCatalogContext.class.getClassLoader());
flinkCatalog = (FlinkCatalog) flinkCatalogFactory.createCatalog(context);
flinkCatalog.open();
return flinkCatalog;
}

HiveMetaStoreClient getHMSClient() {
return TEST_HMS.getHiveClient();
}
static final TestHMS TEST_HMS = new TestHMS();
static final TestAms TEST_AMS = new TestAms();
static final FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();

static ResolvedSchema resolvedSchema =
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT()));
static Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();

ObjectPath objectPath = new ObjectPath("default", "test_hive_from_flink");

static Stream<Arguments> getFlinkCatalogAndTable() {
return Stream.of(
Arguments.of(
initFlinkCatalog(TableFormat.MIXED_HIVE),
generateFlinkTable(TableFormat.MIXED_HIVE.toString()),
TableFormat.MIXED_HIVE),
Arguments.of(
initFlinkCatalog(TableFormat.MIXED_ICEBERG),
generateFlinkTable(TableFormat.MIXED_ICEBERG.toString()),
TableFormat.MIXED_ICEBERG));
}

static ResolvedCatalogTable generateFlinkTable(String tableFormat) {
return new ResolvedCatalogTable(
CatalogTable.of(
schema,
"Flink managed table",
new ArrayList<>(),
Collections.singletonMap(TABLE_FORMAT.key(), tableFormat)),
resolvedSchema);
}

void initial() throws Exception {
TEST_HMS.before();
TEST_AMS.before();
}

void close() {
TEST_AMS.after();
TEST_HMS.after();
}

static FlinkCatalog initFlinkCatalog(TableFormat tableFormat) {
FlinkCatalog flinkCatalog;
Map<String, String> factoryOptions = Maps.newHashMap();
CatalogMeta meta =
HiveCatalogTestHelper.build(TEST_HMS.getHiveConf(), tableFormat)
.buildCatalogMeta(TEST_HMS.getWareHouseLocation());
meta.setCatalogName(tableFormat.name().toLowerCase());

TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName());
TEST_AMS.getAmsHandler().createCatalog(meta);

factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + meta.getCatalogName());
final FactoryUtil.DefaultCatalogContext context =
new FactoryUtil.DefaultCatalogContext(
"flink_catalog_name",
factoryOptions,
new Configuration(),
FlinkCatalogContext.class.getClassLoader());
flinkCatalog = (FlinkCatalog) flinkCatalogFactory.createCatalog(context);
flinkCatalog.open();
return flinkCatalog;
}

HiveMetaStoreClient getHMSClient() {
return TEST_HMS.getHiveClient();
}
}

0 comments on commit b0fd5da

Please sign in to comment.