diff --git a/.idea/code.iml b/.idea/code.iml new file mode 100644 index 000000000..70105a514 --- /dev/null +++ b/.idea/code.iml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs index a69f39a85..7467ad6fc 100644 --- a/apps/labrinth/src/routes/internal/statuses.rs +++ b/apps/labrinth/src/routes/internal/statuses.rs @@ -33,6 +33,7 @@ pub enum ServerToClientMessage { UserOffline { id: UserId }, FriendStatuses { statuses: Vec }, FriendRequest { from: UserId }, + FriendRequestRejected { from: UserId }, } #[derive(Deserialize)] @@ -40,7 +41,7 @@ struct LauncherHeartbeatInit { code: String, } -#[get("launcher_heartbeat")] +#[get("launcher_socket")] pub async fn ws_init( req: HttpRequest, pool: Data, @@ -122,16 +123,12 @@ pub async fn ws_init( user.id, ServerToClientMessage::StatusUpdate { status }, &pool, - &redis, &db, Some(friends), ) .await?; - let mut stream = msg_stream - .aggregate_continuations() - // aggregate continuation frames up to 1MiB - .max_continuation_size(2_usize.pow(20)); + let mut stream = msg_stream.aggregate_continuations(); actix_web::rt::spawn(async move { // receive messages from websocket @@ -168,7 +165,6 @@ pub async fn ws_init( status: status.clone(), }, &pool, - &redis, &db, None, ) @@ -180,12 +176,23 @@ pub async fn ws_init( } Ok(AggregatedMessage::Close(_)) => { - let _ = close_socket(user.id, &pool, &redis, &db).await; + let _ = close_socket(user.id, &pool, &db).await; + } + + Ok(AggregatedMessage::Ping(msg)) => { + if let Some(mut socket) = + db.auth_sockets.get_mut(&user.id.into()) + { + let (_, socket) = socket.value_mut(); + let _ = socket.pong(&msg).await; + } } _ => {} } } + + let _ = close_socket(user.id, &pool, &db).await; }); Ok(res) @@ -195,7 +202,6 @@ pub async fn broadcast_friends( user_id: UserId, message: ServerToClientMessage, pool: &PgPool, - redis: &RedisPool, sockets: &ActiveSockets, friends: Option>, ) -> Result<(), crate::database::models::DatabaseError> { @@ -218,17 +224,7 @@ pub async fn broadcast_friends( { let (_, socket) = socket.value_mut(); - // TODO: bulk close sockets for better perf - if socket.text(serde_json::to_string(&message)?).await.is_err() - { - Box::pin(close_socket( - friend_id.into(), - pool, - redis, - sockets, - )) - .await?; - } + let _ = socket.text(serde_json::to_string(&message)?).await; } } } @@ -239,22 +235,20 @@ pub async fn broadcast_friends( pub async fn close_socket( id: UserId, pool: &PgPool, - redis: &RedisPool, sockets: &ActiveSockets, ) -> Result<(), crate::database::models::DatabaseError> { if let Some((_, (_, socket))) = sockets.auth_sockets.remove(&id) { let _ = socket.close(None).await; - } - broadcast_friends( - id, - ServerToClientMessage::UserOffline { id }, - pool, - redis, - sockets, - None, - ) - .await?; + broadcast_friends( + id, + ServerToClientMessage::UserOffline { id }, + pool, + sockets, + None, + ) + .await?; + } Ok(()) } diff --git a/apps/labrinth/src/routes/v3/friends.rs b/apps/labrinth/src/routes/v3/friends.rs index ec8ade22f..9541c3dd5 100644 --- a/apps/labrinth/src/routes/v3/friends.rs +++ b/apps/labrinth/src/routes/v3/friends.rs @@ -74,8 +74,6 @@ pub async fn add_friend( async fn send_friend_status( user_id: UserId, friend_id: UserId, - pool: &PgPool, - redis: &RedisPool, sockets: &ActiveSockets, ) -> Result<(), ApiError> { if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) { @@ -85,45 +83,21 @@ pub async fn add_friend( { let (_, socket) = socket.value_mut(); - if socket + let _ = socket .text(serde_json::to_string( &ServerToClientMessage::StatusUpdate { status: friend_status.clone(), }, )?) - .await - .is_err() - { - close_socket( - friend_id.into(), - pool, - redis, - sockets, - ) - .await?; - } + .await; } } Ok(()) } - send_friend_status( - friend.user_id, - friend.friend_id, - &pool, - &redis, - &db, - ) - .await?; - send_friend_status( - friend.friend_id, - friend.user_id, - &pool, - &redis, - &db, - ) - .await?; + send_friend_status(friend.user_id, friend.friend_id, &db).await?; + send_friend_status(friend.friend_id, friend.user_id, &db).await?; } else { if friend.id == user.id.into() { return Err(ApiError::InvalidInput( @@ -157,7 +131,7 @@ pub async fn add_friend( .await .is_err() { - close_socket(user.id, &pool, &redis, &db).await?; + close_socket(user.id, &pool, &db).await?; } } } @@ -177,6 +151,7 @@ pub async fn remove_friend( pool: web::Data, redis: web::Data, session_queue: web::Data, + db: web::Data, ) -> Result { let user = get_user_from_headers( &req, @@ -202,6 +177,18 @@ pub async fn remove_friend( ) .await?; + if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) { + let (_, socket) = socket.value_mut(); + + let _ = socket + .text(serde_json::to_string( + &ServerToClientMessage::FriendRequestRejected { + from: user.id, + }, + )?) + .await; + } + transaction.commit().await?; Ok(HttpResponse::NoContent().body("")) diff --git a/packages/app-lib/src/state/friends.rs b/packages/app-lib/src/state/friends.rs index 1cd214039..c4d0f73f4 100644 --- a/packages/app-lib/src/state/friends.rs +++ b/packages/app-lib/src/state/friends.rs @@ -16,10 +16,10 @@ use reqwest::header::HeaderValue; use reqwest::Method; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::RwLock; type WriteSocket = - Arc, Message>>>>; + Arc, Message>>>>; pub struct FriendsSocket { write: WriteSocket, @@ -67,7 +67,7 @@ impl Default for FriendsSocket { impl FriendsSocket { pub fn new() -> Self { Self { - write: Arc::new(Mutex::new(None)), + write: Arc::new(RwLock::new(None)), user_statuses: Arc::new(DashMap::new()), } } @@ -83,7 +83,7 @@ impl FriendsSocket { if let Some(credentials) = credentials { let mut request = format!( - "{MODRINTH_SOCKET_URL}_internal/launcher_heartbeat?code={}", + "{MODRINTH_SOCKET_URL}_internal/launcher_socket?code={}", credentials.session ) .into_client_request()?; @@ -105,7 +105,7 @@ impl FriendsSocket { let (write, read) = socket.split(); { - let mut write_lock = self.write.lock().await; + let mut write_lock = self.write.write().await; *write_lock = Some(write); } @@ -181,10 +181,8 @@ impl FriendsSocket { } } - let mut w = write_handle.lock().await; + let mut w = write_handle.write().await; *w = None; - - Self::reconnect_task(); }); } Err(e) => { @@ -192,8 +190,6 @@ impl FriendsSocket { "Error connecting to friends socket: {e:?}" ); - Self::reconnect_task(); - return Err(crate::Error::from(e)); } } @@ -202,40 +198,42 @@ impl FriendsSocket { Ok(()) } - fn reconnect_task() { + pub async fn reconnect_task() -> crate::Result<()> { + let state = crate::State::get().await?; + tokio::task::spawn(async move { - let res = async { - let state = crate::State::get().await?; + let mut last_connection = Utc::now(); + loop { + let connected = { + let read = state.friends_socket.write.read().await; + read.is_some() + }; + + if !connected + && Utc::now().signed_duration_since(last_connection) + > chrono::Duration::seconds(30) { - if state.friends_socket.write.lock().await.is_some() { - return Ok(()); - } + last_connection = Utc::now(); + let _ = state + .friends_socket + .connect( + &state.pool, + &state.api_semaphore, + &state.process_manager, + ) + .await; } - state - .friends_socket - .connect( - &state.pool, - &state.api_semaphore, - &state.process_manager, - ) - .await?; - - Ok::<(), crate::Error>(()) - }; - - if let Err(e) = res.await { - tracing::info!("Error reconnecting to friends socket: {e:?}"); - - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - FriendsSocket::reconnect_task(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } }); + + Ok(()) } pub async fn disconnect(&self) -> crate::Result<()> { - let mut write_lock = self.write.lock().await; + let mut write_lock = self.write.write().await; if let Some(ref mut write_half) = *write_lock { write_half.close().await?; *write_lock = None; @@ -247,7 +245,7 @@ impl FriendsSocket { &self, profile_name: Option, ) -> crate::Result<()> { - let mut write_lock = self.write.lock().await; + let mut write_lock = self.write.write().await; if let Some(ref mut write_half) = *write_lock { write_half .send(Message::Text(serde_json::to_string( diff --git a/packages/app-lib/src/state/mod.rs b/packages/app-lib/src/state/mod.rs index 9eae019cf..ec0b95e82 100644 --- a/packages/app-lib/src/state/mod.rs +++ b/packages/app-lib/src/state/mod.rs @@ -87,6 +87,16 @@ impl State { if let Err(e) = res { tracing::error!("Error running discord RPC: {e}"); } + + let _ = state + .friends_socket + .connect( + &state.pool, + &state.api_semaphore, + &state.process_manager, + ) + .await; + let _ = FriendsSocket::reconnect_task().await; }); Ok(()) @@ -138,9 +148,6 @@ impl State { let process_manager = ProcessManager::new(); let friends_socket = FriendsSocket::new(); - friends_socket - .connect(&pool, &fetch_semaphore, &process_manager) - .await?; Ok(Arc::new(Self { directories, diff --git a/packages/ui/src/components/base/Accordion.vue b/packages/ui/src/components/base/Accordion.vue index 599940a22..333e2564b 100644 --- a/packages/ui/src/components/base/Accordion.vue +++ b/packages/ui/src/components/base/Accordion.vue @@ -41,6 +41,9 @@ const props = withDefaults( { type: 'standard', openByDefault: false, + buttonClass: null, + contentClass: null, + titleWrapperClass: null, }, ) diff --git a/packages/ui/src/components/base/AutoLink.vue b/packages/ui/src/components/base/AutoLink.vue index f6413205e..3c883fc32 100644 --- a/packages/ui/src/components/base/AutoLink.vue +++ b/packages/ui/src/components/base/AutoLink.vue @@ -12,6 +12,7 @@ diff --git a/packages/ui/src/components/project/ProjectPageVersions.vue b/packages/ui/src/components/project/ProjectPageVersions.vue index a136a7e9d..ce3a3b9e5 100644 --- a/packages/ui/src/components/project/ProjectPageVersions.vue +++ b/packages/ui/src/components/project/ProjectPageVersions.vue @@ -87,11 +87,16 @@
{{ gameVersion }} @@ -119,11 +124,11 @@ >
@@ -141,10 +146,7 @@
-
+
import { formatBytes, - formatCategory, formatNumber, + formatCategory, + formatNumber, formatVersionsForDisplay, - type GameVersionTag, type PlatformTag, type Version + type GameVersionTag, + type PlatformTag, + type Version, } from '@modrinth/utils' import { commonMessages } from '../../utils/common-messages' -import { - CalendarIcon, - DownloadIcon, - StarIcon, -} from '@modrinth/assets' +import { CalendarIcon, DownloadIcon, StarIcon } from '@modrinth/assets' import { Pagination, VersionChannelIndicator, VersionFilterControl } from '../index' import { useVIntl } from '@vintl/vintl' import { type Ref, ref, computed } from 'vue' @@ -190,24 +191,24 @@ import TagItem from '../base/TagItem.vue' const { formatMessage } = useVIntl() -type VersionWithDisplayUrlEnding = Version & { +type VersionWithDisplayUrlEnding = Version & { displayUrlEnding: string } const props = withDefaults( defineProps<{ - baseId?: string, + baseId?: string project: { project_type: string slug?: string id: string - }, - versions: VersionWithDisplayUrlEnding[], - showFiles?: boolean, - currentMember?: boolean, - loaders: PlatformTag[], - gameVersions: GameVersionTag[], - versionLink?: (version: Version) => string, + } + versions: VersionWithDisplayUrlEnding[] + showFiles?: boolean + currentMember?: boolean + loaders: PlatformTag[] + gameVersions: GameVersionTag[] + versionLink?: (version: Version) => string }>(), { baseId: undefined, @@ -217,23 +218,26 @@ const props = withDefaults( }, ) -const currentPage: Ref = ref(1); -const pageSize: Ref = ref(20); +const currentPage: Ref = ref(1) +const pageSize: Ref = ref(20) const versionFilters: Ref | null> = ref(null) -const selectedGameVersions: Ref = computed(() => versionFilters.value?.selectedGameVersions ?? []); -const selectedPlatforms: Ref = computed(() => versionFilters.value?.selectedPlatforms ?? []); -const selectedChannels: Ref = computed(() => versionFilters.value?.selectedChannels ?? []); +const selectedGameVersions: Ref = computed( + () => versionFilters.value?.selectedGameVersions ?? [], +) +const selectedPlatforms: Ref = computed( + () => versionFilters.value?.selectedPlatforms ?? [], +) +const selectedChannels: Ref = computed(() => versionFilters.value?.selectedChannels ?? []) const filteredVersions = computed(() => { return props.versions.filter( (version) => hasAnySelected(version.game_versions, selectedGameVersions.value) && hasAnySelected(version.loaders, selectedPlatforms.value) && - isAnySelected(version.version_type, selectedChannels.value) - ); -}); - + isAnySelected(version.version_type, selectedChannels.value), + ) +}) function hasAnySelected(values: string[], selected: string[]) { return selected.length === 0 || selected.some((value) => values.includes(value)) @@ -244,33 +248,37 @@ function isAnySelected(value: string, selected: string[]) { } const currentVersions = computed(() => - filteredVersions.value.slice((currentPage.value - 1) * pageSize.value, currentPage.value * pageSize.value)); + filteredVersions.value.slice( + (currentPage.value - 1) * pageSize.value, + currentPage.value * pageSize.value, + ), +) -const route = useRoute(); -const router = useRouter(); +const route = useRoute() +const router = useRouter() if (route.query.page) { - currentPage.value = Number(route.query.page) || 1; + currentPage.value = Number(route.query.page) || 1 } function switchPage(page: number) { - currentPage.value = page; + currentPage.value = page router.replace({ query: { ...route.query, page: currentPage.value !== 1 ? currentPage.value : undefined, }, - }); + }) - window.scrollTo({ top: 0, behavior: 'smooth' }); + window.scrollTo({ top: 0, behavior: 'smooth' }) } function updateQuery(newQueries: Record) { if (newQueries.page) { - currentPage.value = Number(newQueries.page); + currentPage.value = Number(newQueries.page) } else if (newQueries.page === undefined) { - currentPage.value = 1; + currentPage.value = 1 } router.replace({ @@ -278,9 +286,8 @@ function updateQuery(newQueries: Record