Skip to content

Commit

Permalink
fix many bugs and now start to add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Jun 24, 2023
1 parent 479f5f0 commit 0a05f2c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ impl InputFormatTextBase for InputFormatCSV {
n_end: 0,
num_fields: ctx.schema.num_fields(),
projection,
need_skip_first_line: split_info.offset > 0,
has_read_one_more_line: false,
recieved_bytes_offset: split_info.offset,
})
}

Expand Down Expand Up @@ -222,6 +225,10 @@ pub struct CsvReaderState {

num_fields: usize,
projection: Option<Vec<usize>>,
// used to skip the first line
need_skip_first_line: bool,
has_read_one_more_line: bool,
recieved_bytes_offset: usize,
}

impl CsvReaderState {
Expand All @@ -247,13 +254,16 @@ impl CsvReaderState {
ReadRecordResult::OutputEndsFull => Err(self.error_output_ends_full()),
ReadRecordResult::Record => {
if self.projection.is_none() {
if let Err(e) = self.check_num_field() {
self.ctx.on_error(e, None, None)?;
self.common.rows += 1;
self.common.offset += n_in;
self.n_end = 0;
return Ok((Some(0), n_in, n_out));
if !self.need_skip_first_line && !self.has_read_one_more_line {
if let Err(e) = self.check_num_field() {
self.ctx.on_error(e, None, None)?;
self.common.rows += 1;
self.common.offset += n_in;
self.n_end = 0;
return Ok((Some(0), n_in, n_out));
}
}
self.need_skip_first_line = false;
}

self.common.rows += 1;
Expand All @@ -277,7 +287,7 @@ impl AligningStateTextBased for CsvReaderState {
fn align(&mut self, buf_in: &[u8]) -> Result<Vec<RowBatch>> {
let mut out_tmp = vec![0u8; buf_in.len()];
let mut buf = buf_in;

self.recieved_bytes_offset += buf.len();
while self.common.rows_to_skip > 0 {
let (_, n_in, _) = self.read_record(buf, &mut out_tmp)?;
buf = &buf[n_in..];
Expand Down Expand Up @@ -327,13 +337,14 @@ impl AligningStateTextBased for CsvReaderState {
buf_in.len(),
);
self.out.extend_from_slice(&out_tmp);
self.judge_read_one_more_line();
Ok(vec![])
} else {
let last_remain = mem::take(&mut self.out);

self.common.batch_id += 1;
self.out.extend_from_slice(&out_tmp[row_batch_end..]);

self.judge_read_one_more_line();
tracing::debug!(
"csv aligner: {} + {} bytes => {} rows + {} bytes remain",
last_remain.len(),
Expand Down Expand Up @@ -390,6 +401,14 @@ impl AligningStateTextBased for CsvReaderState {
}

impl CsvReaderState {
fn judge_read_one_more_line(&mut self) {
if self.recieved_bytes_offset - self.out.len()
> self.split_info.offset + self.split_info.size
{
self.has_read_one_more_line = true;
}
}

fn check_num_field(&self) -> Result<()> {
let expect = self.num_fields;
let actual = self.n_end;
Expand Down
44 changes: 41 additions & 3 deletions src/query/pipeline/sources/src/input_formats/input_format_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,18 @@ impl<T: InputFormatTextBase> InputFormat for T {

let mut split_size = stage_info.copy_options.split_size;
let mut splittable = is_splittable(&info);
// user doesn't specify the split
if splittable && split_size == 0 {
split_size = TEXT_FILE_SPLIT_CHUNK_SIZE;

if splittable {
// user doesn't specify the split
if split_size == 0 {
split_size = TEXT_FILE_SPLIT_CHUNK_SIZE;
} else if split_size > 0 && split_size < TEXT_FILE_SPLIT_CHUNK_SIZE {
// one line's size won't over 50M, so we just make sure the split chunk should
// be over 50M,otherwise a line will be split into multi splits(greater than 2),
// in this occassion, we will get some immediate splits of one line, we can't
// process this and it will insert one and the same line multi times.
split_size = 0;
}
}
splittable = splittable || T::is_splittable();

Expand Down Expand Up @@ -398,6 +407,35 @@ impl RowBatchTrait for RowBatch {
fn rows(&self) -> usize {
self.row_ends.len()
}

fn just_save_first_line(&mut self) {
if self.row_ends.len() > 0 {
self.row_ends.truncate(1);
self.data.truncate(self.row_ends[0]);
self.num_fields.truncate(1);
self.field_ends.truncate(self.num_fields[0]);
}
}

fn remove_first_line(&mut self) {
if self.row_ends.len() > 0 {
let first_row_size = self.row_ends[0];

for _ in 0..self.num_fields[0] {
self.field_ends.remove(0);
}
self.num_fields.remove(0);

for _ in 0..self.row_ends[0] {
self.data.remove(0);
}
// correct the row_ends
self.row_ends.remove(0);
for i in 0..self.row_ends.len() {
self.row_ends[i] -= first_row_size;
}
}
}
}

pub struct AligningStateMaybeCompressed<T: InputFormatTextBase> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ impl ReadBatchTrait for Vec<u8> {
pub trait RowBatchTrait: Send {
fn size(&self) -> usize;
fn rows(&self) -> usize;
fn remove_first_line(&mut self) {}
fn just_save_first_line(&mut self) {}
}

#[async_trait::async_trait]
Expand Down
39 changes: 25 additions & 14 deletions src/query/pipeline/sources/src/input_formats/source_aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,31 @@ impl<I: InputFormatPipe> Processor for Aligner<I> {
let eof = read_batch.is_none();
let mut row_batches = state.align(read_batch)?;
let split_info = self.current_split_info.as_ref().expect("can't be none");

if self.in_read_one_more_line && !row_batches.is_empty() {
row_batches.truncate(1);
row_batches.get_mut(0).unwrap().just_save_first_line();

if !self.has_read_one_more_line {
self.complete
.as_mut()
.expect("complete channel can't be none")
.send(true)
.expect("sync message error");
} else {
row_batches.truncate(0);
}

self.has_read_one_more_line = true;
}

// this is the first time to send data and it's not the first split of the file
// we need to skip the first line
if !self.has_sent_data && split_info.offset > 0 && !row_batches.is_empty() {
row_batches.remove(0);
row_batches.get_mut(0).unwrap().remove_first_line();
self.has_sent_data = true;
}
if self.in_read_one_more_line && !row_batches.is_empty() {
row_batches.truncate(1);
self.has_read_one_more_line = true;
// if get error, it means the channel is close, it says that
// the reader has read at the end of file and no more data.
self.complete
.as_mut()
.expect("complete channel can't be none")
.send(true)
.expect("sync message error");
}

for b in row_batches.into_iter() {
if b.size() > 0 {
process_values.rows += b.rows();
Expand All @@ -167,8 +175,10 @@ impl<I: InputFormatPipe> Processor for Aligner<I> {
// there are more data we can read
if split_info.offset + split_info.size < split_info.file.size
&& !self.in_read_one_more_line
|| self.in_read_one_more_line && !self.has_read_one_more_line
{
self.in_read_one_more_line = true;

self.complete
.as_mut()
.expect("complete channel can't be none")
Expand All @@ -183,8 +193,9 @@ impl<I: InputFormatPipe> Processor for Aligner<I> {
if self.in_read_one_more_line {
if self.has_read_one_more_line {
row_batches.truncate(0);
} else {
} else if !row_batches.is_empty() {
row_batches.truncate(1);
row_batches.get_mut(0).unwrap().just_save_first_line();
}
}
for b in row_batches.into_iter() {
Expand Down Expand Up @@ -232,7 +243,7 @@ impl<I: InputFormatPipe> Processor for Aligner<I> {
match rx.recv().await {
Some(Ok(batch)) => {
tracing::debug!("aligner recv new batch");
self.read_batch = Some(batch)
self.read_batch = Some(batch);
}
Some(Err(e)) => {
return Err(e);
Expand Down

0 comments on commit 0a05f2c

Please sign in to comment.