diff --git a/build.gradle b/build.gradle index ccd05f67e60e..96667cab36cd 100644 --- a/build.gradle +++ b/build.gradle @@ -381,6 +381,13 @@ project(':iceberg-core') { exclude group: 'junit' } testImplementation libs.awaitility + testImplementation libs.testcontainers + testImplementation libs.testcontainers.db2 + testImplementation libs.testcontainers.oracle + testImplementation libs.testcontainers.postgres + testImplementation libs.db2.jdbc + testImplementation libs.oracle.jdbc + testImplementation libs.postgres.jdbc } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 0c8fbe41df9e..253d22925571 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -94,6 +95,9 @@ public class JdbcCatalog extends BaseMetastoreViewCatalog private CloseableGroup closeableGroup; private JdbcUtil.SchemaVersion schemaVersion = JdbcUtil.SchemaVersion.V0; + @SuppressWarnings(value = "UnusedVariable") + private String schemaName = null; + public JdbcCatalog() { this(null, null, true); } @@ -141,6 +145,9 @@ public void initialize(String name, Map properties) { this.connections = new JdbcClientPool(uri, properties); } + this.schemaName = + PropertyUtil.propertyAsString(properties, JdbcUtil.SCHEMA_NAME_PROPERTY, null); + this.initializeCatalogTables = PropertyUtil.propertyAsBoolean( properties, JdbcUtil.INIT_CATALOG_TABLES_PROPERTY, initializeCatalogTables); @@ -148,7 +155,19 @@ public void initialize(String name, Map properties) { initializeCatalogTables(); } - updateSchemaIfRequired(); + final boolean updateSchemaIfRequired = + PropertyUtil.propertyAsBoolean( + properties, + JdbcUtil.UPDATE_CATALOG_SCHEMA_IF_NECESSARY_PROPERTY, + initializeCatalogTables); + if (updateSchemaIfRequired) { + updateSchemaIfRequired(); + } else { + schemaVersion = + JdbcUtil.SchemaVersion.valueOf( + PropertyUtil.propertyAsString( + properties, JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V0.name())); + } this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(metricsReporter()); @@ -161,29 +180,18 @@ private void atomicCreateTable(String tableName, String sqlCommand, String reaso throws SQLException, InterruptedException { connections.run( conn -> { - DatabaseMetaData dbMeta = conn.getMetaData(); // check the existence of a table name Predicate tableTest = name -> { try { - ResultSet result = - dbMeta.getTables( - null /* catalog name */, - null /* schemaPattern */, - name /* tableNamePattern */, - null /* types */); - return result.next(); + return tableExists(conn, tableName); } catch (SQLException e) { return false; } }; - // some databases force table name to upper case -- check that last. - Predicate tableExists = - name -> tableTest.test(name) || tableTest.test(name.toUpperCase(Locale.ROOT)); - - if (tableExists.test(tableName)) { + if (tableTest.test(tableName)) { return true; } @@ -193,7 +201,7 @@ private void atomicCreateTable(String tableName, String sqlCommand, String reaso return true; } catch (SQLException e) { // see if table was created by another thread or process. - if (tableExists.test(tableName)) { + if (tableTest.test(tableName)) { return true; } throw e; @@ -206,12 +214,18 @@ private void initializeCatalogTables() { try { atomicCreateTable( - JdbcUtil.CATALOG_TABLE_VIEW_NAME, - JdbcUtil.V0_CREATE_CATALOG_SQL, + JdbcUtil.tableName(JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName), + JdbcUtil.withTableName( + JdbcUtil.V0_CREATE_CATALOG_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName), "to store iceberg catalog tables"); atomicCreateTable( - JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, - JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL, + JdbcUtil.tableName(JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, schemaName), + JdbcUtil.withTableName( + JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName), "to store iceberg catalog namespace properties"); } catch (SQLTimeoutException e) { throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Query timed out"); @@ -229,11 +243,10 @@ private void updateSchemaIfRequired() { try { connections.run( conn -> { - DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet typeColumn = - dbMeta.getColumns( - null, null, JdbcUtil.CATALOG_TABLE_VIEW_NAME, JdbcUtil.RECORD_TYPE); - if (typeColumn.next()) { + if (columnExists( + conn, + JdbcUtil.tableName(JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName), + JdbcUtil.RECORD_TYPE)) { LOG.debug("{} already supports views", JdbcUtil.CATALOG_TABLE_VIEW_NAME); schemaVersion = JdbcUtil.SchemaVersion.V1; return true; @@ -245,7 +258,21 @@ private void updateSchemaIfRequired() { .equalsIgnoreCase(JdbcUtil.SchemaVersion.V1.name())) { LOG.debug("{} is being updated to support views", JdbcUtil.CATALOG_TABLE_VIEW_NAME); schemaVersion = JdbcUtil.SchemaVersion.V1; - return conn.prepareStatement(JdbcUtil.V1_UPDATE_CATALOG_SQL).execute(); + if (isOracle(conn)) { + return conn.prepareStatement( + JdbcUtil.withTableName( + JdbcUtil.V1_UPDATE_CATALOG_ORACLE_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName)) + .execute(); + } else { + return conn.prepareStatement( + JdbcUtil.withTableName( + JdbcUtil.V1_UPDATE_CATALOG_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName)) + .execute(); + } } else { LOG.warn(VIEW_WARNING_LOG_MESSAGE); return true; @@ -264,10 +291,81 @@ private void updateSchemaIfRequired() { } } + private boolean isOracle(Connection conn) { + try { + String databaseProductName = conn.getMetaData().getDatabaseProductName(); + return databaseProductName != null + && databaseProductName.toLowerCase(Locale.ROOT).contains("oracle"); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Cannot determine database product name"); + } + } + + private boolean tableExists(Connection conn, String tableNamePattern) throws SQLException { + DatabaseMetaData dbMeta = conn.getMetaData(); + + String[] tableVariants = + new String[] { + tableNamePattern, + tableNamePattern == null ? null : tableNamePattern.toUpperCase(Locale.ROOT), + tableNamePattern == null ? null : tableNamePattern.toLowerCase(Locale.ROOT) + }; + + for (String t : tableVariants) { + if (t == null) { + continue; + } + try (ResultSet rs = dbMeta.getColumns(null, null, t, null)) { + if (rs.next()) { + return true; + } + } + } + return false; + } + + private boolean columnExists(Connection conn, String tableNamePattern, String columnNamePattern) + throws SQLException { + DatabaseMetaData dbMeta = conn.getMetaData(); + + String[] tableVariants = + new String[] { + tableNamePattern, + tableNamePattern == null ? null : tableNamePattern.toUpperCase(Locale.ROOT), + tableNamePattern == null ? null : tableNamePattern.toLowerCase(Locale.ROOT) + }; + String[] columnVariants = + new String[] { + columnNamePattern, + columnNamePattern == null ? null : columnNamePattern.toUpperCase(Locale.ROOT), + columnNamePattern == null ? null : columnNamePattern.toLowerCase(Locale.ROOT) + }; + + for (String t : tableVariants) { + for (String c : columnVariants) { + if (t == null || c == null) { + continue; + } + try (ResultSet rs = dbMeta.getColumns(null, null, t, c)) { + if (rs.next()) { + return true; + } + } + } + } + return false; + } + @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { return new JdbcTableOperations( - connections, io, catalogName, tableIdentifier, catalogProperties, schemaVersion); + connections, + io, + catalogName, + tableIdentifier, + catalogProperties, + schemaName, + schemaVersion); } @Override @@ -275,7 +373,8 @@ protected ViewOperations newViewOps(TableIdentifier viewIdentifier) { if (schemaVersion != JdbcUtil.SchemaVersion.V1) { throw new UnsupportedOperationException(VIEW_WARNING_LOG_MESSAGE); } - return new JdbcViewOperations(connections, io, catalogName, viewIdentifier, catalogProperties); + return new JdbcViewOperations( + connections, io, catalogName, viewIdentifier, catalogProperties, schemaName); } @Override @@ -301,8 +400,14 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { int deletedRecords = execute( (schemaVersion == JdbcUtil.SchemaVersion.V1) - ? JdbcUtil.V1_DROP_TABLE_SQL - : JdbcUtil.V0_DROP_TABLE_SQL, + ? JdbcUtil.withTableName( + JdbcUtil.V1_DROP_TABLE_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName) + : JdbcUtil.withTableName( + JdbcUtil.V0_DROP_TABLE_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName), catalogName, JdbcUtil.namespaceToString(identifier.namespace()), identifier.name()); @@ -331,8 +436,10 @@ public List listTables(Namespace namespace) { JdbcUtil.stringToTableIdentifier( row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)), (schemaVersion == JdbcUtil.SchemaVersion.V1) - ? JdbcUtil.V1_LIST_TABLE_SQL - : JdbcUtil.V0_LIST_TABLE_SQL, + ? JdbcUtil.withTableName( + JdbcUtil.V1_LIST_TABLE_SQL_TEMPLATE, JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName) + : JdbcUtil.withTableName( + JdbcUtil.V0_LIST_TABLE_SQL_TEMPLATE, JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName), catalogName, JdbcUtil.namespaceToString(namespace)); } @@ -370,8 +477,14 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { } }, (schemaVersion == JdbcUtil.SchemaVersion.V1) - ? JdbcUtil.V1_RENAME_TABLE_SQL - : JdbcUtil.V0_RENAME_TABLE_SQL, + ? JdbcUtil.withTableName( + JdbcUtil.V1_RENAME_TABLE_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName) + : JdbcUtil.withTableName( + JdbcUtil.V0_RENAME_TABLE_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName), JdbcUtil.namespaceToString(to.namespace()), to.name(), catalogName, @@ -425,12 +538,18 @@ public List listNamespaces() { namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_ALL_NAMESPACES_SQL, + JdbcUtil.withTableName( + JdbcUtil.LIST_ALL_NAMESPACES_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName), catalogName)); namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.NAMESPACE_NAME)), - JdbcUtil.LIST_ALL_PROPERTY_NAMESPACES_SQL, + JdbcUtil.withTableName( + JdbcUtil.LIST_ALL_PROPERTY_NAMESPACES_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName), catalogName)); namespaces = @@ -460,13 +579,19 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)), - JdbcUtil.LIST_NAMESPACES_SQL, + JdbcUtil.withTableName( + JdbcUtil.LIST_NAMESPACES_SQL_TEMPLATE, + JdbcUtil.CATALOG_TABLE_VIEW_NAME, + schemaName), catalogName, JdbcUtil.namespaceToString(namespace) + "%")); namespaces.addAll( fetch( row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.NAMESPACE_NAME)), - JdbcUtil.LIST_PROPERTY_NAMESPACES_SQL, + JdbcUtil.withTableName( + JdbcUtil.LIST_PROPERTY_NAMESPACES_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName), catalogName, JdbcUtil.namespaceToString(namespace) + "%")); @@ -548,7 +673,10 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept int deletedRows = execute( - JdbcUtil.DELETE_ALL_NAMESPACE_PROPERTIES_SQL, + JdbcUtil.withTableName( + JdbcUtil.DELETE_ALL_NAMESPACE_PROPERTIES_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName), catalogName, JdbcUtil.namespaceToString(namespace)); @@ -628,7 +756,7 @@ public void close() { @Override public boolean namespaceExists(Namespace namespace) { - return JdbcUtil.namespaceExists(catalogName, connections, namespace); + return JdbcUtil.namespaceExists(schemaName, catalogName, connections, namespace); } @Override @@ -647,7 +775,8 @@ public boolean dropView(TableIdentifier identifier) { int deletedRecords = execute( - JdbcUtil.DROP_VIEW_SQL, + JdbcUtil.withTableName( + JdbcUtil.DROP_VIEW_SQL_TEMPLATE, JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName), catalogName, JdbcUtil.namespaceToString(identifier.namespace()), identifier.name()); @@ -679,7 +808,8 @@ public List listViews(Namespace namespace) { row -> JdbcUtil.stringToTableIdentifier( row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)), - JdbcUtil.LIST_VIEW_SQL, + JdbcUtil.withTableName( + JdbcUtil.LIST_VIEW_SQL_TEMPLATE, JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName), catalogName, JdbcUtil.namespaceToString(namespace)); } @@ -720,7 +850,8 @@ public void renameView(TableIdentifier from, TableIdentifier to) { "Cannot rename %s to %s. View already exists", from, to); } }, - JdbcUtil.RENAME_VIEW_SQL, + JdbcUtil.withTableName( + JdbcUtil.RENAME_VIEW_SQL_TEMPLATE, JdbcUtil.CATALOG_TABLE_VIEW_NAME, schemaName), JdbcUtil.namespaceToString(to.namespace()), to.name(), catalogName, @@ -814,7 +945,10 @@ private Map fetchProperties(Namespace namespace) { new AbstractMap.SimpleImmutableEntry<>( row.getString(JdbcUtil.NAMESPACE_PROPERTY_KEY), row.getString(JdbcUtil.NAMESPACE_PROPERTY_VALUE)), - JdbcUtil.GET_ALL_NAMESPACE_PROPERTIES_SQL, + JdbcUtil.withTableName( + JdbcUtil.GET_ALL_NAMESPACE_PROPERTIES_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName), catalogName, namespaceName); @@ -823,48 +957,100 @@ private Map fetchProperties(Namespace namespace) { private boolean insertProperties(Namespace namespace, Map properties) { String namespaceName = JdbcUtil.namespaceToString(namespace); - String[] args = - properties.entrySet().stream() - .flatMap( - entry -> Stream.of(catalogName, namespaceName, entry.getKey(), entry.getValue())) - .toArray(String[]::new); - int insertedRecords = execute(JdbcUtil.insertPropertiesStatement(properties.size()), args); + try { + int[] insertedRecords = + connections.run( + conn -> { + try (PreparedStatement preparedStatement = + conn.prepareStatement( + JdbcUtil.withTableName( + JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName))) { + for (Map.Entry entry : properties.entrySet()) { + preparedStatement.setString(1, catalogName); + preparedStatement.setString(2, namespaceName); + preparedStatement.setString(3, entry.getKey()); + preparedStatement.setString(4, entry.getValue()); + preparedStatement.addBatch(); + } + return preparedStatement.executeBatch(); + } + }); + + int successCount = 0; + for (int result : insertedRecords) { + if (result >= 0) { + successCount++; + } + } - if (insertedRecords == properties.size()) { - return true; - } + if (successCount == properties.size()) { + return true; + } - throw new IllegalStateException( - String.format( - Locale.ROOT, - "Failed to insert: %d of %d succeeded", - insertedRecords, - properties.size())); + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Failed to insert: %d of %d succeeded", + successCount, + properties.size())); + } catch (SQLException e) { + throw new UncheckedSQLException( + e, "Failed to insert properties for namespace: %s", namespace); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, "Interrupted in SQL command"); + } } private boolean updateProperties(Namespace namespace, Map properties) { String namespaceName = JdbcUtil.namespaceToString(namespace); - Stream caseArgs = - properties.entrySet().stream() - .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())); - Stream whereArgs = - Stream.concat(Stream.of(catalogName, namespaceName), properties.keySet().stream()); - String[] args = Stream.concat(caseArgs, whereArgs).toArray(String[]::new); + try { + int[] updatedRecords = + connections.run( + conn -> { + try (PreparedStatement preparedStatement = + conn.prepareStatement( + JdbcUtil.withTableName( + JdbcUtil.UPDATE_NAMESPACE_PROPERTY_SQL_TEMPLATE, + JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName))) { + for (Map.Entry entry : properties.entrySet()) { + preparedStatement.setString(1, entry.getValue()); + preparedStatement.setString(2, catalogName); + preparedStatement.setString(3, namespaceName); + preparedStatement.setString(4, entry.getKey()); + preparedStatement.addBatch(); + } + return preparedStatement.executeBatch(); + } + }); + + int successCount = 0; + for (int result : updatedRecords) { + if (result >= 0) { + successCount++; + } + } - int updatedRecords = execute(JdbcUtil.updatePropertiesStatement(properties.size()), args); + if (successCount == properties.size()) { + return true; + } - if (updatedRecords == properties.size()) { - return true; + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Failed to update: %d of %d succeeded", + successCount, + properties.size())); + } catch (SQLException e) { + throw new UncheckedSQLException( + e, "Failed to update properties for namespace: %s", namespace); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, "Interrupted in SQL command"); } - - throw new IllegalStateException( - String.format( - Locale.ROOT, - "Failed to update: %d of %d succeeded", - updatedRecords, - properties.size())); } private boolean deleteProperties(Namespace namespace, Set properties) { @@ -873,7 +1059,7 @@ private boolean deleteProperties(Namespace namespace, Set properties) { Stream.concat(Stream.of(catalogName, namespaceName), properties.stream()) .toArray(String[]::new); - return execute(JdbcUtil.deletePropertiesStatement(properties), args) > 0; + return execute(JdbcUtil.deletePropertiesStatement(properties, schemaName), args) > 0; } @Override diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index 619296ad3336..7d26041d069c 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -50,6 +50,7 @@ class JdbcTableOperations extends BaseMetastoreTableOperations { private final JdbcClientPool connections; private final Map catalogProperties; private final JdbcUtil.SchemaVersion schemaVersion; + private final String schemaName; protected JdbcTableOperations( JdbcClientPool dbConnPool, @@ -57,12 +58,14 @@ protected JdbcTableOperations( String catalogName, TableIdentifier tableIdentifier, Map catalogProperties, + String schemaName, JdbcUtil.SchemaVersion schemaVersion) { this.catalogName = catalogName; this.tableIdentifier = tableIdentifier; this.fileIO = fileIO; this.connections = dbConnPool; this.catalogProperties = catalogProperties; + this.schemaName = schemaName; this.schemaVersion = schemaVersion; } @@ -71,7 +74,8 @@ public void doRefresh() { Map table; try { - table = JdbcUtil.loadTable(schemaVersion, connections, catalogName, tableIdentifier); + table = + JdbcUtil.loadTable(schemaName, schemaVersion, connections, catalogName, tableIdentifier); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e, "Interrupted during refresh"); @@ -106,7 +110,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); try { Map table = - JdbcUtil.loadTable(schemaVersion, connections, catalogName, tableIdentifier); + JdbcUtil.loadTable(schemaName, schemaVersion, connections, catalogName, tableIdentifier); if (base != null) { validateMetadataLocation(table, base); @@ -153,6 +157,7 @@ private void updateTable(String newMetadataLocation, String oldMetadataLocation) throws SQLException, InterruptedException { int updatedRecords = JdbcUtil.updateTable( + schemaName, schemaVersion, connections, catalogName, @@ -171,23 +176,25 @@ private void updateTable(String newMetadataLocation, String oldMetadataLocation) private void createTable(String newMetadataLocation) throws SQLException, InterruptedException { Namespace namespace = tableIdentifier.namespace(); if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) - && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + && !JdbcUtil.namespaceExists(schemaName, catalogName, connections, namespace)) { throw new NoSuchNamespaceException( "Cannot create table %s in catalog %s. Namespace %s does not exist", tableIdentifier, catalogName, namespace); } if (schemaVersion == JdbcUtil.SchemaVersion.V1 - && JdbcUtil.viewExists(catalogName, connections, tableIdentifier)) { + && JdbcUtil.viewExists(schemaName, catalogName, connections, tableIdentifier)) { throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier); } - if (JdbcUtil.tableExists(schemaVersion, catalogName, connections, tableIdentifier)) { + if (JdbcUtil.tableExists( + schemaName, schemaVersion, catalogName, connections, tableIdentifier)) { throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); } int insertRecord = JdbcUtil.doCommitCreateTable( + schemaName, schemaVersion, connections, catalogName, diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index 544e9f39c7cb..99a251652117 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -31,17 +31,23 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; final class JdbcUtil { // property to control strict-mode (aka check if namespace exists when creating a table) static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode"; + // property to control the schema name used for the catalog tables + static final String SCHEMA_NAME_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "schema"; // property to control if view support is added to the existing database static final String SCHEMA_VERSION_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "schema-version"; // property to control if catalog tables are created during initialization static final String INIT_CATALOG_TABLES_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "init-catalog-tables"; + // property to control if catalog schema is updated if necessary during initialization + static final String UPDATE_CATALOG_SCHEMA_IF_NECESSARY_PROPERTY = + JdbcCatalog.PROPERTY_PREFIX + "update-catalog-schema-if-necessary"; static final String RETRYABLE_STATUS_CODES = "retryable_status_codes"; @@ -50,6 +56,8 @@ enum SchemaVersion { V1 } + public static final String TABLE_NAME_PLACEHOLDER = "${tableName}"; + // Catalog Table & View static final String CATALOG_TABLE_VIEW_NAME = "iceberg_tables"; static final String CATALOG_NAME = "catalog_name"; @@ -59,9 +67,9 @@ enum SchemaVersion { static final String TABLE_RECORD_TYPE = "TABLE"; static final String VIEW_RECORD_TYPE = "VIEW"; - private static final String V1_DO_COMMIT_TABLE_SQL = + private static final String V1_DO_COMMIT_TABLE_SQL_TEMPLATE = "UPDATE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " SET " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ? , " @@ -83,9 +91,9 @@ enum SchemaVersion { + " OR " + RECORD_TYPE + " IS NULL)"; - private static final String V1_DO_COMMIT_VIEW_SQL = + private static final String V1_DO_COMMIT_VIEW_SQL_TEMPLATE = "UPDATE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " SET " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ? , " @@ -105,9 +113,9 @@ enum SchemaVersion { + "'" + VIEW_RECORD_TYPE + "'"; - private static final String V0_DO_COMMIT_SQL = + private static final String V0_DO_COMMIT_SQL_TEMPLATE = "UPDATE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " SET " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ? , " @@ -122,9 +130,9 @@ enum SchemaVersion { + " = ? AND " + JdbcTableOperations.METADATA_LOCATION_PROP + " = ?"; - static final String V0_CREATE_CATALOG_SQL = + static final String V0_CREATE_CATALOG_SQL_TEMPLATE = "CREATE TABLE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + "(" + CATALOG_NAME + " VARCHAR(255) NOT NULL," @@ -144,12 +152,14 @@ enum SchemaVersion { + TABLE_NAME + ")" + ")"; - static final String V1_UPDATE_CATALOG_SQL = - "ALTER TABLE " + CATALOG_TABLE_VIEW_NAME + " ADD COLUMN " + RECORD_TYPE + " VARCHAR(5)"; + static final String V1_UPDATE_CATALOG_ORACLE_SQL_TEMPLATE = + "ALTER TABLE " + TABLE_NAME_PLACEHOLDER + " ADD (" + RECORD_TYPE + " VARCHAR2(5))"; + static final String V1_UPDATE_CATALOG_SQL_TEMPLATE = + "ALTER TABLE " + TABLE_NAME_PLACEHOLDER + " ADD COLUMN " + RECORD_TYPE + " VARCHAR(5)"; - private static final String GET_VIEW_SQL = + private static final String GET_VIEW_SQL_TEMPLATE = "SELECT * FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -162,9 +172,9 @@ enum SchemaVersion { + "'" + VIEW_RECORD_TYPE + "'"; - private static final String V1_GET_TABLE_SQL = + private static final String V1_GET_TABLE_SQL_TEMPLATE = "SELECT * FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -180,9 +190,9 @@ enum SchemaVersion { + " OR " + RECORD_TYPE + " IS NULL)"; - private static final String V0_GET_TABLE_SQL = + private static final String V0_GET_TABLE_SQL_TEMPLATE = "SELECT * FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -190,9 +200,9 @@ enum SchemaVersion { + " = ? AND " + TABLE_NAME + " = ?"; - static final String LIST_VIEW_SQL = + static final String LIST_VIEW_SQL_TEMPLATE = "SELECT * FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -203,9 +213,9 @@ enum SchemaVersion { + "'" + VIEW_RECORD_TYPE + "'"; - static final String V1_LIST_TABLE_SQL = + static final String V1_LIST_TABLE_SQL_TEMPLATE = "SELECT * FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -219,17 +229,17 @@ enum SchemaVersion { + " OR " + RECORD_TYPE + " IS NULL)"; - static final String V0_LIST_TABLE_SQL = + static final String V0_LIST_TABLE_SQL_TEMPLATE = "SELECT * FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ?"; - static final String RENAME_VIEW_SQL = + static final String RENAME_VIEW_SQL_TEMPLATE = "UPDATE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " SET " + TABLE_NAMESPACE + " = ?, " @@ -247,9 +257,9 @@ enum SchemaVersion { + "'" + VIEW_RECORD_TYPE + "'"; - static final String V1_RENAME_TABLE_SQL = + static final String V1_RENAME_TABLE_SQL_TEMPLATE = "UPDATE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " SET " + TABLE_NAMESPACE + " = ?, " @@ -270,9 +280,9 @@ enum SchemaVersion { + " OR " + RECORD_TYPE + " IS NULL)"; - static final String V0_RENAME_TABLE_SQL = + static final String V0_RENAME_TABLE_SQL_TEMPLATE = "UPDATE " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " SET " + TABLE_NAMESPACE + " = ?, " @@ -285,9 +295,9 @@ enum SchemaVersion { + " = ? AND " + TABLE_NAME + " = ?"; - static final String DROP_VIEW_SQL = + static final String DROP_VIEW_SQL_TEMPLATE = "DELETE FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -300,9 +310,9 @@ enum SchemaVersion { + "'" + VIEW_RECORD_TYPE + "'"; - static final String V1_DROP_TABLE_SQL = + static final String V1_DROP_TABLE_SQL_TEMPLATE = "DELETE FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -318,9 +328,9 @@ enum SchemaVersion { + " OR " + RECORD_TYPE + " IS NULL)"; - static final String V0_DROP_TABLE_SQL = + static final String V0_DROP_TABLE_SQL_TEMPLATE = "DELETE FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -328,11 +338,11 @@ enum SchemaVersion { + " = ? AND " + TABLE_NAME + " = ?"; - private static final String GET_NAMESPACE_SQL = + private static final String GET_NAMESPACE_SQL_TEMPLATE = "SELECT " + TABLE_NAMESPACE + " FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -340,29 +350,28 @@ enum SchemaVersion { + TABLE_NAMESPACE + " = ? OR " + TABLE_NAMESPACE - + " LIKE ? ESCAPE '!')" - + " LIMIT 1"; - static final String LIST_NAMESPACES_SQL = + + " LIKE ? ESCAPE '!')"; + static final String LIST_NAMESPACES_SQL_TEMPLATE = "SELECT DISTINCT " + TABLE_NAMESPACE + " FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ?"; - static final String LIST_ALL_NAMESPACES_SQL = + static final String LIST_ALL_NAMESPACES_SQL_TEMPLATE = "SELECT DISTINCT " + TABLE_NAMESPACE + " FROM " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ?"; - private static final String V1_DO_COMMIT_CREATE_SQL = + private static final String V1_DO_COMMIT_CREATE_SQL_TEMPLATE = "INSERT INTO " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " (" + CATALOG_NAME + ", " @@ -377,9 +386,9 @@ enum SchemaVersion { + RECORD_TYPE + ") " + " VALUES (?,?,?,?,null,?)"; - private static final String V0_DO_COMMIT_CREATE_SQL = + private static final String V0_DO_COMMIT_CREATE_SQL_TEMPLATE = "INSERT INTO " - + CATALOG_TABLE_VIEW_NAME + + TABLE_NAME_PLACEHOLDER + " (" + CATALOG_NAME + ", " @@ -399,16 +408,16 @@ enum SchemaVersion { static final String NAMESPACE_PROPERTY_KEY = "property_key"; static final String NAMESPACE_PROPERTY_VALUE = "property_value"; - static final String CREATE_NAMESPACE_PROPERTIES_TABLE_SQL = + static final String CREATE_NAMESPACE_PROPERTIES_TABLE_SQL_TEMPLATE = "CREATE TABLE " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + "(" + CATALOG_NAME + " VARCHAR(255) NOT NULL," + NAMESPACE_NAME + " VARCHAR(255) NOT NULL," + NAMESPACE_PROPERTY_KEY - + " VARCHAR(255)," + + " VARCHAR(255) NOT NULL," + NAMESPACE_PROPERTY_VALUE + " VARCHAR(1000)," + "PRIMARY KEY (" @@ -419,11 +428,11 @@ enum SchemaVersion { + NAMESPACE_PROPERTY_KEY + ")" + ")"; - static final String GET_NAMESPACE_PROPERTIES_SQL = + static final String GET_NAMESPACE_PROPERTIES_SQL_TEMPLATE = "SELECT " + NAMESPACE_NAME + " FROM " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -433,9 +442,9 @@ enum SchemaVersion { + NAMESPACE_NAME + " LIKE ? ESCAPE '!' " + " ) "; - static final String INSERT_NAMESPACE_PROPERTIES_SQL = + static final String INSERT_NAMESPACE_PROPERTIES_SQL_TEMPLATE = "INSERT INTO " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " (" + CATALOG_NAME + ", " @@ -444,20 +453,32 @@ enum SchemaVersion { + NAMESPACE_PROPERTY_KEY + ", " + NAMESPACE_PROPERTY_VALUE - + ") VALUES "; - static final String INSERT_PROPERTIES_VALUES_BASE = "(?,?,?,?)"; - static final String GET_ALL_NAMESPACE_PROPERTIES_SQL = + + ") VALUES (?,?,?,?)"; + + static final String UPDATE_NAMESPACE_PROPERTY_SQL_TEMPLATE = + "UPDATE " + + TABLE_NAME_PLACEHOLDER + + " SET " + + NAMESPACE_PROPERTY_VALUE + + " = ? WHERE " + + CATALOG_NAME + + " = ? AND " + + NAMESPACE_NAME + + " = ? AND " + + NAMESPACE_PROPERTY_KEY + + " = ?"; + static final String GET_ALL_NAMESPACE_PROPERTIES_SQL_TEMPLATE = "SELECT * " + " FROM " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " + NAMESPACE_NAME + " = ? "; - static final String DELETE_NAMESPACE_PROPERTIES_SQL = + static final String DELETE_NAMESPACE_PROPERTIES_SQL_TEMPLATE = "DELETE FROM " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " @@ -465,29 +486,29 @@ enum SchemaVersion { + " = ? AND " + NAMESPACE_PROPERTY_KEY + " IN "; - static final String DELETE_ALL_NAMESPACE_PROPERTIES_SQL = + static final String DELETE_ALL_NAMESPACE_PROPERTIES_SQL_TEMPLATE = "DELETE FROM " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " + NAMESPACE_NAME + " = ?"; - static final String LIST_PROPERTY_NAMESPACES_SQL = + static final String LIST_PROPERTY_NAMESPACES_SQL_TEMPLATE = "SELECT DISTINCT " + NAMESPACE_NAME + " FROM " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ? AND " + NAMESPACE_NAME + " LIKE ?"; - static final String LIST_ALL_PROPERTY_NAMESPACES_SQL = + static final String LIST_ALL_PROPERTY_NAMESPACES_SQL_TEMPLATE = "SELECT DISTINCT " + NAMESPACE_NAME + " FROM " - + NAMESPACE_PROPERTIES_TABLE_NAME + + TABLE_NAME_PLACEHOLDER + " WHERE " + CATALOG_NAME + " = ?"; @@ -529,6 +550,7 @@ static Properties filterAndRemovePrefix(Map properties, String p private static int update( boolean isTable, + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, @@ -541,8 +563,13 @@ private static int update( try (PreparedStatement sql = conn.prepareStatement( (schemaVersion == SchemaVersion.V1) - ? (isTable ? V1_DO_COMMIT_TABLE_SQL : V1_DO_COMMIT_VIEW_SQL) - : V0_DO_COMMIT_SQL)) { + ? (isTable + ? withTableName( + V1_DO_COMMIT_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName) + : withTableName( + V1_DO_COMMIT_VIEW_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName)) + : withTableName( + V0_DO_COMMIT_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName))) { // UPDATE sql.setString(1, newMetadataLocation); sql.setString(2, oldMetadataLocation); @@ -558,6 +585,7 @@ private static int update( } static int updateTable( + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, @@ -567,6 +595,7 @@ static int updateTable( throws SQLException, InterruptedException { return update( true, + schemaName, schemaVersion, connections, catalogName, @@ -576,6 +605,7 @@ static int updateTable( } static int updateView( + String schemaName, JdbcClientPool connections, String catalogName, TableIdentifier viewIdentifier, @@ -584,6 +614,7 @@ static int updateView( throws SQLException, InterruptedException { return update( false, + schemaName, SchemaVersion.V1, connections, catalogName, @@ -594,6 +625,7 @@ static int updateView( private static Map tableOrView( boolean isTable, + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, @@ -606,8 +638,13 @@ private static Map tableOrView( try (PreparedStatement sql = conn.prepareStatement( isTable - ? ((schemaVersion == SchemaVersion.V1) ? V1_GET_TABLE_SQL : V0_GET_TABLE_SQL) - : GET_VIEW_SQL)) { + ? ((schemaVersion == SchemaVersion.V1) + ? withTableName( + V1_GET_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName) + : withTableName( + V0_GET_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName)) + : withTableName( + GET_VIEW_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName))) { sql.setString(1, catalogName); sql.setString(2, namespaceToString(identifier.namespace())); sql.setString(3, identifier.name()); @@ -633,25 +670,28 @@ private static Map tableOrView( } static Map loadTable( + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, TableIdentifier identifier) throws SQLException, InterruptedException { - return tableOrView(true, schemaVersion, connections, catalogName, identifier); + return tableOrView(true, schemaName, schemaVersion, connections, catalogName, identifier); } static Map loadView( + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, TableIdentifier identifier) throws SQLException, InterruptedException { - return tableOrView(false, schemaVersion, connections, catalogName, identifier); + return tableOrView(false, schemaName, schemaVersion, connections, catalogName, identifier); } private static int doCommitCreate( boolean isTable, + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, @@ -664,8 +704,10 @@ private static int doCommitCreate( try (PreparedStatement sql = conn.prepareStatement( (schemaVersion == SchemaVersion.V1) - ? V1_DO_COMMIT_CREATE_SQL - : V0_DO_COMMIT_CREATE_SQL)) { + ? withTableName( + V1_DO_COMMIT_CREATE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName) + : withTableName( + V0_DO_COMMIT_CREATE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName))) { sql.setString(1, catalogName); sql.setString(2, namespaceToString(namespace)); sql.setString(3, identifier.name()); @@ -680,6 +722,7 @@ private static int doCommitCreate( } static int doCommitCreateTable( + String schemaName, SchemaVersion schemaVersion, JdbcClientPool connections, String catalogName, @@ -689,6 +732,7 @@ static int doCommitCreateTable( throws SQLException, InterruptedException { return doCommitCreate( true, + schemaName, schemaVersion, connections, catalogName, @@ -698,6 +742,7 @@ static int doCommitCreateTable( } static int doCommitCreateView( + String schemaName, JdbcClientPool connections, String catalogName, Namespace namespace, @@ -706,6 +751,7 @@ static int doCommitCreateView( throws SQLException, InterruptedException { return doCommitCreate( false, + schemaName, SchemaVersion.V1, connections, catalogName, @@ -715,70 +761,41 @@ static int doCommitCreateView( } static boolean viewExists( - String catalogName, JdbcClientPool connections, TableIdentifier viewIdentifier) { + String schemaName, + String catalogName, + JdbcClientPool connections, + TableIdentifier viewIdentifier) { return exists( connections, - GET_VIEW_SQL, + withTableName(GET_VIEW_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName), catalogName, namespaceToString(viewIdentifier.namespace()), viewIdentifier.name()); } static boolean tableExists( + String schemaName, SchemaVersion schemaVersion, String catalogName, JdbcClientPool connections, TableIdentifier tableIdentifier) { return exists( connections, - (schemaVersion == SchemaVersion.V1) ? V1_GET_TABLE_SQL : V0_GET_TABLE_SQL, + (schemaVersion == SchemaVersion.V1) + ? withTableName(V1_GET_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName) + : withTableName(V0_GET_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName), catalogName, namespaceToString(tableIdentifier.namespace()), tableIdentifier.name()); } - static String updatePropertiesStatement(int size) { + static String deletePropertiesStatement(Set properties, String schemaName) { StringBuilder sqlStatement = new StringBuilder( - "UPDATE " - + NAMESPACE_PROPERTIES_TABLE_NAME - + " SET " - + NAMESPACE_PROPERTY_VALUE - + " = CASE"); - for (int i = 0; i < size; i += 1) { - sqlStatement.append(" WHEN " + NAMESPACE_PROPERTY_KEY + " = ? THEN ?"); - } - - sqlStatement.append( - " END WHERE " - + CATALOG_NAME - + " = ? AND " - + NAMESPACE_NAME - + " = ? AND " - + NAMESPACE_PROPERTY_KEY - + " IN "); - - String values = String.join(",", Collections.nCopies(size, String.valueOf('?'))); - sqlStatement.append("(").append(values).append(")"); - - return sqlStatement.toString(); - } - - static String insertPropertiesStatement(int size) { - StringBuilder sqlStatement = new StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL); - - for (int i = 0; i < size; i++) { - if (i != 0) { - sqlStatement.append(", "); - } - sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE); - } - - return sqlStatement.toString(); - } - - static String deletePropertiesStatement(Set properties) { - StringBuilder sqlStatement = new StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL); + withTableName( + JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL_TEMPLATE, + NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName)); String values = String.join(",", Collections.nCopies(properties.size(), String.valueOf('?'))); sqlStatement.append("(").append(values).append(")"); @@ -786,20 +803,28 @@ static String deletePropertiesStatement(Set properties) { } static boolean namespaceExists( - String catalogName, JdbcClientPool connections, Namespace namespace) { + String schemaName, String catalogName, JdbcClientPool connections, Namespace namespace) { String namespaceEquals = JdbcUtil.namespaceToString(namespace); // when namespace has sub-namespace then additionally checking it with LIKE statement. // catalog.db can exists as: catalog.db.ns1 or catalog.db.ns1.ns2 String namespaceStartsWith = namespaceEquals.replace("!", "!!").replace("_", "!_").replace("%", "!%") + ".%"; - if (exists(connections, GET_NAMESPACE_SQL, catalogName, namespaceEquals, namespaceStartsWith)) { + if (exists( + connections, + withTableName(GET_NAMESPACE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, schemaName), + catalogName, + namespaceEquals, + namespaceStartsWith)) { return true; } return exists( connections, - JdbcUtil.GET_NAMESPACE_PROPERTIES_SQL, + withTableName( + JdbcUtil.GET_NAMESPACE_PROPERTIES_SQL_TEMPLATE, + NAMESPACE_PROPERTIES_TABLE_NAME, + schemaName), catalogName, namespaceEquals, namespaceStartsWith); @@ -811,6 +836,9 @@ private static boolean exists(JdbcClientPool connections, String sql, String... return connections.run( conn -> { try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + // Limit result set to 1 row for performance (works with all databases) + preparedStatement.setMaxRows(1); + for (int pos = 0; pos < args.length; pos += 1) { preparedStatement.setString(pos + 1, args[pos]); } @@ -831,4 +859,17 @@ private static boolean exists(JdbcClientPool connections, String sql, String... throw new UncheckedInterruptedException(e, "Interrupted in SQL query"); } } + + public static String tableName(String tableName, String schemaName) { + if (Strings.isNullOrEmpty(schemaName)) { + return tableName; + } else { + return schemaName + "." + tableName; + } + } + + public static String withTableName( + String sqlTemplate, String catalogTableViewName, String schemaName) { + return sqlTemplate.replace(TABLE_NAME_PLACEHOLDER, tableName(catalogTableViewName, schemaName)); + } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java index 10f46941d694..210f9e64205f 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcViewOperations.java @@ -50,18 +50,30 @@ public class JdbcViewOperations extends BaseViewOperations { private final FileIO fileIO; private final JdbcClientPool connections; private final Map catalogProperties; + private final String schemaName; protected JdbcViewOperations( JdbcClientPool dbConnPool, FileIO fileIO, String catalogName, TableIdentifier viewIdentifier, - Map catalogProperties) { + Map catalogProperties, + String schemaName) { this.catalogName = catalogName; this.viewIdentifier = viewIdentifier; this.fileIO = fileIO; this.connections = dbConnPool; this.catalogProperties = catalogProperties; + this.schemaName = schemaName; + } + + protected JdbcViewOperations( + JdbcClientPool dbConnPool, + FileIO fileIO, + String catalogName, + TableIdentifier viewIdentifier, + Map catalogProperties) { + this(dbConnPool, fileIO, catalogName, viewIdentifier, catalogProperties, null); } @Override @@ -69,7 +81,9 @@ protected void doRefresh() { Map view; try { - view = JdbcUtil.loadView(JdbcUtil.SchemaVersion.V1, connections, catalogName, viewIdentifier); + view = + JdbcUtil.loadView( + schemaName, JdbcUtil.SchemaVersion.V1, connections, catalogName, viewIdentifier); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e, "Interrupted during refresh"); @@ -99,7 +113,8 @@ protected void doCommit(ViewMetadata base, ViewMetadata metadata) { String newMetadataLocation = writeNewMetadataIfRequired(metadata); try { Map view = - JdbcUtil.loadView(JdbcUtil.SchemaVersion.V1, connections, catalogName, viewIdentifier); + JdbcUtil.loadView( + schemaName, JdbcUtil.SchemaVersion.V1, connections, catalogName, viewIdentifier); if (base != null) { validateMetadataLocation(view, base); String oldMetadataLocation = base.metadataFileLocation(); @@ -165,7 +180,12 @@ private void updateView(String newMetadataLocation, String oldMetadataLocation) throws SQLException, InterruptedException { int updatedRecords = JdbcUtil.updateView( - connections, catalogName, viewIdentifier, newMetadataLocation, oldMetadataLocation); + schemaName, + connections, + catalogName, + viewIdentifier, + newMetadataLocation, + oldMetadataLocation); if (updatedRecords == 1) { LOG.debug("Successfully committed to existing view: {}", viewIdentifier); @@ -178,23 +198,24 @@ private void updateView(String newMetadataLocation, String oldMetadataLocation) private void createView(String newMetadataLocation) throws SQLException, InterruptedException { Namespace namespace = viewIdentifier.namespace(); if (PropertyUtil.propertyAsBoolean(catalogProperties, JdbcUtil.STRICT_MODE_PROPERTY, false) - && !JdbcUtil.namespaceExists(catalogName, connections, namespace)) { + && !JdbcUtil.namespaceExists(schemaName, catalogName, connections, namespace)) { throw new NoSuchNamespaceException( "Cannot create view %s in catalog %s. Namespace %s does not exist", viewIdentifier, catalogName, namespace); } - if (JdbcUtil.tableExists(JdbcUtil.SchemaVersion.V1, catalogName, connections, viewIdentifier)) { + if (JdbcUtil.tableExists( + schemaName, JdbcUtil.SchemaVersion.V1, catalogName, connections, viewIdentifier)) { throw new AlreadyExistsException("Table with same name already exists: %s", viewIdentifier); } - if (JdbcUtil.viewExists(catalogName, connections, viewIdentifier)) { + if (JdbcUtil.viewExists(schemaName, catalogName, connections, viewIdentifier)) { throw new AlreadyExistsException("View already exists: %s", viewIdentifier); } int insertRecord = JdbcUtil.doCommitCreateView( - connections, catalogName, namespace, viewIdentifier, newMetadataLocation); + schemaName, connections, catalogName, namespace, viewIdentifier, newMetadataLocation); if (insertRecord == 1) { LOG.debug("Successfully committed to new view: {}", viewIdentifier); diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java index 943b277f5c9c..26f166c1b39c 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java @@ -20,6 +20,8 @@ import static org.apache.iceberg.NullOrder.NULLS_FIRST; import static org.apache.iceberg.SortDirection.ASC; +import static org.apache.iceberg.jdbc.JdbcUtil.CATALOG_TABLE_VIEW_NAME; +import static org.apache.iceberg.jdbc.JdbcUtil.withTableName; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -197,7 +199,11 @@ public void testDisableInitCatalogTablesOverridesDefault() throws Exception { assertThatThrownBy(() -> jdbcCatalog.listNamespaces()) .isInstanceOf(UncheckedSQLException.class) - .hasMessage(String.format("Failed to execute query: %s", JdbcUtil.LIST_ALL_NAMESPACES_SQL)); + .hasMessage( + String.format( + "Failed to execute query: %s", + withTableName( + JdbcUtil.LIST_ALL_NAMESPACES_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, null))); } @Test @@ -1081,7 +1087,7 @@ public void testCommitExceptionWithoutMessage() { try (MockedStatic mockedStatic = Mockito.mockStatic(JdbcUtil.class)) { mockedStatic - .when(() -> JdbcUtil.loadTable(any(), any(), any(), any())) + .when(() -> JdbcUtil.loadTable(any(), any(), any(), any(), any())) .thenThrow(new SQLException()); assertThatThrownBy(() -> ops.commit(ops.current(), metadataV1)) .isInstanceOf(UncheckedSQLException.class) @@ -1101,7 +1107,7 @@ public void testCommitExceptionWithMessage() { try (MockedStatic mockedStatic = Mockito.mockStatic(JdbcUtil.class)) { mockedStatic - .when(() -> JdbcUtil.loadTable(any(), any(), any(), any())) + .when(() -> JdbcUtil.loadTable(any(), any(), any(), any(), any())) .thenThrow(new SQLException("constraint failed")); assertThatThrownBy(() -> ops.commit(ops.current(), metadataV1)) .isInstanceOf(AlreadyExistsException.class) @@ -1157,7 +1163,12 @@ private void initLegacySchema(String jdbcUrl) throws SQLException { try (Connection connection = dataSource.getConnection()) { // create "old style" SQL schema - connection.prepareStatement(JdbcUtil.V0_CREATE_CATALOG_SQL).executeUpdate(); + + connection + .prepareStatement( + JdbcUtil.withTableName( + JdbcUtil.V0_CREATE_CATALOG_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, null)) + .executeUpdate(); connection .prepareStatement( "INSERT INTO " diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalogWithDatabaseContainers.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalogWithDatabaseContainers.java new file mode 100644 index 000000000000..88dcb6673d40 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalogWithDatabaseContainers.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.jdbc; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.testcontainers.containers.Db2Container; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.OracleContainer; +import org.testcontainers.containers.PostgreSQLContainer; + +public class TestJdbcCatalogWithDatabaseContainers { + + public static final String DEFAULT = "DEFAULT"; + public static final String ICEBERG_SCHEMA = "iceberg_schema"; + static Configuration conf = new Configuration(); + static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get())); + @TempDir java.nio.file.Path tableDir; + + @Test + public void testInitializeAndOperationsWithSqlite() { + // public void testInitialize() { + Map properties = Maps.newHashMap(); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.tableDir.toAbsolutePath().toString()); + // properties.put(CatalogProperties.URI, "jdbc:sqlite:build/tmp/test-catalog.db"); + properties.put(CatalogProperties.URI, "jdbc:sqlite:file::memory:?icebergDB"); + properties.put(JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V1.name()); + JdbcCatalog jdbcCatalog = new JdbcCatalog(); + jdbcCatalog.setConf(conf); + jdbcCatalog.initialize("test_jdbc_catalog", properties); + // second initialization should not fail even if tables are already created + jdbcCatalog.initialize("test_jdbc_catalog", properties); + jdbcCatalog.initialize("test_jdbc_catalog", properties); + testCatalogOperations(jdbcCatalog); + } + + @ParameterizedTest(name = "schema {0}") + @ValueSource(strings = {DEFAULT, ICEBERG_SCHEMA}) + public void testInitializeAndOperationsWithPostgres(String schemaName) { + var jdbcCatalog = + testInitializeWithDatabaseContainer( + new PostgreSQLContainer(), schemaName.equals(DEFAULT) ? null : schemaName); + testCatalogOperations(jdbcCatalog); + } + + @ParameterizedTest(name = "schema {0}") + @ValueSource(strings = {DEFAULT, ICEBERG_SCHEMA}) + public void testInitializeAndOperationsWithDb2(String schemaName) { + var jdbcCatalog = + testInitializeWithDatabaseContainer( + new Db2Container(), schemaName.equals(DEFAULT) ? null : schemaName); + testCatalogOperations(jdbcCatalog); + } + + @ParameterizedTest(name = "schema {0}") + @ValueSource(strings = {DEFAULT, ICEBERG_SCHEMA}) + public void testInitializeAndOperationsWithOracle(String schemaName) { + var jdbcCatalog = + testInitializeWithDatabaseContainer( + new OracleContainer().withUsername(schemaName.equals(DEFAULT) ? "test" : schemaName), + schemaName.equals(DEFAULT) ? null : schemaName); + testCatalogOperations(jdbcCatalog); + } + + private JdbcCatalog testInitializeWithDatabaseContainer( + JdbcDatabaseContainer dbContainer, String schemaName) { + dbContainer.start(); + try { + if (dbContainer instanceof PostgreSQLContainer) { + Class.forName("org.postgresql.Driver"); + } else if (dbContainer instanceof OracleContainer) { + Class.forName("oracle.jdbc.OracleDriver"); + } else if (dbContainer instanceof Db2Container) { + Class.forName("com.ibm.db2.jcc.DB2Driver"); + } + } catch (ClassNotFoundException e) { + throw new RuntimeException("JDBC driver not found", e); + } + createSchemaIfNecessary(dbContainer, schemaName); + Map properties = Maps.newHashMap(); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, this.tableDir.toAbsolutePath().toString()); + properties.put(JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V1.name()); + if (schemaName != null) { + properties.put(JdbcUtil.SCHEMA_NAME_PROPERTY, schemaName); + } + properties.put(CatalogProperties.URI, dbContainer.getJdbcUrl()); + properties.put(JdbcCatalog.PROPERTY_PREFIX + CatalogProperties.USER, dbContainer.getUsername()); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", dbContainer.getPassword()); + JdbcCatalog jdbcCatalog = new JdbcCatalog(); + jdbcCatalog.setConf(conf); + jdbcCatalog.initialize( + "test_" + dbContainer.getClass().getSimpleName().toLowerCase() + "_jdbc_catalog", + properties); + + assertTableCreation(dbContainer, schemaName); + return jdbcCatalog; + } + + private static void assertTableCreation(JdbcDatabaseContainer dbContainer, String schemaName) { + // assert that the tables were created in the correct schema + try (Connection conn = dbContainer.createConnection("")) { + DatabaseMetaData metaData = conn.getMetaData(); + String schema = null; + if (dbContainer instanceof PostgreSQLContainer) { + schema = schemaName == null ? "public" : schemaName; + } else if (dbContainer instanceof OracleContainer) { + schema = + schemaName == null ? dbContainer.getUsername().toUpperCase() : schemaName.toUpperCase(); + } else if (dbContainer instanceof Db2Container) { + schema = + schemaName == null ? dbContainer.getUsername().toUpperCase() : schemaName.toUpperCase(); + } + try (ResultSet tables = metaData.getTables(null, schema, null, new String[] {"TABLE"})) { + Set tableNames = Sets.newHashSet(); + while (tables.next()) { + tableNames.add(tables.getString("TABLE_NAME").toLowerCase()); + } + Set.of("iceberg_namespace_properties", "iceberg_tables") + .forEach( + expectedTable -> { + assertThat(tableNames) + .withFailMessage( + () -> + "expected table " + + expectedTable + + " not found in schema " + + schemaName) + .contains(expectedTable); + }); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to verify catalog tables in database", e); + } + } + + private static void createSchemaIfNecessary( + JdbcDatabaseContainer dbContainer, String schemaName) { + // create the schema if specified + if (schemaName != null) { + try (Connection conn = dbContainer.createConnection("")) { + try (var stmt = conn.createStatement()) { + if (dbContainer instanceof PostgreSQLContainer) { + stmt.execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); + } else if (dbContainer instanceof OracleContainer) { + // we are using the user as schema in Oracle + } else if (dbContainer instanceof Db2Container) { + stmt.execute("CREATE SCHEMA " + schemaName); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed to create schema " + schemaName, e); + } + } + } + + /** + * Helper method to test basic catalog operations. This method tests the main functionality of + * JdbcCatalog including namespace, table, and view operations. It is called by the + * database-specific initialization tests to ensure that the catalog works correctly across + * different database backends. + * + * @param jdbcCatalog the initialized JdbcCatalog instance to test + */ + private void testCatalogOperations(JdbcCatalog jdbcCatalog) { + // Test catalog name (Catalog interface) + assertThat(jdbcCatalog.name()).isNotNull(); + + // Test namespace creation with metadata (SupportsNamespaces interface) + Namespace testNamespace = Namespace.of("test_db", "test_ns"); + Map namespaceMetadata = Maps.newHashMap(); + namespaceMetadata.put("owner", "test_user"); + namespaceMetadata.put("description", "Test namespace"); + jdbcCatalog.createNamespace(testNamespace, namespaceMetadata); + assertThat(jdbcCatalog.namespaceExists(testNamespace)).isTrue(); + + // Test loading namespace metadata (SupportsNamespaces interface) + Map loadedMetadata = jdbcCatalog.loadNamespaceMetadata(testNamespace); + assertThat(loadedMetadata).containsEntry("owner", "test_user"); + assertThat(loadedMetadata).containsEntry("description", "Test namespace"); + + // Test listing top-level namespaces (SupportsNamespaces interface) + assertThat(jdbcCatalog.listNamespaces()).contains(Namespace.of("test_db")); + + // Test listing child namespaces (SupportsNamespaces interface) + List childNamespaces = jdbcCatalog.listNamespaces(Namespace.of("test_db")); + assertThat(childNamespaces).contains(testNamespace); + + // Test setting namespace properties (SupportsNamespaces interface) + Map newProperties = Maps.newHashMap(); + newProperties.put("location", "/test/location"); + newProperties.put("owner", "updated_user"); + boolean propsSet = jdbcCatalog.setProperties(testNamespace, newProperties); + assertThat(propsSet).isTrue(); + + // Verify properties were updated + Map updatedMetadata = jdbcCatalog.loadNamespaceMetadata(testNamespace); + assertThat(updatedMetadata).containsEntry("location", "/test/location"); + assertThat(updatedMetadata).containsEntry("owner", "updated_user"); + assertThat(updatedMetadata).containsEntry("description", "Test namespace"); + + // Test removing namespace properties (SupportsNamespaces interface) + boolean propsRemoved = + jdbcCatalog.removeProperties(testNamespace, Sets.newHashSet("description")); + assertThat(propsRemoved).isTrue(); + + // Verify property was removed + Map metadataAfterRemoval = jdbcCatalog.loadNamespaceMetadata(testNamespace); + assertThat(metadataAfterRemoval).doesNotContainKey("description"); + assertThat(metadataAfterRemoval).containsEntry("owner", "updated_user"); + + // Test table operations (Catalog interface) + TableIdentifier tableId = TableIdentifier.of(testNamespace, "test_table"); + Table table = jdbcCatalog.createTable(tableId, SCHEMA, PartitionSpec.unpartitioned()); + assertThat(table).isNotNull(); + assertThat(jdbcCatalog.tableExists(tableId)).isTrue(); + assertThat(jdbcCatalog.listTables(testNamespace)).contains(tableId); + + // Test table loading (Catalog interface) + Table loadedTable = jdbcCatalog.loadTable(tableId); + assertThat(loadedTable.schema().asStruct()).isEqualTo(SCHEMA.asStruct()); + + // Test table renaming (Catalog interface) + TableIdentifier renamedTableId = TableIdentifier.of(testNamespace, "renamed_table"); + jdbcCatalog.renameTable(tableId, renamedTableId); + assertThat(jdbcCatalog.tableExists(renamedTableId)).isTrue(); + assertThat(jdbcCatalog.tableExists(tableId)).isFalse(); + + // Test view operations (ViewCatalog interface - only available in SchemaVersion.V1) + TableIdentifier viewId = TableIdentifier.of(testNamespace, "test_view"); + jdbcCatalog + .buildView(viewId) + .withQuery("spark", "SELECT * FROM renamed_table") + .withSchema(SCHEMA) + .withDefaultNamespace(testNamespace) + .create(); + + assertThat(jdbcCatalog.viewExists(viewId)).isTrue(); + assertThat(jdbcCatalog.listViews(testNamespace)).contains(viewId); + + // Test view renaming (ViewCatalog interface) + TableIdentifier renamedViewId = TableIdentifier.of(testNamespace, "renamed_view"); + jdbcCatalog.renameView(viewId, renamedViewId); + assertThat(jdbcCatalog.viewExists(renamedViewId)).isTrue(); + assertThat(jdbcCatalog.viewExists(viewId)).isFalse(); + + // Test dropping view (ViewCatalog interface) + assertThat(jdbcCatalog.dropView(renamedViewId)).isTrue(); + assertThat(jdbcCatalog.viewExists(renamedViewId)).isFalse(); + + // Test dropping table (Catalog interface) + assertThat(jdbcCatalog.dropTable(renamedTableId)).isTrue(); + assertThat(jdbcCatalog.tableExists(renamedTableId)).isFalse(); + + // Test dropping namespace (SupportsNamespaces interface) + assertThat(jdbcCatalog.dropNamespace(testNamespace)).isTrue(); + assertThat(jdbcCatalog.namespaceExists(testNamespace)).isFalse(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java index 4ac3a9301b4a..0425db8b6033 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.jdbc; +import static org.apache.iceberg.jdbc.JdbcUtil.CATALOG_TABLE_VIEW_NAME; +import static org.apache.iceberg.jdbc.JdbcUtil.withTableName; import static org.assertj.core.api.Assertions.assertThat; import java.nio.file.Files; @@ -62,10 +64,16 @@ public void testV0toV1SqlStatements() throws Exception { try (JdbcClientPool connections = new JdbcClientPool(jdbcUrl, Maps.newHashMap())) { // create "old style" SQL schema - connections.newClient().prepareStatement(JdbcUtil.V0_CREATE_CATALOG_SQL).executeUpdate(); + connections + .newClient() + .prepareStatement( + JdbcUtil.withTableName( + JdbcUtil.V0_CREATE_CATALOG_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, null)) + .executeUpdate(); // inserting tables JdbcUtil.doCommitCreateTable( + null, JdbcUtil.SchemaVersion.V0, connections, "TEST", @@ -73,6 +81,7 @@ public void testV0toV1SqlStatements() throws Exception { TableIdentifier.of(Namespace.of("namespace1"), "table1"), "testLocation"); JdbcUtil.doCommitCreateTable( + null, JdbcUtil.SchemaVersion.V0, connections, "TEST", @@ -81,7 +90,11 @@ public void testV0toV1SqlStatements() throws Exception { "testLocation"); try (PreparedStatement statement = - connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) { + connections + .newClient() + .prepareStatement( + withTableName( + JdbcUtil.V0_LIST_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, null))) { statement.setString(1, "TEST"); statement.setString(2, "namespace1"); ResultSet tables = statement.executeQuery(); @@ -92,10 +105,15 @@ public void testV0toV1SqlStatements() throws Exception { } // updating the schema from V0 to V1 - connections.newClient().prepareStatement(JdbcUtil.V1_UPDATE_CATALOG_SQL).execute(); + connections + .newClient() + .prepareStatement( + withTableName(JdbcUtil.V1_UPDATE_CATALOG_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, null)) + .execute(); // trying to add a table on the updated schema JdbcUtil.doCommitCreateTable( + null, JdbcUtil.SchemaVersion.V1, connections, "TEST", @@ -105,7 +123,11 @@ public void testV0toV1SqlStatements() throws Exception { // testing the tables after migration and new table added try (PreparedStatement statement = - connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) { + connections + .newClient() + .prepareStatement( + withTableName( + JdbcUtil.V0_LIST_TABLE_SQL_TEMPLATE, CATALOG_TABLE_VIEW_NAME, null))) { statement.setString(1, "TEST"); statement.setString(2, "namespace1"); ResultSet tables = statement.executeQuery(); @@ -123,6 +145,7 @@ public void testV0toV1SqlStatements() throws Exception { // update a table (commit) created on V1 schema int updated = JdbcUtil.updateTable( + null, JdbcUtil.SchemaVersion.V1, connections, "TEST", @@ -134,6 +157,7 @@ public void testV0toV1SqlStatements() throws Exception { // update a table (commit) migrated from V0 schema updated = JdbcUtil.updateTable( + null, JdbcUtil.SchemaVersion.V1, connections, "TEST", diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java index 38c908a0c03c..f90eb9a3176b 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcViewCatalog.java @@ -110,7 +110,7 @@ public void testCommitExceptionWithoutMessage() { try (MockedStatic mockedStatic = Mockito.mockStatic(JdbcUtil.class)) { mockedStatic - .when(() -> JdbcUtil.loadView(any(), any(), any(), any())) + .when(() -> JdbcUtil.loadView(any(), any(), any(), any(), any())) .thenThrow(new SQLException()); assertThatThrownBy(() -> ops.commit(ops.current(), metadataV1)) .isInstanceOf(UncheckedSQLException.class) @@ -137,7 +137,7 @@ public void testCommitExceptionWithMessage() { try (MockedStatic mockedStatic = Mockito.mockStatic(JdbcUtil.class)) { mockedStatic - .when(() -> JdbcUtil.loadView(any(), any(), any(), any())) + .when(() -> JdbcUtil.loadView(any(), any(), any(), any(), any())) .thenThrow(new SQLException("constraint failed")); assertThatThrownBy(() -> ops.commit(ops.current(), metadataV1)) .isInstanceOf(AlreadyExistsException.class) diff --git a/core/src/test/resources/container-license-acceptance.txt b/core/src/test/resources/container-license-acceptance.txt new file mode 100644 index 000000000000..f11647f10eac --- /dev/null +++ b/core/src/test/resources/container-license-acceptance.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ibmcom/db2:11.5.0.0a \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index eeabe54f5f05..f648c503ea91 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -86,6 +86,9 @@ spark34 = "3.4.4" spark35 = "3.5.6" spark40 = "4.0.0" sqlite-jdbc = "3.50.3.0" +db2-jdbc = "12.1.2.0" +postgres-jdbc = "42.2.27" +oracle-jdbc = "21.7.0.0" testcontainers = "1.21.3" tez08 = { strictly = "0.8.4"} # see rich version usage explanation above @@ -216,7 +219,14 @@ nessie-versioned-storage-inmemory-tests = { module = "org.projectnessie.nessie:n nessie-versioned-storage-testextension = { module = "org.projectnessie.nessie:nessie-versioned-storage-testextension", version.ref = "nessie" } orc-tools = { module = "org.apache.orc:orc-tools", version.ref = "orc" } sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" } +db2-jdbc = { module = "com.ibm.db2:jcc", version.ref = "db2-jdbc" } +postgres-jdbc = { module = "org.postgresql:postgresql", version.ref = "postgres-jdbc" } +oracle-jdbc = { module = "com.oracle.database.jdbc:ojdbc11", version.ref = "oracle-jdbc" } + testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" } +testcontainers-db2 = { module = "org.testcontainers:db2", version.ref = "testcontainers" } +testcontainers-oracle = { module = "org.testcontainers:oracle-xe", version.ref = "testcontainers" } +testcontainers-postgres = { module = "org.testcontainers:postgresql", version.ref = "testcontainers" } testcontainers-junit-jupiter = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers" } testcontainers-minio = { module = "org.testcontainers:minio", version.ref = "testcontainers" } tez08-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez08" }