Skip to content

Commit

Permalink
fix postcard compatibility for top_hits, add postcard test (#2346)
Browse files Browse the repository at this point in the history
* fix postcard compatibility for top_hits, add postcard test

* fix top_hits naming, delay data fetch

closes #2347

* fix import
  • Loading branch information
PSeitz authored Apr 9, 2024
1 parent b644d78 commit 92c3297
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 231 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
postcard = { version = "1.0.4", features = [
"use-std",
], default-features = false }

[target.'cfg(not(windows))'.dev-dependencies]
criterion = { version = "0.5", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion common/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub type DatePrecision = DateTimePrecision;
/// All constructors and conversions are provided as explicit
/// functions and not by implementing any `From`/`Into` traits
/// to prevent unintended usage.
#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct DateTime {
// Timestamp in nanoseconds.
pub(crate) timestamp_nanos: i64,
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl AggregationWithAccessor {
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
TopHits(ref mut top_hits) => {
top_hits.validate_and_resolve(reader.fast_fields().columnar())?;
top_hits.validate_and_resolve_field_names(reader.fast_fields().columnar())?;
let accessors: Vec<(Column<u64>, ColumnType)> = top_hits
.field_names()
.iter()
Expand Down
27 changes: 27 additions & 0 deletions src/aggregation/agg_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::aggregation::agg_req::{Aggregation, Aggregations};
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::buf_collector::DOC_BLOCK_SIZE;
use crate::aggregation::collector::AggregationCollector;
use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use crate::aggregation::segment_agg_result::AggregationLimits;
use crate::aggregation::tests::{get_test_index_2_segments, get_test_index_from_values_and_terms};
use crate::aggregation::DistributedAggregationCollector;
Expand Down Expand Up @@ -66,6 +67,22 @@ fn test_aggregation_flushing(
}
}
},
"top_hits_test":{
"terms": {
"field": "string_id"
},
"aggs": {
"bucketsL2": {
"top_hits": {
"size": 2,
"sort": [
{ "score": "asc" }
],
"docvalue_fields": ["score"]
}
}
}
},
"histogram_test":{
"histogram": {
"field": "score",
Expand Down Expand Up @@ -108,6 +125,16 @@ fn test_aggregation_flushing(

let searcher = reader.searcher();
let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap();

// Test postcard roundtrip serialization
let intermediate_agg_result_bytes = postcard::to_allocvec(&intermediate_agg_result).expect(
"Postcard Serialization failed, flatten etc. is not supported in the intermediate \
result",
);
let intermediate_agg_result: IntermediateAggregationResults =
postcard::from_bytes(&intermediate_agg_result_bytes)
.expect("Post deserialization failed");

intermediate_agg_result
.into_final_result(agg_req, &Default::default())
.unwrap()
Expand Down
12 changes: 6 additions & 6 deletions src/aggregation/intermediate_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::bucket::{
};
use super::metric::{
IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats,
IntermediateSum, PercentilesCollector, TopHitsCollector,
IntermediateSum, PercentilesCollector, TopHitsTopNComputer,
};
use super::segment_agg_result::AggregationLimits;
use super::{format_date, AggregationError, Key, SerializedKey};
Expand Down Expand Up @@ -221,9 +221,9 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
Percentiles(_) => IntermediateAggregationResult::Metric(
IntermediateMetricResult::Percentiles(PercentilesCollector::default()),
),
TopHits(_) => IntermediateAggregationResult::Metric(IntermediateMetricResult::TopHits(
TopHitsCollector::default(),
)),
TopHits(ref req) => IntermediateAggregationResult::Metric(
IntermediateMetricResult::TopHits(TopHitsTopNComputer::new(req.clone())),
),
}
}

Expand Down Expand Up @@ -285,7 +285,7 @@ pub enum IntermediateMetricResult {
/// Intermediate sum result.
Sum(IntermediateSum),
/// Intermediate top_hits result
TopHits(TopHitsCollector),
TopHits(TopHitsTopNComputer),
}

impl IntermediateMetricResult {
Expand Down Expand Up @@ -314,7 +314,7 @@ impl IntermediateMetricResult {
.into_final_result(req.agg.as_percentile().expect("unexpected metric type")),
),
IntermediateMetricResult::TopHits(top_hits) => {
MetricResult::TopHits(top_hits.finalize())
MetricResult::TopHits(top_hits.into_final_result())
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/aggregation/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod stats;
mod sum;
mod top_hits;

use std::collections::HashMap;

pub use average::*;
pub use count::*;
pub use max::*;
Expand All @@ -36,6 +38,8 @@ pub use stats::*;
pub use sum::*;
pub use top_hits::*;

use crate::schema::OwnedValue;

/// Single-metric aggregations use this common result structure.
///
/// Main reason to wrap it in value is to match elasticsearch output structure.
Expand Down Expand Up @@ -92,8 +96,9 @@ pub struct TopHitsVecEntry {

/// Search results, for queries that include field retrieval requests
/// (`docvalue_fields`).
#[serde(flatten)]
pub search_results: FieldRetrivalResult,
#[serde(rename = "docvalue_fields")]
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub doc_value_fields: HashMap<String, OwnedValue>,
}

/// The top_hits metric aggregation results a list of top hits by sort criteria.
Expand Down
Loading

0 comments on commit 92c3297

Please sign in to comment.