Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add insert and delete APIs to orderbook WS #2418

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion coordinator/src/orderbook/db/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ pub fn get_all_matched_market_orders_by_order_reason(
pub fn insert_limit_order(
conn: &mut PgConnection,
order: NewLimitOrder,
// TODO: All limit orders are "manual".
order_reason: OrderBookOrderReason,
) -> QueryResult<OrderbookOrder> {
let new_order = NewOrder {
Expand Down Expand Up @@ -388,7 +389,22 @@ pub fn set_is_taken(
}
}

/// Updates the order state to `Deleted`
/// Mark an order as [`OrderState::Deleted`], if it belongs to the given `trader_id`.
pub fn delete_trader_order(
conn: &mut PgConnection,
id: Uuid,
trader_id: PublicKey,
) -> QueryResult<OrderbookOrder> {
let order: Order = diesel::update(orders::table)
.filter(orders::trader_order_id.eq(id))
.filter(orders::trader_id.eq(trader_id.to_string()))
.set(orders::order_state.eq(OrderState::Deleted))
.get_result(conn)?;

Ok(OrderbookOrder::from(order))
}

/// Mark an order as [`OrderState::Deleted`].
pub fn delete(conn: &mut PgConnection, id: Uuid) -> QueryResult<OrderbookOrder> {
set_order_state(conn, id, commons::OrderState::Deleted)
}
Expand Down
4 changes: 1 addition & 3 deletions coordinator/src/orderbook/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ pub async fn post_order(
NewOrder::Market(o) => {
orders::insert_market_order(&mut conn, o.clone(), OrderReason::Manual)
}
NewOrder::Limit(o) => {
orders::insert_limit_order(&mut conn, o.clone(), OrderReason::Manual)
}
NewOrder::Limit(o) => orders::insert_limit_order(&mut conn, o, OrderReason::Manual),
}
.map_err(|e| anyhow!(e))
.context("Failed to insert new order into DB")?;
Expand Down
121 changes: 121 additions & 0 deletions coordinator/src/orderbook/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ use crate::db;
use crate::db::user;
use crate::message::NewUserMessage;
use crate::orderbook::db::orders;
use crate::orderbook::trading::NewOrderMessage;
use crate::referrals;
use crate::routes::AppState;
use anyhow::bail;
use anyhow::Result;
use axum::extract::ws::Message as WebsocketMessage;
use axum::extract::ws::WebSocket;
use bitcoin::secp256k1::PublicKey;
use commons::create_sign_message;
use commons::Message;
use commons::NewLimitOrder;
use commons::OrderReason;
use commons::OrderbookRequest;
use commons::ReferralStatus;
use commons::TenTenOneConfig;
Expand All @@ -18,9 +24,66 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use uuid::Uuid;

const WEBSOCKET_SEND_TIMEOUT: Duration = Duration::from_secs(5);

async fn handle_insert_order(
state: Arc<AppState>,
trader_id: PublicKey,
order: NewLimitOrder,
) -> Result<()> {
if order.trader_id != trader_id {
bail!("Maker {trader_id} tried to trade on behalf of someone else: {order:?}");
}

tracing::trace!(?order, "Inserting order");

let order = spawn_blocking({
let mut conn = state.pool.clone().get()?;
move || {
let order = orders::insert_limit_order(&mut conn, order, OrderReason::Manual)?;

anyhow::Ok(order)
}
})
.await??;

let _ = state
.trading_sender
.send(NewOrderMessage {
order,
channel_opening_params: None,
order_reason: OrderReason::Manual,
})
.await;

Ok(())
}

async fn handle_delete_order(
state: Arc<AppState>,
trader_id: PublicKey,
order_id: Uuid,
) -> Result<()> {
tracing::trace!(%order_id, "Deleting order");

spawn_blocking({
let mut conn = state.pool.clone().get()?;
move || {
orders::delete_trader_order(&mut conn, order_id, trader_id)?;

anyhow::Ok(())
}
})
.await??;

let _ = state.tx_orderbook_feed.send(Message::DeleteOrder(order_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order_id is the trader_order_id right? If so, the traders receiving the DeleteOrder msg might not know what todo with this.

The user connected to the websocket shouldn't know about the trader_order_id, so did this ever work? 🙈

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I made a mistake. I'll test it again locally.

Copy link
Contributor

@bonomat bonomat Apr 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you didn't it's been like that before. Maybe this was a reason why we saw outdated orders on the client side...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I've been thinking about it and I don't see the problem with those warnings we get on the app.

As far as I can tell, what happens is:

  1. The maker generates an order.
  2. The maker deletes the order.
  3. The app starts and connects to the coordinator's WS, getting an AllOrders message that does not include the deleted order.
  4. The WS still sends the app the corresponding DeleteOrder message from step 2.

I don't understand why the "outdated" message is delivered to the app, but I think we should demote the WARN to DEBUG because I think this is harmless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into it and we are actually sending the trader_order_id to all users 🙈 It's probably not an issue, but I think it wasn't meant to be used like that:

Here are we mapping the trader_order_id from the db to order.id for the orders we are returning to the users.

id: value.trader_order_id,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the "outdated" message is delivered to the app, but I think we should demote the WARN to DEBUG because I think this is harmless.

Imho this happens because we are here also clinsing outdated orders here and we might have a race condition where the maker deletes an order and this is run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into it and we are actually sending the trader_order_id to all users 🙈 It's probably not an issue, but I think it wasn't meant to be used like that:

Here are we mapping the trader_order_id from the db to order.id for the orders we are returning to the users.

id: value.trader_order_id,

Yeah, that makes sense.

Did we ever have an order ID that wasn't the trader order ID though? I can't find another one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, on DB level we have a different one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but that one has only ever been used internally:

    // this id is only internally but needs to be here or diesel complains
    #[allow(dead_code)]
    pub id: i32,


Ok(())
}

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending messages).
Expand Down Expand Up @@ -83,8 +146,53 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc<AppState>) {
// Spawn a task that takes messages from the websocket
let local_sender = local_sender.clone();
let mut recv_task = tokio::spawn(async move {
let mut whitelisted_maker = Option::<PublicKey>::None;

while let Some(Ok(WebsocketMessage::Text(text))) = receiver.next().await {
match serde_json::from_str(text.as_str()) {
Ok(OrderbookRequest::InsertOrder(order)) => {
let order_id = order.id;

match whitelisted_maker {
Some(authenticated_trader_id) => {
if let Err(e) =
handle_insert_order(state.clone(), authenticated_trader_id, order)
.await
{
tracing::error!(%order_id, "Failed to insert order: {e:#}");
// TODO: Send error to peer.
}
}
None => {
tracing::error!(
?order,
"Failed to insert order: maker not yet authenticated"
);
}
}
}
Ok(OrderbookRequest::DeleteOrder(order_id)) => {
match whitelisted_maker {
Some(authenticated_trader_id) => {
if let Err(e) = handle_delete_order(
state.clone(),
authenticated_trader_id,
order_id,
)
.await
{
tracing::error!(%order_id, "Failed to delete order: {e:#}");
// TODO: Send error to peer.
}
}
None => {
tracing::error!(
%order_id,
"Failed to delete order: maker not yet authenticated"
);
}
}
}
Ok(OrderbookRequest::Authenticate {
fcm_token,
version,
Expand Down Expand Up @@ -149,7 +257,20 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc<AppState>) {
new_user: trader_id,
sender: local_sender.clone(),
};

tracing::debug!(%trader_id, "New login");

// Check if the trader is a whitelisted maker.
{
let settings = state.settings.read().await;

if !settings.whitelist_enabled
|| settings.whitelisted_makers.contains(&trader_id)
{
whitelisted_maker = Some(trader_id);
}
}

if let Err(e) = state.tx_user_feed.send(message) {
tracing::error!(%trader_id, "Could not send new user message. Error: {e:#}");
}
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ pub async fn get_polls(
})?;
Ok(Json(polls))
}

pub async fn post_poll_answer(
State(state): State<Arc<AppState>>,
poll_answer: Json<PollAnswers>,
Expand Down
3 changes: 3 additions & 0 deletions crates/commons/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::order::Order;
use crate::signature::Signature;
use crate::trade::FilledWith;
use crate::LiquidityOption;
use crate::NewLimitOrder;
use crate::ReferralStatus;
use anyhow::Result;
use bitcoin::address::NetworkUnchecked;
Expand Down Expand Up @@ -82,6 +83,8 @@ pub enum OrderbookRequest {
version: Option<String>,
signature: Signature,
},
InsertOrder(NewLimitOrder),
DeleteOrder(Uuid),
}

impl TryFrom<OrderbookRequest> for tungstenite::Message {
Expand Down
6 changes: 3 additions & 3 deletions crates/commons/src/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl NewOrderRequest {
}
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum NewOrder {
Market(NewMarketOrder),
Limit(NewLimitOrder),
Expand Down Expand Up @@ -82,7 +82,7 @@ impl NewOrder {
}
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct NewMarketOrder {
pub id: Uuid,
pub contract_symbol: ContractSymbol,
Expand All @@ -97,7 +97,7 @@ pub struct NewMarketOrder {
pub stable: bool,
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct NewLimitOrder {
pub id: Uuid,
pub contract_symbol: ContractSymbol,
Expand Down
Loading