@@ -520,7 +520,7 @@ def test_spark_credentials_can_delete_after_purge(root_client, snowflake_catalog
520520 attempts = 0
521521
522522 # watch the data directory. metadata will be deleted first, so if data directory is clear, we can expect
523- # metadatat diretory to be clear also
523+ # metadata directory to be clear also
524524 while 'Contents' in objects and len (objects ['Contents' ]) > 0 and attempts < 60 :
525525 time .sleep (1 ) # seconds, not milliseconds ;)
526526 objects = s3 .list_objects (Bucket = test_bucket , Delimiter = '/' ,
@@ -1149,6 +1149,81 @@ def test_spark_ctas(snowflake_catalog, polaris_catalog_url, snowman):
11491149 spark .sql (f"drop table { table_name } _t2 PURGE" )
11501150
11511151
1152+ @pytest .mark .skipif (os .environ .get ('AWS_TEST_ENABLED' , 'False' ).lower () != 'true' ,
1153+ reason = 'AWS_TEST_ENABLED is not set or is false' )
1154+ def test_spark_credentials_s3_exception_on_metadata_file_deletion (root_client , snowflake_catalog , polaris_catalog_url ,
1155+ snowman , snowman_catalog_client , test_bucket , aws_bucket_base_location_prefix ):
1156+ """
1157+ Create a using Spark. Then call the loadTable api directly with snowman token to fetch the vended credentials
1158+ for the first table.
1159+ Delete the metadata directory and try to access the table using the vended credentials.
1160+ It should throw 404 exception
1161+ :param root_client:
1162+ :param snowflake_catalog:
1163+ :param polaris_catalog_url:
1164+ :param snowman_catalog_client:
1165+ :param reader_catalog_client:
1166+ :return:
1167+ """
1168+ with IcebergSparkSession (credentials = f'{ snowman .principal .client_id } :{ snowman .credentials .client_secret } ' ,
1169+ catalog_name = snowflake_catalog .name ,
1170+ polaris_url = polaris_catalog_url ) as spark :
1171+ spark .sql (f'USE { snowflake_catalog .name } ' )
1172+ spark .sql ('CREATE NAMESPACE db1' )
1173+ spark .sql ('CREATE NAMESPACE db1.schema' )
1174+ spark .sql ('USE db1.schema' )
1175+ spark .sql ('CREATE TABLE iceberg_table (col1 int, col2 string)' )
1176+
1177+ response = snowman_catalog_client .load_table (snowflake_catalog .name , unquote ('db1%1Fschema' ),
1178+ "iceberg_table" ,
1179+ "vended-credentials" )
1180+ assert response .config is not None
1181+ assert 's3.access-key-id' in response .config
1182+ assert 's3.secret-access-key' in response .config
1183+ assert 's3.session-token' in response .config
1184+
1185+ s3 = boto3 .client ('s3' ,
1186+ aws_access_key_id = response .config ['s3.access-key-id' ],
1187+ aws_secret_access_key = response .config ['s3.secret-access-key' ],
1188+ aws_session_token = response .config ['s3.session-token' ])
1189+
1190+ # Get metadata files
1191+ objects = s3 .list_objects (Bucket = test_bucket , Delimiter = '/' ,
1192+ Prefix = f'{ aws_bucket_base_location_prefix } /snowflake_catalog/db1/schema/iceberg_table/metadata/' )
1193+ assert objects is not None
1194+ assert 'Contents' in objects
1195+ assert len (objects ['Contents' ]) > 0
1196+
1197+ # Verify metadata content
1198+ metadata_file = next (f for f in objects ['Contents' ] if f ['Key' ].endswith ('metadata.json' ))
1199+ assert metadata_file is not None
1200+
1201+ metadata_contents = s3 .get_object (Bucket = test_bucket , Key = metadata_file ['Key' ])
1202+ assert metadata_contents is not None
1203+ assert metadata_contents ['ContentLength' ] > 0
1204+
1205+ # Delete metadata files
1206+ s3 .delete_objects (Bucket = test_bucket ,
1207+ Delete = {'Objects' : objects })
1208+
1209+ try :
1210+ response = snowman_catalog_client .load_table (snowflake_catalog .name , unquote ('db1%1Fschema' ),
1211+ "iceberg_table" ,
1212+ "vended-credentials" )
1213+ except Exception as e :
1214+ assert '404' in str (e )
1215+
1216+
1217+ with IcebergSparkSession (credentials = f'{ snowman .principal .client_id } :{ snowman .credentials .client_secret } ' ,
1218+ catalog_name = snowflake_catalog .name ,
1219+ polaris_url = polaris_catalog_url ) as spark :
1220+ spark .sql (f'USE { snowflake_catalog .name } ' )
1221+ spark .sql ('USE db1.schema' )
1222+ spark .sql ('DROP TABLE iceberg_table PURGE' )
1223+ spark .sql (f'USE { snowflake_catalog .name } ' )
1224+ spark .sql ('DROP NAMESPACE db1.schema' )
1225+ spark .sql ('DROP NAMESPACE db1' )
1226+
11521227def create_catalog_role (api , catalog , role_name ):
11531228 catalog_role = CatalogRole (name = role_name )
11541229 try :
0 commit comments