Skip to content

Commit

Permalink
refactor-opt(torii-grpc): subscriptions no db fetch (#2455)
Browse files Browse the repository at this point in the history
* refactor-opt(torii-grpc): subscriptions no db fetch

* update grpc

* fmt

* cmt
  • Loading branch information
Larkooo authored Sep 20, 2024
1 parent 7ff6593 commit daf0b65
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 131 deletions.
7 changes: 4 additions & 3 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::VecDeque;

use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use dojo_types::schema::{Struct, Ty};
use sqlx::{FromRow, Pool, Sqlite};
use starknet::core::types::Felt;

Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct DeleteEntityQuery {
pub entity_id: String,
pub event_id: String,
pub block_timestamp: String,
pub entity: Ty,
pub ty: Ty,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -115,7 +115,8 @@ impl QueryQueue {
.fetch_one(&mut *tx)
.await?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity.entity);
entity_updated.updated_model =
Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] }));

let count = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM entity_model WHERE entity_id = ?",
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl Sql {
entity_id: entity_id.clone(),
event_id: event_id.to_string(),
block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
entity: entity.clone(),
ty: entity.clone(),
}),
);

Expand Down
14 changes: 4 additions & 10 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,11 @@ impl DojoWorld {
Arc::clone(&state_diff_manager),
));

tokio::task::spawn(subscriptions::entity::Service::new(
pool.clone(),
Arc::clone(&entity_manager),
Arc::clone(&model_cache),
));
tokio::task::spawn(subscriptions::entity::Service::new(Arc::clone(&entity_manager)));

tokio::task::spawn(subscriptions::event_message::Service::new(
pool.clone(),
Arc::clone(&event_message_manager),
Arc::clone(&model_cache),
));
tokio::task::spawn(subscriptions::event_message::Service::new(Arc::clone(
&event_message_manager,
)));

tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager)));

Expand Down
70 changes: 10 additions & 60 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,17 @@ use std::task::{Context, Poll};
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Entity;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::server::map_row_to_entity;
use crate::types::{EntityKeysClause, PatternMatching};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";
Expand Down Expand Up @@ -81,32 +77,16 @@ impl EntityManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
}

impl Service {
pub fn new(
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()),
}
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()) }
}

async fn publish_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
entity: &Entity,
) -> Result<(), Error> {
async fn publish_updates(subs: Arc<EntityManager>, entity: &Entity) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
let keys = entity
Expand Down Expand Up @@ -204,41 +184,13 @@ impl Service {
continue;
}

let models_query = r#"
SELECT group_concat(entity_model.model_id) as model_ids
FROM entities
JOIN entity_model ON entities.id = entity_model.entity_id
WHERE entities.id = ?
GROUP BY entities.id
"#;
let (model_ids,): (String,) =
sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?;
let model_ids: Vec<Felt> = model_ids
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
"entities",
"entity_id",
Some("entities.id = ?"),
Some("entities.id = ?"),
None,
None,
)?;

let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?;
let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let row = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?;
arrays_rows.insert(name, row);
}

// This should NEVER be None
let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone();
let resp = proto::world::SubscribeEntityResponse {
entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?),
entity: Some(proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![model.into()],
}),
subscription_id: *idx,
};

Expand All @@ -264,10 +216,8 @@ impl Future for Service {

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
Expand Down
66 changes: 9 additions & 57 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ use std::task::{Context, Poll};
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::RwLock;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::EventMessage;
Expand All @@ -23,7 +20,6 @@ use tracing::{error, trace};
use super::entity::EntitiesSubscriber;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::server::map_row_to_entity;
use crate::types::{EntityKeysClause, PatternMatching};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";
Expand Down Expand Up @@ -75,30 +71,17 @@ impl EventMessageManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
}

impl Service {
pub fn new(
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()),
}
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()) }
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
entity: &EventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -182,42 +165,13 @@ impl Service {
continue;
}

// publish all updates if ids is empty or only ids that are subscribed to
let models_query = r#"
SELECT group_concat(event_model.model_id) as model_ids
FROM event_messages
JOIN event_model ON event_messages.id = event_model.entity_id
WHERE event_messages.id = ?
GROUP BY event_messages.id
"#;
let (model_ids,): (String,) =
sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?;
let model_ids: Vec<Felt> = model_ids
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
"event_messages",
"event_message_id",
Some("event_messages.id = ?"),
Some("event_messages.id = ?"),
None,
None,
)?;

let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?;
let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let rows = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?;
arrays_rows.insert(name, rows);
}

// This should NEVER be None
let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone();
let resp = proto::world::SubscribeEntityResponse {
entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?),
entity: Some(proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![model.into()],
}),
subscription_id: *idx,
};

Expand All @@ -243,10 +197,8 @@ impl Future for Service {

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
Expand Down

0 comments on commit daf0b65

Please sign in to comment.