Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep long-running tasks alive #1725

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 81 additions & 48 deletions coordinator/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::db::user;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use anyhow::Context;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use commons::Message;
Expand All @@ -16,8 +17,10 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;

/// This value is arbitrarily set to 100 and defines the message accepted in the message
/// This value is arbitrarily set to 100 and defines theff message accepted in the message
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

Expand All @@ -33,14 +36,14 @@ pub enum OrderbookMessage {
#[derive(Clone)]
pub struct NewUserMessage {
pub new_user: PublicKey,
pub sender: mpsc::Sender<Message>,
pub sender: Sender<Message>,
}

pub fn spawn_delivering_messages_to_authenticated_users(
pool: Pool<ConnectionManager<PgConnection>>,
notification_sender: mpsc::Sender<Notification>,
notification_sender: Sender<Notification>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<OrderbookMessage>) {
) -> (RemoteHandle<()>, Sender<OrderbookMessage>) {
let (sender, mut receiver) = mpsc::channel::<OrderbookMessage>(NOTIFICATION_BUFFER_SIZE);

let authenticated_users = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -57,12 +60,12 @@ pub fn spawn_delivering_messages_to_authenticated_users(
.insert(new_user_msg.new_user, new_user_msg.sender);
}
Err(RecvError::Closed) => {
tracing::error!("New user message sender died! Channel closed.");
tracing::error!("New user message sender died! Channel closed");
break;
}
Err(RecvError::Lagged(skip)) => tracing::warn!(%skip,
"Lagging behind on new user message."
),
Err(RecvError::Lagged(skip)) => {
tracing::warn!(%skip, "Lagging behind on new user message")
}
}
}
}
Expand All @@ -71,49 +74,19 @@ pub fn spawn_delivering_messages_to_authenticated_users(
let (fut, remote_handle) = {
async move {
while let Some(notification) = receiver.recv().await {
let mut conn = pool.get()?;
match notification {
OrderbookMessage::TraderMessage { trader_id, message , notification} => {
tracing::info!(%trader_id, "Sending trader message: {message:?}");

let trader = authenticated_users.read().get(&trader_id).cloned();

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!(%trader_id, "Connection lost to trader {e:#}");
} else {
tracing::trace!(%trader_id, "Skipping optional push notifications as the user was successfully notified via the websocket.");
continue;
}
}
None => tracing::warn!(%trader_id, "Trader is not connected."),
};

if let (Some(notification_kind),Some(user)) = (notification, user::by_id(&mut conn, trader_id.to_string())?) {
tracing::debug!(%trader_id, "Sending push notification to user");

match FcmToken::new(user.fcm_token) {
Ok(fcm_token) => {
if let Err(e) = notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind,
})
.await {
tracing::error!(%trader_id, "Failed to send push notification. Error: {e:#}");
}
}
Err(error) => {
tracing::error!(%trader_id, "Could not send notification to user. Error: {error:#}");
}
}
}
}
if let Err(e) = process_orderbook_message(
pool.clone(),
&authenticated_users,
&notification_sender,
notification,
)
.await
{
tracing::error!("Failed to process orderbook message: {e:#}");
}
}

Ok(())
tracing::error!("Channel closed");
}
.remote_handle()
};
Expand All @@ -122,3 +95,63 @@ pub fn spawn_delivering_messages_to_authenticated_users(

(remote_handle, sender)
}

async fn process_orderbook_message(
pool: Pool<ConnectionManager<PgConnection>>,
authenticated_users: &RwLock<HashMap<PublicKey, Sender<Message>>>,
notification_sender: &Sender<Notification>,
notification: OrderbookMessage,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

match notification {
OrderbookMessage::TraderMessage {
trader_id,
message,
notification,
} => {
tracing::info!(%trader_id, ?message, "Sending trader message");

let trader = authenticated_users.read().get(&trader_id).cloned();

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!(%trader_id, "Connection lost to trader: {e:#}");
} else {
tracing::trace!(
%trader_id,
"Skipping optional push notifications as the user was successfully \
notified via the websocket"
);
return Ok(());
}
}
None => tracing::warn!(%trader_id, "Trader is not connected"),
};

let user = user::by_id(&mut conn, trader_id.to_string())
.context("Failed to get user by ID")?;

if let (Some(notification_kind), Some(user)) = (notification, user) {
tracing::debug!(%trader_id, "Sending push notification to user");

let fcm_token = FcmToken::new(user.fcm_token)?;

notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind,
})
.await
.with_context(|| {
format!("Failed to send push notification to trader {trader_id}")
})?;
}
}
}

Ok(())
}
20 changes: 13 additions & 7 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use trade::ContractSymbol;

#[derive(Debug, Clone)]
Expand All @@ -51,20 +52,20 @@ pub fn monitor(
notifier: mpsc::Sender<OrderbookMessage>,
network: Network,
node: Node,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let node = node.clone();
let pool = pool.clone();
async move {
if let Err(e) = node
.check_if_eligible_for_rollover(
&mut conn,
pool,
notifier,
new_user_msg.new_user,
network,
Expand All @@ -87,7 +88,6 @@ pub fn monitor(
),
}
}
Ok(())
}
.remote_handle();

Expand Down Expand Up @@ -156,14 +156,18 @@ impl Rollover {
impl Node {
async fn check_if_eligible_for_rollover(
&self,
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
network: Network,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;
Comment on lines +164 to +166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we spawn_blocking here but not elsewhere? Additionally, can we not use a native async connection pool if this is a big issue?

Copy link
Contributor Author

@luckysori luckysori Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we spawn_blocking here but not elsewhere?

What do you mean by "elsewhere"? In this PR I have tried to consistently use spawn_blocking when calling pool.get().

It's very possible that we aren't doing this consistently, but we should because get blocks until the connection is retrieved or until a timeout expires.

Additionally, can we not use a native async connection pool if this is a big issue?

Yep, we can, although we didn't prioritise it thus far. It looks like we could use https://github.com/djc/bb8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "elsewhere"? In this PR I have tried to consistently use spawn_blocking when calling pool.get().

My mistake, maybe I was checking the deleted and not added part 😅


tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover");
if let Some(position) = positions::Position::get_position_by_trader(
conn,
&mut conn,
trader_id,
vec![PositionState::Open, PositionState::Rollover],
)? {
Expand All @@ -172,7 +176,9 @@ impl Node {
.get_signed_channel_by_trader_id(position.trader)?;

let (retry_rollover, contract_id) = match position.position_state {
PositionState::Rollover => self.rollback_channel_if_needed(conn, signed_channel)?,
PositionState::Rollover => {
self.rollback_channel_if_needed(&mut conn, signed_channel)?
}
PositionState::Open => (false, signed_channel.get_contract_id()),
_ => bail!("Unexpected position state {:?}", position.position_state),
};
Expand Down
43 changes: 32 additions & 11 deletions coordinator/src/orderbook/async_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,37 @@ use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<OrderbookMessage>,
network: Network,
oracle_pk: XOnlyPublicKey,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let pool = pool.clone();
async move {
tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches");
if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user, network, oracle_pk).await {
tracing::debug!(
trader_id=%new_user_msg.new_user,
"Checking if the user needs to be notified about pending matches"
);
if let Err(e) = process_pending_match(
pool,
notifier,
new_user_msg.new_user,
network,
oracle_pk,
)
.await
{
tracing::error!("Failed to process pending match. Error: {e:#}");
}
}
Expand All @@ -50,13 +62,16 @@ pub fn monitor(
tracing::error!("New user message sender died! Channel closed.");
break;
}
Err(RecvError::Lagged(skip)) => tracing::warn!(%skip,
Err(RecvError::Lagged(skip)) => {
tracing::warn!(
%skip,
"Lagging behind on new user message."
),
)
}
}
}
Ok(())
}.remote_handle();
}
.remote_handle();

tokio::spawn(fut);

Expand All @@ -65,16 +80,22 @@ pub fn monitor(

/// Checks if there are any pending matches
async fn process_pending_match(
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
network: Network,
oracle_pk: XOnlyPublicKey,
) -> Result<()> {
if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

if let Some(order) =
orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)?
{
tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match");

let matches = matches::get_matches_by_order_id(conn, order.id)?;
let matches = matches::get_matches_by_order_id(&mut conn, order.id)?;
let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?;

let message = match order.order_reason {
Expand Down
16 changes: 10 additions & 6 deletions coordinator/src/orderbook/collaborative_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ use rust_decimal::Decimal;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<OrderbookMessage>,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let pool = pool.clone();
async move {
tracing::debug!(
trader_id=%new_user_msg.new_user,
Expand All @@ -38,7 +39,7 @@ pub fn monitor(
);

if let Err(e) = process_pending_collaborative_revert(
&mut conn,
pool,
notifier,
new_user_msg.new_user,
)
Expand All @@ -60,7 +61,6 @@ pub fn monitor(
),
}
}
Ok(())
}
.remote_handle();

Expand All @@ -71,11 +71,15 @@ pub fn monitor(

/// Checks if there are any pending collaborative reverts
async fn process_pending_collaborative_revert(
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
) -> Result<()> {
match collaborative_reverts::by_trader_pubkey(trader_id.to_string().as_str(), conn)? {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

match collaborative_reverts::by_trader_pubkey(trader_id.to_string().as_str(), &mut conn)? {
None => {
// nothing to revert
}
Expand Down
Loading