Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: added support for aggregation queries #33

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 82 additions & 47 deletions crates/emulator-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{mem, sync::Arc};

use firestore_database::{
event::DatabaseEvent,
Expand All @@ -12,14 +12,13 @@ use googleapis::google::{
firestore::v1::{
firestore_server::FirestoreServer,
structured_query::{CollectionSelector, FieldReference},
transaction_options::ReadWrite,
*,
},
protobuf::{Empty, Timestamp},
};
use itertools::Itertools;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::{once, wrappers::ReceiverStream, StreamExt};
use tonic::{async_trait, codec::CompressionEncoding, Code, Request, Response, Result, Status};
use tracing::{info, info_span, instrument, Instrument};

Expand Down Expand Up @@ -103,17 +102,17 @@ impl firestore_server::Firestore for FirestoreEmulator {
.map(|name| name.parse::<DocumentRef>())
.try_collect()?;

// Only used for new transactions.
let (mut new_transaction, read_consistency) = match consistency_selector {
Some(batch_get_documents_request::ConsistencySelector::NewTransaction(
transaction_options,
)) => {
unimplemented_option!(transaction_options.mode);
let id = database.new_txn().await?;
let (
// Only used for new transactions.
mut new_transaction,
read_consistency,
) = match consistency_selector {
Some(batch_get_documents_request::ConsistencySelector::NewTransaction(txn_opts)) => {
let id = database.new_txn(Some(txn_opts)).await?;
info!("started new transaction");
(Some(id.into()), ReadConsistency::Transaction(id))
(id.into(), ReadConsistency::Transaction(id))
}
s => (None, s.try_into()?),
s => (vec![], s.try_into()?),
};
info!(?read_consistency);

Expand All @@ -129,7 +128,7 @@ impl firestore_server::Firestore for FirestoreEmulator {
Some(doc) => Found(doc),
}),
read_time: Some(Timestamp::now()),
transaction: new_transaction.take().unwrap_or_default(),
transaction: mem::take(&mut new_transaction),
}),
Err(err) => Err(Status::from(err)),
};
Expand Down Expand Up @@ -201,7 +200,6 @@ impl firestore_server::Firestore for FirestoreEmulator {
unimplemented_bool!(show_missing);
unimplemented_collection!(order_by);
unimplemented_collection!(page_token);
unimplemented_option!(mask);
if page_size > 0 {
unimplemented!("page_size");
}
Expand Down Expand Up @@ -267,25 +265,12 @@ impl firestore_server::Firestore for FirestoreEmulator {
) -> Result<Response<BeginTransactionResponse>> {
let BeginTransactionRequest { database, options } = request.into_inner();

let database = self.project.database(&database.parse()?).await;

use transaction_options::Mode;

let txn_id = match options {
None => database.new_txn().await?,
Some(TransactionOptions {
mode: None | Some(Mode::ReadOnly(_)),
}) => {
unimplemented!("read-only transactions")
}
Some(TransactionOptions {
mode: Some(Mode::ReadWrite(ReadWrite { retry_transaction })),
}) => {
let id = retry_transaction.try_into()?;
database.new_txn_with_id(id).await?;
id
}
};
let txn_id = self
.project
.database(&database.parse()?)
.await
.new_txn(options)
.await?;

info!(?txn_id);
Ok(Response::new(BeginTransactionResponse {
Expand Down Expand Up @@ -321,22 +306,29 @@ impl firestore_server::Firestore for FirestoreEmulator {
consistency_selector,
} = request.into_inner();
unimplemented_option!(explain_options);
let Some(run_query_request::QueryType::StructuredQuery(query)) = query_type else {
unimplemented!("query without query")
};
let run_query_request::QueryType::StructuredQuery(query) = mandatory!(query_type);

let parent: Ref = parent.parse()?;

let docs = self
.project
.database(parent.root())
.await
.run_query(parent, query, consistency_selector.try_into()?)
.await?;
let database = self.project.database(parent.root()).await;

let stream = tokio_stream::iter(docs).map(|doc| {
let (
// Only used for new transactions.
mut new_transaction,
read_consistency,
) = match consistency_selector {
Some(run_query_request::ConsistencySelector::NewTransaction(txn_opts)) => {
let id = database.new_txn(Some(txn_opts)).await?;
info!("started new transaction");
(id.into(), ReadConsistency::Transaction(id))
}
s => (vec![], s.try_into()?),
};
let docs = database.run_query(parent, query, read_consistency).await?;

let stream = tokio_stream::iter(docs).map(move |doc| {
Ok(RunQueryResponse {
transaction: vec![],
transaction: mem::take(&mut new_transaction),
document: Some(doc),
read_time: Some(Timestamp::now()),
skipped_results: 0,
Expand All @@ -349,7 +341,7 @@ impl firestore_server::Firestore for FirestoreEmulator {
}

/// Server streaming response type for the RunAggregationQuery method.
type RunAggregationQueryStream = tonic::Streaming<RunAggregationQueryResponse>;
type RunAggregationQueryStream = tokio_stream::Once<Result<RunAggregationQueryResponse>>;

/// Runs an aggregation query.
///
Expand All @@ -367,9 +359,52 @@ impl firestore_server::Firestore for FirestoreEmulator {
#[instrument(skip_all, err)]
async fn run_aggregation_query(
&self,
_request: Request<RunAggregationQueryRequest>,
request: Request<RunAggregationQueryRequest>,
) -> Result<Response<Self::RunAggregationQueryStream>> {
unimplemented!("run_aggregation_query")
let RunAggregationQueryRequest {
parent,
explain_options,
query_type,
consistency_selector,
} = request.into_inner();
unimplemented_option!(explain_options);

let run_aggregation_query_request::QueryType::StructuredAggregationQuery(agg_query) =
mandatory!(query_type);

let structured_aggregation_query::QueryType::StructuredQuery(query) =
mandatory!(agg_query.query_type);

let parent: Ref = parent.parse()?;

let database = self.project.database(parent.root()).await;

let (
// Only used for new transactions.
new_transaction,
read_consistency,
) = match consistency_selector {
Some(run_aggregation_query_request::ConsistencySelector::NewTransaction(txn_opts)) => {
let id = database.new_txn(Some(txn_opts)).await?;
info!("started new transaction");
(id.into(), ReadConsistency::Transaction(id))
}
s => (vec![], s.try_into()?),
};

info!(?read_consistency);
let aggregate_fields = database
.run_aggregation_query(parent, query, agg_query.aggregations, read_consistency)
.await?;

let response = RunAggregationQueryResponse {
result: Some(AggregationResult { aggregate_fields }),
explain_metrics: None,
transaction: new_transaction,
read_time: Some(Timestamp::now()),
};

Ok(Response::new(once(Ok(response))))
}

/// Partitions a query by returning partition cursors that can be used to run
Expand Down
12 changes: 9 additions & 3 deletions crates/emulator-grpc/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@ macro_rules! unimplemented {
macro_rules! unimplemented_option {
($val:expr) => {
if $val.is_some() {
unimplemented!(stringify!($val))
unimplemented!(stringify!($val));
}
};
}

macro_rules! unimplemented_collection {
($val:expr) => {
if !$val.is_empty() {
unimplemented!(stringify!($val))
unimplemented!(stringify!($val));
}
};
}

macro_rules! unimplemented_bool {
($val:expr) => {
if $val {
unimplemented!(stringify!($val))
unimplemented!(stringify!($val));
}
};
}

macro_rules! mandatory {
($val:expr) => {
$val.ok_or_else(|| Status::invalid_argument(concat!("missing ", stringify!($val))))?
};
}
Loading
Loading