Skip to content

Conversation

@nastra
Copy link
Contributor

@nastra nastra commented Apr 15, 2025

No description provided.

@nastra nastra marked this pull request as draft April 15, 2025 10:33
@github-actions github-actions bot added the AWS label Apr 15, 2025
@nastra nastra force-pushed the s3fileio-multi-prefix-support branch 4 times, most recently from e12e4ff to c443354 Compare April 15, 2025 13:38
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
private transient StackTraceElement[] createStack;
private List<StorageCredential> storageCredentials = ImmutableList.of();
private transient volatile Cache<String, PrefixedS3Client> s3ClientCache;
Copy link
Contributor

@singhpk234 singhpk234 Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we use Execution interceptor doc which gives us the handle of the request, so that we can inspect the request and see which path it refers to ? rather than making a client per perfix ?

Copy link
Contributor Author

@nastra nastra Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really familiar with ExecutionInterceptor so I'll have to do some digging on how a potential solution would look with this. @singhpk234 is there an easy way to pass the storage credentials (that we currently pass to FileIO) to a given ExecutionInterceptor?

But generally speaking, we're only expecting a handful of different storage credentials in practice, so the cost of maintaining < 5 different client instances should be quite low.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what i had in mind and have seen people doing in past

  1. Execution interceptor can be configured as part of ClientOverrideConfiguration when we are creating a S3 SDK client, I am assuming we wire the VendedCredProvider for the same way but giving the creds map to the class constructor, so we can do the same way here for the ExecutionInterceptor and pass it in SDK client creation
  2. Now how to provide / have refreshed creds is this ExecutionInterceptor will give us the handle of modify request and you can meddle with SDK header
    something like this
    @Override
    public SdkHttpRequest modifyHttpRequest(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
        // you can also inspect from the request which path it has in this request  
        // and then based on the path you can wire in the creds which suits best for the path here :
        SdkHttpRequest.Builder builder = context.httpRequest().toBuilder();
        AwsCredentials credentials = credentialsProvider.get();

        if (credentials != null && credentials.accessKeyId() != null && credentials.secretAccessKey() != null) {
            builder.putHeader("X-Amz-Access-Key", credentials.accessKeyId());
            builder.putHeader("X-Amz-Secret-Key", credentials.secretAccessKey());
            if (credentials.sessionToken().isPresent()) {
                builder.putHeader("X-Amz-Security-Token", credentials.sessionToken().get());
            }
        } else {
            System.err.println("Warning: AWS credentials not found.");
        }
        return builder.build();
    }

The main reason for not having one client per path is each client will have a connection pool and we may have to tune it like what is the min pool size for HTTP, if we can by gaurantee it will < 5 (which based on my understanding we can't right now) i think then we can think more.

Would love to know your thoughts for the same considering above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion @singhpk234 and I really like that idea. I've opened #12827 to explore using a custom ExecutionInterceptor

@nastra nastra force-pushed the s3fileio-multi-prefix-support branch 3 times, most recently from c3ad9bb to c4080aa Compare April 16, 2025 15:21
@nastra nastra requested a review from danielcweeks April 22, 2025 13:28
@nastra nastra force-pushed the s3fileio-multi-prefix-support branch from c4080aa to 2c3df22 Compare April 22, 2025 14:57
@nastra nastra marked this pull request as ready for review April 23, 2025 10:54
@nastra nastra requested a review from amogh-jahagirdar April 23, 2025 15:55
@nastra nastra force-pushed the s3fileio-multi-prefix-support branch from 33fbac8 to aa82932 Compare April 28, 2025 16:17
@nastra nastra force-pushed the s3fileio-multi-prefix-support branch 3 times, most recently from e36b267 to f8470d5 Compare May 5, 2025 09:03
Copy link
Contributor

@danielcweeks danielcweeks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks @nastra!

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I already approved last week for some reason, but this has looked right to me since the recent changes. Thank you @nastra!

@amogh-jahagirdar amogh-jahagirdar merged commit 1449378 into apache:main May 22, 2025
42 checks passed
@nastra nastra deleted the s3fileio-multi-prefix-support branch May 22, 2025 17:35
@and124578963
Copy link

Hey. I had a problem with this commit. As I understand my path start with s3a://, not s3://. How to configure it?
After removing the commit, all is working well.

25/05/27 13:13:38 ERROR org.apache.spark.executor.Executor: Exception in task 2.0 in stage 2.0 (TID 10)
java.lang.IllegalStateException: [BUG] S3 client for storage path not available: s3a://spark-nt/load_test_delete_warehouse/gen_transaction_icb_year/data/TRANSACTION_DATE_year=2003/00115-938-a969331b-32bf-4df2-a897-8c9dbc4a7c1a-0-00001.parquet
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:603) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.aws.s3.S3FileIO.clientForStoragePath(S3FileIO.java:427) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.aws.s3.S3FileIO.newInputFile(S3FileIO.java:184) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.encryption.EncryptingFileIO.wrap(EncryptingFileIO.java:150) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.relocated.com.google.common.collect.Iterators$6.transform(Iterators.java:828) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:51) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:51) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.encryption.EncryptingFileIO.bulkDecrypt(EncryptingFileIO.java:63) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.spark.source.BaseReader.inputFiles(BaseReader.java:177) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.spark.source.BaseReader.getInputFile(BaseReader.java:170) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:100) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:134) ~[iceberg-spark-runtime-3.5_2.12-iceberg-comet-trunk.jar:?]
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at scala.Option.exists(Option.scala:376) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.12-3.5.4.jar:3.5.4]
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) ~[spark-sql_2.12-3.5.4.jar:3.5.4]
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.18.jar:?]
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41) ~[spark-core_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:322) ~[spark-core_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:320) ~[spark-core_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910) ~[spark-core_2.12-3.5.4.jar:3.5.4]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910) ~[spark-core_2.12-3.5.4.jar:3.5.4]

@nastra
Copy link
Contributor Author

nastra commented May 27, 2025

@and124578963 can you please open a new issue with some additional details about your catalog configuration?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants