-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20979][SS]Add RateSource to generate values for tests and benchmark #18199
Conversation
Test build #77730 has started for PR 18199 at commit |
retest this please |
Test build #77749 has finished for PR 18199 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to formulate how this source works. Also it would be great to say that we try our best to reach tuplesPerSecond
, but we may be resource constrained, and numPartitions
can be tweaked to help reach the provided options.
val tuplesPerSecond = params.get("tuplesPerSecond").map(_.toLong).getOrElse(1L) | ||
if (tuplesPerSecond <= 0) { | ||
throw new IllegalArgumentException( | ||
s"Invalid value '${params("tuplesPerSecond")}' for option 'tuplesPerSecond', " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Invalid value '${params("tuplesPerSecond")}'. The option 'tuplesPerSecond' must be a positive
?
"must be positive") | ||
} | ||
|
||
val rampUpTimeSeconds = params.get("rampUpTimeSeconds").map(_.toLong).getOrElse(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should take this value as a duration string? e.g. option("rampUpTime", "5s")
private val maxSeconds = Long.MaxValue / tuplesPerSecond | ||
|
||
if (rampUpTimeSeconds > maxSeconds) { | ||
throw new ArithmeticException("integer overflow. Max offset with tuplesPerSecond " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Integer
also may be better to write $tuplesPerSecond tuplesPerSecond
instead of tuplesPerSecond $tuplesPerSecond
.
s"$tuplesPerSecond is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.") | ||
} | ||
|
||
private val startTimeMs = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to go to this complexity for this source??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add versioning at the beginning. Just a lesson from Kafka source.
override def getBatch(start: Option[Offset], end: Offset): DataFrame = { | ||
val startSeconds = start.flatMap(LongOffset.convert(_).map(_.offset)).getOrElse(0L) | ||
val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L) | ||
assert(startSeconds <= endSeconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: A meaningful assertion message would be useful
val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L) | ||
assert(startSeconds <= endSeconds) | ||
if (endSeconds > maxSeconds) { | ||
throw new ArithmeticException("integer overflow. Max offset with " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
You basically read my mind for the formulation! |
|
||
val clock = if (useManualClock) new ManualClock else new SystemClock | ||
|
||
private val maxSeconds = Long.MaxValue / tuplesPerSecond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be <= the real max allowed seconds because it doesn't take rampUpTimeSeconds
into consideration. I don't find a simple way to detect overflow quickly with rampUpTimeSeconds
.
However, this should be fine because the user usually won't hit this problem. The overflow detection is just to not surprise people because range
will return an empty RDD if overflow happens (See the below codes).
scala> sc.range(Long.MaxValue, Long.MinValue, 1).count()
res0: Long = 0
|
||
val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds) | ||
val relativeMsPerValue = | ||
TimeUnit.SECONDS.toMillis(endSeconds - startSeconds) / (rangeEnd - rangeStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integer division bug! This can easily return 0 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brkyvz did you mean rangeEnd - rangeStart == 0
? It's handled above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I mean endSeconds - startSeconds => 2000
, and rangeEnd - rangeStart => 50,000
=>
relativeMsPerValue = 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assume you meant TimeUnit.SECONDS.toMillis(endSeconds - startSeconds) => 2000
. Yeah, then it's fine. It will return multiple rows with the same timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess it may be okay, but it won't create a uniform distribution of event timestamps in that case though. Not sure if that's a requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okey. Try to generate make event timestamps be uniform distribution. The codes become more complicated though.
Test build #77757 has finished for PR 18199 at commit
|
Test build #77760 has finished for PR 18199 at commit
|
@@ -199,13 +199,52 @@ class RateStreamSource( | |||
} | |||
|
|||
val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds) | |||
val relativeMsPerValue = | |||
TimeUnit.SECONDS.toMillis(endSeconds - startSeconds) / (rangeEnd - rangeStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that you would only change TimeUnit.SECONDS.toMillis(endSeconds - startSeconds).toDouble
. Wasn't expecting all this change!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to avoid floating point inaccuracy :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's acceptable. Right now the code got overcomplicated :(
Test build #77762 has finished for PR 18199 at commit
|
Test build #77846 has finished for PR 18199 at commit
|
// The following condition is the same as | ||
// "relativeValue < (valueSizePerMs + 1) * remainderValue", just rewrite it to avoid | ||
// overflow. | ||
if (relativeValue - remainderValue < valueSizePerMs * remainderValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add parenthesis around relativeValue - remainderValue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also around valueSizePerMs * remainderValue
=> (relativeValue - remainderValue) < (valueSizePerMs * remainderValue)
.as[(java.sql.Timestamp, Long)] | ||
.map(v => (v._1.getTime, v._2)) | ||
val expectedAnswer = | ||
(0 until 1000).map(v => (v / 2, v)) ++ // Two values share the same timestamp. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there are 1000 timestamps in one second but we have 1500 values.
val VERSION = 1 | ||
} | ||
|
||
class RateStreamSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a InterfaceStability.Evolving
? I don't know where we use those. Just in case we change the namings, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. The class won't appear in the public Scaladoc/Javadoc. The user cannot see this tag in any place unless they jump to this file.
@@ -199,13 +199,52 @@ class RateStreamSource( | |||
} | |||
|
|||
val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds) | |||
val relativeMsPerValue = | |||
TimeUnit.SECONDS.toMillis(endSeconds - startSeconds) / (rangeEnd - rangeStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's acceptable. Right now the code got overcomplicated :(
@brkyvz I changed to use double to simplify the codes. |
Test build #77858 has finished for PR 18199 at commit
|
test("valueAtSecond") { | ||
import RateStreamSource._ | ||
|
||
assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add one test where rampUpTimeSeconds = 0
Left one last comment. Otherwise LGTM |
Test build #77901 has finished for PR 18199 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that we don't have a nice toString
method for this source. Can be added in a follow up.
Let me just do it now. Since it's pretty easy. |
Test build #77942 has finished for PR 18199 at commit
|
Test build #77943 has finished for PR 18199 at commit
|
Thanks! Merging to master. |
…chmark ## What changes were proposed in this pull request? This PR adds RateSource for Structured Streaming so that the user can use it to generate data for tests and benchmark easily. This source generates increment long values with timestamps. Each generated row has two columns: a timestamp column for the generated time and an auto increment long column starting with 0L. It supports the following options: - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer seconds. - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reach `rowsPerSecond`, but the query may be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. Here is a simple example that prints 10 rows per seconds: ``` spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .writeStream .format("console") .start() ``` The idea came from marmbrus and he did the initial work. ## How was this patch tested? The added tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes #18199 from zsxwing/rate.
I also merged this to branch 2.2 since this is a separated feature. |
…chmark ## What changes were proposed in this pull request? This PR adds RateSource for Structured Streaming so that the user can use it to generate data for tests and benchmark easily. This source generates increment long values with timestamps. Each generated row has two columns: a timestamp column for the generated time and an auto increment long column starting with 0L. It supports the following options: - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second. - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer seconds. - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reach `rowsPerSecond`, but the query may be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. Here is a simple example that prints 10 rows per seconds: ``` spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .writeStream .format("console") .start() ``` The idea came from marmbrus and he did the initial work. ## How was this patch tested? The added tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes apache#18199 from zsxwing/rate.
What changes were proposed in this pull request?
This PR adds RateSource for Structured Streaming so that the user can use it to generate data for tests and benchmark easily.
This source generates increment long values with timestamps. Each generated row has two columns: a timestamp column for the generated time and an auto increment long column starting with 0L.
It supports the following options:
rowsPerSecond
(e.g. 100, default: 1): How many rows should be generated per second.rampUpTime
(e.g. 5s, default: 0s): How long to ramp up before the generating speed becomesrowsPerSecond
. Using finer granularities than seconds will be truncated to integer seconds.numPartitions
(e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reachrowsPerSecond
, but the query may be resource constrained, andnumPartitions
can be tweaked to help reach the desired speed.Here is a simple example that prints 10 rows per seconds:
The idea came from @marmbrus and he did the initial work.
How was this patch tested?
The added tests.