@@ -25,6 +25,11 @@ import com.amazonaws.services.s3.AmazonS3
2525import com.amazonaws.services.s3.AmazonS3ClientBuilder
2626import com.amazonaws.services.s3.model.ListObjectsRequest
2727import com.amazonaws.services.s3.model.ObjectListing
28+ import com.azure.storage.blob.BlobContainerClient
29+ import com.azure.storage.blob.BlobContainerClientBuilder
30+ import com.azure.storage.blob.models.ListBlobsOptions
31+ import com.azure.storage.common.StorageSharedKeyCredential
32+ import java.time.Duration
2833import org.apache.hadoop.conf.Configuration
2934import org.apache.hadoop.fs.FileStatus
3035import org.apache.hadoop.fs.FileSystem
@@ -79,38 +84,64 @@ Suite.metaClass.checkRecycleTable = { String token, String instanceId, String cl
7984 suite. getLogger(). info(" checkRecycleTable(): getObjStoreInfoApiResult:${ getObjStoreInfoApiResult} " . toString())
8085
8186 if (getObjStoreInfoApiResult. result. toString(). contains(" obj_info" )) {
82- String ak, sk, endpoint, region, prefix, bucket
87+ String ak, sk, endpoint, region, prefix, bucket, provider
8388 if (! getObjStoreInfoApiResult. result. toString(). contains(" storage_vault=[" )){
8489 ak = getObjStoreInfoApiResult. result. obj_info[0 ]. ak
8590 sk = getObjStoreInfoApiResult. result. obj_info[0 ]. sk
8691 endpoint = getObjStoreInfoApiResult. result. obj_info[0 ]. endpoint
8792 region = getObjStoreInfoApiResult. result. obj_info[0 ]. region
8893 prefix = getObjStoreInfoApiResult. result. obj_info[0 ]. prefix
8994 bucket = getObjStoreInfoApiResult. result. obj_info[0 ]. bucket
95+ provider = getObjStoreInfoApiResult. result. obj_info[0 ]. provider
9096 }else {
9197 ak = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. ak
9298 sk = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. sk
9399 endpoint = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. endpoint
94100 region = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. region
95101 prefix = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. prefix
96102 bucket = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. bucket
103+ provider = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. provider
97104 }
98105 suite. getLogger(). info(" ak:${ ak} , sk:${ sk} , endpoint:${ endpoint} , prefix:${ prefix} " . toString())
99106
100- def credentials = new BasicAWSCredentials (ak, sk)
101- def endpointConfiguration = new EndpointConfiguration (endpoint, region)
102- def s3Client = AmazonS3ClientBuilder . standard(). withEndpointConfiguration(endpointConfiguration)
103- .withCredentials(new AWSStaticCredentialsProvider (credentials)). build()
104-
105107 assertTrue (tabletIdList. size() > 0 )
106- for (tabletId : tabletIdList) {
107- suite. getLogger(). info(" tableName: ${ tableName} , tabletId:${ tabletId} " );
108- def objectListing = s3Client. listObjects(
109- new ListObjectsRequest (). withMaxKeys(1 ). withBucketName(bucket). withPrefix(" ${ prefix} /data/${ tabletId} /" ))
110108
111- suite. getLogger(). info(" tableName: ${ tableName} , tabletId:${ tabletId} , objectListing:${ objectListing.getObjectSummaries()} " . toString())
112- if (! objectListing. getObjectSummaries(). isEmpty()) {
113- return false ;
109+ if (provider?. equalsIgnoreCase(" AZURE" )) {
110+ // Use Azure Blob Storage SDK
111+ String uri = String . format(" https://%s/%s" , endpoint, bucket);
112+ StorageSharedKeyCredential cred = new StorageSharedKeyCredential (ak, sk);
113+ BlobContainerClient containerClient = new BlobContainerClientBuilder ()
114+ .credential(cred)
115+ .endpoint(uri)
116+ .buildClient();
117+
118+ for (tabletId : tabletIdList) {
119+ suite. getLogger(). info(" tableName: ${ tableName} , tabletId:${ tabletId} " );
120+ def blobs = containerClient. listBlobs(
121+ new ListBlobsOptions (). setPrefix(" ${ prefix} /data/${ tabletId} /" ). setMaxResultsPerPage(1 ),
122+ Duration . ofMinutes(1 ));
123+ def blobsList = blobs. stream(). toList()
124+ suite. getLogger(). info(" tableName: ${ tableName} , tabletId:${ tabletId} , blobs:${ blobsList} " . toString())
125+ if (! blobsList. isEmpty()) {
126+ return false ;
127+ }
128+ }
129+ } else {
130+ // Use AWS S3 SDK
131+ def credentials = new BasicAWSCredentials (ak, sk)
132+ def endpointConfiguration = new EndpointConfiguration (endpoint, region)
133+ def s3Client = AmazonS3ClientBuilder . standard(). withEndpointConfiguration(endpointConfiguration)
134+ .withCredentials(new AWSStaticCredentialsProvider (credentials)). build()
135+
136+ for (tabletId : tabletIdList) {
137+ suite. getLogger(). info(" tableName: ${ tableName} , tabletId:${ tabletId} " );
138+ def objectListing = s3Client. listObjects(
139+ new ListObjectsRequest (). withMaxKeys(1 ). withBucketName(bucket). withPrefix(" ${ prefix} /data/${ tabletId} /" ))
140+
141+ suite. getLogger(). info(" tableName: ${ tableName} , tabletId:${ tabletId} , objectListing:${ objectListing.getObjectSummaries()} " . toString())
142+ if (! objectListing. getObjectSummaries(). isEmpty()) {
143+ return false ;
144+ }
114145 }
115146 }
116147 return true ;
@@ -179,40 +210,63 @@ Suite.metaClass.checkRecycleInternalStage = { String token, String instanceId, S
179210 suite. getLogger(). info(" checkRecycleTable(): getObjStoreInfoApiResult:${ getObjStoreInfoApiResult} " . toString())
180211
181212 if (getObjStoreInfoApiResult. result. toString(). contains(" obj_info" )) {
182- String ak, sk, endpoint, region, prefix, bucket
213+ String ak, sk, endpoint, region, prefix, bucket, provider
183214 if (! getObjStoreInfoApiResult. result. toString(). contains(" storage_vault=[" )){
184215 ak = getObjStoreInfoApiResult. result. obj_info[0 ]. ak
185216 sk = getObjStoreInfoApiResult. result. obj_info[0 ]. sk
186217 endpoint = getObjStoreInfoApiResult. result. obj_info[0 ]. endpoint
187218 region = getObjStoreInfoApiResult. result. obj_info[0 ]. region
188219 prefix = getObjStoreInfoApiResult. result. obj_info[0 ]. prefix
189220 bucket = getObjStoreInfoApiResult. result. obj_info[0 ]. bucket
221+ provider = getObjStoreInfoApiResult. result. obj_info[0 ]. provider
190222 }else {
191223 ak = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. ak
192224 sk = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. sk
193225 endpoint = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. endpoint
194226 region = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. region
195227 prefix = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. prefix
196228 bucket = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. bucket
229+ provider = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. provider
197230 }
198231 suite. getLogger(). info(" ak:${ ak} , sk:${ sk} , endpoint:${ endpoint} , prefix:${ prefix} " . toString())
199232
200- def credentials = new BasicAWSCredentials (ak, sk)
201- def endpointConfiguration = new EndpointConfiguration (endpoint, region)
202- def s3Client = AmazonS3ClientBuilder . standard(). withEndpointConfiguration(endpointConfiguration)
203- .withCredentials(new AWSStaticCredentialsProvider (credentials)). build()
204-
205233 // for root and admin, userId equal userName
206234 String userName = suite. context. config. jdbcUser;
207235 String userId = suite. context. config. jdbcUser;
208- def objectListing = s3Client. listObjects(
209- new ListObjectsRequest (). withMaxKeys(1 )
210- .withBucketName(bucket)
211- .withPrefix(" ${ prefix} /stage/${ userName} /${ userId} /${ fileName} " ))
212-
213- suite. getLogger(). info(" ${ prefix} /stage/${ userName} /${ userId} /${ fileName} , objectListing:${ objectListing.getObjectSummaries()} " . toString())
214- if (! objectListing. getObjectSummaries(). isEmpty()) {
215- return false ;
236+
237+ if (provider?. equalsIgnoreCase(" AZURE" )) {
238+ // Use Azure Blob Storage SDK
239+ String uri = String . format(" https://%s/%s" , endpoint, bucket);
240+ StorageSharedKeyCredential cred = new StorageSharedKeyCredential (ak, sk);
241+ BlobContainerClient containerClient = new BlobContainerClientBuilder ()
242+ .credential(cred)
243+ .endpoint(uri)
244+ .buildClient();
245+
246+ def blobs = containerClient. listBlobs(
247+ new ListBlobsOptions (). setPrefix(" ${ prefix} /stage/${ userName} /${ userId} /${ fileName} " ). setMaxResultsPerPage(1 ),
248+ Duration . ofMinutes(1 ));
249+ def blobsList = blobs. stream(). toList()
250+ suite. getLogger(). info(" ${ prefix} /stage/${ userName} /${ userId} /${ fileName} , blobs:${ blobsList} " . toString())
251+ if (! blobsList. isEmpty()) {
252+ return false ;
253+ }
254+ } else {
255+ // Use AWS S3 SDK
256+ def credentials = new BasicAWSCredentials (ak, sk)
257+ def endpointConfiguration = new EndpointConfiguration (endpoint, region)
258+ def s3Client = AmazonS3ClientBuilder . standard(). withEndpointConfiguration(endpointConfiguration)
259+ .withCredentials(new AWSStaticCredentialsProvider (credentials)). build()
260+
261+ def objectListing = s3Client. listObjects(
262+ new ListObjectsRequest (). withMaxKeys(1 )
263+ .withBucketName(bucket)
264+ .withPrefix(" ${ prefix} /stage/${ userName} /${ userId} /${ fileName} " ))
265+
266+ suite. getLogger(). info(" ${ prefix} /stage/${ userName} /${ userId} /${ fileName} , objectListing:${ objectListing.getObjectSummaries()} " . toString())
267+ if (! objectListing. getObjectSummaries(). isEmpty()) {
268+ return false ;
269+ }
216270 }
217271 return true ;
218272 }
@@ -262,54 +316,92 @@ Suite.metaClass.checkRecycleExpiredStageObjects = { String token, String instanc
262316 suite. getLogger(). info(" checkRecycleExpiredStageObjects(): getObjStoreInfoApiResult:${ getObjStoreInfoApiResult} " . toString())
263317
264318 if (getObjStoreInfoApiResult. result. toString(). contains(" obj_info" )) {
265- String ak, sk, endpoint, region, prefix, bucket
319+ String ak, sk, endpoint, region, prefix, bucket, provider
266320 if (! getObjStoreInfoApiResult. result. toString(). contains(" storage_vault=[" )){
267321 ak = getObjStoreInfoApiResult. result. obj_info[0 ]. ak
268322 sk = getObjStoreInfoApiResult. result. obj_info[0 ]. sk
269323 endpoint = getObjStoreInfoApiResult. result. obj_info[0 ]. endpoint
270324 region = getObjStoreInfoApiResult. result. obj_info[0 ]. region
271325 prefix = getObjStoreInfoApiResult. result. obj_info[0 ]. prefix
272326 bucket = getObjStoreInfoApiResult. result. obj_info[0 ]. bucket
327+ provider = getObjStoreInfoApiResult. result. obj_info[0 ]. provider
273328 }else {
274329 ak = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. ak
275330 sk = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. sk
276331 endpoint = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. endpoint
277332 region = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. region
278333 prefix = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. prefix
279334 bucket = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. bucket
335+ provider = getObjStoreInfoApiResult. result. storage_vault[0 ]. obj_info. provider
280336 }
281337 suite. getLogger(). info(" ak:${ ak} , sk:${ sk} , endpoint:${ endpoint} , prefix:${ prefix} " . toString())
282338
283- def credentials = new BasicAWSCredentials (ak, sk)
284- def endpointConfiguration = new EndpointConfiguration (endpoint, region)
285- def s3Client = AmazonS3ClientBuilder . standard(). withEndpointConfiguration(endpointConfiguration)
286- .withCredentials(new AWSStaticCredentialsProvider (credentials)). build()
287-
288339 // for root and admin, userId equal userName
289340 String userName = suite. context. config. jdbcUser;
290341 String userId = suite. context. config. jdbcUser;
291- def objectListing = s3Client. listObjects(
292- new ListObjectsRequest ()
293- .withBucketName(bucket)
294- .withPrefix(" ${ prefix} /stage/${ userName} /${ userId} /" ))
295-
296- suite. getLogger(). info(" ${ prefix} /stage/${ userName} /${ userId} /, objectListing:${ objectListing.getObjectSummaries()} " . toString())
297- Set<String > fileNames = new HashSet<> ()
298- for (def os : objectListing. getObjectSummaries()) {
299- def split = os. key. split(" /" )
300- if (split. length <= 0 ) {
301- continue
342+
343+ if (provider?. equalsIgnoreCase(" AZURE" )) {
344+ // Use Azure Blob Storage SDK
345+ String uri = String . format(" https://%s/%s" , endpoint, bucket);
346+ StorageSharedKeyCredential cred = new StorageSharedKeyCredential (ak, sk);
347+ BlobContainerClient containerClient = new BlobContainerClientBuilder ()
348+ .credential(cred)
349+ .endpoint(uri)
350+ .buildClient();
351+
352+ def blobs = containerClient. listBlobs(
353+ new ListBlobsOptions (). setPrefix(" ${ prefix} /stage/${ userName} /${ userId} /" ),
354+ Duration . ofMinutes(1 ));
355+
356+ suite. getLogger(). info(" ${ prefix} /stage/${ userName} /${ userId} /, blobs count:${ blobs.stream().count()} " . toString())
357+ Set<String > fileNames = new HashSet<> ()
358+ for (def blob : blobs) {
359+ def split = blob. getName(). split(" /" )
360+ if (split. length <= 0 ) {
361+ continue
362+ }
363+ fileNames. add(split[split. length-1 ])
302364 }
303- fileNames. add(split[split. length-1 ])
304- }
305- for (def f : nonExistFileNames) {
306- if (fileNames. contains(f)) {
307- return false
365+ for (def f : nonExistFileNames) {
366+ if (fileNames. contains(f)) {
367+ return false
368+ }
308369 }
309- }
310- for (def f : existFileNames) {
311- if (! fileNames. contains(f)) {
312- return false
370+ for (def f : existFileNames) {
371+ if (! fileNames. contains(f)) {
372+ return false
373+ }
374+ }
375+ } else {
376+ // Use AWS S3 SDK
377+ def credentials = new BasicAWSCredentials (ak, sk)
378+ def endpointConfiguration = new EndpointConfiguration (endpoint, region)
379+ def s3Client = AmazonS3ClientBuilder . standard(). withEndpointConfiguration(endpointConfiguration)
380+ .withCredentials(new AWSStaticCredentialsProvider (credentials)). build()
381+
382+ def objectListing = s3Client. listObjects(
383+ new ListObjectsRequest ()
384+ .withBucketName(bucket)
385+ .withPrefix(" ${ prefix} /stage/${ userName} /${ userId} /" ))
386+
387+ suite. getLogger(). info(" ${ prefix} /stage/${ userName} /${ userId} /, objectListing:${ objectListing.getObjectSummaries()} " . toString())
388+ Set<String > fileNames = new HashSet<> ()
389+ for (def os : objectListing. getObjectSummaries()) {
390+ def split = os. key. split(" /" )
391+ if (split. length <= 0 ) {
392+ continue
393+ }
394+ fileNames. add(split[split. length-1 ])
395+ }
396+ for (def f : nonExistFileNames) {
397+ if (fileNames. contains(f)) {
398+ return false
399+ }
400+ }
401+ for (def f : existFileNames) {
402+ if (! fileNames. contains(f)) {
403+ return false
404+ }
313405 }
314406 }
315407 return true
0 commit comments