Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIFO JoinHashMap for HashJoin #8130

Closed
korowa opened this issue Nov 10, 2023 · 5 comments · Fixed by #8658
Closed

FIFO JoinHashMap for HashJoin #8130

korowa opened this issue Nov 10, 2023 · 5 comments · Fixed by #8658
Assignees
Labels
enhancement New feature or request

Comments

@korowa
Copy link
Contributor

korowa commented Nov 10, 2023

Is your feature request related to a problem or challenge?

The problem related to #8020

At this moment, collect_left_input function in hash join implementation produces JoinHasMap which is optimized for reverse iteration -- for each hash_value the actual HashMap stores the index of the last row from the build-side, and remaining indices for same has are stored in JoinHashMap.next vector.

To keep maintaining inputs order in join output, HashJoinStream iterates over probe-batch in reverse order, and after processing the whole batch, it, again, reverts order of matched build and probe indices.

The problem is that reverse iteration doesn't allow HashJoinStream to produce partial output batch (without having all matched indices for current probe batch) -- in this case join output order will be distorted.

Describe the solution you'd like

Desired behaviour of HashJoinStream -- preserving natural order of probe + build side record, without intermediary inversion of indices -- to be ready to produce output batch in any moment.

This could be achieved by modifying update_hash to save "head" of hash chains in Map. It may also require to still track chain tails while building JoinHashMap in order to keep performance at the same level -- tails might be stored in JoinHashMap (probably not the best decision in terms of memory utilization), or in a separate data structure which will only exist during build-side collection and won't be as memory-greedy as one more integer in HashMap tuples.

Describe alternatives you've considered

No response

Additional context

No response

@korowa korowa added the enhancement New feature or request label Nov 10, 2023
@alamb alamb self-assigned this Nov 13, 2023
@alamb
Copy link
Contributor

alamb commented Nov 13, 2023

I am going to try and study this code and see if I can find some way to structure it

@korowa
Copy link
Contributor Author

korowa commented Nov 13, 2023

Another option to achieve desired behaviour and not to affect left input collection performance (almost) at all -- is updating JoinHashMap in reverse order -- seems to be the most straightforward and simple solution for the issue.

@alamb
Copy link
Contributor

alamb commented Nov 13, 2023

Tha

Another option to achieve desired behaviour and not to affect left input collection performance (almost) at all -- is updating JoinHashMap in reverse order -- seems to be the most straightforward and simple solution for the issue.

Thanks @korowa -- I am not super familiar with this code; After I have done a bit more study I'll hopefully have a proposal to share

@alamb
Copy link
Contributor

alamb commented Nov 13, 2023

@alamb
Copy link
Contributor

alamb commented Nov 13, 2023

I started hacking on this locally -- the basic structure I have in mind looks something like this (I need to sort out the OnceFuture stuff, which is awkward at the moment).

But the idea is to wrap up the state needed to build output into a separate enum like the following:

/// State machine for creating output for HashJoin
///
/// TODO Add memory reservation for intermediate rows
enum HashJoinOutput {
    /// output phase has not yet started, input is
    ReadingInput {
        /// future which builds hash table from left side
        left_fut: OnceFut<JoinLeftData>,
    },
    /// output phase has started, but have no probe batch
    Ready {
        // TODO make this into the proper state
        left_fut: OnceFut<JoinLeftData>,
    },
    /// and output is being built from probe batches
    Probing {
        data: JoinLeftData,
    },
    /// emitting any final unmatched indices, if any (depending on the join type)
    Unmatched {
        //
        data: JoinLeftData,
    },
    /// Input is complete, and output is complete
    Done,
}

Then I think adding the logic to incrementally compute the matching indices is more tractable (though as you say @korowa we'll still have to protect against pathalogical cases where each input row in the probe batch matches all the rows in the hashtable).

I think this will take me a few days to code up realistically, and #8078 is higher priority for me. However, I think we'll be able to make this work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants