diff --git a/Cargo.lock b/Cargo.lock index 4205ddb6..e1df2e56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1630,6 +1630,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -1735,7 +1744,9 @@ dependencies = [ "clients", "futures", "hubbub", + "itertools 0.13.0", "lbo", + "reqwest 0.12.9", "serde", "serde_json", "sqlx", @@ -2243,7 +2254,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.90", diff --git a/live-elo/Cargo.toml b/live-elo/Cargo.toml index d2a548ea..f1f9eab6 100644 --- a/live-elo/Cargo.toml +++ b/live-elo/Cargo.toml @@ -25,3 +25,5 @@ bililive = { git = "https://github.com/vanorsigma/bililive-rs.git", rev = "e15f7 futures = { workspace = true } hubbub = "0.10.1" sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio"] } +reqwest.workspace = true +itertools = "0.13.0" diff --git a/live-elo/src/config.rs b/live-elo/src/config.rs index 9bfd2ddd..aa16eb08 100644 --- a/live-elo/src/config.rs +++ b/live-elo/src/config.rs @@ -28,6 +28,10 @@ pub struct Config { #[serde(default = "default_score_weight")] pub discord_score: f32, + #[serde(default = "default_false")] + pub adventures_enabled: bool, + pub adventures_poll_hrs: Option, + pub database_filename: Option, } diff --git a/live-elo/src/main.rs b/live-elo/src/main.rs index 002d6e8f..b10a20c8 100644 --- a/live-elo/src/main.rs +++ b/live-elo/src/main.rs @@ -19,6 +19,7 @@ use live_elo::{ raid::RaidScoring, subs::SubCountScoring, text::{DiscordTextScoring, TwitchTextScoring}, + adventures::AdventuresScoring, MessageCountScoring, MultiScorer }, @@ -27,6 +28,7 @@ use live_elo::{ discord::{DiscordHandleOptions, DiscordMessageSourceHandle}, emote::EmoteExtractingSource, twitch::TwitchMessageSourceHandle, + adventures::AdventuresMessageSourceHandle, CancellableSource, Message, TokioTaskSource @@ -83,6 +85,7 @@ async fn main() { "overall", "non-vips", "partner", + "adventures" ]; let mut loaded_leaderboards = db_manager @@ -97,7 +100,7 @@ async fn main() { .or_insert_with(|| Arc::new(LeaderboardElos::new(Vec::new()))); } - let shared_handle = SharedHandle::new( + let mut shared_handle = SharedHandle::new( Arc::new(loaded_leaderboards), Duration::from_secs( GLOBAL_CONFIG @@ -334,6 +337,31 @@ async fn main() { )); } + if GLOBAL_CONFIG.adventures_enabled { + + tokio_task_builder = tokio_task_builder.add_source(AdventuresMessageSourceHandle::spawn( + Duration::from_secs(GLOBAL_CONFIG.adventures_poll_hrs.unwrap_or(24)) * 60 * 60 + )); + + fanout_performances = fanout_performances.add_performance_processor(StandardLeaderboard::new( + AdventuresScoring::new(), + MultiExporter::pair( + DummyExporter::new(), + shared_handle.create_consumer_for_leaderboard( + LeaderboardName::new("adventures".to_string()) + )).append(db_manager.create_consumer_for_leaderboard( + LeaderboardName::new("adventures".to_string()), + shared_handle.clone(), + )), + app_state.clone(), + )); + + shared_handle.set_passthrough_for( + LeaderboardName::new("adventures".to_string()), + true + ).await; + } + let pipeline = Pipeline::builder() .source(CancellableSource::new( tokio_task_builder.build(), diff --git a/live-elo/src/scoring.rs b/live-elo/src/scoring.rs index 6b7836e5..d70ae9d2 100644 --- a/live-elo/src/scoring.rs +++ b/live-elo/src/scoring.rs @@ -3,6 +3,7 @@ pub mod emote; pub mod raid; pub mod subs; pub mod text; +pub mod adventures; use std::{ops::Add, sync::Arc}; diff --git a/live-elo/src/scoring/adventures.rs b/live-elo/src/scoring/adventures.rs new file mode 100644 index 00000000..6799eb93 --- /dev/null +++ b/live-elo/src/scoring/adventures.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use lbo::scoring::ScoringSystem; + +pub struct AdventuresScoring {} + +impl AdventuresScoring { + pub fn new() -> Self { + Self {} + } +} + +impl ScoringSystem for AdventuresScoring { + type Message = Arc; + type Performance = Option; + type Closed = (); + + fn score_message(&self, message: Self::Message) -> Self::Performance { + match message.as_ref() { + crate::sources::Message::Adventures(message) => { + Some(crate::exporter::websocket::PerformancePoints::new( + message.score as f32, + )) + } + _ => None, + } + } + + async fn close(self) -> Self::Closed {} +} diff --git a/live-elo/src/sources.rs b/live-elo/src/sources.rs index 7a685143..3db89251 100644 --- a/live-elo/src/sources.rs +++ b/live-elo/src/sources.rs @@ -2,7 +2,9 @@ pub mod bilibili; pub mod discord; pub mod emote; pub mod twitch; +pub mod adventures; +use adventures::AdventuresMessage; use bilibili::B2Message; use discord::DiscordMessage; use emote::EmoteMessage; @@ -19,6 +21,7 @@ pub enum Message { Discord(DiscordMessage), B2(B2Message), Emote(EmoteMessage), + Adventures(AdventuresMessage), } impl AuthoredMesasge for Message { @@ -32,6 +35,7 @@ impl AuthoredMesasge for Message { } Message::B2(message) => AuthorId::B2(B2Id::new(message.user_id.clone())), Message::Emote(message) => AuthorId::Emote(EmoteId::new(message.emote_id.clone())), + Message::Adventures(message) => AuthorId::Discord(DiscordId::new(message.user.clone())), } } } diff --git a/live-elo/src/sources/adventures.rs b/live-elo/src/sources/adventures.rs new file mode 100644 index 00000000..c5133d0b --- /dev/null +++ b/live-elo/src/sources/adventures.rs @@ -0,0 +1,191 @@ +use std::{collections::HashMap, sync::Arc, time::Duration, str::FromStr}; +use itertools::Itertools; +use lbo::sources::Source; +use reqwest; +use serde::{de, Deserialize, Deserializer, Serialize}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use super::Message; +const ADVENTURES_LEADERBOARD_URL: &str = "https://neuros.click:8727/leaderboard"; + +#[derive(Serialize, Default, Debug)] +pub enum AdventuresMetadataType { + #[default] + #[serde(rename = "get_metadata")] + GetMetadata, +} + +#[derive(Serialize, Default, Debug)] +pub enum AdventuresGetLeaderboardType { + #[default] + #[serde(rename = "get_leaderboard")] + GetLeaderboard, +} + +#[derive(Serialize, Default, Debug)] +pub struct AdventuresMetadataRequest { + #[serde(rename = "type")] + ty: AdventuresMetadataType, +} + +#[derive(Deserialize, Debug)] +pub struct AdventuresMetadataVersion { + #[serde(rename = "versionNumber")] + pub version_number: String, + pub maps: Vec, +} + +#[derive(Deserialize, Debug)] +pub struct AdventuresMetadataResponse { + pub versions: Vec, +} + +#[derive(Serialize, Debug)] +pub struct AdventuresGetLeaderboardData { + pub version: String, + pub version_maps: Vec, + pub map: String, +} + +#[derive(Serialize, Debug)] +pub struct AdventuresGetLeaderboardRequest { + #[serde(rename = "type")] + pub ty: AdventuresGetLeaderboardType, + pub data: AdventuresGetLeaderboardData, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AdventuresMessage { + pub user: String, + #[serde(deserialize_with = "string_to_u64")] + pub score: u64, + #[serde(rename = "uID")] + pub uid: String, + pub ai: String, + #[serde(rename = "collabPartner")] + pub collab_partner: String, +} + +fn string_to_u64<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + u64::from_str(&s).map_err(de::Error::custom) +} + +pub struct AdventuresMessageSourceHandle { + mpsc_recv: mpsc::Receiver, + task_join: tokio::task::JoinHandle<()>, + cancellation_token: CancellationToken, +} + +impl AdventuresMessageSourceHandle { + pub fn spawn(refresh_duration: Duration) -> Self { + let cancellation_token = CancellationToken::new(); + let (mpsc_send, mpsc_recv) = mpsc::channel(1000); + let task_join = tokio::task::spawn(adventures_source_inner( + mpsc_send, + refresh_duration, + cancellation_token.clone(), + )); + Self { + mpsc_recv, + task_join, + cancellation_token, + } + } +} + +fn get_highest_score_rank_items(mut rank_items: Vec) -> Vec { + rank_items.sort_by_key(|rank_item| rank_item.score); + rank_items.reverse(); + rank_items + .into_iter() + .unique_by(|rank_item| rank_item.uid.clone()) + .collect() +} + +pub async fn get_ranks() -> Result, anyhow::Error> { + let metadata_response = reqwest::Client::new() + .post(ADVENTURES_LEADERBOARD_URL) + .json(&AdventuresMetadataRequest::default()) + .send() + .await? + .json::() + .await?; + + let version = metadata_response + .versions + .first() + .ok_or(anyhow::anyhow!("cannot get metadata versions"))?; + + let responses = + futures::future::try_join_all(version.maps.clone().into_iter().map(|game_map| async { + reqwest::Client::new() + .post(ADVENTURES_LEADERBOARD_URL) + .json(&AdventuresGetLeaderboardRequest { + ty: AdventuresGetLeaderboardType::GetLeaderboard, + data: AdventuresGetLeaderboardData { + version: version.version_number.clone(), + version_maps: version.maps.clone(), + map: game_map.to_string(), + }, + }) + .send() + .await + .map(|response| (game_map, response)) + })) + .await?; + + let rank_items = futures::future::try_join_all( + responses.into_iter().map(|(game_map, response)| async move { + let mut rank_items = response.json::>().await?; + rank_items.iter_mut().for_each(|rank_item| { + rank_item.ai = game_map.clone(); + }); + Ok::<_, reqwest::Error>(rank_items) + })).await? + .into_iter() + .flatten() + .collect::>(); + + Ok(get_highest_score_rank_items(rank_items)) + +} + +async fn adventures_source_inner( + mpsc_send: mpsc::Sender, + refresh_duration: Duration, + cancellation_token: CancellationToken, +) { + let mut interval = tokio::time::interval(refresh_duration); + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + break; + } + _ = interval.tick() => { + let ranks = get_ranks().await.unwrap(); + for rank in ranks { + mpsc_send.send(rank).await.unwrap(); + } + } + } + } +} + +impl Source for AdventuresMessageSourceHandle { + type Message = Message; + type Closed = (); + + async fn next_message(&mut self) -> Option { + self.mpsc_recv.recv().await.map(Message::Adventures) + } + + async fn close(self) -> Self::Closed { + self.cancellation_token.cancel(); + self.task_join.await.unwrap(); + } +} \ No newline at end of file