-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql: better distribute distinct processors #52098
Conversation
37c16b4
to
702e365
Compare
6e46f48
to
311a657
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me whether hashing the output of the first stage is necessarily an improvement with higher inter-node latencies. This is probably a decision that should be left up to the optimizer. I'm not opposed to merging this though if it makes it easier for the new spec planning work to implement distinct construction.
Reviewed 4 of 6 files at r1, 2 of 2 files at r2, 3 of 3 files at r3.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/sql/distsql_physical_planner.go, line 2914 at r1 (raw file):
// Add distinct processors local to each existing current result processor. // TODO(arjun): This is potentially memory inefficient if we don't have any
s/arjun/yuzefovich
😄?
pkg/sql/physicalplan/physical_plan.go, line 1171 at r2 (raw file):
// INTERSECT and EXCEPT plans. // // TODO(abhimadan): If there's a strong key on the left or right side, we
ditto
311a657
to
5bd1f4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first commit is not necessary for the new factory, but I disagree that it is not improvement. The thing is that in the old factory if one stage is local, then all consequent stages will also be local, and those later stages might contain joins or aggregations.
For example, for this query we can now run distributed sort and use ordered synchronizer to merge the streams (before https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lVGL2kAUhd_7Ky73ySUjcSZx1YFC2jVLA1a3UWjLkocxGbaBbGInI7SI_73EFHeVOsmi9fHOzJnv3JPLZIPlzww5zv2Jf7eAtcrgPpx9hkf_28PkQzCFzjiYL-ZfJjfw90hVB9O7BQgCy_pwIrSAr5_80IdOR4AFyxuwIOY8mC6GN_Ae2ABm4dgP4eN3WBIQERLMi0ROxbMskT8iRYIMCTpI0EWCfYwIrlQRy7IsVHVksxMEyS_kPYJpvlrrajkiGBdKIt-gTnUmkeNCLDMZSpFIZfeQYCK1SLMdpjLqrVT6LNRvJDhfibzk0LUpiDwBCoX-IRUSvE8zLRWHTsejYIHHqn4853VDvC6Q4GytOXiUeAyjLcFirV-8lVo8SeR0S9r7H6elTvNY2_1D8zWC4EwlUsmkiclOMl9QRX3VMcciHrMw2prM0d5Z7py3JDIvlJbKpvQIySziUeskwz1g0PZTQ9tMjU27Nrvq3DR0sP80txecG9Y-NdYqNda1naum1tDBPrXBBVNz2qfmtErN6druVVNr6GCf2vCCqbntU3NbpeZ27f5VU2voYJ_a6D-97P9ghrJcFXkpD4inbu5VT75MnmT9myiLtYrlgyriHaYuZzvdbiGRpa53aV0Eeb1VGXwtpkYxOxDTYzEzih0z2THbZma1a1T3zeL-OU3fGsUDM3lwDnloFI_M5NE5ZNowY01D9rYpi7bv_gQAAP__AKVrVw==,
after https://cockroachdb.github.io/distsqlplan/decode.html#eJy8ltFr4koUxt_vX3E4TxZH4kxitYELube1rNDVrgq7S_FhNEMrWONORthS_N8Xzao16pykhjxG882c830_PvKO8a8Z-jhoP7Rvh7DUM7jv977CU_vH48N_nS5U7jqD4eDbwxX8fWX93OneDkEyGCcvh9JI-P6l3W9DpSKhCuMrqMLE9zvdYesK_gXRhF7_rt2H_3_CmIEcIcN5FKqufFUx-k_IkaFAhi4y9JBhA0cMFzqaqDiO9PqV942gE_5Gv85wOl8szfrnEcNJpBX672imZqbQx6Ecz1RfyVBpp44MQ2XkdLa5Zj1osNDTV6nfkOFgIeexDzWHg5yHwCEyL0ojw_vpzCjtQ6UScKhCINb7BO7HhfzkARn2lsaHgLNA4GjFMFqa_Wyxkc8Kfb5i2ee_m8ZmOp8Yp3E4fHIFw54OlVbh6Tv3x4zf4EXGL6fOGK32o4mzo-2PipIb00dVWSCqyWFnd-D1Tyyxn87NY9wg0kZph6d9E1UW8OrZO7xPO7A7-MQw3agWLRxxuP65ERoHI_DsfPMsfDu85ohSCSc22NFxXT7hvHDCeYGEE8ZtCU_7lotwkR0vkQkvUXPcUvEiNthF0ywfL1E4XqJAvAjjtnilfcuFl5sdLzcTXm7N8UrFi9hgF02rfLzcwvFyC8SLMG6LV9q3XHh52fHyMuHl1ZxGqXgRG-yiuSkfL69wvLwC8SKM2-KV9u3Tn38n7uireBHNY5Xpq66-tkaFzyqxM46WeqIedTTZXJM89ja6zQ-hik3yL08eOvPkr_WAH8U8LeYfxeJAzPOJm5eIOb9I3bhIfWNXC6vhrt1w1yr27Dd7VrEgsm5Y1dd28fUloNjFBCh2MQUKoSZAIdQEKE2r4S274a1LQLmxd0KdKIWjSsnVCnY1VQt2NdkLhJwqBkJOBM6PiuXQd0H4bm8WInNurxbuEZcflUuu0O1qKnS7mgydkFOhE3IqdHuvcqJY-VHH5Ard3jGcKBl-1DK5QrerqdDtajJ0Qk6FTsip0O0NK4iGFfaPtnToo9U_fwIAAP__UdBziQ==).
I agree that in some cases the first commit might not be beneficial, but I think in most cases it is.
TFTR!
bors r+
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto)
pkg/sql/distsql_physical_planner.go, line 2914 at r1 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
s/arjun/yuzefovich
😄?
:)
I think this particular TODO doesn't make sense, removed.
pkg/sql/physicalplan/physical_plan.go, line 1171 at r2 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
ditto
Done.
Merge skew. bors r- |
Canceled. |
5bd1f4c
to
fc4cda3
Compare
It's not a merge skew and rather a bug in the new factory. Hey @RaduBerinde, there is a bug with this block that got exposed in this PR: cockroach/pkg/sql/distsql_spec_exec_factory.go Lines 360 to 366 in 1d12a54
Here is the expected plan and the actual plan. Because we have a projection on top of the hash joiner which has |
Hm. Yes, I hate that hack. I tried moving the top-level projection in |
Thanks! I would say fixing this is not super urgent since we've decided to temporarily postpone the work on the new factory until the feature cut off time (for a milestone or so) in order to get some vectorized features in (because we won't be able to finish distsql spec work before the cut off anyway), but I appreciate your willingness to jump in. |
The distinct processors are planned in two stages - first, a "local" stage is planned on the same nodes as the previous stage, then, a "global" stage is added. Previously, the global stage would be planned on the gateway, and this commit changes that to make it distributed - by planning "global" distinct processors on the same nodes as the "local" ones and connecting them via a hash router hashing the distinct columns. Release note: None
Release note: None
fc4cda3
to
229df27
Compare
Radu fixed the bug, and now the build should be green. bors r+ |
This PR was included in a batch that was canceled, it will be automatically retried |
Build succeeded: |
sql: better distribute distinct processors
The distinct processors are planned in two stages - first, a "local"
stage is planned on the same nodes as the previous stage, then,
a "global" stage is added. Previously, the global stage would be planned
on the gateway, and this commit changes that to make it distributed - by
planning "global" distinct processors on the same nodes as the "local"
ones and connecting them via a hash router hashing the distinct columns.
Release note: None
sql: implement ConstructDistinct in the new factory
Addresses: #47473.
Release note: None