-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
3/ hops aggregation + sawtooth windows #4
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's set aside some time for this one in the walkthrough :)
// daily aggregates (headStart(minTs) - 180days, maxTs), | ||
class HopsAggregator(minQueryTs: Long, | ||
aggregations: Seq[Aggregation], | ||
inputSchema: Seq[(String, DataType)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq[StructType] instead?
import ai.zipline.api.Extensions._ | ||
import scala.collection.JavaConverters._ | ||
|
||
// generate hops per spec, (NOT per window) for the given hop sizes in the resolution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is spec
here?
} | ||
|
||
object HopsAggregator { | ||
type OutputArrayType = Array[Array[Array[Any]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why triple nested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah is the inner one a Tuple(ts, IR)
? If so, case class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ARRAY [IR1, IR2, IR3, .. IRN, ts] -HOP START - END - TODO: Add comment
case _ => | ||
throw new IllegalArgumentException( | ||
s"Invalid request for window $window for daily aggregation. " + | ||
s"Window can only be multiples of 1d or the operation needs to be un-windowed." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is the right place to be surfacing this exception... Seems a bit too deep in the stack.
.zip(readableLeftBounds) | ||
.map { case (hop, left) => s"$hop->$left" } | ||
.mkString(", ") | ||
println(s"""Left bounds: $readableHopsToBoundsMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a proper logger for logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this block is meant for debugging purpose? Should we comment it out in prod version?
|
||
object HopsAggregator { | ||
type OutputArrayType = Array[Array[Array[Any]]] | ||
type IrMapType = Array[java.util.HashMap[Long, Array[Any]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would TreeMap
make the merge
method faster?
// Zero-copy merging | ||
// NOTE: inputs will be mutated in the process, use "clone" if you want re-use references | ||
def merge(leftHops: IrMapType, rightHops: IrMapType): IrMapType = { | ||
if (leftHops == null) return rightHops |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned below that if TreeMap
would make the merge faster? That might actually increase the insert time complexity.
Or if we convert both to TreeMap in this method, nlogn + n
should be still better than n^2? It may also increase unnecessary memory cost in reality though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore this comment.
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") | ||
formatter.setTimeZone(TimeZone.getTimeZone("UTC")) | ||
|
||
// in millisecond precision |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember we discussed this before regarding millisecond or nanosecond precision. Is there necessity to support for nanoseconds? Probably not
@transient private lazy val baseAggregator = new RowAggregator(inputSchema, aggregations.map(_.unWindowed)) | ||
@transient private lazy val baseIrIndices = windowMappings.map(_.baseIrIndex) | ||
|
||
def computeWindows(hops: HopsAggregator.OutputArrayType, endTimes: Array[Long]): Array[Array[Any]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hops + [10:31, 10:42, 11:40] => [(10:31, ir1), (10:42, ir2)]
|
||
// method is used to generate head-realtime ness on top of hops | ||
// But without the requirement that the input be sorted | ||
def cumulateUnsorted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5:00 min - 10:30 - 10:35 - [10:31, 10:33], ir1,
inputs - i1 10:30, i2 10:32, i3 10:34
@transient private lazy val baseAggregator = new RowAggregator(inputSchema, aggregations.map(_.unWindowed)) | ||
@transient private lazy val baseIrIndices = windowMappings.map(_.baseIrIndex) | ||
|
||
def computeWindows(hops: HopsAggregator.OutputArrayType, endTimes: Array[Long]): Array[Array[Any]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: type alias Array[Any] to IRWithRowTs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also explain why we chose this structure
HopsAggregator, converts raw data into IRs, bucketed by hops (parts of a window)
SawToothWindow has logic to convert multiple hops into the final windowed irs.