From 864606616995308ff40984100a673965c2dff85a Mon Sep 17 00:00:00 2001 From: Yomguithereal Date: Wed, 29 Jan 2025 11:37:50 +0100 Subject: [PATCH] Complete rewrite of xan join Fix #198 --- Cargo.lock | 1 - Cargo.toml | 1 - docs/cmd/join.md | 95 +++-- docs/cmd/regex-join.md | 28 +- docs/xanzines/8_2025_feb.md | 3 +- src/cmd/join.rs | 831 +++++++++++++++++++++--------------- src/cmd/regex_join.rs | 28 +- tests/test_join.rs | 7 +- 8 files changed, 574 insertions(+), 420 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab5db74..a470a9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2254,7 +2254,6 @@ dependencies = [ "base64", "bstr", "btoi", - "byteorder", "bytesize", "calamine", "colored", diff --git a/Cargo.toml b/Cargo.toml index 576992f..2104667 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,6 @@ arrayvec = "0.7.4" base64 = "0.22.1" bstr = "1.10.0" btoi = "0.4.3" -byteorder = "1" bytesize = "1.3.0" calamine = "0.24.0" colored = "2.0.0" diff --git a/docs/cmd/join.md b/docs/cmd/join.md index df1e3b5..e1f79f4 100644 --- a/docs/cmd/join.md +++ b/docs/cmd/join.md @@ -4,59 +4,74 @@ ```txt Join two sets of CSV data on the specified columns. -The default join operation is an 'inner' join. This corresponds to the -intersection of rows on the keys specified. +The default join operation is an "inner" join. This corresponds to the +intersection of rows on the keys specified. The command is also able to +perform a left outer join with --left, a right outer join with --right, +a full outer join with --full and finally a cartesian product/cross join +with --cross. By default, joins are done case sensitively, but this can be disabled using -the --ignore-case flag. +the -i, --ignore-case flag. The column arguments specify the columns to join for each input. Columns can -be selected using the same syntax as the 'xan select' command. Both selections -must return a same number of columns in proper order. +be selected using the same syntax as the "xan select" command. Both selections +must return a same number of columns, for the join keys to be properly aligned. Note that this command is able to consume streams such as stdin (in which case -the file name must be '-' to indicate which file will be read from stdin) and -gzipped files out of the box, but be aware that those file will be entirely -buffered into memory so the join operation can be done. +the file name must be "-" to indicate which file will be read from stdin) and +gzipped files out of the box. -Note that when performing an 'inner' join (the default), it's the second file that -will be indexed into memory. And when performing an 'outer' join, it will be the file -that is on the other side of --left/--right. +# Memory considerations + + - `inner join`: the command does not try to be clever and + always indexes the left file, while the right + file is streamed. Prefer placing the smaller file + on the left. + - `left join`: the command always indexes the right file and streams + the left file. + - `right join`: the command always indexes the left file and streams + the right file. + - `full join`: the command does not try to be clever and + always indexes the left file, while the right + file is streamed. Prefer placing the smaller file + on the left. + - `cross join`: the command does not try to be clever and + always indexes the left file, while the right + file is streamed. Prefer placing the smaller file + on the left. Usage: xan join [options] + xan join [options] --cross xan join --help join options: - -i, --ignore-case When set, joins are done case insensitively. - --left Do a 'left outer' join. This returns all rows in - first CSV data set, including rows with no - corresponding row in the second data set. When no - corresponding row exists, it is padded out with - empty fields. - --right Do a 'right outer' join. This returns all rows in - second CSV data set, including rows with no - corresponding row in the first data set. When no - corresponding row exists, it is padded out with - empty fields. (This is the reverse of 'outer left'.) - --full Do a 'full outer' join. This returns all rows in - both data sets with matching records joined. If - there is no match, the missing side will be padded - out with empty fields. (This is the combination of - 'outer left' and 'outer right'.) - --cross USE WITH CAUTION. - This returns the cartesian product of the CSV - data sets given. The number of rows return is - equal to N * M, where N and M correspond to the - number of rows in the given data sets, respectively. - --nulls When set, joins will work on empty fields. - Otherwise, empty fields are completely ignored. - (In fact, any row that has an empty field in the - key specified is ignored.) - --prefix-left Add a prefix to the names of the columns in the - first dataset. - --prefix-right Add a prefix to the names of the columns in the - second dataset. + --left Do an "outer left" join. This returns all rows in + first CSV data set, including rows with no + corresponding row in the second data set. When no + corresponding row exists, it is padded out with + empty fields. This is the reverse of --right. + --right Do an "outer right" join. This returns all rows in + second CSV data set, including rows with no + corresponding row in the first data set. When no + corresponding row exists, it is padded out with + empty fields. This is the reverse of --left. + --full Do a "full outer" join. This returns all rows in + both data sets with matching records joined. If + there is no match, the missing side will be padded + out with empty fields. + --cross This returns the cartesian product of the given CSV + files. The number of rows emitted will be equal to N * M, + where N and M correspond to the number of rows in the given + data sets, respectively. + -i, --ignore-case When set, joins are done case insensitively. + --nulls When set, joins will work on empty fields. + Otherwise, empty keys are completely ignored, i.e. when + column selection yield only empty cells. + -L, --prefix-left Add a prefix to the names of the columns in the + first dataset. + -R, --prefix-right Add a prefix to the names of the columns in the + second dataset. Common options: -h, --help Display this message diff --git a/docs/cmd/regex-join.md b/docs/cmd/regex-join.md index 1823cb2..57c42c6 100644 --- a/docs/cmd/regex-join.md +++ b/docs/cmd/regex-join.md @@ -36,20 +36,20 @@ Usage: xan regex-join --help join options: - -i, --ignore-case Make the regex patterns case-insensitive. - --left Write every row from the first file in the output, with empty - padding cells when no regex pattern from the second file - produced a match. - -p, --parallel Whether to use parallelization to speed up computations. - Will automatically select a suitable number of threads to use - based on your number of cores. Use -t, --threads if you want to - indicate the number of threads yourself. - -t, --threads Parellize computations using this many threads. Use -p, --parallel - if you want the number of threads to be automatically chosen instead. - --prefix-left Add a prefix to the names of the columns in the - searched file. - --prefix-right Add a prefix to the names of the columns in the - patterns file. + -i, --ignore-case Make the regex patterns case-insensitive. + --left Write every row from the first file in the output, with empty + padding cells when no regex pattern from the second file + produced a match. + -p, --parallel Whether to use parallelization to speed up computations. + Will automatically select a suitable number of threads to use + based on your number of cores. Use -t, --threads if you want to + indicate the number of threads yourself. + -t, --threads Parellize computations using this many threads. Use -p, --parallel + if you want the number of threads to be automatically chosen instead. + -L, --prefix-left Add a prefix to the names of the columns in the + searched file. + -R, --prefix-right Add a prefix to the names of the columns in the + patterns file. Common options: -h, --help Display this message diff --git a/docs/xanzines/8_2025_feb.md b/docs/xanzines/8_2025_feb.md index 1bca4fa..01634ba 100644 --- a/docs/xanzines/8_2025_feb.md +++ b/docs/xanzines/8_2025_feb.md @@ -25,4 +25,5 @@ cargo install xan - `xan bins --label` - `xan fmt --tabs` & `xan fmt --in-place` - Overhauling scales accross commands (`xan plot`, `xan bins --nice`, `xan heatmap`, `xan hist`) -- `xan slice --skip` alias \ No newline at end of file +- `xan slice --skip` alias +- Complete rewrite of `xan join` for better streaming performance. \ No newline at end of file diff --git a/src/cmd/join.rs b/src/cmd/join.rs index b7ec950..7fd0354 100644 --- a/src/cmd/join.rs +++ b/src/cmd/join.rs @@ -1,74 +1,265 @@ use std::collections::hash_map::{Entry, HashMap}; -use std::fmt; use std::io; -use std::iter::repeat; -use std::str; +use std::num::NonZeroUsize; use bstr::ByteSlice; -use byteorder::{BigEndian, WriteBytesExt}; +use csv::ByteRecord; -use crate::config::{Config, Delimiter, SeekRead}; -use crate::index::Indexed; +use crate::config::{Config, Delimiter}; use crate::select::{SelectColumns, Selection}; use crate::util; use crate::CliResult; +type IndexKey = Vec>; + +fn get_row_key(sel: &Selection, row: &ByteRecord, case_insensitive: bool) -> IndexKey { + sel.select(row) + .map(|v| transform(v, case_insensitive)) + .collect() +} + +fn transform(bs: &[u8], case_insensitive: bool) -> Vec { + if !case_insensitive { + bs.to_vec() + } else { + bs.to_lowercase() + } +} + +fn build_headers( + left_headers: &ByteRecord, + right_headers: &ByteRecord, + left_prefix: &Option, + right_prefix: &Option, +) -> ByteRecord { + let mut headers = ByteRecord::new(); + + for column in left_headers.iter() { + if let Some(prefix) = left_prefix { + headers.push_field(&[prefix.as_bytes(), column].concat()); + } else { + headers.push_field(column); + } + } + + for column in right_headers.iter() { + if let Some(prefix) = right_prefix { + headers.push_field(&[prefix.as_bytes(), column].concat()); + } else { + headers.push_field(column); + } + } + + headers +} + +fn get_padding(headers: &ByteRecord) -> ByteRecord { + (0..headers.len()).map(|_| b"").collect() +} + +#[derive(Debug)] +struct IndexNode { + record: ByteRecord, + written: bool, + next: Option, +} + +impl IndexNode { + fn new(record: ByteRecord) -> Self { + Self { + record, + written: false, + next: None, + } + } +} + +// NOTE: I keep both head & tail to keep insertion order easily +// It is possible to keep only the tail instead of course to +// save up more memory, but the output is less understandable +// for the user and not aligned with usual affordances. +#[derive(Debug)] +struct Index { + case_insensitive: bool, + nulls: bool, + map: HashMap, + nodes: Vec, +} + +impl Index { + fn new(case_insensitive: bool, nulls: bool) -> Self { + Self { + case_insensitive, + nulls, + map: HashMap::new(), + nodes: Vec::new(), + } + } + + fn from_csv_reader( + reader: &mut csv::Reader, + sel: &Selection, + case_insensitive: bool, + nulls: bool, + ) -> CliResult { + let mut index = Index::new(case_insensitive, nulls); + + for result in reader.byte_records() { + let record = result?; + + index.add(sel, record); + } + + Ok(index) + } + + fn add(&mut self, sel: &Selection, record: ByteRecord) { + let key = get_row_key(sel, &record, self.case_insensitive); + + if !self.nulls && key.iter().all(|c| c.is_empty()) { + return; + } + + let next_id = self.nodes.len() + 1; + + match self.map.entry(key) { + Entry::Occupied(mut entry) => { + let (_, tail) = entry.get_mut(); + let new_node = IndexNode::new(record); + self.nodes[*tail - 1].next = Some(NonZeroUsize::new(next_id).unwrap()); + *tail = next_id; + self.nodes.push(new_node); + } + Entry::Vacant(entry) => { + entry.insert((next_id, next_id)); + self.nodes.push(IndexNode::new(record)); + } + }; + } + + fn for_each_node_mut( + &mut self, + sel: &Selection, + record: &ByteRecord, + mut callback: F, + ) -> Result<(), E> + where + F: FnMut(&mut IndexNode) -> Result<(), E>, + { + let key = get_row_key(sel, record, self.case_insensitive); + + if !self.nulls && key.iter().all(|c| c.is_empty()) { + return Ok(()); + } + + if let Some((i, _)) = self.map.get(&key) { + let mut current_node = &mut self.nodes[i - 1]; + + callback(current_node)?; + + while let Some(previous_index) = current_node.next { + current_node = &mut self.nodes[previous_index.get() - 1]; + callback(current_node)?; + } + } + + Ok(()) + } + + fn for_each_record( + &mut self, + sel: &Selection, + record: &ByteRecord, + mut callback: F, + ) -> Result<(), E> + where + F: FnMut(&ByteRecord) -> Result<(), E>, + { + self.for_each_node_mut(sel, record, |node| callback(&node.record)) + } + + fn records_not_written(&self) -> impl Iterator { + self.nodes.iter().filter_map(|node| { + if !node.written { + Some(&node.record) + } else { + None + } + }) + } +} + static USAGE: &str = " Join two sets of CSV data on the specified columns. -The default join operation is an 'inner' join. This corresponds to the -intersection of rows on the keys specified. +The default join operation is an \"inner\" join. This corresponds to the +intersection of rows on the keys specified. The command is also able to +perform a left outer join with --left, a right outer join with --right, +a full outer join with --full and finally a cartesian product/cross join +with --cross. By default, joins are done case sensitively, but this can be disabled using -the --ignore-case flag. +the -i, --ignore-case flag. The column arguments specify the columns to join for each input. Columns can -be selected using the same syntax as the 'xan select' command. Both selections -must return a same number of columns in proper order. +be selected using the same syntax as the \"xan select\" command. Both selections +must return a same number of columns, for the join keys to be properly aligned. Note that this command is able to consume streams such as stdin (in which case -the file name must be '-' to indicate which file will be read from stdin) and -gzipped files out of the box, but be aware that those file will be entirely -buffered into memory so the join operation can be done. - -Note that when performing an 'inner' join (the default), it's the second file that -will be indexed into memory. And when performing an 'outer' join, it will be the file -that is on the other side of --left/--right. +the file name must be \"-\" to indicate which file will be read from stdin) and +gzipped files out of the box. + +# Memory considerations + + - `inner join`: the command does not try to be clever and + always indexes the left file, while the right + file is streamed. Prefer placing the smaller file + on the left. + - `left join`: the command always indexes the right file and streams + the left file. + - `right join`: the command always indexes the left file and streams + the right file. + - `full join`: the command does not try to be clever and + always indexes the left file, while the right + file is streamed. Prefer placing the smaller file + on the left. + - `cross join`: the command does not try to be clever and + always indexes the left file, while the right + file is streamed. Prefer placing the smaller file + on the left. Usage: xan join [options] + xan join [options] --cross xan join --help join options: - -i, --ignore-case When set, joins are done case insensitively. - --left Do a 'left outer' join. This returns all rows in - first CSV data set, including rows with no - corresponding row in the second data set. When no - corresponding row exists, it is padded out with - empty fields. - --right Do a 'right outer' join. This returns all rows in - second CSV data set, including rows with no - corresponding row in the first data set. When no - corresponding row exists, it is padded out with - empty fields. (This is the reverse of 'outer left'.) - --full Do a 'full outer' join. This returns all rows in - both data sets with matching records joined. If - there is no match, the missing side will be padded - out with empty fields. (This is the combination of - 'outer left' and 'outer right'.) - --cross USE WITH CAUTION. - This returns the cartesian product of the CSV - data sets given. The number of rows return is - equal to N * M, where N and M correspond to the - number of rows in the given data sets, respectively. - --nulls When set, joins will work on empty fields. - Otherwise, empty fields are completely ignored. - (In fact, any row that has an empty field in the - key specified is ignored.) - --prefix-left Add a prefix to the names of the columns in the - first dataset. - --prefix-right Add a prefix to the names of the columns in the - second dataset. + --left Do an \"outer left\" join. This returns all rows in + first CSV data set, including rows with no + corresponding row in the second data set. When no + corresponding row exists, it is padded out with + empty fields. This is the reverse of --right. + --right Do an \"outer right\" join. This returns all rows in + second CSV data set, including rows with no + corresponding row in the first data set. When no + corresponding row exists, it is padded out with + empty fields. This is the reverse of --left. + --full Do a \"full outer\" join. This returns all rows in + both data sets with matching records joined. If + there is no match, the missing side will be padded + out with empty fields. + --cross This returns the cartesian product of the given CSV + files. The number of rows emitted will be equal to N * M, + where N and M correspond to the number of rows in the given + data sets, respectively. + -i, --ignore-case When set, joins are done case insensitively. + --nulls When set, joins will work on empty fields. + Otherwise, empty keys are completely ignored, i.e. when + column selection yield only empty cells. + -L, --prefix-left Add a prefix to the names of the columns in the + first dataset. + -R, --prefix-right Add a prefix to the names of the columns in the + second dataset. Common options: -h, --help Display this message @@ -80,8 +271,6 @@ Common options: Must be a single character. "; -type ByteString = Vec; - #[derive(Deserialize)] struct Args { arg_columns1: SelectColumns, @@ -101,351 +290,303 @@ struct Args { flag_prefix_right: Option, } -pub fn run(argv: &[&str]) -> CliResult<()> { - let args: Args = util::get_args(USAGE, argv)?; - let mut state = args.new_io_state()?; +type BoxedReader = csv::Reader>; - if [ - args.flag_left, - args.flag_right, - args.flag_full, - args.flag_cross, - ] - .iter() - .filter(|flag| **flag) - .count() - > 1 - { - Err("Please pick exactly one join operation.")?; +impl Args { + fn readers_and_selections( + &self, + ) -> CliResult<((BoxedReader, Selection), (BoxedReader, Selection))> { + let left = Config::new(&Some(self.arg_input1.clone())) + .delimiter(self.flag_delimiter) + .no_headers(self.flag_no_headers) + .select(self.arg_columns1.clone()); + + let right = Config::new(&Some(self.arg_input2.clone())) + .delimiter(self.flag_delimiter) + .no_headers(self.flag_no_headers) + .select(self.arg_columns2.clone()); + + let mut left_reader = left.reader()?; + let mut right_reader = right.reader()?; + + let left_sel = left.selection(left_reader.byte_headers()?)?; + let right_sel = right.selection(right_reader.byte_headers()?)?; + + if left_sel.len() != right_sel.len() { + Err("not the same number of columns selected on left & right!")?; + } + + Ok(((left_reader, left_sel), (right_reader, right_sel))) } - state.write_headers()?; + fn wconf(&self) -> Config { + Config::new(&self.flag_output) + } - if args.flag_left { - state.outer_join(false) - } else if args.flag_right { - state.outer_join(true) - } else if args.flag_full { - state.full_outer_join() - } else if args.flag_cross { - state.cross_join() - } else { - state.inner_join() + fn index(&self, reader: &mut BoxedReader, sel: &Selection) -> CliResult { + Index::from_csv_reader(reader, sel, self.flag_ignore_case, self.flag_nulls) } -} -fn prefix_header(headers: &csv::ByteRecord, prefix: &String) -> csv::ByteRecord { - let mut prefixed_headers = csv::ByteRecord::new(); + fn write_headers( + &self, + writer: &mut csv::Writer, + left_headers: &ByteRecord, + right_headers: &ByteRecord, + ) -> CliResult<()> { + if !self.flag_no_headers { + writer.write_byte_record(&build_headers( + left_headers, + right_headers, + &self.flag_prefix_left, + &self.flag_prefix_right, + ))?; + } - for column in headers.iter() { - prefixed_headers.push_field(&[prefix.as_bytes(), column].concat()); + Ok(()) } - prefixed_headers -} + fn inner_join(self) -> CliResult<()> { + let ((mut left_reader, left_sel), (mut right_reader, right_sel)) = + self.readers_and_selections()?; -struct IoState { - wtr: csv::Writer, - rdr1: csv::Reader, - sel1: Selection, - rdr2: csv::Reader, - sel2: Selection, - no_headers: bool, - case_insensitive: bool, - nulls: bool, - pfx_left: Option, - pfx_right: Option, -} + let mut writer = self.wconf().writer()?; -impl IoState { - fn write_headers(&mut self) -> CliResult<()> { - if !self.no_headers { - let mut headers = self.rdr1.byte_headers()?.clone(); - if let Some(prefix) = &self.pfx_left { - headers = prefix_header(&headers, prefix); - } + self.write_headers( + &mut writer, + left_reader.byte_headers()?, + right_reader.byte_headers()?, + )?; - if let Some(prefix) = &self.pfx_right { - headers.extend(prefix_header(&self.rdr2.byte_headers()?.clone(), prefix).iter()) - } else { - headers.extend(self.rdr2.byte_headers()?.iter()); - } - self.wtr.write_record(&headers)?; + let mut index = self.index(&mut left_reader, &left_sel)?; + + let mut right_record = csv::ByteRecord::new(); + + while right_reader.read_byte_record(&mut right_record)? { + index.for_each_record(&right_sel, &right_record, |left_record| { + writer.write_record(left_record.iter().chain(right_record.iter())) + })?; } - Ok(()) + + Ok(writer.flush()?) } - fn inner_join(mut self) -> CliResult<()> { - let mut scratch = csv::ByteRecord::new(); - let mut validx = ValueIndex::new(self.rdr2, &self.sel2, self.case_insensitive, self.nulls)?; - for row in self.rdr1.byte_records() { - let row = row?; - let key = get_row_key(&self.sel1, &row, self.case_insensitive); - match validx.values.get(&key) { - None => continue, - Some(rows) => { - for &rowi in rows.iter() { - validx.idx.seek(rowi as u64)?; - - validx.idx.read_byte_record(&mut scratch)?; - let combined = row.iter().chain(scratch.iter()); - self.wtr.write_record(combined)?; - } - } + fn full_outer_join(self) -> CliResult<()> { + let ((mut left_reader, left_sel), (mut right_reader, right_sel)) = + self.readers_and_selections()?; + + let mut writer = self.wconf().writer()?; + + let left_headers = left_reader.byte_headers()?.clone(); + let right_headers = right_reader.byte_headers()?.clone(); + + let left_padding = get_padding(&left_headers); + let right_padding = get_padding(&right_headers); + + self.write_headers(&mut writer, &left_headers, &right_headers)?; + + let mut index = self.index(&mut left_reader, &left_sel)?; + + let mut right_record = csv::ByteRecord::new(); + + while right_reader.read_byte_record(&mut right_record)? { + let mut something_was_written: bool = false; + + index.for_each_node_mut(&right_sel, &right_record, |left_node| { + something_was_written = true; + left_node.written = true; + writer.write_record(left_node.record.iter().chain(right_record.iter())) + })?; + + if !something_was_written { + writer.write_record(left_padding.iter().chain(right_record.iter()))?; } } - Ok(()) - } - fn outer_join(mut self, right: bool) -> CliResult<()> { - if right { - ::std::mem::swap(&mut self.rdr1, &mut self.rdr2); - ::std::mem::swap(&mut self.sel1, &mut self.sel2); + for left_record in index.records_not_written() { + writer.write_record(left_record.iter().chain(right_padding.iter()))?; } - let mut scratch = csv::ByteRecord::new(); - let (_, pad2) = self.get_padding()?; - let mut validx = ValueIndex::new(self.rdr2, &self.sel2, self.case_insensitive, self.nulls)?; - for row in self.rdr1.byte_records() { - let row = row?; - let key = get_row_key(&self.sel1, &row, self.case_insensitive); - match validx.values.get(&key) { - None => { - if right { - self.wtr.write_record(pad2.iter().chain(&row))?; - } else { - self.wtr.write_record(row.iter().chain(&pad2))?; - } - } - Some(rows) => { - for &rowi in rows.iter() { - validx.idx.seek(rowi as u64)?; - let row1 = row.iter(); - validx.idx.read_byte_record(&mut scratch)?; - if right { - self.wtr.write_record(scratch.iter().chain(row1))?; - } else { - self.wtr.write_record(row1.chain(&scratch))?; - } - } - } - } - } - Ok(()) + Ok(writer.flush()?) } - fn full_outer_join(mut self) -> CliResult<()> { - let mut scratch = csv::ByteRecord::new(); - let (pad1, pad2) = self.get_padding()?; - let mut validx = ValueIndex::new(self.rdr2, &self.sel2, self.case_insensitive, self.nulls)?; - - // Keep track of which rows we've written from rdr2. - let mut rdr2_written: Vec<_> = repeat(false).take(validx.num_rows).collect(); - for row1 in self.rdr1.byte_records() { - let row1 = row1?; - let key = get_row_key(&self.sel1, &row1, self.case_insensitive); - match validx.values.get(&key) { - None => { - self.wtr.write_record(row1.iter().chain(&pad2))?; - } - Some(rows) => { - for &rowi in rows.iter() { - rdr2_written[rowi] = true; - - validx.idx.seek(rowi as u64)?; - validx.idx.read_byte_record(&mut scratch)?; - self.wtr.write_record(row1.iter().chain(&scratch))?; - } - } + fn left_join(self) -> CliResult<()> { + let ((mut left_reader, left_sel), (mut right_reader, right_sel)) = + self.readers_and_selections()?; + + let mut writer = self.wconf().writer()?; + + let left_headers = left_reader.byte_headers()?.clone(); + let right_headers = right_reader.byte_headers()?.clone(); + + let right_padding = get_padding(&right_headers); + + self.write_headers(&mut writer, &left_headers, &right_headers)?; + + let mut index = self.index(&mut right_reader, &right_sel)?; + + let mut left_record = csv::ByteRecord::new(); + + while left_reader.read_byte_record(&mut left_record)? { + let mut something_was_written: bool = false; + + index.for_each_record(&left_sel, &left_record, |right_record| { + something_was_written = true; + writer.write_record(left_record.iter().chain(right_record.iter())) + })?; + + if !something_was_written { + writer.write_record(left_record.iter().chain(right_padding.iter()))?; } } - // OK, now write any row from rdr2 that didn't get joined with a row - // from rdr1. - for (i, &written) in rdr2_written.iter().enumerate() { - if !written { - validx.idx.seek(i as u64)?; - validx.idx.read_byte_record(&mut scratch)?; - self.wtr.write_record(pad1.iter().chain(&scratch))?; + Ok(writer.flush()?) + } + + fn right_join(self) -> CliResult<()> { + let ((mut left_reader, left_sel), (mut right_reader, right_sel)) = + self.readers_and_selections()?; + + let mut writer = self.wconf().writer()?; + + let left_headers = left_reader.byte_headers()?.clone(); + let right_headers = right_reader.byte_headers()?.clone(); + + let left_padding = get_padding(&left_headers); + + self.write_headers(&mut writer, &left_headers, &right_headers)?; + + let mut index = self.index(&mut left_reader, &left_sel)?; + + let mut right_record = csv::ByteRecord::new(); + + while right_reader.read_byte_record(&mut right_record)? { + let mut something_was_written: bool = false; + + index.for_each_record(&right_sel, &right_record, |left_record| { + something_was_written = true; + writer.write_record(left_record.iter().chain(right_record.iter())) + })?; + + if !something_was_written { + writer.write_record(left_padding.iter().chain(right_record.iter()))?; } } - Ok(()) + + Ok(writer.flush()?) } - fn cross_join(mut self) -> CliResult<()> { - let mut pos = csv::Position::new(); - pos.set_byte(0); - let mut row2 = csv::ByteRecord::new(); - for row1 in self.rdr1.byte_records() { - let row1 = row1?; - self.rdr2.seek(pos.clone())?; - if self.rdr2.has_headers() { - // Read and skip the header row, since CSV readers disable - // the header skipping logic after being seeked. - self.rdr2.read_byte_record(&mut row2)?; - } - while self.rdr2.read_byte_record(&mut row2)? { - self.wtr.write_record(row1.iter().chain(&row2))?; + fn cross_join(self) -> CliResult<()> { + let ((mut left_reader, _), (mut right_reader, _)) = self.readers_and_selections()?; + + let mut writer = self.wconf().writer()?; + + self.write_headers( + &mut writer, + left_reader.byte_headers()?, + right_reader.byte_headers()?, + )?; + + let index = right_reader + .into_byte_records() + .collect::, _>>()?; + + let mut left_record = csv::ByteRecord::new(); + + while left_reader.read_byte_record(&mut left_record)? { + for right_record in index.iter() { + writer.write_record(left_record.iter().chain(right_record.iter()))?; } } - Ok(()) - } - fn get_padding(&mut self) -> CliResult<(csv::ByteRecord, csv::ByteRecord)> { - let len1 = self.rdr1.byte_headers()?.len(); - let len2 = self.rdr2.byte_headers()?.len(); - Ok(( - repeat(b"").take(len1).collect(), - repeat(b"").take(len2).collect(), - )) + Ok(writer.flush()?) } } -impl Args { - fn new_io_state( - &self, - ) -> CliResult, Box>> { - let rconf1 = Config::new(&Some(self.arg_input1.clone())) - .delimiter(self.flag_delimiter) - .no_headers(self.flag_no_headers) - .select(self.arg_columns1.clone()); - let rconf2 = Config::new(&Some(self.arg_input2.clone())) - .delimiter(self.flag_delimiter) - .no_headers(self.flag_no_headers) - .select(self.arg_columns2.clone()); +pub fn run(argv: &[&str]) -> CliResult<()> { + let args: Args = util::get_args(USAGE, argv)?; - let mut rdr1 = rconf1.reader_file()?; - let mut rdr2 = rconf2.reader_file()?; - let (sel1, sel2) = self.get_selections(&rconf1, &mut rdr1, &rconf2, &mut rdr2)?; - Ok(IoState { - wtr: Config::new(&self.flag_output).writer()?, - rdr1, - sel1, - rdr2, - sel2, - no_headers: rconf1.no_headers, - case_insensitive: self.flag_ignore_case, - nulls: self.flag_nulls, - pfx_left: self.flag_prefix_left.clone(), - pfx_right: self.flag_prefix_right.clone(), - }) + if [ + args.flag_left, + args.flag_right, + args.flag_full, + args.flag_cross, + ] + .iter() + .filter(|flag| **flag) + .count() + > 1 + { + Err("Please pick exactly one join operation.")?; } - fn get_selections( - &self, - rconf1: &Config, - rdr1: &mut csv::Reader, - rconf2: &Config, - rdr2: &mut csv::Reader, - ) -> CliResult<(Selection, Selection)> { - let headers1 = rdr1.byte_headers()?; - let headers2 = rdr2.byte_headers()?; - let select1 = rconf1.selection(headers1)?; - let select2 = rconf2.selection(headers2)?; - - Ok((select1, select2)) + if args.flag_left { + args.left_join() + } else if args.flag_right { + args.right_join() + } else if args.flag_full { + args.full_outer_join() + } else if args.flag_cross { + args.cross_join() + } else { + args.inner_join() } } -struct ValueIndex { - // This maps tuples of values to corresponding rows. - values: HashMap, Vec>, - idx: Indexed>>, - num_rows: usize, -} +#[cfg(test)] +mod tests { + use super::*; -impl ValueIndex { - fn new( - mut rdr: csv::Reader, - sel: &Selection, - case_insensitive: bool, - nulls: bool, - ) -> CliResult> { - let mut val_idx = HashMap::with_capacity(10000); - let mut row_idx = io::Cursor::new(Vec::with_capacity(8 * 10000)); - let (mut rowi, mut count) = (0usize, 0usize); - - // This logic is kind of tricky. Basically, we want to include - // the header row in the line index (because that's what csv::index - // does), but we don't want to include header values in the ValueIndex. - if !rdr.has_headers() { - // ... so if there are no headers, we seek to the beginning and - // index everything. - let mut pos = csv::Position::new(); - pos.set_byte(0); - rdr.seek(pos)?; - } else { - // ... and if there are headers, we make sure that we've parsed - // them, and write the offset of the header row to the index. - rdr.byte_headers()?; - row_idx.write_u64::(0)?; - count += 1; - } + fn rec(values: &[&str]) -> ByteRecord { + let mut record = ByteRecord::new(); - let mut row = csv::ByteRecord::new(); - while rdr.read_byte_record(&mut row)? { - // This is a bit hokey. We're doing this manually instead of using - // the `csv-index` crate directly so that we can create both - // indexes in one pass. - row_idx.write_u64::(row.position().unwrap().byte())?; - - let fields: Vec<_> = sel - .select(&row) - .map(|v| transform(v, case_insensitive)) - .collect(); - if nulls || !fields.iter().any(|f| f.is_empty()) { - match val_idx.entry(fields) { - Entry::Vacant(v) => { - let mut rows = Vec::with_capacity(4); - rows.push(rowi); - v.insert(rows); - } - Entry::Occupied(mut v) => { - v.get_mut().push(rowi); - } - } - } - rowi += 1; - count += 1; + for v in values.iter() { + record.push_field(v.as_bytes()); } - row_idx.write_u64::(count as u64)?; - let idx = Indexed::open(rdr, io::Cursor::new(row_idx.into_inner()))?; - Ok(ValueIndex { - values: val_idx, - idx, - num_rows: rowi, - }) + record } -} -impl fmt::Debug for ValueIndex { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - // Sort the values by order of first appearance. - let mut kvs = self.values.iter().collect::>(); - kvs.sort_by(|&(_, v1), &(_, v2)| v1[0].cmp(&v2[0])); - for (keys, rows) in kvs.into_iter() { - // This is just for debugging, so assume Unicode for now. - let keys = keys - .iter() - .map(|k| String::from_utf8(k.to_vec()).unwrap()) - .collect::>(); - writeln!(f, "({}) => {:?}", keys.join(", "), rows)? + impl Index { + fn test_vec(&mut self, sel: &Selection, record: &ByteRecord) -> Vec { + let mut v = Vec::new(); + + self.for_each_node_mut(sel, record, |node| -> Result<(), ()> { + v.push(node.record.clone()); + Ok(()) + }) + .unwrap(); + + v } - Ok(()) } -} - -fn get_row_key(sel: &Selection, row: &csv::ByteRecord, case_insensitive: bool) -> Vec { - sel.select(row) - .map(|v| transform(v, case_insensitive)) - .collect() -} -fn transform(bs: &[u8], case_insensitive: bool) -> ByteString { - if !case_insensitive { - bs.to_vec() - } else { - bs.to_lowercase() + #[test] + fn test_index_linked_lists() { + let mut index = Index::new(false, false); + let sel = Selection::full(1); + + index.add(&sel, ByteRecord::from(rec(&["a", "one"]))); + index.add(&sel, ByteRecord::from(rec(&["b", "one"]))); + index.add(&sel, ByteRecord::from(rec(&["a", "two"]))); + index.add(&sel, ByteRecord::from(rec(&["a", "three"]))); + index.add(&sel, ByteRecord::from(rec(&["b", "two"]))); + index.add(&sel, ByteRecord::from(rec(&["c", "one"]))); + + assert_eq!( + index.test_vec(&sel, &rec(&["d", "one"])), + Vec::::new() + ); + assert_eq!( + index.test_vec(&sel, &rec(&["a", "one"])), + vec![rec(&["a", "one"]), rec(&["a", "two"]), rec(&["a", "three"])] + ); + assert_eq!( + index.test_vec(&sel, &rec(&["b", "one"])), + vec![rec(&["b", "one"]), rec(&["b", "two"])] + ); + assert_eq!( + index.test_vec(&sel, &rec(&["c", "one"])), + vec![rec(&["c", "one"])] + ); } } diff --git a/src/cmd/regex_join.rs b/src/cmd/regex_join.rs index 3b1b244..c506113 100644 --- a/src/cmd/regex_join.rs +++ b/src/cmd/regex_join.rs @@ -96,20 +96,20 @@ Usage: xan regex-join --help join options: - -i, --ignore-case Make the regex patterns case-insensitive. - --left Write every row from the first file in the output, with empty - padding cells when no regex pattern from the second file - produced a match. - -p, --parallel Whether to use parallelization to speed up computations. - Will automatically select a suitable number of threads to use - based on your number of cores. Use -t, --threads if you want to - indicate the number of threads yourself. - -t, --threads Parellize computations using this many threads. Use -p, --parallel - if you want the number of threads to be automatically chosen instead. - --prefix-left Add a prefix to the names of the columns in the - searched file. - --prefix-right Add a prefix to the names of the columns in the - patterns file. + -i, --ignore-case Make the regex patterns case-insensitive. + --left Write every row from the first file in the output, with empty + padding cells when no regex pattern from the second file + produced a match. + -p, --parallel Whether to use parallelization to speed up computations. + Will automatically select a suitable number of threads to use + based on your number of cores. Use -t, --threads if you want to + indicate the number of threads yourself. + -t, --threads Parellize computations using this many threads. Use -p, --parallel + if you want the number of threads to be automatically chosen instead. + -L, --prefix-left Add a prefix to the names of the columns in the + searched file. + -R, --prefix-right Add a prefix to the names of the columns in the + patterns file. Common options: -h, --help Display this message diff --git a/tests/test_join.rs b/tests/test_join.rs index 78bf997..8c78c70 100644 --- a/tests/test_join.rs +++ b/tests/test_join.rs @@ -128,10 +128,10 @@ join_test!( vec![ svec!["Boston", "MA", "Boston", "Logan Airport"], svec!["Boston", "MA", "Boston", "Boston Garden"], - svec!["New York", "NY", "", ""], - svec!["San Francisco", "CA", "", ""], svec!["Buffalo", "NY", "Buffalo", "Ralph Wilson Stadium"], svec!["", "", "Orlando", "Disney World"], + svec!["New York", "NY", "", ""], + svec!["San Francisco", "CA", "", ""], ], ); assert_eq!(got, expected); @@ -172,8 +172,7 @@ fn join_cross() { ); let mut cmd = wrk.command("join"); - cmd.arg("--cross") - .args(["", "letters.csv", "", "numbers.csv"]); + cmd.arg("--cross").args(["letters.csv", "numbers.csv"]); let got: Vec> = wrk.read_stdout(&mut cmd); let expected = vec![ svec!["h1", "h2", "h3", "h4"],