5757import org .apache .iceberg .PartitionSpec ;
5858import org .apache .iceberg .Schema ;
5959import org .apache .iceberg .TestHelpers ;
60+ import org .apache .iceberg .aws .AwsClientProperties ;
6061import org .apache .iceberg .aws .AwsProperties ;
6162import org .apache .iceberg .catalog .TableIdentifier ;
6263import org .apache .iceberg .common .DynMethods ;
9192import software .amazon .awssdk .core .sync .RequestBody ;
9293import software .amazon .awssdk .http .urlconnection .UrlConnectionHttpClient ;
9394import software .amazon .awssdk .regions .Region ;
95+ import software .amazon .awssdk .services .s3 .S3AsyncClient ;
9496import software .amazon .awssdk .services .s3 .S3Client ;
9597import software .amazon .awssdk .services .s3 .model .BucketAlreadyExistsException ;
9698import software .amazon .awssdk .services .s3 .model .BucketAlreadyOwnedByYouException ;
@@ -483,6 +485,55 @@ public void fileIOWithStorageCredentialsJavaSerialization()
483485 .isEqualTo (fileIO .credentials ());
484486 }
485487
488+ @ Test
489+ public void fileIOWithPrefixedS3ClientKryoSerialization () throws IOException {
490+ S3FileIO io = new S3FileIO ();
491+ io .setCredentials (
492+ ImmutableList .of (
493+ StorageCredential .create ("s3://my-bucket/my-path/table1" , Map .of ("key1" , "val1" ))));
494+ io .initialize (Map .of (AwsClientProperties .CLIENT_REGION , "us-east-1" ));
495+
496+ // there should be a client for the generic and specific storage prefix available
497+ assertThat (io .client ()).isInstanceOf (S3Client .class );
498+ assertThat (io .asyncClient ()).isInstanceOf (S3AsyncClient .class );
499+ assertThat (io .client ("s3://my-bucket/my-path" )).isInstanceOf (S3Client .class );
500+ assertThat (io .asyncClient ("s3://my-bucket/my-path" )).isInstanceOf (S3AsyncClient .class );
501+
502+ S3FileIO fileIO = TestHelpers .KryoHelpers .roundTripSerialize (io );
503+ assertThat (fileIO .credentials ()).isEqualTo (io .credentials ());
504+
505+ // make sure there's a client for the generic and specific storage prefix available after ser/de
506+ assertThat (fileIO .client ()).isInstanceOf (S3Client .class );
507+ assertThat (fileIO .asyncClient ()).isInstanceOf (S3AsyncClient .class );
508+ assertThat (fileIO .client ("s3://my-bucket/my-path" )).isInstanceOf (S3Client .class );
509+ assertThat (fileIO .asyncClient ("s3://my-bucket/my-path" )).isInstanceOf (S3AsyncClient .class );
510+ }
511+
512+ @ Test
513+ public void fileIOWithPrefixedS3ClientJavaSerialization ()
514+ throws IOException , ClassNotFoundException {
515+ S3FileIO io = new S3FileIO ();
516+ io .setCredentials (
517+ ImmutableList .of (
518+ StorageCredential .create ("s3://my-bucket/my-path/table1" , Map .of ("key1" , "val1" ))));
519+ io .initialize (Map .of (AwsClientProperties .CLIENT_REGION , "us-east-1" ));
520+
521+ // there should be a client for the generic and specific storage prefix available
522+ assertThat (io .client ()).isInstanceOf (S3Client .class );
523+ assertThat (io .asyncClient ()).isInstanceOf (S3AsyncClient .class );
524+ assertThat (io .client ("s3://my-bucket/my-path" )).isInstanceOf (S3Client .class );
525+ assertThat (io .asyncClient ("s3://my-bucket/my-path" )).isInstanceOf (S3AsyncClient .class );
526+
527+ S3FileIO fileIO = TestHelpers .roundTripSerialize (io );
528+ assertThat (fileIO .credentials ()).isEqualTo (io .credentials ());
529+
530+ // make sure there's a client for the generic and specific storage prefix available after ser/de
531+ assertThat (fileIO .client ()).isInstanceOf (S3Client .class );
532+ assertThat (fileIO .asyncClient ()).isInstanceOf (S3AsyncClient .class );
533+ assertThat (fileIO .client ("s3://my-bucket/my-path" )).isInstanceOf (S3Client .class );
534+ assertThat (fileIO .asyncClient ("s3://my-bucket/my-path" )).isInstanceOf (S3AsyncClient .class );
535+ }
536+
486537 @ Test
487538 public void testS3FileIOJavaSerialization () throws IOException , ClassNotFoundException {
488539 FileIO testS3FileIO = new S3FileIO ();
@@ -570,50 +621,83 @@ public void testInputFileWithManifest() throws IOException {
570621 @ Test
571622 public void resolvingFileIOLoadWithStorageCredentials ()
572623 throws IOException , ClassNotFoundException {
573- StorageCredential credential = StorageCredential .create ("prefix " , Map .of ("key1" , "val1" ));
624+ StorageCredential credential = StorageCredential .create ("s3://foo/bar " , Map .of ("key1" , "val1" ));
574625 List <StorageCredential > storageCredentials = ImmutableList .of (credential );
575626 ResolvingFileIO resolvingFileIO = new ResolvingFileIO ();
576627 resolvingFileIO .setCredentials (storageCredentials );
577- resolvingFileIO .initialize (ImmutableMap .of ());
628+ resolvingFileIO .initialize (ImmutableMap .of (AwsClientProperties . CLIENT_REGION , "us-east-1" ));
578629
579630 FileIO result =
580631 DynMethods .builder ("io" )
581632 .hiddenImpl (ResolvingFileIO .class , String .class )
582633 .build (resolvingFileIO )
583634 .invoke ("s3://foo/bar" );
584- assertThat (result )
585- .isInstanceOf (S3FileIO .class )
586- .asInstanceOf (InstanceOfAssertFactories .type (S3FileIO .class ))
587- .extracting (S3FileIO ::credentials )
588- .isEqualTo (storageCredentials );
635+ ObjectAssert <S3FileIO > io =
636+ assertThat (result )
637+ .isInstanceOf (S3FileIO .class )
638+ .asInstanceOf (InstanceOfAssertFactories .type (S3FileIO .class ));
639+ io .extracting (S3FileIO ::credentials ).isEqualTo (storageCredentials );
640+ io .satisfies (
641+ fileIO -> {
642+ // make sure there are two separate S3 clients for different prefixes and that the
643+ // underlying sync/async client is set
644+ assertThat (fileIO .client ("s3://foo/bar" ))
645+ .isNotSameAs (fileIO .client ())
646+ .isInstanceOf (S3Client .class );
647+ assertThat (fileIO .asyncClient ("s3://foo/bar" ))
648+ .isNotSameAs (fileIO .asyncClient ())
649+ .isInstanceOf (S3AsyncClient .class );
650+ });
589651
590652 // make sure credentials are still present after kryo serde
591- ResolvingFileIO io = TestHelpers .KryoHelpers .roundTripSerialize (resolvingFileIO );
592- assertThat (io .credentials ()).isEqualTo (storageCredentials );
653+ ResolvingFileIO resolvingIO = TestHelpers .KryoHelpers .roundTripSerialize (resolvingFileIO );
654+ assertThat (resolvingIO .credentials ()).isEqualTo (storageCredentials );
593655 result =
594656 DynMethods .builder ("io" )
595657 .hiddenImpl (ResolvingFileIO .class , String .class )
596- .build (io )
658+ .build (resolvingIO )
597659 .invoke ("s3://foo/bar" );
598- assertThat (result )
599- .isInstanceOf (S3FileIO .class )
600- .asInstanceOf (InstanceOfAssertFactories .type (S3FileIO .class ))
601- .extracting (S3FileIO ::credentials )
602- .isEqualTo (storageCredentials );
660+ io =
661+ assertThat (result )
662+ .isInstanceOf (S3FileIO .class )
663+ .asInstanceOf (InstanceOfAssertFactories .type (S3FileIO .class ));
664+ io .extracting (S3FileIO ::credentials ).isEqualTo (storageCredentials );
665+ io .satisfies (
666+ fileIO -> {
667+ // make sure there are two separate S3 clients for different prefixes and that the
668+ // underlying sync/async client is set
669+ assertThat (fileIO .client ("s3://foo/bar" ))
670+ .isNotSameAs (fileIO .client ())
671+ .isInstanceOf (S3Client .class );
672+ assertThat (fileIO .asyncClient ("s3://foo/bar" ))
673+ .isNotSameAs (fileIO .asyncClient ())
674+ .isInstanceOf (S3AsyncClient .class );
675+ });
603676
604677 // make sure credentials are still present after java serde
605- io = TestHelpers .roundTripSerialize (resolvingFileIO );
606- assertThat (io .credentials ()).isEqualTo (storageCredentials );
678+ resolvingIO = TestHelpers .roundTripSerialize (resolvingFileIO );
679+ assertThat (resolvingIO .credentials ()).isEqualTo (storageCredentials );
607680 result =
608681 DynMethods .builder ("io" )
609682 .hiddenImpl (ResolvingFileIO .class , String .class )
610- .build (io )
683+ .build (resolvingIO )
611684 .invoke ("s3://foo/bar" );
612- assertThat (result )
613- .isInstanceOf (S3FileIO .class )
614- .asInstanceOf (InstanceOfAssertFactories .type (S3FileIO .class ))
615- .extracting (S3FileIO ::credentials )
616- .isEqualTo (storageCredentials );
685+ io =
686+ assertThat (result )
687+ .isInstanceOf (S3FileIO .class )
688+ .asInstanceOf (InstanceOfAssertFactories .type (S3FileIO .class ));
689+ io .extracting (S3FileIO ::credentials ).isEqualTo (storageCredentials );
690+ io .satisfies (
691+ fileIO -> {
692+ // make sure there are two separate S3 clients for different prefixes and that the
693+ // underlying sync/async client is set
694+ assertThat (fileIO .client ("s3://foo/bar" ))
695+ .isNotSameAs (fileIO .client ())
696+ .isInstanceOf (S3Client .class );
697+ assertThat (fileIO .asyncClient ("s3://foo/bar" ))
698+ .isNotSameAs (fileIO .asyncClient ())
699+ .isInstanceOf (S3AsyncClient .class );
700+ });
617701 }
618702
619703 @ Test
@@ -622,17 +706,23 @@ public void noStorageCredentialConfigured() {
622706 fileIO .setCredentials (ImmutableList .of ());
623707 fileIO .initialize (
624708 ImmutableMap .of (
709+ AwsClientProperties .CLIENT_REGION ,
710+ "us-east-1" ,
625711 "s3.access-key-id" ,
626712 "keyIdFromProperties" ,
627713 "s3.secret-access-key" ,
628714 "accessKeyFromProperties" ,
629715 "s3.session-token" ,
630716 "sessionTokenFromProperties" ));
631717
718+ // make sure that the generic S3 Client is used for all storage paths if there are no storage
719+ // credentials configured
720+ assertThat (fileIO .client ("s3://my-bucket/table1" ))
721+ .isSameAs (fileIO .client ("s3://random-bucket/" ))
722+ .isSameAs (fileIO .client ("s3a://random-bucket/tableX" ));
723+
632724 ObjectAssert <S3FileIOProperties > s3FileIOProperties =
633- assertThat (fileIO )
634- .extracting ("s3FileIOProperties" )
635- .asInstanceOf (InstanceOfAssertFactories .type (S3FileIOProperties .class ));
725+ assertThat (fileIO .clientForStoragePath ("s3://my-bucket/table1" ).s3FileIOProperties ());
636726 s3FileIOProperties .extracting (S3FileIOProperties ::accessKeyId ).isEqualTo ("keyIdFromProperties" );
637727 s3FileIOProperties
638728 .extracting (S3FileIOProperties ::secretAccessKey )
@@ -659,17 +749,32 @@ public void singleStorageCredentialConfigured() {
659749 fileIO .setCredentials (ImmutableList .of (s3Credential ));
660750 fileIO .initialize (
661751 ImmutableMap .of (
752+ AwsClientProperties .CLIENT_REGION ,
753+ "us-east-1" ,
662754 "s3.access-key-id" ,
663755 "keyIdFromProperties" ,
664756 "s3.secret-access-key" ,
665757 "accessKeyFromProperties" ,
666758 "s3.session-token" ,
667759 "sessionTokenFromProperties" ));
668760
761+ // verify that the generic S3 client is used if the storage prefix doesn't match the prefixes in
762+ // the storage credentials
763+ assertThat (fileIO .client ("s3://my-bucket/table1" ))
764+ .isNotSameAs (fileIO .client ("s3://custom-uri/table2" ));
765+ ObjectAssert <S3FileIOProperties > genericProperties =
766+ assertThat (fileIO .clientForStoragePath ("s3://my-bucket/table1" ).s3FileIOProperties ());
767+ genericProperties .extracting (S3FileIOProperties ::accessKeyId ).isEqualTo ("keyIdFromProperties" );
768+ genericProperties
769+ .extracting (S3FileIOProperties ::secretAccessKey )
770+ .isEqualTo ("accessKeyFromProperties" );
771+ genericProperties
772+ .extracting (S3FileIOProperties ::sessionToken )
773+ .isEqualTo ("sessionTokenFromProperties" );
774+
669775 ObjectAssert <S3FileIOProperties > s3FileIOProperties =
670- assertThat (fileIO )
671- .extracting ("s3FileIOProperties" )
672- .asInstanceOf (InstanceOfAssertFactories .type (S3FileIOProperties .class ));
776+ assertThat (
777+ fileIO .clientForStoragePath ("s3://custom-uri/random/table1" ).s3FileIOProperties ());
673778 s3FileIOProperties .extracting (S3FileIOProperties ::accessKeyId ).isEqualTo ("keyIdFromCredential" );
674779 s3FileIOProperties
675780 .extracting (S3FileIOProperties ::secretAccessKey )
@@ -705,18 +810,58 @@ public void multipleStorageCredentialsConfigured() {
705810
706811 S3FileIO fileIO = new S3FileIO ();
707812 fileIO .setCredentials (ImmutableList .of (s3Credential1 , s3Credential2 ));
708- assertThatThrownBy (
709- () ->
710- fileIO .initialize (
711- ImmutableMap .of (
712- "s3.access-key-id" ,
713- "keyIdFromProperties" ,
714- "s3.secret-access-key" ,
715- "accessKeyFromProperties" ,
716- "s3.session-token" ,
717- "sessionTokenFromProperties" )))
718- .isInstanceOf (IllegalStateException .class )
719- .hasMessage ("Invalid S3 Credentials: only one S3 credential should exist" );
813+ fileIO .initialize (
814+ ImmutableMap .of (
815+ AwsClientProperties .CLIENT_REGION ,
816+ "us-east-1" ,
817+ "s3.access-key-id" ,
818+ "keyIdFromProperties" ,
819+ "s3.secret-access-key" ,
820+ "accessKeyFromProperties" ,
821+ "s3.session-token" ,
822+ "sessionTokenFromProperties" ));
823+
824+ // verify that the generic S3 client is used if the storage prefix doesn't match the prefixes in
825+ // the storage credentials
826+ assertThat (fileIO .client ("s3://my-bucket/table1" ))
827+ .isNotSameAs (fileIO .client ("s3://custom-uri/1" ))
828+ .isNotSameAs (fileIO .client ("s3://custom-uri/2" ));
829+ ObjectAssert <S3FileIOProperties > genericProperties =
830+ assertThat (fileIO .clientForStoragePath ("s3://my-bucket/table1" ).s3FileIOProperties ());
831+ genericProperties .extracting (S3FileIOProperties ::accessKeyId ).isEqualTo ("keyIdFromProperties" );
832+ genericProperties
833+ .extracting (S3FileIOProperties ::secretAccessKey )
834+ .isEqualTo ("accessKeyFromProperties" );
835+ genericProperties
836+ .extracting (S3FileIOProperties ::sessionToken )
837+ .isEqualTo ("sessionTokenFromProperties" );
838+
839+ // there are separate credentials configured for each storage path, so there should be separate
840+ // S3 client instances
841+ assertThat (fileIO .client ("s3://custom-uri/1" )).isNotSameAs (fileIO .client ("s3://custom-uri/2" ));
842+ ObjectAssert <S3FileIOProperties > s3FileIOProperties1 =
843+ assertThat (fileIO .clientForStoragePath ("s3://custom-uri/1/table1" ).s3FileIOProperties ());
844+ s3FileIOProperties1
845+ .extracting (S3FileIOProperties ::accessKeyId )
846+ .isEqualTo ("keyIdFromCredential1" );
847+ s3FileIOProperties1
848+ .extracting (S3FileIOProperties ::secretAccessKey )
849+ .isEqualTo ("accessKeyFromCredential1" );
850+ s3FileIOProperties1
851+ .extracting (S3FileIOProperties ::sessionToken )
852+ .isEqualTo ("sessionTokenFromCredential1" );
853+
854+ ObjectAssert <S3FileIOProperties > s3FileIOProperties2 =
855+ assertThat (fileIO .clientForStoragePath ("s3://custom-uri/2/table1" ).s3FileIOProperties ());
856+ s3FileIOProperties2
857+ .extracting (S3FileIOProperties ::accessKeyId )
858+ .isEqualTo ("keyIdFromCredential2" );
859+ s3FileIOProperties2
860+ .extracting (S3FileIOProperties ::secretAccessKey )
861+ .isEqualTo ("accessKeyFromCredential2" );
862+ s3FileIOProperties2
863+ .extracting (S3FileIOProperties ::sessionToken )
864+ .isEqualTo ("sessionTokenFromCredential2" );
720865 }
721866
722867 private void createRandomObjects (String prefix , int count ) {
0 commit comments