Skip to content

Commit

Permalink
wips
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 24, 2024
1 parent 02ac58e commit 569aed9
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use rayexec_error::Result;

use crate::arrays::batch::Batch;
use crate::arrays::buffer_manager::BufferManager;
use crate::arrays::datatype::DataType;
use crate::execution::operators_exp::batch_collection::BatchCollectionBlock;

#[derive(Debug)]
pub struct BuildData<B: BufferManager> {
capacity_per_block: usize,
blocks: Vec<BuildBlock<B>>,
}

impl<B> BuildData<B>
where
B: BufferManager,
{
pub fn push_batch(&mut self, manager: &B, input_types: &[DataType], batch: &Batch<B>) -> Result<()> {
let mut block = self.pop_or_allocate_block(manager, input_types, batch.num_rows())?;

// TODO: Hashes

block.block.append_batch_data(batch)?;

self.blocks.push(block);

Ok(())
}

fn pop_or_allocate_block(&mut self, manager: &B, input_types: &[DataType], count: usize) -> Result<BuildBlock<B>> {
debug_assert!(count <= self.capacity_per_block);

if let Some(last) = self.blocks.last() {
if last.block.has_capacity_for_rows(count) {
return Ok(self.blocks.pop().unwrap());
}
}

let block = BuildBlock::new(manager, input_types, self.capacity_per_block)?;

Ok(block)
}
}

#[derive(Debug)]
pub struct BuildBlock<B: BufferManager> {
block: BatchCollectionBlock<B>,
/// Row hashes, allocated to capacity of the batch block.
hashes: Vec<u64>,
}

impl<B> BuildBlock<B>
where
B: BufferManager,
{
pub fn new(manager: &B, input_types: &[DataType], capacity: usize) -> Result<Self> {
let block = BatchCollectionBlock::new(manager, input_types, capacity)?;
let hashes = vec![0; capacity]; // TODO: Track

Ok(BuildBlock { block, hashes })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
mod build_data;

use std::task::{Context, Waker};

use build_data::BuildData;
use parking_lot::Mutex;
use rayexec_error::{OptionExt, Result};

use super::{
ExecutableOperator,
ExecuteInOutState,
OperatorState,
PartitionAndOperatorStates,
PartitionState,
PollExecute,
PollFinalize,
};
use crate::arrays::buffer_manager::NopBufferManager;
use crate::arrays::datatype::DataType;
use crate::database::DatabaseContext;
use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable};

#[derive(Debug)]
pub enum HashJoinBuildPartitionState {
/// Partition is building.
Building(InProgressBuildState),
/// Partition finished building.
Finished,
}

#[derive(Debug)]
pub struct InProgressBuildState {
build_data: BuildData<NopBufferManager>,
}

#[derive(Debug)]
pub enum HashJoinProbePartitionState {
/// Partition waiting for build side to complete.
Waiting(usize),
/// Partition is probing.
Probing(ProbeState),
/// Left-join drain state.
Draining(DrainState),
/// Probing finished.
Finished,
}

#[derive(Debug)]
pub struct ProbeState {}

#[derive(Debug)]
pub struct DrainState {}

#[derive(Debug)]
pub struct HashJoinOperatorState {
inner: Mutex<HashJoinOperatorStateInner>,
}

#[derive(Debug)]
struct HashJoinOperatorStateInner {
/// Wakers from the probe side that are waiting for the build side to
/// complete.
///
/// Keyed by probe-side partition index.
build_waiting_probers: Vec<Option<Waker>>,
}

#[derive(Debug)]
pub struct PhysicalHashJoin {
/// Data types from the left (build) side of the join.
left_types: Vec<DataType>,
/// Data types from the right (probe) side of the join.
right_types: Vec<DataType>,
}

impl ExecutableOperator for PhysicalHashJoin {
fn create_states(
&self,
context: &DatabaseContext,
batch_size: usize,
partitions: usize,
) -> Result<PartitionAndOperatorStates> {
unimplemented!()
}

fn poll_execute(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
operator_state: &OperatorState,
inout: ExecuteInOutState,
) -> Result<PollExecute> {
match partition_state {
PartitionState::HashJoinBuild(state) => {
let state = match state {
HashJoinBuildPartitionState::Building(state) => state,
HashJoinBuildPartitionState::Finished => return Ok(PollExecute::Exhausted), // TODO: Probably should error instead.
};

let batch = inout.input.required("input batch required")?;
state
.build_data
.push_batch(&NopBufferManager, &self.left_types, batch)?;

Ok(PollExecute::NeedsMore)
}
PartitionState::HashJoinProbe(state) => {
match state {
HashJoinProbePartitionState::Waiting(probe_idx) => {
// Still waiting for build side to complete, just need
// to register a waker.

let mut operator_state = match operator_state {
OperatorState::HashJoin(state) => state.inner.lock(),
other => panic!("invalid operator state: {other:?}"),
};

operator_state.build_waiting_probers[*probe_idx] = Some(cx.waker().clone());

Ok(PollExecute::Pending)
}
_ => unimplemented!(),
}
}
other => panic!("invalid partition state: {other:?}"),
}
}

fn poll_finalize(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
operator_state: &OperatorState,
) -> Result<PollFinalize> {
unimplemented!()
}
}

impl Explainable for PhysicalHashJoin {
fn explain_entry(&self, conf: ExplainConfig) -> ExplainEntry {
unimplemented!()
}
}

0 comments on commit 569aed9

Please sign in to comment.