Skip to content

fix: query staging(in-mem) when concerned with the past 5 minutes #1194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc};
use datafusion::catalog::Session;
use datafusion::common::stats::Precision;
use datafusion::logical_expr::utils::conjunction;
Expand Down Expand Up @@ -442,7 +442,7 @@ impl TableProvider for StandardTableProvider {
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
}

if include_now(filters, &time_partition) {
if is_within_staging_window(&time_filters) {
if let Ok(staging) = PARSEABLE.get_stream(&self.stream) {
let records = staging.recordbatches_cloned(&self.schema);
let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?;
Expand Down Expand Up @@ -730,23 +730,21 @@ fn return_listing_time_filters(
}
}

pub fn include_now(filters: &[Expr], time_partition: &Option<String>) -> bool {
let current_minute = Utc::now()
/// We should consider data in staging for queries concerning a time period,
/// ending within 5 minutes from now. e.g. If current time is 5
pub fn is_within_staging_window(time_filters: &[PartialTimeFilter]) -> bool {
let five_minutes_back = (Utc::now() - TimeDelta::minutes(5))
.with_second(0)
.and_then(|x| x.with_nanosecond(0))
.expect("zeroed value is valid")
.naive_utc();

let time_filters = extract_primary_filter(filters, time_partition);

let upper_bound_matches = time_filters.iter().any(|filter| match filter {
if time_filters.iter().any(|filter| match filter {
PartialTimeFilter::High(Bound::Excluded(time))
| PartialTimeFilter::High(Bound::Included(time))
| PartialTimeFilter::Eq(time) => time > &current_minute,
| PartialTimeFilter::Eq(time) => time >= &five_minutes_back,
_ => false,
});

if upper_bound_matches {
}) {
return true;
}

Expand Down Expand Up @@ -828,7 +826,7 @@ pub async fn collect_manifest_files(
}

// Extract start time and end time from filter predicate
fn extract_primary_filter(
pub fn extract_primary_filter(
filters: &[Expr],
time_partition: &Option<String>,
) -> Vec<PartialTimeFilter> {
Expand Down
8 changes: 4 additions & 4 deletions src/utils/arrow/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::event::Event;
use crate::handlers::http::ingest::push_logs_unchecked;
use crate::handlers::http::query::Query as QueryJson;
use crate::parseable::PARSEABLE;
use crate::query::stream_schema_provider::include_now;
use crate::query::stream_schema_provider::{extract_primary_filter, is_within_staging_window};
use crate::{handlers::http::modal::IngestorMetadata, option::Mode};

use arrow_array::RecordBatch;
Expand Down Expand Up @@ -131,9 +131,9 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool {
datafusion::logical_expr::Operator::Lt,
Box::new(filter_end),
);
let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)];

PARSEABLE.options.mode == Mode::Query && include_now(&ex, &None)
let time_filters =
extract_primary_filter(&[Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)], &None);
PARSEABLE.options.mode == Mode::Query && is_within_staging_window(&time_filters)
}

fn lit_timestamp_milli(time: i64) -> Expr {
Expand Down
Loading