From ac5c9ca807e2b9d292bdcfa4177bc5bbd66d772b Mon Sep 17 00:00:00 2001 From: roseboy Date: Fri, 15 Nov 2024 18:40:22 +0800 Subject: [PATCH] perf: reduce read page calculations --- tskv/src/reader/iterator.rs | 14 +++++-- tskv/src/tsm/reader.rs | 83 ++++++++++++++++++++----------------- tskv/src/tsm/tombstone.rs | 32 +++++++++----- 3 files changed, 77 insertions(+), 52 deletions(-) diff --git a/tskv/src/reader/iterator.rs b/tskv/src/reader/iterator.rs index 78a8a4373..3ba503206 100644 --- a/tskv/src/reader/iterator.rs +++ b/tskv/src/reader/iterator.rs @@ -978,9 +978,17 @@ async fn build_stream( return Ok(Box::pin(reader.process()?)); } - Ok(Box::pin(EmptySchemableTskvRecordBatchStream::new( - factory.schema(), - ))) + if query_option.aggregates.is_some() { + Ok(Box::pin(PushDownAggregateStream { + schema: factory.schema(), + num_count: 0, + is_get: false, + })) + } else { + Ok(Box::pin(EmptySchemableTskvRecordBatchStream::new( + factory.schema(), + ))) + } } #[cfg(test)] diff --git a/tskv/src/tsm/reader.rs b/tskv/src/tsm/reader.rs index 03b2eaf14..695bf591a 100644 --- a/tskv/src/tsm/reader.rs +++ b/tskv/src/tsm/reader.rs @@ -450,45 +450,58 @@ pub fn decode_pages( tomb: Option<(Arc, SeriesId)>, ) -> TskvResult { let mut target_arrays = Vec::with_capacity(pages.len()); + + let fields = pages + .iter() + .map(|page| Field::from(&page.meta.column)) + .collect::>(); + let schema = Arc::new(Schema::new_with_metadata(fields, schema_meta)); + if let Some((tomb, series_id)) = tomb { + // deal time page let time_page = pages .iter() .find(|f| f.meta.column.column_type.is_time()) .context(CommonSnafu { reason: "time field not found".to_string(), })?; - let (time_array, time_range) = get_time_page_meta(time_page)?; - let fields = pages + let (time_array, time_range, time) = get_time_page_meta(time_page)?; + let filters = tomb.get_all_fields_excluded_time_range(&time_range); + let time_null_bits = { + if filters.is_empty() { + None + } else { + let null_bitset = update_nullbits_by_time_range(&time_array, &filters, time_page)?; + Some(null_bitset) + } + }; + + target_arrays.push(time); + + // deal field page + for page in pages .iter() - .map(|page| Field::from(&page.meta.column)) - .collect::>(); - let schema = Arc::new(Schema::new_with_metadata(fields, schema_meta)); - let mut time_have_null = None; - for page in pages { - let null_bits = - if tomb.overlaps_column_time_range(series_id, page.meta.column.id, &time_range) { - let null_bitset = update_nullbits_by_tombstone( - &time_array, - &tomb, - series_id, - &time_range, - &page, - )?; - if page.meta.column.column_type.is_time() && !null_bitset.is_all_set() { - time_have_null = Some(null_bitset.clone()); - NullBitset::Ref(page.null_bitset()) - } else { - NullBitset::Own(null_bitset) - } - } else { + .filter(|p| p.meta.column.column_type.is_field()) + { + let null_bits = { + let filters = tomb.get_column_overlapped_time_ranges( + series_id, + page.meta.column.id, + &time_range, + ); + if filters.is_empty() { NullBitset::Ref(page.null_bitset()) - }; + } else { + let null_bitset = update_nullbits_by_time_range(&time_array, &filters, page)?; + NullBitset::Own(null_bitset) + } + }; let array = page.to_arrow_array()?; let array = updated_nullbuffer(array, null_bits)?; target_arrays.push(array); } let mut record_batch = RecordBatch::try_new(schema, target_arrays).context(ArrowSnafu)?; - if let Some(time_column_have_null) = time_have_null { + if let Some(time_column_have_null) = time_null_bits { let len = time_column_have_null.len(); let buffer = Buffer::from_vec(time_column_have_null.into_bytes()); let boolean_buffer = BooleanBuffer::new(buffer, 0, len); @@ -498,11 +511,6 @@ pub fn decode_pages( } Ok(record_batch) } else { - let fields = pages - .iter() - .map(|page| Field::from(&page.meta.column)) - .collect::>(); - let schema = Arc::new(Schema::new_with_metadata(fields, schema_meta)); for page in pages { let array = page.to_arrow_array()?; target_arrays.push(array); @@ -512,8 +520,10 @@ pub fn decode_pages( } } -pub fn get_time_page_meta(time_page: &Page) -> TskvResult<(PrimitiveArray, TimeRange)> { - let time_array_ref = data_buf_to_arrow_array(time_page)?; +pub fn get_time_page_meta( + time_page: &Page, +) -> TskvResult<(PrimitiveArray, TimeRange, ArrayRef)> { + let time_array_ref = time_page.to_arrow_array()?; let time_array = match time_array_ref.data_type() { DataType::Timestamp(_time_unit, _) => { let array_data = time_array_ref.to_data(); @@ -568,7 +578,7 @@ pub fn get_time_page_meta(time_page: &Page) -> TskvResult<(PrimitiveArray, - tomb: &TsmTombstone, - series_id: SeriesId, - time_range: &TimeRange, + time_ranges: &Vec, page: &Page, ) -> TskvResult { - let time_ranges = tomb.get_overlapped_time_ranges(series_id, page.meta.column.id, time_range); let mut null_bitset = page.null_bitset().to_bitset(); for time_range in time_ranges { diff --git a/tskv/src/tsm/tombstone.rs b/tskv/src/tsm/tombstone.rs index ee0246e0a..086edd6f3 100644 --- a/tskv/src/tsm/tombstone.rs +++ b/tskv/src/tsm/tombstone.rs @@ -331,6 +331,7 @@ impl TsmTombstone { Ok(()) } + #[cfg(test)] pub fn overlaps_column_time_range( &self, series_id: SeriesId, @@ -341,7 +342,7 @@ impl TsmTombstone { .read() .overlaps_field_time_range(series_id, column_id, time_range) } - + #[cfg(test)] pub fn check_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> bool { self.cache .read() @@ -350,7 +351,7 @@ impl TsmTombstone { /// Returns all tombstone `TimeRange`s that overlaps the given `TimeRange`. /// Returns None if there is nothing to return, or `TimeRange`s is empty. - pub fn get_overlapped_time_ranges( + pub fn get_column_overlapped_time_ranges( &self, series_id: SeriesId, column_id: ColumnId, @@ -358,7 +359,13 @@ impl TsmTombstone { ) -> Vec { self.cache .read() - .get_overlapped_time_ranges(series_id, column_id, time_range) + .get_column_overlapped_time_ranges(series_id, column_id, time_range) + } + + pub fn get_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> Vec { + self.cache + .read() + .get_all_fields_excluded_time_range(time_range) } } @@ -500,9 +507,6 @@ impl TsmTombstoneCache { column_id: ColumnId, time_range: &TimeRange, ) -> bool { - if self.all_excluded.includes(time_range) { - return true; - } if let Some(time_ranges) = self.column_excluded.get(&(series_id, column_id)) { for t in time_ranges.time_ranges() { if t.overlaps(time_range) { @@ -513,7 +517,7 @@ impl TsmTombstoneCache { false } - pub fn get_overlapped_time_ranges( + pub fn get_column_overlapped_time_ranges( &self, series_id: SeriesId, column_id: ColumnId, @@ -527,6 +531,16 @@ impl TsmTombstoneCache { } } } + + trs + } + + pub fn check_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> bool { + self.all_excluded.includes(time_range) + } + + pub fn get_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> Vec { + let mut trs = Vec::new(); for all in self.all_excluded.time_ranges() { if all.overlaps(time_range) { trs.push(all); @@ -535,10 +549,6 @@ impl TsmTombstoneCache { trs } - pub fn check_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> bool { - self.all_excluded.includes(time_range) - } - pub async fn load(path: impl AsRef) -> TskvResult> { let mut reader = if LocalFileSystem::try_exists(&path) { record_file::Reader::open(&path).await?