Skip to content

Commit

Permalink
update count route
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed May 24, 2024
1 parent 2c38810 commit ce08868
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 15 deletions.
15 changes: 9 additions & 6 deletions server/src/api/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use actix_web::{
HttpResponse,
};
use serde_json::json;
use smoltable::query::count::Input as CountInput;

#[post("/v1/table/{name}/count")]
pub async fn handler(
path: Path<String>,
app_state: web::Data<AppState>,
req_body: web::Json<CountInput>,
) -> CustomRouteResult<HttpResponse> {
let before = std::time::Instant::now();

Expand All @@ -40,10 +42,10 @@ pub async fn handler(
let tables = app_state.tables.read().await;

if let Some(table) = tables.get(&table_name) {
let (row_count, cell_count) = {
let result = {
let table = table.clone();

tokio::task::spawn_blocking(move || table.count())
tokio::task::spawn_blocking(move || table.scan_count(req_body.0))
.await
.expect("should join")
}?;
Expand All @@ -52,21 +54,22 @@ pub async fn handler(

let micros_total = dur.as_micros();

let micros_per_row = if row_count == 0 {
let micros_per_row = if result.row_count == 0 {
None
} else {
Some(micros_total / row_count as u128)
Some(micros_total / result.row_count as u128)
};

Ok(build_response(
dur,
StatusCode::OK,
"Count successful",
&json!({
"row_count": row_count,
"cell_count": cell_count,
"row_count": result.row_count,
"cell_count": result.cell_count,
"micros": micros_total,
"micros_per_row": micros_per_row,
"bytes_scanned": result.bytes_scanned_count,
}),
))
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ManifestTable {
self.tree
.insert(format!("table#{table_name}#name"), table_name)?;

self.keyspace.persist(fjall::FlushMode::SyncAll)?;
self.keyspace.persist(fjall::PersistMode::SyncAll)?;

Ok(())
}
Expand Down
28 changes: 28 additions & 0 deletions smoltable/src/query/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use super::scan::ScanMode;
use crate::ColumnFilter;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RowOptions {
#[serde(flatten)]
pub scan: ScanMode,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ColumnOptions {
#[serde(flatten)]
pub filter: Option<ColumnFilter>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Input {
pub row: RowOptions,
pub column: Option<ColumnOptions>,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Output {
pub affected_locality_groups: usize,
pub cell_count: u64,
pub row_count: u64,
pub bytes_scanned_count: u64,
}
1 change: 1 addition & 0 deletions smoltable/src/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod count;
pub mod row;
pub mod scan;
2 changes: 1 addition & 1 deletion smoltable/src/table/merge_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Iterator for &mut MergeReader {
.enumerate()
.filter(|(_, cell)| Option::is_some(cell))
.map(|(idx, cell)| (idx, cell.unwrap()))
.max_by(|(_, a), (_, b)| a.raw_key.cmp(&b.raw_key));
.min_by(|(_, a), (_, b)| a.raw_key.cmp(&b.raw_key));

let Some((lowest_idx, _)) = lowest_idx else {
// No more items
Expand Down
113 changes: 106 additions & 7 deletions smoltable/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod writer;
use self::row_reader::SingleRowReader;
use crate::{
query::{
count::{Input as CountInput, Output as CountOutput},
row::{
ColumnOptions as QueryRowColumnOptions, Input as QueryRowInput,
Output as QueryRowOutput, RowOptions as QueryRowInputRowOptions,
Expand Down Expand Up @@ -288,7 +289,7 @@ impl Smoltable {
}

batch.commit()?;
self.keyspace.persist(fjall::FlushMode::SyncAll)?;
self.keyspace.persist(fjall::PersistMode::SyncAll)?;

self.load_locality_groups()?;

Expand Down Expand Up @@ -357,6 +358,98 @@ impl Smoltable {
Ok((row_count, cell_count))
}

pub fn scan_count(&self, input: CountInput) -> crate::Result<CountOutput> {
use reader::Reader as TableReader;

let column_filter = &input.column.as_ref().and_then(|x| x.filter.clone());

let locality_groups_to_scan = get_affected_locality_groups(self, column_filter)?;
let instant = self.keyspace.instant();

let mut bytes_scanned_count: u64 = 0;
let mut cell_count = 0; // Cell count over all aggregated rows

let mut current_row_key: Option<String> = None;
let mut row_count = 0;

let affected_locality_groups = locality_groups_to_scan.len();

let readers = locality_groups_to_scan
.into_iter()
.map(|locality_group| match &input.row.scan {
ScanMode::Prefix(prefix) => {
TableReader::from_prefix(instant, locality_group, prefix)
}
ScanMode::Range(range) => {
TableReader::from_prefix(instant, locality_group, &range.start)
} // TODO: ScanMode::Ranges(ranges) => unimplemented!(),
})
.collect::<fjall::Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();

let mut reader = MergeReader::new(readers);

let mut should_be_terminated = false;

loop {
let Some(cell) = (&mut reader).next() else {
break;
};

let cell = cell?;

match &input.row.scan {
ScanMode::Prefix(prefix) => {
if !cell.row_key.starts_with(prefix) {
should_be_terminated = true;
continue;
}
}
ScanMode::Range(range) => {
if range.inclusive {
if cell.row_key > range.end {
should_be_terminated = true;
continue;
}
} else if cell.row_key >= range.end {
should_be_terminated = true;
continue;
}
}
}

if let Some(filter) = column_filter {
if !cell.satisfies_column_filter(filter) {
continue;
}
}

if current_row_key.is_none() || current_row_key.as_ref().unwrap() != &cell.row_key {
current_row_key = Some(cell.row_key);

// We are visiting a new row
row_count += 1;

if should_be_terminated {
break;
}
}

cell_count += 1;
}

bytes_scanned_count += reader.bytes_scanned_count();

Ok(CountOutput {
affected_locality_groups,
cell_count: cell_count as u64,
row_count: row_count as u64,
bytes_scanned_count,
})
}

// TODO: GC thrashes block cache

pub fn run_version_gc(&self) -> crate::Result<u64> {
Expand Down Expand Up @@ -587,6 +680,8 @@ impl Smoltable {

let mut reader = MergeReader::new(readers);

let mut should_be_terminated = false;

loop {
// We are gonna visit another cell, if the global cell limit is reached
// we can short circuit out of the loop
Expand All @@ -600,22 +695,22 @@ impl Smoltable {

let cell = cell?;

// TODO: test with multiple partitions, can only break once ALL partitions have emitted row key that doesn't match scan mode
// TODO: Reader.stop() -> is_finished = true, short circuits next() to None?
// TODO: merge reader needs "stop predicate" or something...
match &input.row.scan {
ScanMode::Prefix(prefix) => {
if !cell.row_key.starts_with(prefix) {
break;
should_be_terminated = true;
continue;
}
}
ScanMode::Range(range) => {
if range.inclusive {
if cell.row_key > range.end {
break;
should_be_terminated = true;
continue;
}
} else if cell.row_key >= range.end {
break;
should_be_terminated = true;
continue;
}
}
}
Expand Down Expand Up @@ -649,6 +744,10 @@ impl Smoltable {
}
}
}

if should_be_terminated {
break;
}
}

// TODO: test
Expand Down
1 change: 1 addition & 0 deletions smoltable/src/table/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Writer {

pub fn finalize(self) -> crate::Result<()> {
self.batch.commit()?;
self.table.keyspace.persist(fjall::PersistMode::SyncAll)?;
Ok(())
}
}

0 comments on commit ce08868

Please sign in to comment.