-
Notifications
You must be signed in to change notification settings - Fork 695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SEDONA-655] DBSCAN #1589
[SEDONA-655] DBSCAN #1589
Conversation
@james-willis The memory consumption of new tests seem to be very high? |
This is something I had worked with Kristin on in the past. We had thought it was related to cached dataframes holding references to broadcast relations. But this committed version of the code does not contain any caching, just checkpoints. There is some persisting inside of the connected component implementation, but those should be getting cleaned up. I'll check if something here is leaking persisted dataframes |
Theres some dataframes inside of the connected components algo that dont get unpersisted. I will look into raising a PR in that repo tomorrow to fix that. |
I submitted a fix to graphframes for this. I can clear the spark catalog if the disable broadcast fix doesnt resolve |
what are the remaining tasks for this PR? |
just documentation |
After you guys (@james-willis @jiayuasu) finish this, I'll write an article on this. |
useSpheroid: Boolean = false): DataFrame = { | ||
|
||
// We want to disable broadcast joins because the broadcast reference were using too much driver memory | ||
val spark = SparkSession.getActiveSession.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is here possible to have an empty session ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if the session crashed in some other test. The parent class initializes the spark session.
.withColumnRenamed("id", ID_COLUMN) | ||
.withColumn("id", sha2(to_json(struct("*")), 256)) | ||
} else { | ||
dataframe.withColumn("id", sha2(to_json(struct("*")), 256)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated records should be equal and aggregated or not ? I am wondering if here monotonically_increasing_id
function is enough ? I think that might by costly sha2(to_json(struct("*"))
also as key for join might not be super efficient compared to bigints ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
monotonically increasing id is not deterministic so it can give incorrect results in joins unless you immediately checkpoint after generating the ids. This blog does a decent job of describing the issue: https://xebia.com/blog/spark-surprises-for-the-uninitiated/ I've been bitten by this issue before, espcially when an executor crashes.
duplicated records should be equal and aggregated or not ?
This implementation does not deal with duplicated records well. There is only a comment in the doc string saying dont provide duplicates. monotonically_increasing_id + checkpoint would handle duplicates.
Co-authored-by: Kelly-Ann Dolor <kellyanndolor@gmail.com>
Co-authored-by: Kelly-Ann Dolor <kellyanndolor@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is not put in the mkdocs.yml
hence it will show up on the website navigation bar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on it. and fixing test failures.
Did you read the Contributor Guide?
Is this PR related to a JIRA ticket?
[SEDONA-XXX] my subject
.What changes were proposed in this PR?
This PR add a DBSCAN function in the scala and python APIs of the spark implementation of sedona.
How was this patch tested?
unit tests
Did this PR include necessary documentation updates?
vX.Y.Z
format.