Skip to content

Commit

Permalink
fix(coordinator): Keep rollover task alive
Browse files Browse the repository at this point in the history
Before we would end up bubbling up the error on `pool.get()`, which
would silently break out of the loop, so we would stop processin new
`NewUserMessage`s.
  • Loading branch information
luckysori committed Dec 12, 2023
1 parent 97db65e commit 7b236da
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use trade::ContractSymbol;

#[derive(Debug, Clone)]
Expand All @@ -51,20 +52,20 @@ pub fn monitor(
notifier: mpsc::Sender<OrderbookMessage>,
network: Network,
node: Node,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let node = node.clone();
let pool = pool.clone();
async move {
if let Err(e) = node
.check_if_eligible_for_rollover(
&mut conn,
pool,
notifier,
new_user_msg.new_user,
network,
Expand All @@ -87,7 +88,6 @@ pub fn monitor(
),
}
}
Ok(())
}
.remote_handle();

Expand Down Expand Up @@ -156,14 +156,18 @@ impl Rollover {
impl Node {
async fn check_if_eligible_for_rollover(
&self,
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
network: Network,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover");
if let Some(position) = positions::Position::get_position_by_trader(
conn,
&mut conn,
trader_id,
vec![PositionState::Open, PositionState::Rollover],
)? {
Expand All @@ -172,7 +176,9 @@ impl Node {
.get_signed_channel_by_trader_id(position.trader)?;

let (retry_rollover, contract_id) = match position.position_state {
PositionState::Rollover => self.rollback_channel_if_needed(conn, signed_channel)?,
PositionState::Rollover => {
self.rollback_channel_if_needed(&mut conn, signed_channel)?
}
PositionState::Open => (false, signed_channel.get_contract_id()),
_ => bail!("Unexpected position state {:?}", position.position_state),
};
Expand Down

0 comments on commit 7b236da

Please sign in to comment.