Skip to content

Commit

Permalink
Fix sockets issues (#3015)
Browse files Browse the repository at this point in the history
* Fix sockets issues

* Fix app comp
  • Loading branch information
Geometrically authored Dec 12, 2024
1 parent 10ef25e commit c970e9c
Show file tree
Hide file tree
Showing 32 changed files with 571 additions and 455 deletions.
18 changes: 18 additions & 0 deletions .idea/code.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 25 additions & 31 deletions apps/labrinth/src/routes/internal/statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ pub enum ServerToClientMessage {
UserOffline { id: UserId },
FriendStatuses { statuses: Vec<UserStatus> },
FriendRequest { from: UserId },
FriendRequestRejected { from: UserId },
}

#[derive(Deserialize)]
struct LauncherHeartbeatInit {
code: String,
}

#[get("launcher_heartbeat")]
#[get("launcher_socket")]
pub async fn ws_init(
req: HttpRequest,
pool: Data<PgPool>,
Expand Down Expand Up @@ -122,16 +123,12 @@ pub async fn ws_init(
user.id,
ServerToClientMessage::StatusUpdate { status },
&pool,
&redis,
&db,
Some(friends),
)
.await?;

let mut stream = msg_stream
.aggregate_continuations()
// aggregate continuation frames up to 1MiB
.max_continuation_size(2_usize.pow(20));
let mut stream = msg_stream.aggregate_continuations();

actix_web::rt::spawn(async move {
// receive messages from websocket
Expand Down Expand Up @@ -168,7 +165,6 @@ pub async fn ws_init(
status: status.clone(),
},
&pool,
&redis,
&db,
None,
)
Expand All @@ -180,12 +176,23 @@ pub async fn ws_init(
}

Ok(AggregatedMessage::Close(_)) => {
let _ = close_socket(user.id, &pool, &redis, &db).await;
let _ = close_socket(user.id, &pool, &db).await;
}

Ok(AggregatedMessage::Ping(msg)) => {
if let Some(mut socket) =
db.auth_sockets.get_mut(&user.id.into())
{
let (_, socket) = socket.value_mut();
let _ = socket.pong(&msg).await;
}
}

_ => {}
}
}

let _ = close_socket(user.id, &pool, &db).await;
});

Ok(res)
Expand All @@ -195,7 +202,6 @@ pub async fn broadcast_friends(
user_id: UserId,
message: ServerToClientMessage,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
friends: Option<Vec<FriendItem>>,
) -> Result<(), crate::database::models::DatabaseError> {
Expand All @@ -218,17 +224,7 @@ pub async fn broadcast_friends(
{
let (_, socket) = socket.value_mut();

// TODO: bulk close sockets for better perf
if socket.text(serde_json::to_string(&message)?).await.is_err()
{
Box::pin(close_socket(
friend_id.into(),
pool,
redis,
sockets,
))
.await?;
}
let _ = socket.text(serde_json::to_string(&message)?).await;
}
}
}
Expand All @@ -239,22 +235,20 @@ pub async fn broadcast_friends(
pub async fn close_socket(
id: UserId,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
) -> Result<(), crate::database::models::DatabaseError> {
if let Some((_, (_, socket))) = sockets.auth_sockets.remove(&id) {
let _ = socket.close(None).await;
}

broadcast_friends(
id,
ServerToClientMessage::UserOffline { id },
pool,
redis,
sockets,
None,
)
.await?;
broadcast_friends(
id,
ServerToClientMessage::UserOffline { id },
pool,
sockets,
None,
)
.await?;
}

Ok(())
}
49 changes: 18 additions & 31 deletions apps/labrinth/src/routes/v3/friends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ pub async fn add_friend(
async fn send_friend_status(
user_id: UserId,
friend_id: UserId,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
) -> Result<(), ApiError> {
if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) {
Expand All @@ -85,45 +83,21 @@ pub async fn add_friend(
{
let (_, socket) = socket.value_mut();

if socket
let _ = socket
.text(serde_json::to_string(
&ServerToClientMessage::StatusUpdate {
status: friend_status.clone(),
},
)?)
.await
.is_err()
{
close_socket(
friend_id.into(),
pool,
redis,
sockets,
)
.await?;
}
.await;
}
}

Ok(())
}

send_friend_status(
friend.user_id,
friend.friend_id,
&pool,
&redis,
&db,
)
.await?;
send_friend_status(
friend.friend_id,
friend.user_id,
&pool,
&redis,
&db,
)
.await?;
send_friend_status(friend.user_id, friend.friend_id, &db).await?;
send_friend_status(friend.friend_id, friend.user_id, &db).await?;
} else {
if friend.id == user.id.into() {
return Err(ApiError::InvalidInput(
Expand Down Expand Up @@ -157,7 +131,7 @@ pub async fn add_friend(
.await
.is_err()
{
close_socket(user.id, &pool, &redis, &db).await?;
close_socket(user.id, &pool, &db).await?;
}
}
}
Expand All @@ -177,6 +151,7 @@ pub async fn remove_friend(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
db: web::Data<ActiveSockets>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
Expand All @@ -202,6 +177,18 @@ pub async fn remove_friend(
)
.await?;

if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) {
let (_, socket) = socket.value_mut();

let _ = socket
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequestRejected {
from: user.id,
},
)?)
.await;
}

transaction.commit().await?;

Ok(HttpResponse::NoContent().body(""))
Expand Down
68 changes: 33 additions & 35 deletions packages/app-lib/src/state/friends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use reqwest::header::HeaderValue;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

type WriteSocket =
Arc<Mutex<Option<SplitSink<WebSocketStream<ConnectStream>, Message>>>>;
Arc<RwLock<Option<SplitSink<WebSocketStream<ConnectStream>, Message>>>>;

pub struct FriendsSocket {
write: WriteSocket,
Expand Down Expand Up @@ -67,7 +67,7 @@ impl Default for FriendsSocket {
impl FriendsSocket {
pub fn new() -> Self {
Self {
write: Arc::new(Mutex::new(None)),
write: Arc::new(RwLock::new(None)),
user_statuses: Arc::new(DashMap::new()),
}
}
Expand All @@ -83,7 +83,7 @@ impl FriendsSocket {

if let Some(credentials) = credentials {
let mut request = format!(
"{MODRINTH_SOCKET_URL}_internal/launcher_heartbeat?code={}",
"{MODRINTH_SOCKET_URL}_internal/launcher_socket?code={}",
credentials.session
)
.into_client_request()?;
Expand All @@ -105,7 +105,7 @@ impl FriendsSocket {
let (write, read) = socket.split();

{
let mut write_lock = self.write.lock().await;
let mut write_lock = self.write.write().await;
*write_lock = Some(write);
}

Expand Down Expand Up @@ -181,19 +181,15 @@ impl FriendsSocket {
}
}

let mut w = write_handle.lock().await;
let mut w = write_handle.write().await;
*w = None;

Self::reconnect_task();
});
}
Err(e) => {
tracing::error!(
"Error connecting to friends socket: {e:?}"
);

Self::reconnect_task();

return Err(crate::Error::from(e));
}
}
Expand All @@ -202,40 +198,42 @@ impl FriendsSocket {
Ok(())
}

fn reconnect_task() {
pub async fn reconnect_task() -> crate::Result<()> {
let state = crate::State::get().await?;

tokio::task::spawn(async move {
let res = async {
let state = crate::State::get().await?;
let mut last_connection = Utc::now();

loop {
let connected = {
let read = state.friends_socket.write.read().await;
read.is_some()
};

if !connected
&& Utc::now().signed_duration_since(last_connection)
> chrono::Duration::seconds(30)
{
if state.friends_socket.write.lock().await.is_some() {
return Ok(());
}
last_connection = Utc::now();
let _ = state
.friends_socket
.connect(
&state.pool,
&state.api_semaphore,
&state.process_manager,
)
.await;
}

state
.friends_socket
.connect(
&state.pool,
&state.api_semaphore,
&state.process_manager,
)
.await?;

Ok::<(), crate::Error>(())
};

if let Err(e) = res.await {
tracing::info!("Error reconnecting to friends socket: {e:?}");

tokio::time::sleep(std::time::Duration::from_secs(30)).await;
FriendsSocket::reconnect_task();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});

Ok(())
}

pub async fn disconnect(&self) -> crate::Result<()> {
let mut write_lock = self.write.lock().await;
let mut write_lock = self.write.write().await;
if let Some(ref mut write_half) = *write_lock {
write_half.close().await?;
*write_lock = None;
Expand All @@ -247,7 +245,7 @@ impl FriendsSocket {
&self,
profile_name: Option<String>,
) -> crate::Result<()> {
let mut write_lock = self.write.lock().await;
let mut write_lock = self.write.write().await;
if let Some(ref mut write_half) = *write_lock {
write_half
.send(Message::Text(serde_json::to_string(
Expand Down
Loading

0 comments on commit c970e9c

Please sign in to comment.