Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
update to AWS SDK v2
Browse files Browse the repository at this point in the history
  • Loading branch information
QlikFrederic committed Nov 11, 2024
1 parent 7615204 commit c999469
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ jobs:
kubectl create clusterrolebinding serviceaccounts-cluster-admin --clusterrole=cluster-admin --group=system:serviceaccounts || true
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.7.0/installer/volcano-development.yaml || true
eval $(minikube docker-env)
build/sbt -Psparkr -Pkubernetes -Pvolcano -Pkubernetes-integration-tests -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local "kubernetes-integration-tests/test"
build/sbt -Phadoop-3 -Psparkr -Pkubernetes -Pvolcano -Pkubernetes-integration-tests -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local "kubernetes-integration-tests/test"
- name: Upload Spark on K8S integration tests log files
if: failure()
uses: actions/upload-artifact@v3
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
<aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
<aws.java.sdk.version>1.11.655</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.20.160</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
<gcs-connector.version>hadoop3-2.2.14</gcs-connector.version>
Expand Down
6 changes: 3 additions & 3 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@
</activation>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.11.375</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>${aws.java.sdk.v2.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@
package org.apache.spark.deploy.k8s.integrationtest

import java.io.File
import java.net.URL
import java.net.{URI, URL}
import java.nio.file.Files

import scala.collection.JavaConverters._

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.AmazonS3Client
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder
import org.apache.hadoop.util.VersionInfo
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Span}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, PutObjectRequest}

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH}
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, SPARK_PI_MAIN_CLASS, TIMEOUT}
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
import org.apache.spark.internal.config.{ARCHIVES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
Expand All @@ -45,6 +48,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
val BUCKET = "spark"
val ACCESS_KEY = "minio"
val SECRET_KEY = "miniostorage"
val REGION = "us-west-2"

private def getMinioContainer(): Container = {
val envVars = Map (
Expand Down Expand Up @@ -302,20 +306,27 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
private def getS3Client(
endPoint: String,
accessKey: String = ACCESS_KEY,
secretKey: String = SECRET_KEY): AmazonS3Client = {
val credentials = new BasicAWSCredentials(accessKey, secretKey)
val s3client = new AmazonS3Client(credentials)
s3client.setEndpoint(endPoint)
secretKey: String = SECRET_KEY): S3Client = {
val credentials = AwsBasicCredentials.create(accessKey, secretKey)
val s3client = S3Client.builder()
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.endpointOverride(URI.create(endPoint))
.region(Region.of(REGION))
.build()
s3client
}

private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
Eventually.eventually(TIMEOUT, INTERVAL) {
try {
val s3client = getS3Client(endPoint, accessKey, secretKey)
s3client.createBucket(BUCKET)
val createBucketRequest = CreateBucketRequest.builder()
.bucket(BUCKET)
.build()
s3client.createBucket(createBucketRequest)
} catch {
case e: Exception =>
logError(s"Failed to create bucket $BUCKET", e)
throw new SparkException(s"Failed to create bucket $BUCKET.", e)
}
}
Expand All @@ -328,7 +339,11 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
Eventually.eventually(TIMEOUT, INTERVAL) {
try {
val s3client = getS3Client(endPoint)
s3client.putObject(BUCKET, objectKey, objectContent)
val putObjectRequest = PutObjectRequest.builder()
.bucket(BUCKET)
.key(objectKey)
.build()
s3client.putObject(putObjectRequest, RequestBody.fromString(objectContent))
} catch {
case e: Exception =>
throw new SparkException(s"Failed to create object $BUCKET/$objectKey.", e)
Expand Down

0 comments on commit c999469

Please sign in to comment.