Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: optimize index/lookup joiners #34997

Closed
justinj opened this issue Feb 15, 2019 · 5 comments
Closed

distsql: optimize index/lookup joiners #34997

justinj opened this issue Feb 15, 2019 · 5 comments
Labels
A-sql-optimizer SQL logical planning and optimizations. C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-sql-queries SQL Queries Team

Comments

@justinj
Copy link
Contributor

justinj commented Feb 15, 2019

@jordanlewis and discussed today two potential optimizations for the index/lookup joiners.

Pipeline Batches

As a reminder, the way the index joiner works is that it reads in a batch of rows (today, the number of rows batched is 100) from its input, and then sends that batch out to get the data required for them, and then repeats. These two processes could be pipelined so that we're preparing the next batch while waiting for the following one to finish.

Bucket batches by node

Today, we unconditionally read in a batch of rows, send out the entire batch, and then block while waiting for it. To see why this might be bad in some cases, consider a three node cluster, with the indexjoiner living on node 1, and consider an input sequence that looks like this, where each number corresponds to the leaseholder node for the row in question, with a batch size of 3:

1, 2, 3, 1, 2, 3, 1, 2, 3

each batch will require a round-trip to nodes 2 and 3, thus, the total latency of this index join will be roughly

3 * max(latency(2), latency(3))

If we were to instead intelligently schedule the batches, perhaps by bucketing them by node, we could instead process them as a sequence that looked something more like this:

1, 1, 1, 2, 2, 2, 3, 3, 3

which would bring the cost to roughly

latency(2) + latency(3)

Obviously there are far more complications and subtleties than this discussion brings up, for instance these operators need to maintain the ordering of their inputs, so we will need to intelligently flush batches before they might be ready.

cc @nvanbenschoten who might have some thoughts about this.

@RaduBerinde
Copy link
Member

Figuring out how to bucket by node might be hard, but we could sort each batch according to the lookup columns and then rearrange the results. That would achieve more or less the same result transparently.

There's also the question on whether DistSender should be doing this internally (for batches with no limits).

@nvanbenschoten
Copy link
Member

Figuring out how to bucket by node might be hard, but we could sort each batch according to the lookup columns and then rearrange the results. That would achieve more or less the same result transparently.

We play this trick in other areas of the code as well. For instance, during intent resolution. Like @RaduBerinde mentioned, it doesn't require any knowledge about range boundaries - all that matters is that you sort based on the lookup key of each row.

There's also the question on whether DistSender should be doing this internally (for batches with no limits).

Do you mean so that we could be more confident about increasing the size of the batches?

Re. the pipelining proposal, are the two stages you're looking to pipeline comparably expensive?

reads in a batch of rows from its input

Is the input preparing these rows concurrently as the lookup joiner sends out its lookup reads? If so, could we accomplish the same thing with a larger buffer between it and its input?

@RaduBerinde
Copy link
Member

Do you mean so that we could be more confident about increasing the size of the batches?

Never mind that part, I think I misunderstood part of Justin's proposal (I incorrectly thought the idea was rearrange each batch separately).

@awoods187 awoods187 added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) A-sql-optimizer SQL logical planning and optimizations. labels Mar 6, 2019
@rohany
Copy link
Contributor

rohany commented Jan 9, 2020

For bookkeeping sake, I played around today with trying to implement the batch pipelining for the index joiner, and it didn't go too well.

I tried something similar to the following:
A "reader" goroutine that read batches of rows from the input, constructed a batch of spans, and sent this over a buffered channel to the "scanner" goroutine.
A "scanner" goroutine that read span batches from the "reader" goroutine and performed scans from the batch it read, and then sent batches of rows a "delivery channel".
The Next() function returns rows from a cached batch, and when it runs out, it grabs a batch from the delivery channel and continues.

This performed pretty poorly, with a profile showing a large amount of time spent in thread operations, most likely syncs around the channel usage.

It's possible that performance could be improved greatly when the cost of scans/input reads increases in a real distributed setting (i tested on some local benchmarks).

Additionally, I feel like the "nexting" style of these processors makes it a little tricky to keep these pipelines full, where the real performance benefit of the pipeline comes from.

@jordanlewis
Copy link
Member

@andreimatei I think this is going to be satisfied by your RFC (#67040), so I'm going to close it. Please take a look at @justinj's nice writeup (👋) but I think you've covered it, yes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-sql-optimizer SQL logical planning and optimizations. C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-sql-queries SQL Queries Team
Projects
None yet
Development

No branches or pull requests

7 participants