Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,43 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<!--
AWS SDK modules for Iceberg REST catalog + S3 tests.
iceberg-spark-runtime treats the AWS SDK as provided scope, so tests
that exercise Iceberg's S3FileIO (via ResolvingFileIO) must supply these.
AwsProperties references all service client types in method signatures,
and Java serialization introspection resolves them at class-load time.
-->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${amazon-awssdk-v2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${amazon-awssdk-v2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>${amazon-awssdk-v2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
<version>${amazon-awssdk-v2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kms</artifactId>
<version>${amazon-awssdk-v2.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
Expand Down
17 changes: 17 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,27 @@ under the License.
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
</dependency>
<!-- AWS SDK modules required by Iceberg's S3FileIO (see parent pom for details) -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kms</artifactId>
</dependency>
<!-- Jetty and Iceberg dependencies for testing native Iceberg scan -->
<!-- Note: The specific versions are defined in profiles below based on Spark version -->
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,32 @@ object IcebergReflection extends Logging {
}
}

/**
* Gets storage properties from an Iceberg table's FileIO.
*
* This extracts credentials from the FileIO implementation, which is critical for REST catalog
* credential vending. The REST catalog returns temporary S3 credentials per-table via the
* loadTable response, stored in the table's FileIO (typically ResolvingFileIO).
*
* The properties() method is not on the FileIO interface -- it exists on specific
* implementations like ResolvingFileIO and S3FileIO. Returns None gracefully when unavailable.
*/
def getFileIOProperties(table: Any): Option[Map[String, String]] = {
import scala.jdk.CollectionConverters._
getFileIO(table).flatMap { fileIO =>
findMethodInHierarchy(fileIO.getClass, "properties").flatMap { propsMethod =>
propsMethod.invoke(fileIO) match {
case javaMap: java.util.Map[_, _] =>
val scalaMap = javaMap.asScala.collect { case (k: String, v: String) =>
k -> v
}.toMap
if (scalaMap.nonEmpty) Some(scalaMap) else None
case _ => None
}
}
}
}

/**
* Gets the schema from an Iceberg table.
*/
Expand Down
17 changes: 13 additions & 4 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.{Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
import org.apache.comet.serde.operator.CometNativeScan
import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan}
import org.apache.comet.shims.{CometTypeShim, ShimFileFormat, ShimSubqueryBroadcast}

/**
Expand Down Expand Up @@ -387,9 +387,18 @@ case class CometScanRule(session: SparkSession)

val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)

val catalogProperties =
org.apache.comet.serde.operator.CometIcebergNativeScan
.hadoopToIcebergS3Properties(hadoopS3Options)
val hadoopDerivedProperties =
CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options)

// Extract vended credentials from FileIO (REST catalog credential vending).
// FileIO properties take precedence over Hadoop-derived properties because
// they contain per-table credentials vended by the REST catalog.
val fileIOProperties = tableOpt
.flatMap(IcebergReflection.getFileIOProperties)
.map(CometIcebergNativeScan.filterStorageProperties)
.getOrElse(Map.empty)

val catalogProperties = hadoopDerivedProperties ++ fileIOProperties

val result = CometIcebergNativeScanMetadata
.extract(scanExec.scan, effectiveLocation, catalogProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
}
}

/** Storage-related property prefixes passed through to native FileIO. */
private val storagePropertyPrefixes =
Seq("s3.", "gcs.", "adls.", "client.")

/**
* Filters a properties map to only include storage-related keys. FileIO.properties() may
* contain catalog URIs, bearer tokens, and other non-storage settings that should not be passed
* to the native FileIO builder.
*/
def filterStorageProperties(props: Map[String, String]): Map[String, String] = {
props.filter { case (key, _) =>
storagePropertyPrefixes.exists(prefix => key.startsWith(prefix))
}
}

/**
* Transforms Hadoop S3A configuration keys to Iceberg FileIO property keys.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public class RESTCatalogAdapter implements RESTClient {
private final SupportsNamespaces asNamespaceCatalog;
private final ViewCatalog asViewCatalog;

// Optional credentials to inject into loadTable responses, simulating REST catalog
// credential vending. When non-empty, these are added to LoadTableResponse.config().
private Map<String, String> vendedCredentials = ImmutableMap.of();

public void setVendedCredentials(Map<String, String> credentials) {
this.vendedCredentials = credentials;
}

public RESTCatalogAdapter(Catalog catalog) {
this.catalog = catalog;
this.asNamespaceCatalog =
Expand Down Expand Up @@ -279,6 +287,26 @@ private static OAuthTokenResponse handleOAuthRequest(Object body) {
@SuppressWarnings({"MethodLength", "checkstyle:CyclomaticComplexity"})
public <T extends RESTResponse> T handleRequest(
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
T response = doHandleRequest(route, vars, body, responseType);
// Inject vended credentials into any LoadTableResponse, simulating REST catalog
// credential vending. This covers CREATE_TABLE, LOAD_TABLE, UPDATE_TABLE, etc.
if (!vendedCredentials.isEmpty() && response instanceof LoadTableResponse) {
LoadTableResponse original = (LoadTableResponse) response;
@SuppressWarnings("unchecked")
T withCreds =
(T)
LoadTableResponse.builder()
.withTableMetadata(original.tableMetadata())
.addAllConfig(original.config())
.addAllConfig(vendedCredentials)
.build();
return withCreds;
}
return response;
}

private <T extends RESTResponse> T doHandleRequest(
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
switch (route) {
case TOKENS:
return castResponse(responseType, handleOAuthRequest(body));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
import org.apache.spark.sql.execution.SparkPlan

class IcebergReadFromS3Suite extends CometS3TestBase {
import org.apache.comet.iceberg.RESTCatalogHelper

class IcebergReadFromS3Suite extends CometS3TestBase with RESTCatalogHelper {

override protected val testBucketName = "test-iceberg-bucket"

Expand Down Expand Up @@ -227,4 +229,74 @@ class IcebergReadFromS3Suite extends CometS3TestBase {

spark.sql("DROP TABLE s3_catalog.db.mor_delete_test")
}

test("REST catalog credential vending rejects wrong credentials") {
assume(icebergAvailable, "Iceberg not available in classpath")

val wrongCreds = Map(
"s3.access-key-id" -> "WRONG_ACCESS_KEY",
"s3.secret-access-key" -> "WRONG_SECRET_KEY",
"s3.endpoint" -> minioContainer.getS3URL,
"s3.path-style-access" -> "true")
val warehouse = s"s3a://$testBucketName/warehouse-bad-creds"

withRESTCatalog(vendedCredentials = wrongCreds, warehouseLocation = Some(warehouse)) {
(restUri, _, _) =>
withSQLConf(
"spark.sql.catalog.bad_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.bad_cat.catalog-impl" -> "org.apache.iceberg.rest.RESTCatalog",
"spark.sql.catalog.bad_cat.uri" -> restUri,
"spark.sql.catalog.bad_cat.warehouse" -> warehouse) {

spark.sql("CREATE NAMESPACE bad_cat.db")

// CREATE TABLE succeeds (metadata only, no S3 access needed)
spark.sql("CREATE TABLE bad_cat.db.test (id INT) USING iceberg")

// INSERT fails because S3FileIO uses the wrong vended credentials
val e = intercept[Exception] {
spark.sql("INSERT INTO bad_cat.db.test VALUES (1)")
}
assert(e.getMessage.contains("403"), s"Expected S3 403 error but got: ${e.getMessage}")
}
}
}

test("REST catalog credential vending with native Iceberg scan on S3") {
assume(icebergAvailable, "Iceberg not available in classpath")

val vendedCreds = Map(
"s3.access-key-id" -> userName,
"s3.secret-access-key" -> password,
"s3.endpoint" -> minioContainer.getS3URL,
"s3.path-style-access" -> "true")
val warehouse = s"s3a://$testBucketName/warehouse-vending"

withRESTCatalog(vendedCredentials = vendedCreds, warehouseLocation = Some(warehouse)) {
(restUri, _, _) =>
withSQLConf(
"spark.sql.catalog.vend_cat" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.vend_cat.catalog-impl" -> "org.apache.iceberg.rest.RESTCatalog",
"spark.sql.catalog.vend_cat.uri" -> restUri,
"spark.sql.catalog.vend_cat.warehouse" -> warehouse,
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {

spark.sql("CREATE NAMESPACE vend_cat.db")

spark.sql("""
CREATE TABLE vend_cat.db.simple (
id INT, name STRING, value DOUBLE
) USING iceberg
""")
spark.sql("""
INSERT INTO vend_cat.db.simple
VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)
""")
checkIcebergNativeScan("SELECT * FROM vend_cat.db.simple ORDER BY id")

spark.sql("DROP TABLE vend_cat.db.simple")
spark.sql("DROP NAMESPACE vend_cat.db")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,22 @@ import java.nio.file.Files
trait RESTCatalogHelper {

/** Helper to set up REST catalog with embedded Jetty server (Spark 3.x / Jetty 9.4) */
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit =
withRESTCatalog()(f)

/**
* Helper to set up REST catalog with optional credential vending.
*
* @param vendedCredentials
* Storage credentials to inject into loadTable responses, simulating REST catalog credential
* vending. When non-empty, these are added to every LoadTableResponse.config().
* @param warehouseLocation
* Override the warehouse location (e.g., for S3). Defaults to a local temp directory.
*/
def withRESTCatalog(
vendedCredentials: Map[String, String] = Map.empty,
warehouseLocation: Option[String] = None)(
f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
import org.apache.iceberg.inmemory.InMemoryCatalog
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
Expand All @@ -35,12 +50,18 @@ trait RESTCatalogHelper {
import org.eclipse.jetty.server.handler.gzip.GzipHandler

val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile
val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)

val backendCatalog = new InMemoryCatalog()
backendCatalog.initialize(
"in-memory",
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseDir.getAbsolutePath))
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse))

val adapter = new RESTCatalogAdapter(backendCatalog)
if (vendedCredentials.nonEmpty) {
import scala.jdk.CollectionConverters._
adapter.setVendedCredentials(vendedCredentials.asJava)
}
val servlet = new RESTCatalogServlet(adapter)

val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,22 @@ import java.nio.file.Files
trait RESTCatalogHelper {

/** Helper to set up REST catalog with embedded Jetty server (Spark 4.0 / Jetty 11) */
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit =
withRESTCatalog()(f)

/**
* Helper to set up REST catalog with optional credential vending.
*
* @param vendedCredentials
* Storage credentials to inject into loadTable responses, simulating REST catalog credential
* vending. When non-empty, these are added to every LoadTableResponse.config().
* @param warehouseLocation
* Override the warehouse location (e.g., for S3). Defaults to a local temp directory.
*/
def withRESTCatalog(
vendedCredentials: Map[String, String] = Map.empty,
warehouseLocation: Option[String] = None)(
f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
import org.apache.iceberg.inmemory.InMemoryCatalog
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
Expand All @@ -35,12 +50,18 @@ trait RESTCatalogHelper {
import org.eclipse.jetty.server.handler.gzip.GzipHandler

val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile
val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)

val backendCatalog = new InMemoryCatalog()
backendCatalog.initialize(
"in-memory",
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseDir.getAbsolutePath))
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse))

val adapter = new RESTCatalogAdapter(backendCatalog)
if (vendedCredentials.nonEmpty) {
import scala.jdk.CollectionConverters._
adapter.setVendedCredentials(vendedCredentials.asJava)
}
val servlet = new RESTCatalogServlet(adapter)

val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
Expand Down
Loading