diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 15166c822903..3d6de985a62f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -81,10 +81,17 @@ class V2SessionCatalog(catalog: SessionCatalog) new CaseInsensitiveStringMap(propertiesWithPath.asJava) } + private def hasCustomSessionCatalog: Boolean = { + catalog.conf.contains(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key) + } + override def loadTable(ident: Identifier): Table = { try { val table = catalog.getTableMetadata(ident.asTableIdentifier) - if (table.provider.isDefined) { + // The custom session catalog may extend `DelegatingCatalogExtension` and rely on the returned + // table here. To avoid breaking it we do not resolve the table provider and still return + // `V1Table` if the custom session catalog is present. + if (table.provider.isDefined && !hasCustomSessionCatalog) { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) // Check if the table is in the v1 table cache to skip the v2 table lookup. if (catalog.getCachedTable(qualifiedTableName) != null) {