Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: on_event can subscribe to multiple endpoints at once #1881

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions dozer-api/src/generator/protoc/generator/template/proto.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ service {{plural_pascal_name}} {
{{#if enable_on_event}}
/**
* Subscribes to the Dozer event stream, optionally applies a filter. See [Query](../query) for the filter format.
*
* This API is unstable and may change in the future.
*/
rpc on_event({{pascal_name}}EventRequest) returns (stream {{pascal_name}}Event);
rpc on_event(dozer.types.EventFilter) returns (stream {{pascal_name}}Event);
{{/if}}
{{#if enable_token}}
// Gets the authentication token.
Expand All @@ -60,13 +58,6 @@ message Query{{plural_pascal_name}}Response {
}

{{#if enable_on_event}}
// Request for `on_event`.
message {{pascal_name}}EventRequest {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// JSON filter string.
optional string filter = 2;
}
// Response for `on_event`.
message {{pascal_name}}Event {
// The operation type.
Expand Down
32 changes: 17 additions & 15 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::auth::Access;

use crate::grpc::shared_impl;
use crate::grpc::shared_impl::{self, EndpointFilter};
use crate::grpc::types_helper::map_record;
use crate::CacheEndpoint;
use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService;
Expand Down Expand Up @@ -109,24 +110,25 @@ impl CommonGrpcService for CommonService {
let extensions = parts.1;
let query_request = parts.2;
let access = extensions.get::<Access>();
let endpoint = &query_request.endpoint;
let cache_endpoint = self
.endpoint_map
.get(endpoint)
.ok_or_else(|| Status::invalid_argument(endpoint))?;

let mut endpoints = HashMap::new();
for (endpoint, filter) in query_request.endpoints {
let cache_endpoint = self
.endpoint_map
.get(&endpoint)
.ok_or_else(|| Status::invalid_argument(&endpoint))?;
let schema = cache_endpoint.cache_reader().get_schema().0.clone();
endpoints.insert(
endpoint,
EndpointFilter::new(schema, filter.filter.as_deref())?,
);
}

shared_impl::on_event(
&cache_endpoint.cache_reader(),
query_request.filter.as_deref(),
endpoints,
self.event_notifier.as_ref().map(|r| r.resubscribe()),
access.cloned(),
move |op| {
if op.endpoint_name == query_request.endpoint {
Some(Ok(op))
} else {
None
}
},
Ok,
)
}

Expand Down
16 changes: 12 additions & 4 deletions dozer-api/src/grpc/common/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use dozer_types::grpc_types::{
common_grpc_service_server::CommonGrpcService, GetEndpointsRequest, GetFieldsRequest,
OnEventRequest, QueryRequest,
},
types::{value, EventType, FieldDefinition, OperationType, RecordWithId, Type, Value},
types::{
value, EventFilter, EventType, FieldDefinition, OperationType, RecordWithId, Type, Value,
},
};
use tonic::Request;

Expand Down Expand Up @@ -136,9 +138,15 @@ async fn test_grpc_common_on_event() {
let service = setup_common_service().await;
let mut rx = service
.on_event(Request::new(OnEventRequest {
endpoint: "films".to_string(),
r#type: EventType::All as i32,
filter: Some(r#"{ "film_id": 32 }"#.to_string()),
endpoints: [(
"films".to_string(),
EventFilter {
r#type: EventType::All as i32,
filter: Some(r#"{ "film_id": 32 }"#.to_string()),
},
)]
.into_iter()
.collect(),
}))
.await
.unwrap()
Expand Down
59 changes: 37 additions & 22 deletions dozer-api/src/grpc/shared_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
use std::collections::HashMap;

use dozer_cache::cache::expression::{default_limit_for_query, FilterExpression, QueryExpression};
use dozer_cache::cache::CacheRecord;
use dozer_cache::CacheReader;
use dozer_types::grpc_types::types::Operation;
use dozer_types::log::warn;
use dozer_types::serde_json;
use dozer_types::types::Schema;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -58,12 +61,33 @@ pub fn query(
Ok(records)
}

#[derive(Debug)]
pub struct EndpointFilter {
schema: Schema,
filter: Option<FilterExpression>,
}

impl EndpointFilter {
pub fn new(schema: Schema, filter: Option<&str>) -> Result<Self, Status> {
let filter = filter
.and_then(|filter| {
if filter.is_empty() {
None
} else {
Some(serde_json::from_str(filter))
}
})
.transpose()
.map_err(from_error)?;
Ok(Self { schema, filter })
}
}

pub fn on_event<T: Send + 'static>(
reader: &CacheReader,
filter: Option<&str>,
endpoints: HashMap<String, EndpointFilter>,
mut broadcast_receiver: Option<Receiver<Operation>>,
_access: Option<Access>,
event_mapper: impl Fn(Operation) -> Option<T> + Send + Sync + 'static,
event_mapper: impl Fn(Operation) -> T + Send + Sync + 'static,
) -> Result<Response<ReceiverStream<T>>, Status> {
// TODO: Use access.

Expand All @@ -73,18 +97,6 @@ pub fn on_event<T: Send + 'static>(
));
}

let filter = match filter {
Some(filter) => {
if filter.is_empty() {
None
} else {
Some(serde_json::from_str(filter).map_err(from_error)?)
}
}
None => None,
};
let schema = reader.get_schema().0.clone();

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
Expand All @@ -93,12 +105,15 @@ pub fn on_event<T: Send + 'static>(
let event = broadcast_receiver.recv().await;
match event {
Ok(op) => {
if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
if let Some(event) = event_mapper(op) {
if (tx.send(event).await).is_err() {
// receiver dropped
break;
}
if let Some(filter) = endpoints.get(&op.endpoint_name) {
if filter::op_satisfies_filter(
&op,
filter.filter.as_ref(),
&filter.schema,
) && (tx.send(event_mapper(op)).await).is_err()
{
// receiver dropped
break;
}
}
}
Expand Down
36 changes: 16 additions & 20 deletions dozer-api/src/grpc/typed/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use crate::{
CountResponseDesc, EventDesc, ProtoGenerator, QueryResponseDesc, ServiceDesc,
TokenResponseDesc,
},
grpc::shared_impl,
grpc::shared_impl::{self, EndpointFilter},
CacheEndpoint,
};
use dozer_cache::CacheReader;
use dozer_types::log::error;
use dozer_types::{grpc_types::types::Operation, models::api_security::ApiSecurity};
use dozer_types::{log::error, types::Schema};
use futures_util::future;
use prost_reflect::{MethodDescriptor, Value};
use std::{borrow::Cow, collections::HashMap, convert::Infallible};
Expand Down Expand Up @@ -188,8 +188,8 @@ impl TypedService {
fn call(&mut self, request: tonic::Request<DynamicMessage>) -> Self::Future {
future::ready(on_event(
request,
&self.cache_endpoint.cache_reader(),
&self.cache_endpoint.endpoint.name,
self.cache_endpoint.cache_reader().get_schema().0.clone(),
self.cache_endpoint.endpoint.name.clone(),
self.event_desc
.take()
.expect("This future shouldn't be polled twice"),
Expand Down Expand Up @@ -339,8 +339,8 @@ fn query(

fn on_event(
request: Request<DynamicMessage>,
reader: &CacheReader,
endpoint_name: &str,
schema: Schema,
endpoint_name: String,
event_desc: EventDesc,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
) -> Result<Response<ReceiverStream<Result<TypedResponse, tonic::Status>>>, Status> {
Expand All @@ -357,21 +357,17 @@ fn on_event(
.ok_or_else(|| Status::new(Code::InvalidArgument, "filter must be a string"))
})
.transpose()?;
let filter = EndpointFilter::new(schema, filter)?;

let endpoint_to_be_streamed = endpoint_name.to_string();
shared_impl::on_event(reader, filter, event_notifier, access.cloned(), move |op| {
if endpoint_to_be_streamed == op.endpoint_name {
match on_event_to_typed_response(op, event_desc.clone()) {
Ok(event) => Some(Ok(event)),
Err(e) => {
error!("On event error: {:?}", e);
None
}
}
} else {
None
}
})
shared_impl::on_event(
[(endpoint_name, filter)].into_iter().collect(),
event_notifier,
access.cloned(),
move |op| {
on_event_to_typed_response(op, event_desc.clone())
.map_err(|e| tonic::Status::internal(e.to_string()))
},
)
}

fn token(
Expand Down
Binary file modified dozer-api/src/grpc/typed/tests/generated_films.bin
Binary file not shown.
9 changes: 4 additions & 5 deletions dozer-api/src/grpc/typed/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use crate::{
};
use dozer_cache::cache::expression::{FilterExpression, QueryExpression};
use dozer_types::grpc_types::{
generated::films::FilmEventRequest,
generated::films::{
films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest,
QueryFilmsResponse,
},
types::{value, EventType, Operation, OperationType, Record, Value},
types::{value, EventFilter, EventType, Operation, OperationType, Record, Value},
};
use dozer_types::models::api_security::ApiSecurity;
use futures_util::FutureExt;
Expand Down Expand Up @@ -234,7 +233,7 @@ async fn test_typed_streaming1() {
let address = "http://127.0.0.1:14321".to_owned();
let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();

let request = FilmEventRequest {
let request = EventFilter {
r#type: EventType::All as i32,
filter: None,
};
Expand Down Expand Up @@ -263,7 +262,7 @@ async fn test_typed_streaming2() {
});
tokio::time::sleep(Duration::from_millis(1001)).await;
let address = "http://127.0.0.1:14322".to_owned();
let request = FilmEventRequest {
let request = EventFilter {
r#type: EventType::All as i32,
filter: Some(r#"{ "film_id": 32 }"#.into()),
};
Expand Down Expand Up @@ -294,7 +293,7 @@ async fn test_typed_streaming3() {
tokio::time::sleep(Duration::from_millis(1001)).await;
let address = "http://127.0.0.1:14323".to_owned();
let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
let request = FilmEventRequest {
let request = EventFilter {
r#type: EventType::All as i32,
filter: Some(r#"{ "film_id": 0 }"#.into()),
};
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dozer_cache::cache::LmdbRwCacheManager;
use dozer_cache::dozer_log::home_dir::HomeDir;
use dozer_core::app::AppPipeline;
use dozer_core::dag_schemas::DagSchemas;
use dozer_types::models::flags::default_push_events;
use tokio::select;

use crate::console_helper::get_colored_text;
Expand Down Expand Up @@ -310,7 +311,7 @@ impl SimpleOrchestrator {
.flags
.as_ref()
.map(|flags| flags.push_events)
.unwrap_or(false);
.unwrap_or_else(default_push_events);
let contract = build::Contract::new(
&dag_schemas,
&self.config.endpoints,
Expand Down
8 changes: 2 additions & 6 deletions dozer-types/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ message CountResponse {

// Request for `OnEvent`.
message OnEventRequest {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// The name of the endpoint to subscribe to.
string endpoint = 2;
// JSON filter string.
optional string filter = 3;
// The endpoints to subscribe to. Key is the endpoint name, value is the filter.
map<string, dozer.types.EventFilter> endpoints = 1;
}

// Request for `getFields`.
Expand Down
12 changes: 1 addition & 11 deletions dozer-types/protos/films.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ service Films {

/**
* Subscribes to the Dozer event stream, optionally applies a filter. See [Query](../query) for the filter format.
*
* This API is unstable and may change in the future.
*/
rpc on_event(FilmEventRequest) returns (stream FilmEvent);
rpc on_event(dozer.types.EventFilter) returns (stream FilmEvent);

// Gets the authentication token.
rpc token(TokenRequest) returns (TokenResponse);
Expand All @@ -58,14 +56,6 @@ message QueryFilmsResponse {
repeated FilmWithId records = 1;
}

// Request for `on_event`.
message FilmEventRequest {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// JSON filter string.
optional string filter = 2;
}

// Response for `on_event`.
message FilmEvent {
// The operation type.
Expand Down
8 changes: 8 additions & 0 deletions dozer-types/protos/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ enum EventType {
DELETE_ONLY = 3; // Only DELETE events.
}

// Event filter.
message EventFilter {
// The event type to subscribe to.
dozer.types.EventType type = 1;
// JSON filter string.
optional string filter = 3;
}

// The event types.
enum OperationType {
INSERT = 0; // INSERT operation.
Expand Down
Loading
Loading