-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Call-Cache CollectDistributedArray
(rfc-0000)
#12954
[query] Call-Cache CollectDistributedArray
(rfc-0000)
#12954
Conversation
6affd38
to
b0aee67
Compare
Return `(Option[Throwable], IndexedSeq[(Int, Array[Byte])]`, where Option[Throwable]: exception that was raised while computing partitions IndexedSeq[(Int, Array[Byte])]: partition index -> result
b0aee67
to
e332df7
Compare
Read type of execution cache from flags
…collect-distributed-array
…collect-distributed-array
def getFileHash(fs: FS)(path: String): Array[Byte] = | ||
fs.eTag(path) match { | ||
case Some(etag) => | ||
etag.getBytes | ||
case None => | ||
path.getBytes ++ Bytes.fromLong(fs.fileStatus(path).getModificationTime) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something to call out here:
I'm just using the file's etag if the filesystem supports them and NOT the path.
I think that the etag is unique on azure (their doc is quite hard to navigate so I'm finding it hard to know for sure - welcome help!).
GCS has the nice property that copying the file preserves the etag and it only changes after modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this is really great work! I have some small requests, some of which we discussed last week. I haven't finished looking at all the hash cases yet, or the tests. With the hash cases, in general I think by default anything that we print, we should include in the hash. In some cases it might not be necessary, but I'd rather be conservative.
} | ||
jobs(i) = JObject( | ||
|
||
JObject( | ||
"always_run" -> JBool(false), | ||
"job_id" -> JInt(i + 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danking This is using job_id
s from the original job, while n
is the number of partitions currently being retried. So it's possible to have job_id >= n
. Could that cause any issues?
hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala
Outdated
Show resolved
Hide resolved
hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala
Outdated
Show resolved
Hide resolved
hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala
Outdated
Show resolved
Hide resolved
hail/src/main/scala/is/hail/expr/ir/analyses/SemanticHash.scala
Outdated
Show resolved
Hide resolved
object CodeGenSupport { | ||
def lift(hash: SemanticHash.Type): Option[SemanticHash.Type] = | ||
Some(hash) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was having a horrible time trying to generate code to lift an Int
into an Option
and kept getting NoSuchMethodError
s when trying to call the constructor of Some
or call Option.apply
, presumably because Option
is parameterised and thus needs a reference type in its jvm implementation.
I rage quit and wrote this instead.
…collect-distributed-array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm feeling good about merging this as an experimental flag, disabled by default. Just one last comment.
…collect-distributed-array
…collect-distributed-array
Adds the ability to rerun/retry queries from the nearest
CollectDistributedArray
(CDA
)IR
site.Computes a "Semantic Hash" of the top-level
IR
which is used to generate a key for the various constituentCDA
calls in a query. The implementation for CDA,BackendUtils.collectDArray
, uses that key to look into an the execution cache for the results of each partition for that call and uses/updates the cache with successful partition computations.The nature of the staged- lower and execute model means we don't know how many
CDA
calls that will be generated ahead of time. Thus we treat the "Semantic Hash" in a similar way to an RNG state variable and generate a key from the Semantic Hash every time every time we encounter aCDA
.The execution cache is implemented on-top of a local or remote filesystem (configurable via the
HAIL_CACHE_DIR
environment variable). This defaults to{tmpdir}/hail/{pip-version}
.