diff --git a/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs index 86c3013663804..080dd6811ce91 100644 --- a/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs @@ -167,6 +167,9 @@ impl InputFormatTextBase for InputFormatCSV { n_end: 0, num_fields: ctx.schema.num_fields(), projection, + need_skip_first_line: split_info.offset > 0, + has_read_one_more_line: false, + recieved_bytes_offset: split_info.offset, }) } @@ -222,6 +225,10 @@ pub struct CsvReaderState { num_fields: usize, projection: Option>, + // used to skip the first line + need_skip_first_line: bool, + has_read_one_more_line: bool, + recieved_bytes_offset: usize, } impl CsvReaderState { @@ -247,13 +254,16 @@ impl CsvReaderState { ReadRecordResult::OutputEndsFull => Err(self.error_output_ends_full()), ReadRecordResult::Record => { if self.projection.is_none() { - if let Err(e) = self.check_num_field() { - self.ctx.on_error(e, None, None)?; - self.common.rows += 1; - self.common.offset += n_in; - self.n_end = 0; - return Ok((Some(0), n_in, n_out)); + if !self.need_skip_first_line && !self.has_read_one_more_line { + if let Err(e) = self.check_num_field() { + self.ctx.on_error(e, None, None)?; + self.common.rows += 1; + self.common.offset += n_in; + self.n_end = 0; + return Ok((Some(0), n_in, n_out)); + } } + self.need_skip_first_line = false; } self.common.rows += 1; @@ -277,7 +287,7 @@ impl AligningStateTextBased for CsvReaderState { fn align(&mut self, buf_in: &[u8]) -> Result> { let mut out_tmp = vec![0u8; buf_in.len()]; let mut buf = buf_in; - + self.recieved_bytes_offset += buf.len(); while self.common.rows_to_skip > 0 { let (_, n_in, _) = self.read_record(buf, &mut out_tmp)?; buf = &buf[n_in..]; @@ -327,13 +337,14 @@ impl AligningStateTextBased for CsvReaderState { buf_in.len(), ); self.out.extend_from_slice(&out_tmp); + self.judge_read_one_more_line(); Ok(vec![]) } else { let last_remain = mem::take(&mut self.out); self.common.batch_id += 1; self.out.extend_from_slice(&out_tmp[row_batch_end..]); - + self.judge_read_one_more_line(); tracing::debug!( "csv aligner: {} + {} bytes => {} rows + {} bytes remain", last_remain.len(), @@ -390,6 +401,14 @@ impl AligningStateTextBased for CsvReaderState { } impl CsvReaderState { + fn judge_read_one_more_line(&mut self) { + if self.recieved_bytes_offset - self.out.len() + > self.split_info.offset + self.split_info.size + { + self.has_read_one_more_line = true; + } + } + fn check_num_field(&self) -> Result<()> { let expect = self.num_fields; let actual = self.n_end; diff --git a/src/query/pipeline/sources/src/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/input_formats/input_format_text.rs index 9410d2cf6e78c..7792819efc51d 100644 --- a/src/query/pipeline/sources/src/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/input_formats/input_format_text.rs @@ -296,9 +296,18 @@ impl InputFormat for T { let mut split_size = stage_info.copy_options.split_size; let mut splittable = is_splittable(&info); - // user doesn't specify the split - if splittable && split_size == 0 { - split_size = TEXT_FILE_SPLIT_CHUNK_SIZE; + + if splittable { + // user doesn't specify the split + if split_size == 0 { + split_size = TEXT_FILE_SPLIT_CHUNK_SIZE; + } else if split_size > 0 && split_size < TEXT_FILE_SPLIT_CHUNK_SIZE { + // one line's size won't over 50M, so we just make sure the split chunk should + // be over 50M,otherwise a line will be split into multi splits(greater than 2), + // in this occassion, we will get some immediate splits of one line, we can't + // process this and it will insert one and the same line multi times. + split_size = 0; + } } splittable = splittable || T::is_splittable(); @@ -398,6 +407,35 @@ impl RowBatchTrait for RowBatch { fn rows(&self) -> usize { self.row_ends.len() } + + fn just_save_first_line(&mut self) { + if self.row_ends.len() > 0 { + self.row_ends.truncate(1); + self.data.truncate(self.row_ends[0]); + self.num_fields.truncate(1); + self.field_ends.truncate(self.num_fields[0]); + } + } + + fn remove_first_line(&mut self) { + if self.row_ends.len() > 0 { + let first_row_size = self.row_ends[0]; + + for _ in 0..self.num_fields[0] { + self.field_ends.remove(0); + } + self.num_fields.remove(0); + + for _ in 0..self.row_ends[0] { + self.data.remove(0); + } + // correct the row_ends + self.row_ends.remove(0); + for i in 0..self.row_ends.len() { + self.row_ends[i] -= first_row_size; + } + } + } } pub struct AligningStateMaybeCompressed { diff --git a/src/query/pipeline/sources/src/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/input_formats/input_pipeline.rs index a040eccfd4c03..4b6e7fbe61b22 100644 --- a/src/query/pipeline/sources/src/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/input_formats/input_pipeline.rs @@ -88,6 +88,8 @@ impl ReadBatchTrait for Vec { pub trait RowBatchTrait: Send { fn size(&self) -> usize; fn rows(&self) -> usize; + fn remove_first_line(&mut self) {} + fn just_save_first_line(&mut self) {} } #[async_trait::async_trait] diff --git a/src/query/pipeline/sources/src/input_formats/source_aligner.rs b/src/query/pipeline/sources/src/input_formats/source_aligner.rs index 26179f84e2cc9..5c6597adb4acd 100644 --- a/src/query/pipeline/sources/src/input_formats/source_aligner.rs +++ b/src/query/pipeline/sources/src/input_formats/source_aligner.rs @@ -134,23 +134,31 @@ impl Processor for Aligner { let eof = read_batch.is_none(); let mut row_batches = state.align(read_batch)?; let split_info = self.current_split_info.as_ref().expect("can't be none"); + + if self.in_read_one_more_line && !row_batches.is_empty() { + row_batches.truncate(1); + row_batches.get_mut(0).unwrap().just_save_first_line(); + + if !self.has_read_one_more_line { + self.complete + .as_mut() + .expect("complete channel can't be none") + .send(true) + .expect("sync message error"); + } else { + row_batches.truncate(0); + } + + self.has_read_one_more_line = true; + } + // this is the first time to send data and it's not the first split of the file // we need to skip the first line if !self.has_sent_data && split_info.offset > 0 && !row_batches.is_empty() { - row_batches.remove(0); + row_batches.get_mut(0).unwrap().remove_first_line(); self.has_sent_data = true; } - if self.in_read_one_more_line && !row_batches.is_empty() { - row_batches.truncate(1); - self.has_read_one_more_line = true; - // if get error, it means the channel is close, it says that - // the reader has read at the end of file and no more data. - self.complete - .as_mut() - .expect("complete channel can't be none") - .send(true) - .expect("sync message error"); - } + for b in row_batches.into_iter() { if b.size() > 0 { process_values.rows += b.rows(); @@ -167,8 +175,10 @@ impl Processor for Aligner { // there are more data we can read if split_info.offset + split_info.size < split_info.file.size && !self.in_read_one_more_line + || self.in_read_one_more_line && !self.has_read_one_more_line { self.in_read_one_more_line = true; + self.complete .as_mut() .expect("complete channel can't be none") @@ -183,8 +193,9 @@ impl Processor for Aligner { if self.in_read_one_more_line { if self.has_read_one_more_line { row_batches.truncate(0); - } else { + } else if !row_batches.is_empty() { row_batches.truncate(1); + row_batches.get_mut(0).unwrap().just_save_first_line(); } } for b in row_batches.into_iter() { @@ -232,7 +243,7 @@ impl Processor for Aligner { match rx.recv().await { Some(Ok(batch)) => { tracing::debug!("aligner recv new batch"); - self.read_batch = Some(batch) + self.read_batch = Some(batch); } Some(Err(e)) => { return Err(e);