Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -544,15 +544,14 @@ class DirectKafkaStreamSuite

test("using rate controller") {
val topic = "backpressure"
val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
kafkaTestUtils.createTopic(topic, 2)
kafkaTestUtils.createTopic(topic, 1)
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
KafkaUtils.fixKafkaParams(executorKafkaParams)

val batchIntervalMilliseconds = 100
val batchIntervalMilliseconds = 500
val estimator = new ConstantEstimator(100)
val messages = Map("foo" -> 200)
val messages = Map("foo" -> 5000)
kafkaTestUtils.sendMessages(topic, messages)

val sparkConf = new SparkConf()
Expand Down Expand Up @@ -596,7 +595,7 @@ class DirectKafkaStreamSuite
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
eventually(timeout(5.seconds), interval(10 milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.asScala.exists(_.size == expectedSize),
Expand Down