From 0f757bdc372b0e726e3fa785bd48aaabff67fe53 Mon Sep 17 00:00:00 2001 From: wyxxxcat Date: Fri, 14 Nov 2025 15:42:51 +0800 Subject: [PATCH] 1 --- .../plugins/cloud_recycler_plugin.groovy | 198 +++++++++++++----- .../cloud_p0/recycler/test_checker.groovy | 58 +++-- 2 files changed, 190 insertions(+), 66 deletions(-) diff --git a/regression-test/plugins/cloud_recycler_plugin.groovy b/regression-test/plugins/cloud_recycler_plugin.groovy index bdd2f9d2f183b5..25dc6d331fc17c 100644 --- a/regression-test/plugins/cloud_recycler_plugin.groovy +++ b/regression-test/plugins/cloud_recycler_plugin.groovy @@ -25,6 +25,11 @@ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.model.ListObjectsRequest import com.amazonaws.services.s3.model.ObjectListing +import com.azure.storage.blob.BlobContainerClient +import com.azure.storage.blob.BlobContainerClientBuilder +import com.azure.storage.blob.models.ListBlobsOptions +import com.azure.storage.common.StorageSharedKeyCredential +import java.time.Duration import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.FileSystem @@ -79,7 +84,7 @@ Suite.metaClass.checkRecycleTable = { String token, String instanceId, String cl suite.getLogger().info("checkRecycleTable(): getObjStoreInfoApiResult:${getObjStoreInfoApiResult}".toString()) if (getObjStoreInfoApiResult.result.toString().contains("obj_info")) { - String ak, sk, endpoint, region, prefix, bucket + String ak, sk, endpoint, region, prefix, bucket, provider if(!getObjStoreInfoApiResult.result.toString().contains("storage_vault=[")){ ak = getObjStoreInfoApiResult.result.obj_info[0].ak sk = getObjStoreInfoApiResult.result.obj_info[0].sk @@ -87,6 +92,7 @@ Suite.metaClass.checkRecycleTable = { String token, String instanceId, String cl region = getObjStoreInfoApiResult.result.obj_info[0].region prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket + provider = getObjStoreInfoApiResult.result.obj_info[0].provider }else{ ak = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.ak sk = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.sk @@ -94,23 +100,48 @@ Suite.metaClass.checkRecycleTable = { String token, String instanceId, String cl region = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.region prefix = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.prefix bucket = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.bucket + provider = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.provider } suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}".toString()) - def credentials = new BasicAWSCredentials(ak, sk) - def endpointConfiguration = new EndpointConfiguration(endpoint, region) - def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() - assertTrue(tabletIdList.size() > 0) - for (tabletId : tabletIdList) { - suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}"); - def objectListing = s3Client.listObjects( - new ListObjectsRequest().withMaxKeys(1).withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/")) - suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}, objectListing:${objectListing.getObjectSummaries()}".toString()) - if (!objectListing.getObjectSummaries().isEmpty()) { - return false; + if (provider?.equalsIgnoreCase("AZURE")) { + // Use Azure Blob Storage SDK + String uri = String.format("https://%s/%s", endpoint, bucket); + StorageSharedKeyCredential cred = new StorageSharedKeyCredential(ak, sk); + BlobContainerClient containerClient = new BlobContainerClientBuilder() + .credential(cred) + .endpoint(uri) + .buildClient(); + + for (tabletId : tabletIdList) { + suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}"); + def blobs = containerClient.listBlobs( + new ListBlobsOptions().setPrefix("${prefix}/data/${tabletId}/").setMaxResultsPerPage(1), + Duration.ofMinutes(1)); + def blobsList = blobs.stream().toList() + suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}, blobs:${blobsList}".toString()) + if (!blobsList.isEmpty()) { + return false; + } + } + } else { + // Use AWS S3 SDK + def credentials = new BasicAWSCredentials(ak, sk) + def endpointConfiguration = new EndpointConfiguration(endpoint, region) + def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() + + for (tabletId : tabletIdList) { + suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}"); + def objectListing = s3Client.listObjects( + new ListObjectsRequest().withMaxKeys(1).withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/")) + + suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}, objectListing:${objectListing.getObjectSummaries()}".toString()) + if (!objectListing.getObjectSummaries().isEmpty()) { + return false; + } } } return true; @@ -179,7 +210,7 @@ Suite.metaClass.checkRecycleInternalStage = { String token, String instanceId, S suite.getLogger().info("checkRecycleTable(): getObjStoreInfoApiResult:${getObjStoreInfoApiResult}".toString()) if (getObjStoreInfoApiResult.result.toString().contains("obj_info")) { - String ak, sk, endpoint, region, prefix, bucket + String ak, sk, endpoint, region, prefix, bucket, provider if(!getObjStoreInfoApiResult.result.toString().contains("storage_vault=[")){ ak = getObjStoreInfoApiResult.result.obj_info[0].ak sk = getObjStoreInfoApiResult.result.obj_info[0].sk @@ -187,6 +218,7 @@ Suite.metaClass.checkRecycleInternalStage = { String token, String instanceId, S region = getObjStoreInfoApiResult.result.obj_info[0].region prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket + provider = getObjStoreInfoApiResult.result.obj_info[0].provider }else{ ak = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.ak sk = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.sk @@ -194,25 +226,47 @@ Suite.metaClass.checkRecycleInternalStage = { String token, String instanceId, S region = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.region prefix = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.prefix bucket = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.bucket + provider = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.provider } suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}".toString()) - def credentials = new BasicAWSCredentials(ak, sk) - def endpointConfiguration = new EndpointConfiguration(endpoint, region) - def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() - // for root and admin, userId equal userName String userName = suite.context.config.jdbcUser; String userId = suite.context.config.jdbcUser; - def objectListing = s3Client.listObjects( - new ListObjectsRequest().withMaxKeys(1) - .withBucketName(bucket) - .withPrefix("${prefix}/stage/${userName}/${userId}/${fileName}")) - - suite.getLogger().info("${prefix}/stage/${userName}/${userId}/${fileName}, objectListing:${objectListing.getObjectSummaries()}".toString()) - if (!objectListing.getObjectSummaries().isEmpty()) { - return false; + + if (provider?.equalsIgnoreCase("AZURE")) { + // Use Azure Blob Storage SDK + String uri = String.format("https://%s/%s", endpoint, bucket); + StorageSharedKeyCredential cred = new StorageSharedKeyCredential(ak, sk); + BlobContainerClient containerClient = new BlobContainerClientBuilder() + .credential(cred) + .endpoint(uri) + .buildClient(); + + def blobs = containerClient.listBlobs( + new ListBlobsOptions().setPrefix("${prefix}/stage/${userName}/${userId}/${fileName}").setMaxResultsPerPage(1), + Duration.ofMinutes(1)); + def blobsList = blobs.stream().toList() + suite.getLogger().info("${prefix}/stage/${userName}/${userId}/${fileName}, blobs:${blobsList}".toString()) + if (!blobsList.isEmpty()) { + return false; + } + } else { + // Use AWS S3 SDK + def credentials = new BasicAWSCredentials(ak, sk) + def endpointConfiguration = new EndpointConfiguration(endpoint, region) + def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() + + def objectListing = s3Client.listObjects( + new ListObjectsRequest().withMaxKeys(1) + .withBucketName(bucket) + .withPrefix("${prefix}/stage/${userName}/${userId}/${fileName}")) + + suite.getLogger().info("${prefix}/stage/${userName}/${userId}/${fileName}, objectListing:${objectListing.getObjectSummaries()}".toString()) + if (!objectListing.getObjectSummaries().isEmpty()) { + return false; + } } return true; } @@ -262,7 +316,7 @@ Suite.metaClass.checkRecycleExpiredStageObjects = { String token, String instanc suite.getLogger().info("checkRecycleExpiredStageObjects(): getObjStoreInfoApiResult:${getObjStoreInfoApiResult}".toString()) if (getObjStoreInfoApiResult.result.toString().contains("obj_info")) { - String ak, sk, endpoint, region, prefix, bucket + String ak, sk, endpoint, region, prefix, bucket, provider if(!getObjStoreInfoApiResult.result.toString().contains("storage_vault=[")){ ak = getObjStoreInfoApiResult.result.obj_info[0].ak sk = getObjStoreInfoApiResult.result.obj_info[0].sk @@ -270,6 +324,7 @@ Suite.metaClass.checkRecycleExpiredStageObjects = { String token, String instanc region = getObjStoreInfoApiResult.result.obj_info[0].region prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket + provider = getObjStoreInfoApiResult.result.obj_info[0].provider }else{ ak = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.ak sk = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.sk @@ -277,39 +332,76 @@ Suite.metaClass.checkRecycleExpiredStageObjects = { String token, String instanc region = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.region prefix = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.prefix bucket = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.bucket + provider = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.provider } suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}".toString()) - def credentials = new BasicAWSCredentials(ak, sk) - def endpointConfiguration = new EndpointConfiguration(endpoint, region) - def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() - // for root and admin, userId equal userName String userName = suite.context.config.jdbcUser; String userId = suite.context.config.jdbcUser; - def objectListing = s3Client.listObjects( - new ListObjectsRequest() - .withBucketName(bucket) - .withPrefix("${prefix}/stage/${userName}/${userId}/")) - - suite.getLogger().info("${prefix}/stage/${userName}/${userId}/, objectListing:${objectListing.getObjectSummaries()}".toString()) - Set fileNames = new HashSet<>() - for (def os: objectListing.getObjectSummaries()) { - def split = os.key.split("/") - if (split.length <= 0 ) { - continue + + if (provider?.equalsIgnoreCase("AZURE")) { + // Use Azure Blob Storage SDK + String uri = String.format("https://%s/%s", endpoint, bucket); + StorageSharedKeyCredential cred = new StorageSharedKeyCredential(ak, sk); + BlobContainerClient containerClient = new BlobContainerClientBuilder() + .credential(cred) + .endpoint(uri) + .buildClient(); + + def blobs = containerClient.listBlobs( + new ListBlobsOptions().setPrefix("${prefix}/stage/${userName}/${userId}/"), + Duration.ofMinutes(1)); + + suite.getLogger().info("${prefix}/stage/${userName}/${userId}/, blobs count:${blobs.stream().count()}".toString()) + Set fileNames = new HashSet<>() + for (def blob: blobs) { + def split = blob.getName().split("/") + if (split.length <= 0 ) { + continue + } + fileNames.add(split[split.length-1]) } - fileNames.add(split[split.length-1]) - } - for(def f : nonExistFileNames) { - if (fileNames.contains(f)) { - return false + for(def f : nonExistFileNames) { + if (fileNames.contains(f)) { + return false + } } - } - for(def f : existFileNames) { - if (!fileNames.contains(f)) { - return false + for(def f : existFileNames) { + if (!fileNames.contains(f)) { + return false + } + } + } else { + // Use AWS S3 SDK + def credentials = new BasicAWSCredentials(ak, sk) + def endpointConfiguration = new EndpointConfiguration(endpoint, region) + def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() + + def objectListing = s3Client.listObjects( + new ListObjectsRequest() + .withBucketName(bucket) + .withPrefix("${prefix}/stage/${userName}/${userId}/")) + + suite.getLogger().info("${prefix}/stage/${userName}/${userId}/, objectListing:${objectListing.getObjectSummaries()}".toString()) + Set fileNames = new HashSet<>() + for (def os: objectListing.getObjectSummaries()) { + def split = os.key.split("/") + if (split.length <= 0 ) { + continue + } + fileNames.add(split[split.length-1]) + } + for(def f : nonExistFileNames) { + if (fileNames.contains(f)) { + return false + } + } + for(def f : existFileNames) { + if (!fileNames.contains(f)) { + return false + } } } return true diff --git a/regression-test/suites/cloud_p0/recycler/test_checker.groovy b/regression-test/suites/cloud_p0/recycler/test_checker.groovy index 96cbce6810c7e5..8cf430f805651e 100644 --- a/regression-test/suites/cloud_p0/recycler/test_checker.groovy +++ b/regression-test/suites/cloud_p0/recycler/test_checker.groovy @@ -33,6 +33,13 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.RemoteIterator import org.apache.hadoop.security.UserGroupInformation +import com.azure.storage.blob.BlobContainerClient +import com.azure.storage.blob.BlobContainerClientBuilder +import com.azure.storage.blob.models.BlobItem +import com.azure.storage.blob.models.ListBlobsOptions +import com.azure.storage.common.StorageSharedKeyCredential +import java.time.Duration + suite("test_checker") { def token = "greedisgood9999" def instanceId = context.config.instanceId; @@ -77,7 +84,7 @@ suite("test_checker") { // Randomly delete segment file under tablet dir getObjStoreInfoApiResult = getObjStoreInfo(token, cloudUniqueId) if (getObjStoreInfoApiResult.result.toString().contains("obj_info")) { - String ak, sk, endpoint, region, prefix, bucket + String ak, sk, endpoint, region, prefix, bucket, provider if(!getObjStoreInfoApiResult.result.toString().contains("storage_vault=[")){ ak = getObjStoreInfoApiResult.result.obj_info[0].ak sk = getObjStoreInfoApiResult.result.obj_info[0].sk @@ -85,6 +92,7 @@ suite("test_checker") { region = getObjStoreInfoApiResult.result.obj_info[0].region prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket + provider = getObjStoreInfoApiResult.result.obj_info[0].provider }else{ ak = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.ak sk = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.sk @@ -92,19 +100,43 @@ suite("test_checker") { region = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.region prefix = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.prefix bucket = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.bucket + provider = getObjStoreInfoApiResult.result.storage_vault[0].obj_info.provider + } + + if (provider?.equalsIgnoreCase("AZURE")) { + // Use Azure Blob Storage SDK + String uri = String.format("https://%s/%s", endpoint, bucket); + StorageSharedKeyCredential cred = new StorageSharedKeyCredential(ak, sk); + BlobContainerClient containerClient = new BlobContainerClientBuilder() + .credential(cred) + .endpoint(uri) + .buildClient(); + + def blobs = containerClient.listBlobs( + new ListBlobsOptions().setPrefix("${prefix}/data/${tabletId}/"), + Duration.ofMinutes(1)); + def blobsList = blobs.stream().toList() + assertTrue(!blobsList.isEmpty()) + Random random = new Random(caseStartTime); + def blobToDelete = blobsList[random.nextInt(blobsList.size())] + def blobName = blobToDelete.getName() + logger.info("delete blob: ${blobName}") + containerClient.getBlobClient(blobName).delete() + } else { + // Use AWS S3 SDK + def credentials = new BasicAWSCredentials(ak, sk) + def endpointConfiguration = new EndpointConfiguration(endpoint, region) + def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() + def objectListing = s3Client.listObjects( + new ListObjectsRequest().withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/")) + def objectSummaries = objectListing.getObjectSummaries() + assertTrue(!objectSummaries.isEmpty()) + Random random = new Random(caseStartTime); + def objectKey = objectSummaries[random.nextInt(objectSummaries.size())].getKey() + logger.info("delete objectKey: ${objectKey}") + s3Client.deleteObject(new DeleteObjectRequest(bucket, objectKey)) } - def credentials = new BasicAWSCredentials(ak, sk) - def endpointConfiguration = new EndpointConfiguration(endpoint, region) - def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(credentials)).build() - def objectListing = s3Client.listObjects( - new ListObjectsRequest().withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/")) - def objectSummaries = objectListing.getObjectSummaries() - assertTrue(!objectSummaries.isEmpty()) - Random random = new Random(caseStartTime); - def objectKey = objectSummaries[random.nextInt(objectSummaries.size())].getKey() - logger.info("delete objectKey: ${objectKey}") - s3Client.deleteObject(new DeleteObjectRequest(bucket, objectKey)) } else if (getObjStoreInfoApiResult.result.toString().contains("storage_vault=[") && getObjStoreInfoApiResult.result.toString().contains("hdfs_info")) { System.setProperty("java.security.krb5.conf", "/etc/krb/krb5.conf") String fsUri = getObjStoreInfoApiResult.result.storage_vault[0].hdfs_info.build_conf.fs_name