Skip to content

Commit

Permalink
apply optimizations to other services
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 19, 2024
1 parent 26575cd commit 5cd1fa5
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
41 changes: 36 additions & 5 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::interval;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
Expand Down Expand Up @@ -62,13 +64,42 @@ impl EventManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventManager>,
simple_broker: Pin<Box<dyn Stream<Item = Event> + Send>>,
update_sender: Sender<Event>,
}

impl Service {
pub fn new(subs_manager: Arc<EventManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()) }
let (update_sender, update_receiver) = channel(100);
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), update_sender };

// Spawn a task to process event updates
tokio::spawn(Self::process_updates(Arc::clone(&subs_manager), update_receiver));

service
}

async fn process_updates(subs: Arc<EventManager>, mut update_receiver: Receiver<Event>) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for event in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
}
}
}
}
Some(event) = update_receiver.recv() => {
pending_updates.push(event);
}
}
}
}

async fn publish_updates(subs: Arc<EventManager>, event: &Event) -> Result<(), Error> {
Expand Down Expand Up @@ -151,10 +182,10 @@ impl Future for Service {
let pin = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let sender = pin.update_sender.clone();

Check warning on line 185 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L185

Added line #L185 was not covered by tests
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
if let Err(e) = sender.send(event).await {
error!(target = LOG_TARGET, error = %e, "Sending event update to channel.");

Check warning on line 188 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L187-L188

Added lines #L187 - L188 were not covered by tests
}
});
}
Expand Down
62 changes: 48 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::interval;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
Expand Down Expand Up @@ -75,10 +77,8 @@ impl EventMessageManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
update_sender: Sender<EventMessage>,
}

impl Service {
Expand All @@ -87,11 +87,47 @@ impl Service {
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
let (update_sender, update_receiver) = channel(100);
let service = Self {
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()),
update_sender,
};

// Spawn a task to process event message updates
tokio::spawn(Self::process_updates(
Arc::clone(&subs_manager),
Arc::clone(&model_cache),
pool,
update_receiver,
));

service
}

async fn process_updates(
subs: Arc<EventMessageManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<EventMessage>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for event_message in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await {
error!(target = LOG_TARGET, error = %e, "Publishing event message update.");
}
}
}
}
Some(event_message) = update_receiver.recv() => {
pending_updates.push(event_message);
}
}
}
}

Expand Down Expand Up @@ -241,13 +277,11 @@ impl Future for Service {
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
let sender = pin.update_sender.clone();

Check warning on line 281 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L281

Added line #L281 was not covered by tests
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(event_message).await {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");

Check warning on line 284 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L283-L284

Added lines #L283 - L284 were not covered by tests
}
});
}
Expand Down

0 comments on commit 5cd1fa5

Please sign in to comment.