Skip to content

Commit dc175e2

Browse files
committed
[SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and fix kinesis integration tests (ENABLE_KINESIS_TESTS=1)
1 parent ea71991 commit dc175e2

File tree

4 files changed

+25
-23
lines changed

4 files changed

+25
-23
lines changed

connector/kinesis-asl/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,16 @@
6464
<version>${aws.java.sdk.version}</version>
6565
</dependency>
6666
<dependency>
67-
<groupId>com.amazonaws</groupId>
67+
<groupId>software.amazon.kinesis</groupId>
6868
<artifactId>amazon-kinesis-producer</artifactId>
6969
<version>${aws.kinesis.producer.version}</version>
7070
<scope>test</scope>
71+
<exclusions>
72+
<exclusion>
73+
<groupId>com.kjetland</groupId>
74+
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
75+
</exclusion>
76+
</exclusions>
7177
</dependency>
7278
<!-- manage this up explicitly to match Spark; com.amazonaws:aws-java-sdk-pom specifies
7379
2.6.7 but says we can manage it up -->

connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis
1818

1919
import java.nio.ByteBuffer
2020
import java.nio.charset.StandardCharsets
21+
import java.util.concurrent.{Executors, TimeUnit}
2122

2223
import scala.collection.mutable
2324
import scala.collection.mutable.ArrayBuffer
2425

25-
import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
26-
KinesisProducerConfiguration, UserRecordResult}
2726
import com.google.common.util.concurrent.{FutureCallback, Futures}
28-
29-
import org.apache.spark.util.ThreadUtils
27+
import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
28+
KinesisProducerConfiguration, UserRecordResult}
3029

3130
private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
3231
extends KinesisTestUtils(streamShardCount) {
@@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
5352
}
5453

5554
override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
55+
val executor = Executors.newSingleThreadExecutor()
5656
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
5757
data.foreach { num =>
5858
val str = num.toString
@@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
6363

6464
override def onSuccess(result: UserRecordResult): Unit = {
6565
val shardId = result.getShardId
66-
val seqNumber = result.getSequenceNumber()
66+
val seqNumber = result.getSequenceNumber
6767
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
6868
new ArrayBuffer[(Int, String)]())
6969
sentSeqNumbers += ((num, seqNumber))
7070
}
7171
}
72-
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService())
72+
Futures.addCallback(future, kinesisCallBack, executor)
7373
}
7474
producer.flushSync()
75-
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
75+
executor.shutdown()
76+
executor.awaitTermination(10, TimeUnit.SECONDS)
77+
shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
7678
}
7779
}

connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
3232
import com.amazonaws.services.dynamodbv2.document.DynamoDB
3333
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
3434
import com.amazonaws.services.kinesis.model._
35+
import com.amazonaws.waiters.WaiterParameters
3536

3637
import org.apache.spark.internal.Logging
3738
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
@@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
6162
client
6263
}
6364

65+
private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()
66+
6467
private lazy val dynamoDB = {
6568
val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
6669
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
@@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
184187
}
185188

186189
private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
187-
val startTimeNs = System.nanoTime()
188-
while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
189-
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
190-
describeStream(streamNameToWaitFor).foreach { description =>
191-
val streamStatus = description.getStreamStatus()
192-
logDebug(s"\t- current state: $streamStatus\n")
193-
if ("ACTIVE".equals(streamStatus)) {
194-
return
195-
}
196-
}
197-
}
198-
require(false, s"Stream $streamName never became active")
190+
val describeStreamRequest = new DescribeStreamRequest()
191+
.withStreamName(streamNameToWaitFor)
192+
streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
199193
}
200194
}
201195

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,12 @@
158158
<codahale.metrics.version>4.2.33</codahale.metrics.version>
159159
<!-- Should be consistent with SparkBuild.scala and docs -->
160160
<avro.version>1.12.0</avro.version>
161-
<aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
161+
<aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
162162
<!-- Should be consistent with Kinesis client dependency -->
163-
<aws.java.sdk.version>1.11.655</aws.java.sdk.version>
163+
<aws.java.sdk.version>1.12.681</aws.java.sdk.version>
164164
<aws.java.sdk.v2.version>2.29.52</aws.java.sdk.v2.version>
165165
<!-- the producer is used in tests -->
166-
<aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
166+
<aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
167167
<!-- Do not use 3.0.0: https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
168168
<gcs-connector.version>hadoop3-2.2.28</gcs-connector.version>
169169
<analyticsaccelerator-s3.version>1.3.0</analyticsaccelerator-s3.version>

0 commit comments

Comments
 (0)