From b1f4eaafb5eb027c32a4c3c0cd6ae98b9b21e994 Mon Sep 17 00:00:00 2001 From: chengxy Date: Mon, 17 Apr 2023 17:38:39 +0800 Subject: [PATCH] support oracle catalog --- .../jdbc/catalog/JdbcCatalogUtils.java | 7 +- .../connector/jdbc/catalog/OracleCatalog.java | 211 +++++++++++ .../jdbc/dialect/oracle/OracleDialect.java | 2 +- .../jdbc/dialect/oracle/OracleTypeMapper.java | 82 +++++ .../jdbc/catalog/OracleCatalogTest.java | 167 +++++++++ .../jdbc/catalog/OracleCatalogTestBase.java | 340 ++++++++++++++++++ 6 files changed, 807 insertions(+), 2 deletions(-) create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/OracleCatalog.java create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTypeMapper.java create mode 100644 flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTest.java create mode 100644 flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTestBase.java diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index 28bea805..15757a34 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; import org.apache.flink.connector.jdbc.dialect.mysql.MySqlDialect; +import org.apache.flink.connector.jdbc.dialect.oracle.OracleDialect; import org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect; import static org.apache.flink.util.Preconditions.checkArgument; @@ -53,7 +54,11 @@ public static AbstractJdbcCatalog createCatalog( } else if (dialect instanceof MySqlDialect) { return new MySqlCatalog( userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); - } else { + } else if (dialect instanceof OracleDialect){ + return new OracleCatalog( + userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + } + else { throw new UnsupportedOperationException( String.format("Catalog for '%s' is not supported yet.", dialect)); } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/OracleCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/OracleCatalog.java new file mode 100644 index 00000000..b76281a8 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/OracleCatalog.java @@ -0,0 +1,211 @@ +package org.apache.flink.connector.jdbc.catalog; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.connector.jdbc.dialect.oracle.OracleTypeMapper; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.catalog.exceptions.*; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.*; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +/** + * OracleCatalog 用于查表和查数据库,便于重建 + */ +public class OracleCatalog extends AbstractJdbcCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class); + + public static final String DEFAULT_DATABASE = "helowin"; + + + public static final String IDENTIFIER = "jdbc"; + private static final String ORACLE_DRIVER = "oracle.driver.OracleDriver"; + private OracleTypeMapper dialectTypeMapper; + private static final Set builtinDatabases = new HashSet() { + { + add("SCOTT"); + add("ANONYMOUS"); + add("XS$NULL"); + add("DIP"); + add("SPATIAL_WFS_ADMIN_USR"); + add("SPATIAL_CSW_ADMIN_USR"); + add("APEX_PUBLIC_USER"); + add("ORACLE_OCM"); + add("MDDATA"); + } + }; + + public OracleCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + String driverVersion = Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null."); + String databaseVersion = Preconditions.checkNotNull(getDatabaseVersion(), "Database version must not be null."); + LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion); + this.dialectTypeMapper = new OracleTypeMapper(databaseVersion, driverVersion); + } + + private String getDatabaseVersion() { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + return conn.getMetaData().getDatabaseProductVersion(); + } catch (Exception e) { + throw new CatalogException( String.format("Failed in getting Oracle version by %s.", defaultUrl), e); + } + } + + private String getDriverVersion() { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + String driverVersion = conn.getMetaData().getDriverVersion(); + Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+"); + Matcher matcher = regexp.matcher(driverVersion); + return matcher.find() ? matcher.group(0) : null; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed in getting Oracle driver version by %s.", defaultUrl), e); + + } + } + + @Override + public List listDatabases() throws CatalogException { + return extractColumnValuesBySQL(this.defaultUrl, + "select username from sys.dba_users " + + "where DEFAULT_TABLESPACE <> 'SYSTEM' and DEFAULT_TABLESPACE <> 'SYSAUX' " + + " order by username", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + Preconditions.checkState(!StringUtils.isBlank(databaseName), "Database name must not be blank"); + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + + @Override + public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + Preconditions.checkState(StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)){// 注意这个值是 oracle 实例名称 + throw new DatabaseNotExistException(getName(), databaseName); + } + + List listDatabases = listDatabases().stream().map(username -> "'" + username + "'") + .collect(Collectors.toList()); + return extractColumnValuesBySQL(this.defaultUrl, + "SELECT OWNER||'.'||TABLE_NAME AS schemaTableName FROM sys.all_tables WHERE OWNER IN (" + String.join(",", listDatabases) + ")"+ + "ORDER BY OWNER,TABLE_NAME",1, null, null); + } + + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + String databaseName = tablePath.getDatabaseName(); + String dbUrl = baseUrl + databaseName; + try(Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = getPrimaryKey(metaData, databaseName, getSchemaName(tablePath), getTableName(tablePath)); + String statement = String.format("SELECT * FROM %s ", getSchemaTableName(tablePath)) ; + PreparedStatement ps = conn.prepareStatement(statement); + ResultSetMetaData resultSetMetaData = ps.getMetaData(); + + String[] columnNames = new String[resultSetMetaData.getColumnCount()]; + DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; + + for (int i = 1; i<=resultSetMetaData.getColumnCount(); i++) { + columnNames[i - 1] = resultSetMetaData.getColumnName(i); + types[i - 1] = fromJDBCType(tablePath, resultSetMetaData,i); + if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) { + types[i-1] = types[i-1].notNull(); + } + } + + Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); + primaryKey.ifPresent( pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); + Schema tableSchema = schemaBuilder.build(); + Map props = new HashMap<>(); + props.put(FactoryUtil.CONNECTOR.key(), IDENTIFIER); + props.put("username" , username); + props.put("password", pwd); + props.put("table_name", getSchemaTableName(tablePath)); + props.put("driverName", ORACLE_DRIVER); + return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props); + + } catch (Exception ex) { + throw new CatalogException(String.format("Failed getting Table %s", tablePath.getFullName()), ex); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + String[] schemaTableNames = getSchemaTableName(tablePath).split("\\."); + return !extractColumnValuesBySQL( + defaultUrl, "SELECT table_name FROM sys.all_tables where OWNER = ? and table_name = ?", + 1, null, schemaTableNames[0], schemaTableNames[1]) + .isEmpty(); + + } + + protected List extractColumnValuesBySQL(String connUrl, String sql, int columnIndex, Predicate filterFunc, Object... params){ + List columnValues = Lists.newArrayList(); + + try (Connection conn = DriverManager.getConnection(connUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement(sql)){ + if (Objects.nonNull(params) && params.length >0){ + for (int i=0; i 0 && precision < DecimalType.MAX_PRECISION) { + return DataTypes.DECIMAL(precision, metaData.getScale(colIndex)); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18); + case Types.DATE: + return DataTypes.DATE(); + case Types.TIMESTAMP: + case Types.TIMESTAMP_WITH_TIMEZONE: + case OracleTypes.TIMESTAMPTZ: + case OracleTypes.TIMESTAMPLTZ: + return scale > 0 ? DataTypes.TIMESTAMP(scale) : DataTypes.TIMESTAMP(); + case OracleTypes.INTERVALYM: + return DataTypes.INTERVAL(DataTypes.YEAR(), DataTypes.MONTH()); + case OracleTypes.INTERVALDS: + return DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND()); + case Types.BOOLEAN: + return DataTypes.BOOLEAN(); + default: + final String jdbcColumnName = metaData.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support Oracle type '%s' on column '%s' in Oracle version %s, driver version %s yet.", + oracleType, jdbcColumnName, databaseVersion, driverVersion)); + } + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTest.java new file mode 100644 index 00000000..77ef31f4 --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTest.java @@ -0,0 +1,167 @@ +package org.apache.flink.connector.jdbc.catalog; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class OracleCatalogTest extends OracleCatalogTestBase { + + @Test + void testGetDb_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.getDatabase("nonexistent")) + .isInstanceOf(DatabaseNotExistException.class) + .hasMessageContaining("Database nonexistent does not exist in Catalog"); + } + + @Test + void testListDatabases() { + List actual = catalog.listDatabases(); + + assertThat(actual).isEqualTo(Arrays.asList("postgres", "test")); + } + + @Test + void testDbExists() { + assertThat(catalog.databaseExists("nonexistent")).isFalse(); + + assertThat(catalog.databaseExists(OracleCatalog.DEFAULT_DATABASE)).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List actual = catalog.listTables(OracleCatalog.DEFAULT_DATABASE); + + assertThat(actual) + .isEqualTo( + Arrays.asList( + "public.array_table", + "public.primitive_table", + "public.primitive_table2", + "public.serial_table", + "public.t1", + "public.t4", + "public.t5")); + + actual = catalog.listTables(TEST_DB); + + assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3")); + } + + @Test + void testListTables_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.listTables(OracleCatalog.DEFAULT_DATABASE)) + .isInstanceOf(DatabaseNotExistException.class); + } + + @Test + void testTableExists() { + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse(); + + assertThat(catalog.tableExists(new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE1))) + .isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3"))).isTrue(); + } + + @Test + void testGetTables_TableNotExistException() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + PostgresTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoSchema() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + PostgresTablePath.toFlinkTableName( + "nonexistschema", "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoDb() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + "nonexistdb", + PostgresTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException { + // test postgres.public.user1 + Schema schema = getSimpleTable().schema; + + CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath("postgres", "public.t1")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.public.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.testschema.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + } + + @Test + void testPrimitiveDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema); + } + + @Test + void testArrayDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getArrayTable().schema); + } + + @Test + public void testSerialDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(OracleCatalog.DEFAULT_DATABASE, TABLE_SERIAL_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema); + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTestBase.java new file mode 100644 index 00000000..7bf666d2 --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/OracleCatalogTestBase.java @@ -0,0 +1,340 @@ +package org.apache.flink.connector.jdbc.catalog; + +import org.apache.flink.connector.jdbc.databases.oracle.OracleDatabase; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.types.logical.DecimalType; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class OracleCatalogTestBase implements OracleDatabase { + + public static final Logger LOG = LoggerFactory.getLogger(OracleCatalogTestBase.class); + + protected static final String TEST_CATALOG_NAME = "mypg"; + protected static final String TEST_USERNAME = CONTAINER.getUsername(); + protected static final String TEST_PWD = CONTAINER.getPassword(); + protected static final String TEST_DB = "test"; + protected static final String TEST_SCHEMA = "test_schema"; + protected static final String TABLE1 = "t1"; + protected static final String TABLE2 = "t2"; + protected static final String TABLE3 = "t3"; + protected static final String TABLE4 = "t4"; + protected static final String TABLE5 = "t5"; + protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table"; + protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2"; + protected static final String TABLE_ARRAY_TYPE = "array_table"; + protected static final String TABLE_SERIAL_TYPE = "serial_table"; + + protected static String baseUrl; + protected static OracleCatalog catalog; + + + @BeforeAll + static void init() throws SQLException { + // jdbc:oracle:thin:@//localhost:50807/helowin + String jdbcUrl = CONTAINER.getJdbcUrl(); + // jdbc:oracle:thin:@//localhost:50807/ + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + new OracleCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + OracleCatalog.DEFAULT_DATABASE, + TEST_USERNAME, + TEST_PWD, + baseUrl); + + // create test database and schema + createSchema(TEST_DB, TEST_SCHEMA); + + // create test tables + // table: helowin.public.t1 + // table: helowin.public.t4 + // table: helowin.public.t5 + createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().oracleSchemaSql); + createTable(PostgresTablePath.fromFlinkTableName(TABLE4), getSimpleTable().oracleSchemaSql); + createTable(PostgresTablePath.fromFlinkTableName(TABLE5), getSimpleTable().oracleSchemaSql); + + // table: test.public.t2 + // table: test.test_schema.t3 + // table: helowin.public.dt + // table: helowin.public.dt2 + createTable( + TEST_DB, + PostgresTablePath.fromFlinkTableName(TABLE2), + getSimpleTable().oracleSchemaSql); + createTable( + TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), + getPrimitiveTable().oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), + getPrimitiveTable("test_pk2").oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), + getArrayTable().oracleSchemaSql); + createTable( + PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), + getSerialTable().oracleSchemaSql); + + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into public.%s values (%s);", TABLE1, getSimpleTable().values)); + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", + TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values)); + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); + } + + public static void createTable(PostgresTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + OracleCatalog.DEFAULT_DATABASE, + String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createSchema(String db, String schema) throws SQLException { + executeSQL(db, String.format("CREATE SCHEMA %s", schema)); + } + + public static void createDatabase(String database) throws SQLException { + executeSQL(String.format("CREATE DATABASE %s;", database)); + } + + public static void executeSQL(String sql) throws SQLException { + executeSQL("", sql); + } + + public static void executeSQL(String db, String sql) throws SQLException { + try (Connection conn = + DriverManager.getConnection( + String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } + } + + + /** Object holding schema and corresponding sql. */ + public static class TestTable { + Schema schema; + String oracleSchemaSql; + String values; + + public TestTable(Schema schema, String oracleSchemaSql, String values) { + this.schema = schema; + this.oracleSchemaSql = oracleSchemaSql; + this.values = values; + } + } + + public static OracleCatalogTestBase.TestTable getSimpleTable() { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1"); + } + + // oracle doesn't support to use the same primary key name across different tables, + // make the table parameterized to resolve this problem. + public static OracleCatalogTestBase.TestTable getPrimitiveTable() { + return getPrimitiveTable("test_pk"); + } + + // TODO: add back timestamptz and time types. + // Flink currently doesn't support converting time's precision, with the following error + // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class: + // java.sql.Time) + // to type information. Only data types that originated from type information fully support a + // reverse conversion. + public static OracleCatalogTestBase.TestTable getPrimitiveTable(String primaryKeyName) { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder() + .column("int", DataTypes.INT().notNull()) + .column("bytea", DataTypes.BYTES()) + .column("short", DataTypes.SMALLINT().notNull()) + .column("long", DataTypes.BIGINT()) + .column("real", DataTypes.FLOAT()) + .column("double_precision", DataTypes.DOUBLE()) + .column("numeric", DataTypes.DECIMAL(10, 5)) + .column("decimal", DataTypes.DECIMAL(10, 1)) + .column("boolean", DataTypes.BOOLEAN()) + .column("text", DataTypes.STRING()) + .column("char", DataTypes.CHAR(1)) + .column("character", DataTypes.CHAR(3)) + .column("character_varying", DataTypes.VARCHAR(20)) + .column("timestamp", DataTypes.TIMESTAMP(5)) + // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) + .column("date", DataTypes.DATE()) + .column("time", DataTypes.TIME(0)) + .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) + .primaryKeyNamed(primaryKeyName, "short", "int") + .build(), + "int integer, " + + "bytea bytea, " + + "short smallint, " + + "long bigint, " + + "real real, " + + "double_precision double precision, " + + "numeric numeric(10, 5), " + + "decimal decimal(10, 1), " + + "boolean boolean, " + + "text text, " + + "char char, " + + "character character(3), " + + "character_varying character varying(20), " + + "timestamp timestamp(5), " + + + // "timestamptz timestamptz(4), " + + "date date," + + "time time(0), " + + "default_numeric numeric, " + + "CONSTRAINT " + + primaryKeyName + + " PRIMARY KEY (short, int)", + "1," + + "'2'," + + "3," + + "4," + + "5.5," + + "6.6," + + "7.7," + + "8.8," + + "true," + + "'a'," + + "'b'," + + "'c'," + + "'d'," + + "'2016-06-22 19:10:25'," + + + // "'2006-06-22 19:10:25'," + + "'2015-01-01'," + + "'00:51:02.746572', " + + "500"); + } + + // TODO: add back timestamptz once planner supports timestamp with timezone + public static OracleCatalogTestBase.TestTable getArrayTable() { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder() + .column("int_arr", DataTypes.ARRAY(DataTypes.INT())) + .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) + .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) + .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) + .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .column( + "numeric_arr_default", + DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))) + .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))) + .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) + .column("text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) + .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) + .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) + .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + // .field("timestamptz_arr", + // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) + .column("date_arr", DataTypes.ARRAY(DataTypes.DATE())) + .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) + .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .build(), + "int_arr integer[], " + + "bytea_arr bytea[], " + + "short_arr smallint[], " + + "long_arr bigint[], " + + "real_arr real[], " + + "double_precision_arr double precision[], " + + "numeric_arr numeric(10, 5)[], " + + "numeric_arr_default numeric[], " + + "decimal_arr decimal(10,2)[], " + + "boolean_arr boolean[], " + + "text_arr text[], " + + "char_arr char[], " + + "character_arr character(3)[], " + + "character_varying_arr character varying(20)[], " + + "timestamp_arr timestamp(5)[], " + + + // "timestamptz_arr timestamptz(4)[], " + + "date_arr date[], " + + "time_arr time(0)[], " + + "null_bytea_arr bytea[], " + + "null_text_arr text[]", + String.format( + "'{1,2,3}'," + + "'{2,3,4}'," + + "'{3,4,5}'," + + "'{4,5,6}'," + + "'{5.5,6.6,7.7}'," + + "'{6.6,7.7,8.8}'," + + "'{7.7,8.8,9.9}'," + + "'{8.8,9.9,10.10}'," + + "'{9.9,10.10,11.11}'," + + "'{true,false,true}'," + + "'{a,b,c}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}'," + + + // "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," + + "'{\"2015-01-01\", \"2020-01-01\"}'," + + "'{\"00:51:02.746572\", \"00:59:02.746572\"}'," + + "NULL," + + "NULL")); + } + + public static OracleCatalogTestBase.TestTable getSerialTable() { + return new OracleCatalogTestBase.TestTable( + Schema.newBuilder() + // serial fields are returned as not null by ResultSetMetaData.columnNoNulls + .column("f0", DataTypes.SMALLINT().notNull()) + .column("f1", DataTypes.INT().notNull()) + .column("f2", DataTypes.SMALLINT().notNull()) + .column("f3", DataTypes.INT().notNull()) + .column("f4", DataTypes.BIGINT().notNull()) + .column("f5", DataTypes.BIGINT().notNull()) + .build(), + "f0 smallserial, " + + "f1 serial, " + + "f2 serial2, " + + "f3 serial4, " + + "f4 serial8, " + + "f5 bigserial", + "32767," + + "2147483647," + + "32767," + + "2147483647," + + "9223372036854775807," + + "9223372036854775807"); + } + + +}