diff --git a/pom.xml b/pom.xml index 1b33fc4757..e647cbdcc6 100644 --- a/pom.xml +++ b/pom.xml @@ -477,12 +477,43 @@ under the License. ${testcontainers.version} test + software.amazon.awssdk s3 ${amazon-awssdk-v2.version} test + + software.amazon.awssdk + sts + ${amazon-awssdk-v2.version} + test + + + software.amazon.awssdk + dynamodb + ${amazon-awssdk-v2.version} + test + + + software.amazon.awssdk + glue + ${amazon-awssdk-v2.version} + test + + + software.amazon.awssdk + kms + ${amazon-awssdk-v2.version} + test + org.codehaus.jackson diff --git a/spark/pom.xml b/spark/pom.xml index a9cd72f51b..1b207288c9 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -169,10 +169,27 @@ under the License. org.testcontainers minio + software.amazon.awssdk s3 + + software.amazon.awssdk + sts + + + software.amazon.awssdk + dynamodb + + + software.amazon.awssdk + glue + + + software.amazon.awssdk + kms + diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 7642749ad8..d778212392 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -237,6 +237,32 @@ object IcebergReflection extends Logging { } } + /** + * Gets storage properties from an Iceberg table's FileIO. + * + * This extracts credentials from the FileIO implementation, which is critical for REST catalog + * credential vending. The REST catalog returns temporary S3 credentials per-table via the + * loadTable response, stored in the table's FileIO (typically ResolvingFileIO). + * + * The properties() method is not on the FileIO interface -- it exists on specific + * implementations like ResolvingFileIO and S3FileIO. Returns None gracefully when unavailable. + */ + def getFileIOProperties(table: Any): Option[Map[String, String]] = { + import scala.jdk.CollectionConverters._ + getFileIO(table).flatMap { fileIO => + findMethodInHierarchy(fileIO.getClass, "properties").flatMap { propsMethod => + propsMethod.invoke(fileIO) match { + case javaMap: java.util.Map[_, _] => + val scalaMap = javaMap.asScala.collect { case (k: String, v: String) => + k -> v + }.toMap + if (scalaMap.nonEmpty) Some(scalaMap) else None + case _ => None + } + } + } + } + /** * Gets the schema from an Iceberg table. */ diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bb37515ab9..404d209b49 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -49,7 +49,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.{Native, SupportsComet} import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} -import org.apache.comet.serde.operator.CometNativeScan +import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan} import org.apache.comet.shims.{CometTypeShim, ShimFileFormat, ShimSubqueryBroadcast} /** @@ -387,9 +387,18 @@ case class CometScanRule(session: SparkSession) val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri) - val catalogProperties = - org.apache.comet.serde.operator.CometIcebergNativeScan - .hadoopToIcebergS3Properties(hadoopS3Options) + val hadoopDerivedProperties = + CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options) + + // Extract vended credentials from FileIO (REST catalog credential vending). + // FileIO properties take precedence over Hadoop-derived properties because + // they contain per-table credentials vended by the REST catalog. + val fileIOProperties = tableOpt + .flatMap(IcebergReflection.getFileIOProperties) + .map(CometIcebergNativeScan.filterStorageProperties) + .getOrElse(Map.empty) + + val catalogProperties = hadoopDerivedProperties ++ fileIOProperties val result = CometIcebergNativeScanMetadata .extract(scanExec.scan, effectiveLocation, catalogProperties) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index 957f621032..c86b2a51bb 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -488,6 +488,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } + /** Storage-related property prefixes passed through to native FileIO. */ + private val storagePropertyPrefixes = + Seq("s3.", "gcs.", "adls.", "client.") + + /** + * Filters a properties map to only include storage-related keys. FileIO.properties() may + * contain catalog URIs, bearer tokens, and other non-storage settings that should not be passed + * to the native FileIO builder. + */ + def filterStorageProperties(props: Map[String, String]): Map[String, String] = { + props.filter { case (key, _) => + storagePropertyPrefixes.exists(prefix => key.startsWith(prefix)) + } + } + /** * Transforms Hadoop S3A configuration keys to Iceberg FileIO property keys. * diff --git a/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index 7d5d6ce6b2..7b04110d3d 100644 --- a/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -98,6 +98,14 @@ public class RESTCatalogAdapter implements RESTClient { private final SupportsNamespaces asNamespaceCatalog; private final ViewCatalog asViewCatalog; + // Optional credentials to inject into loadTable responses, simulating REST catalog + // credential vending. When non-empty, these are added to LoadTableResponse.config(). + private Map vendedCredentials = ImmutableMap.of(); + + public void setVendedCredentials(Map credentials) { + this.vendedCredentials = credentials; + } + public RESTCatalogAdapter(Catalog catalog) { this.catalog = catalog; this.asNamespaceCatalog = @@ -279,6 +287,26 @@ private static OAuthTokenResponse handleOAuthRequest(Object body) { @SuppressWarnings({"MethodLength", "checkstyle:CyclomaticComplexity"}) public T handleRequest( Route route, Map vars, Object body, Class responseType) { + T response = doHandleRequest(route, vars, body, responseType); + // Inject vended credentials into any LoadTableResponse, simulating REST catalog + // credential vending. This covers CREATE_TABLE, LOAD_TABLE, UPDATE_TABLE, etc. + if (!vendedCredentials.isEmpty() && response instanceof LoadTableResponse) { + LoadTableResponse original = (LoadTableResponse) response; + @SuppressWarnings("unchecked") + T withCreds = + (T) + LoadTableResponse.builder() + .withTableMetadata(original.tableMetadata()) + .addAllConfig(original.config()) + .addAllConfig(vendedCredentials) + .build(); + return withCreds; + } + return response; + } + + private T doHandleRequest( + Route route, Map vars, Object body, Class responseType) { switch (route) { case TOKENS: return castResponse(responseType, handleOAuthRequest(body)); diff --git a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala index 00955e6291..c1c90adfa3 100644 --- a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala @@ -23,7 +23,9 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.comet.CometIcebergNativeScanExec import org.apache.spark.sql.execution.SparkPlan -class IcebergReadFromS3Suite extends CometS3TestBase { +import org.apache.comet.iceberg.RESTCatalogHelper + +class IcebergReadFromS3Suite extends CometS3TestBase with RESTCatalogHelper { override protected val testBucketName = "test-iceberg-bucket" @@ -227,4 +229,74 @@ class IcebergReadFromS3Suite extends CometS3TestBase { spark.sql("DROP TABLE s3_catalog.db.mor_delete_test") } + + test("REST catalog credential vending rejects wrong credentials") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val wrongCreds = Map( + "s3.access-key-id" -> "WRONG_ACCESS_KEY", + "s3.secret-access-key" -> "WRONG_SECRET_KEY", + "s3.endpoint" -> minioContainer.getS3URL, + "s3.path-style-access" -> "true") + val warehouse = s"s3a://$testBucketName/warehouse-bad-creds" + + withRESTCatalog(vendedCredentials = wrongCreds, warehouseLocation = Some(warehouse)) { + (restUri, _, _) => + withSQLConf( + "spark.sql.catalog.bad_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.bad_cat.catalog-impl" -> "org.apache.iceberg.rest.RESTCatalog", + "spark.sql.catalog.bad_cat.uri" -> restUri, + "spark.sql.catalog.bad_cat.warehouse" -> warehouse) { + + spark.sql("CREATE NAMESPACE bad_cat.db") + + // CREATE TABLE succeeds (metadata only, no S3 access needed) + spark.sql("CREATE TABLE bad_cat.db.test (id INT) USING iceberg") + + // INSERT fails because S3FileIO uses the wrong vended credentials + val e = intercept[Exception] { + spark.sql("INSERT INTO bad_cat.db.test VALUES (1)") + } + assert(e.getMessage.contains("403"), s"Expected S3 403 error but got: ${e.getMessage}") + } + } + } + + test("REST catalog credential vending with native Iceberg scan on S3") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val vendedCreds = Map( + "s3.access-key-id" -> userName, + "s3.secret-access-key" -> password, + "s3.endpoint" -> minioContainer.getS3URL, + "s3.path-style-access" -> "true") + val warehouse = s"s3a://$testBucketName/warehouse-vending" + + withRESTCatalog(vendedCredentials = vendedCreds, warehouseLocation = Some(warehouse)) { + (restUri, _, _) => + withSQLConf( + "spark.sql.catalog.vend_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.vend_cat.catalog-impl" -> "org.apache.iceberg.rest.RESTCatalog", + "spark.sql.catalog.vend_cat.uri" -> restUri, + "spark.sql.catalog.vend_cat.warehouse" -> warehouse, + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + spark.sql("CREATE NAMESPACE vend_cat.db") + + spark.sql(""" + CREATE TABLE vend_cat.db.simple ( + id INT, name STRING, value DOUBLE + ) USING iceberg + """) + spark.sql(""" + INSERT INTO vend_cat.db.simple + VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7) + """) + checkIcebergNativeScan("SELECT * FROM vend_cat.db.simple ORDER BY id") + + spark.sql("DROP TABLE vend_cat.db.simple") + spark.sql("DROP NAMESPACE vend_cat.db") + } + } + } } diff --git a/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala b/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala index 6230ee33e1..856700c2cf 100644 --- a/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala +++ b/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala @@ -26,7 +26,22 @@ import java.nio.file.Files trait RESTCatalogHelper { /** Helper to set up REST catalog with embedded Jetty server (Spark 3.x / Jetty 9.4) */ - def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = { + def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = + withRESTCatalog()(f) + + /** + * Helper to set up REST catalog with optional credential vending. + * + * @param vendedCredentials + * Storage credentials to inject into loadTable responses, simulating REST catalog credential + * vending. When non-empty, these are added to every LoadTableResponse.config(). + * @param warehouseLocation + * Override the warehouse location (e.g., for S3). Defaults to a local temp directory. + */ + def withRESTCatalog( + vendedCredentials: Map[String, String] = Map.empty, + warehouseLocation: Option[String] = None)( + f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = { import org.apache.iceberg.inmemory.InMemoryCatalog import org.apache.iceberg.CatalogProperties import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet} @@ -35,12 +50,18 @@ trait RESTCatalogHelper { import org.eclipse.jetty.server.handler.gzip.GzipHandler val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile + val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath) + val backendCatalog = new InMemoryCatalog() backendCatalog.initialize( "in-memory", - java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseDir.getAbsolutePath)) + java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse)) val adapter = new RESTCatalogAdapter(backendCatalog) + if (vendedCredentials.nonEmpty) { + import scala.jdk.CollectionConverters._ + adapter.setVendedCredentials(vendedCredentials.asJava) + } val servlet = new RESTCatalogServlet(adapter) val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) diff --git a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala b/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala index ccd03c544d..bd53804b8d 100644 --- a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala +++ b/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala @@ -26,7 +26,22 @@ import java.nio.file.Files trait RESTCatalogHelper { /** Helper to set up REST catalog with embedded Jetty server (Spark 4.0 / Jetty 11) */ - def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = { + def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = + withRESTCatalog()(f) + + /** + * Helper to set up REST catalog with optional credential vending. + * + * @param vendedCredentials + * Storage credentials to inject into loadTable responses, simulating REST catalog credential + * vending. When non-empty, these are added to every LoadTableResponse.config(). + * @param warehouseLocation + * Override the warehouse location (e.g., for S3). Defaults to a local temp directory. + */ + def withRESTCatalog( + vendedCredentials: Map[String, String] = Map.empty, + warehouseLocation: Option[String] = None)( + f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = { import org.apache.iceberg.inmemory.InMemoryCatalog import org.apache.iceberg.CatalogProperties import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet} @@ -35,12 +50,18 @@ trait RESTCatalogHelper { import org.eclipse.jetty.server.handler.gzip.GzipHandler val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile + val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath) + val backendCatalog = new InMemoryCatalog() backendCatalog.initialize( "in-memory", - java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseDir.getAbsolutePath)) + java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse)) val adapter = new RESTCatalogAdapter(backendCatalog) + if (vendedCredentials.nonEmpty) { + import scala.jdk.CollectionConverters._ + adapter.setVendedCredentials(vendedCredentials.asJava) + } val servlet = new RESTCatalogServlet(adapter) val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)