Skip to content

Commit

Permalink
perf: reduce read page calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
roseboy-liu committed Nov 19, 2024
1 parent 3f5d067 commit ac5c9ca
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 52 deletions.
14 changes: 11 additions & 3 deletions tskv/src/reader/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,9 +978,17 @@ async fn build_stream(
return Ok(Box::pin(reader.process()?));
}

Ok(Box::pin(EmptySchemableTskvRecordBatchStream::new(
factory.schema(),
)))
if query_option.aggregates.is_some() {
Ok(Box::pin(PushDownAggregateStream {
schema: factory.schema(),
num_count: 0,
is_get: false,
}))
} else {
Ok(Box::pin(EmptySchemableTskvRecordBatchStream::new(
factory.schema(),
)))
}
}

#[cfg(test)]
Expand Down
83 changes: 45 additions & 38 deletions tskv/src/tsm/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,45 +450,58 @@ pub fn decode_pages(
tomb: Option<(Arc<TsmTombstone>, SeriesId)>,
) -> TskvResult<RecordBatch> {
let mut target_arrays = Vec::with_capacity(pages.len());

let fields = pages
.iter()
.map(|page| Field::from(&page.meta.column))
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new_with_metadata(fields, schema_meta));

if let Some((tomb, series_id)) = tomb {
// deal time page
let time_page = pages
.iter()
.find(|f| f.meta.column.column_type.is_time())
.context(CommonSnafu {
reason: "time field not found".to_string(),
})?;
let (time_array, time_range) = get_time_page_meta(time_page)?;
let fields = pages
let (time_array, time_range, time) = get_time_page_meta(time_page)?;
let filters = tomb.get_all_fields_excluded_time_range(&time_range);
let time_null_bits = {
if filters.is_empty() {
None
} else {
let null_bitset = update_nullbits_by_time_range(&time_array, &filters, time_page)?;
Some(null_bitset)
}
};

target_arrays.push(time);

// deal field page
for page in pages
.iter()
.map(|page| Field::from(&page.meta.column))
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new_with_metadata(fields, schema_meta));
let mut time_have_null = None;
for page in pages {
let null_bits =
if tomb.overlaps_column_time_range(series_id, page.meta.column.id, &time_range) {
let null_bitset = update_nullbits_by_tombstone(
&time_array,
&tomb,
series_id,
&time_range,
&page,
)?;
if page.meta.column.column_type.is_time() && !null_bitset.is_all_set() {
time_have_null = Some(null_bitset.clone());
NullBitset::Ref(page.null_bitset())
} else {
NullBitset::Own(null_bitset)
}
} else {
.filter(|p| p.meta.column.column_type.is_field())
{
let null_bits = {
let filters = tomb.get_column_overlapped_time_ranges(
series_id,
page.meta.column.id,
&time_range,
);
if filters.is_empty() {
NullBitset::Ref(page.null_bitset())
};
} else {
let null_bitset = update_nullbits_by_time_range(&time_array, &filters, page)?;
NullBitset::Own(null_bitset)
}
};
let array = page.to_arrow_array()?;
let array = updated_nullbuffer(array, null_bits)?;
target_arrays.push(array);
}
let mut record_batch = RecordBatch::try_new(schema, target_arrays).context(ArrowSnafu)?;
if let Some(time_column_have_null) = time_have_null {
if let Some(time_column_have_null) = time_null_bits {
let len = time_column_have_null.len();
let buffer = Buffer::from_vec(time_column_have_null.into_bytes());
let boolean_buffer = BooleanBuffer::new(buffer, 0, len);
Expand All @@ -498,11 +511,6 @@ pub fn decode_pages(
}
Ok(record_batch)
} else {
let fields = pages
.iter()
.map(|page| Field::from(&page.meta.column))
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new_with_metadata(fields, schema_meta));
for page in pages {
let array = page.to_arrow_array()?;
target_arrays.push(array);
Expand All @@ -512,8 +520,10 @@ pub fn decode_pages(
}
}

pub fn get_time_page_meta(time_page: &Page) -> TskvResult<(PrimitiveArray<Int64Type>, TimeRange)> {
let time_array_ref = data_buf_to_arrow_array(time_page)?;
pub fn get_time_page_meta(
time_page: &Page,
) -> TskvResult<(PrimitiveArray<Int64Type>, TimeRange, ArrayRef)> {
let time_array_ref = time_page.to_arrow_array()?;
let time_array = match time_array_ref.data_type() {
DataType::Timestamp(_time_unit, _) => {
let array_data = time_array_ref.to_data();
Expand Down Expand Up @@ -568,7 +578,7 @@ pub fn get_time_page_meta(time_page: &Page) -> TskvResult<(PrimitiveArray<Int64T
.build())
}
};
Ok((time_array, time_range))
Ok((time_array, time_range, time_array_ref))
}

pub fn decode_pages_buf(
Expand All @@ -582,14 +592,11 @@ pub fn decode_pages_buf(
Ok(data_block)
}

fn update_nullbits_by_tombstone(
fn update_nullbits_by_time_range(
time_array_ref: &PrimitiveArray<Int64Type>,
tomb: &TsmTombstone,
series_id: SeriesId,
time_range: &TimeRange,
time_ranges: &Vec<TimeRange>,
page: &Page,
) -> TskvResult<BitSet> {
let time_ranges = tomb.get_overlapped_time_ranges(series_id, page.meta.column.id, time_range);
let mut null_bitset = page.null_bitset().to_bitset();

for time_range in time_ranges {
Expand Down
32 changes: 21 additions & 11 deletions tskv/src/tsm/tombstone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl TsmTombstone {
Ok(())
}

#[cfg(test)]
pub fn overlaps_column_time_range(
&self,
series_id: SeriesId,
Expand All @@ -341,7 +342,7 @@ impl TsmTombstone {
.read()
.overlaps_field_time_range(series_id, column_id, time_range)
}

#[cfg(test)]
pub fn check_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> bool {
self.cache
.read()
Expand All @@ -350,15 +351,21 @@ impl TsmTombstone {

/// Returns all tombstone `TimeRange`s that overlaps the given `TimeRange`.
/// Returns None if there is nothing to return, or `TimeRange`s is empty.
pub fn get_overlapped_time_ranges(
pub fn get_column_overlapped_time_ranges(
&self,
series_id: SeriesId,
column_id: ColumnId,
time_range: &TimeRange,
) -> Vec<TimeRange> {
self.cache
.read()
.get_overlapped_time_ranges(series_id, column_id, time_range)
.get_column_overlapped_time_ranges(series_id, column_id, time_range)
}

pub fn get_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> Vec<TimeRange> {
self.cache
.read()
.get_all_fields_excluded_time_range(time_range)
}
}

Expand Down Expand Up @@ -500,9 +507,6 @@ impl TsmTombstoneCache {
column_id: ColumnId,
time_range: &TimeRange,
) -> bool {
if self.all_excluded.includes(time_range) {
return true;
}
if let Some(time_ranges) = self.column_excluded.get(&(series_id, column_id)) {
for t in time_ranges.time_ranges() {
if t.overlaps(time_range) {
Expand All @@ -513,7 +517,7 @@ impl TsmTombstoneCache {
false
}

pub fn get_overlapped_time_ranges(
pub fn get_column_overlapped_time_ranges(
&self,
series_id: SeriesId,
column_id: ColumnId,
Expand All @@ -527,6 +531,16 @@ impl TsmTombstoneCache {
}
}
}

trs
}

pub fn check_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> bool {
self.all_excluded.includes(time_range)
}

pub fn get_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> Vec<TimeRange> {
let mut trs = Vec::new();
for all in self.all_excluded.time_ranges() {
if all.overlaps(time_range) {
trs.push(all);
Expand All @@ -535,10 +549,6 @@ impl TsmTombstoneCache {
trs
}

pub fn check_all_fields_excluded_time_range(&self, time_range: &TimeRange) -> bool {
self.all_excluded.includes(time_range)
}

pub async fn load(path: impl AsRef<Path>) -> TskvResult<Option<Self>> {
let mut reader = if LocalFileSystem::try_exists(&path) {
record_file::Reader::open(&path).await?
Expand Down

0 comments on commit ac5c9ca

Please sign in to comment.