Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ported adventures leaderboard #67

Open
wants to merge 2 commits into
base: live-elo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions live-elo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ bililive = { git = "https://github.com/vanorsigma/bililive-rs.git", rev = "e15f7
futures = { workspace = true }
hubbub = "0.10.1"
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio"] }
reqwest.workspace = true
itertools = "0.13.0"
4 changes: 4 additions & 0 deletions live-elo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub struct Config {
#[serde(default = "default_score_weight")]
pub discord_score: f32,

#[serde(default = "default_false")]
pub adventures_enabled: bool,
pub adventures_poll_hrs: Option<u64>,

pub database_filename: Option<String>,
}

Expand Down
30 changes: 29 additions & 1 deletion live-elo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use live_elo::{
raid::RaidScoring,
subs::SubCountScoring,
text::{DiscordTextScoring, TwitchTextScoring},
adventures::AdventuresScoring,
MessageCountScoring,
MultiScorer
},
Expand All @@ -27,6 +28,7 @@ use live_elo::{
discord::{DiscordHandleOptions, DiscordMessageSourceHandle},
emote::EmoteExtractingSource,
twitch::TwitchMessageSourceHandle,
adventures::AdventuresMessageSourceHandle,
CancellableSource,
Message,
TokioTaskSource
Expand Down Expand Up @@ -83,6 +85,7 @@ async fn main() {
"overall",
"non-vips",
"partner",
"adventures"
];

let mut loaded_leaderboards = db_manager
Expand All @@ -97,7 +100,7 @@ async fn main() {
.or_insert_with(|| Arc::new(LeaderboardElos::new(Vec::new())));
}

let shared_handle = SharedHandle::new(
let mut shared_handle = SharedHandle::new(
Arc::new(loaded_leaderboards),
Duration::from_secs(
GLOBAL_CONFIG
Expand Down Expand Up @@ -334,6 +337,31 @@ async fn main() {
));
}

if GLOBAL_CONFIG.adventures_enabled {

tokio_task_builder = tokio_task_builder.add_source(AdventuresMessageSourceHandle::spawn(
Duration::from_secs(GLOBAL_CONFIG.adventures_poll_hrs.unwrap_or(24)) * 60 * 60
));

fanout_performances = fanout_performances.add_performance_processor(StandardLeaderboard::new(
AdventuresScoring::new(),
MultiExporter::pair(
DummyExporter::new(),
shared_handle.create_consumer_for_leaderboard(
LeaderboardName::new("adventures".to_string())
)).append(db_manager.create_consumer_for_leaderboard(
LeaderboardName::new("adventures".to_string()),
shared_handle.clone(),
)),
app_state.clone(),
));

shared_handle.set_passthrough_for(
LeaderboardName::new("adventures".to_string()),
true
).await;
}

let pipeline = Pipeline::builder()
.source(CancellableSource::new(
tokio_task_builder.build(),
Expand Down
1 change: 1 addition & 0 deletions live-elo/src/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod emote;
pub mod raid;
pub mod subs;
pub mod text;
pub mod adventures;

use std::{ops::Add, sync::Arc};

Expand Down
30 changes: 30 additions & 0 deletions live-elo/src/scoring/adventures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::Arc;

use lbo::scoring::ScoringSystem;

pub struct AdventuresScoring {}

impl AdventuresScoring {
pub fn new() -> Self {
Self {}
}
}

impl ScoringSystem for AdventuresScoring {
type Message = Arc<crate::sources::Message>;
type Performance = Option<crate::exporter::websocket::PerformancePoints>;
type Closed = ();

fn score_message(&self, message: Self::Message) -> Self::Performance {
match message.as_ref() {
crate::sources::Message::Adventures(message) => {
Some(crate::exporter::websocket::PerformancePoints::new(
message.score as f32,
))
}
_ => None,
}
}

async fn close(self) -> Self::Closed {}
}
4 changes: 4 additions & 0 deletions live-elo/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ pub mod bilibili;
pub mod discord;
pub mod emote;
pub mod twitch;
pub mod adventures;

use adventures::AdventuresMessage;
use bilibili::B2Message;
use discord::DiscordMessage;
use emote::EmoteMessage;
Expand All @@ -19,6 +21,7 @@ pub enum Message {
Discord(DiscordMessage),
B2(B2Message),
Emote(EmoteMessage),
Adventures(AdventuresMessage),
}

impl AuthoredMesasge for Message {
Expand All @@ -32,6 +35,7 @@ impl AuthoredMesasge for Message {
}
Message::B2(message) => AuthorId::B2(B2Id::new(message.user_id.clone())),
Message::Emote(message) => AuthorId::Emote(EmoteId::new(message.emote_id.clone())),
Message::Adventures(message) => AuthorId::Discord(DiscordId::new(message.user.clone())),
}
}
}
Expand Down
191 changes: 191 additions & 0 deletions live-elo/src/sources/adventures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use std::{collections::HashMap, sync::Arc, time::Duration, str::FromStr};
use itertools::Itertools;
use lbo::sources::Source;
use reqwest;
use serde::{de, Deserialize, Deserializer, Serialize};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use super::Message;
const ADVENTURES_LEADERBOARD_URL: &str = "https://neuros.click:8727/leaderboard";

#[derive(Serialize, Default, Debug)]
pub enum AdventuresMetadataType {
#[default]
#[serde(rename = "get_metadata")]
GetMetadata,
}

#[derive(Serialize, Default, Debug)]
pub enum AdventuresGetLeaderboardType {
#[default]
#[serde(rename = "get_leaderboard")]
GetLeaderboard,
}

#[derive(Serialize, Default, Debug)]
pub struct AdventuresMetadataRequest {
#[serde(rename = "type")]
ty: AdventuresMetadataType,
}

#[derive(Deserialize, Debug)]
pub struct AdventuresMetadataVersion {
#[serde(rename = "versionNumber")]
pub version_number: String,
pub maps: Vec<String>,
}

#[derive(Deserialize, Debug)]
pub struct AdventuresMetadataResponse {
pub versions: Vec<AdventuresMetadataVersion>,
}

#[derive(Serialize, Debug)]
pub struct AdventuresGetLeaderboardData {
pub version: String,
pub version_maps: Vec<String>,
pub map: String,
}

#[derive(Serialize, Debug)]
pub struct AdventuresGetLeaderboardRequest {
#[serde(rename = "type")]
pub ty: AdventuresGetLeaderboardType,
pub data: AdventuresGetLeaderboardData,
}

#[derive(Deserialize, Debug, Clone)]
pub struct AdventuresMessage {
pub user: String,
#[serde(deserialize_with = "string_to_u64")]
pub score: u64,
#[serde(rename = "uID")]
pub uid: String,
pub ai: String,
#[serde(rename = "collabPartner")]
pub collab_partner: String,
}

fn string_to_u64<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
u64::from_str(&s).map_err(de::Error::custom)
}

pub struct AdventuresMessageSourceHandle {
mpsc_recv: mpsc::Receiver<AdventuresMessage>,
task_join: tokio::task::JoinHandle<()>,
cancellation_token: CancellationToken,
}

impl AdventuresMessageSourceHandle {
pub fn spawn(refresh_duration: Duration) -> Self {
let cancellation_token = CancellationToken::new();
let (mpsc_send, mpsc_recv) = mpsc::channel(1000);
let task_join = tokio::task::spawn(adventures_source_inner(
mpsc_send,
refresh_duration,
cancellation_token.clone(),
));
Self {
mpsc_recv,
task_join,
cancellation_token,
}
}
}

fn get_highest_score_rank_items(mut rank_items: Vec<AdventuresMessage>) -> Vec<AdventuresMessage> {
rank_items.sort_by_key(|rank_item| rank_item.score);
rank_items.reverse();
rank_items
.into_iter()
.unique_by(|rank_item| rank_item.uid.clone())
.collect()
}

pub async fn get_ranks() -> Result<Vec<AdventuresMessage>, anyhow::Error> {
let metadata_response = reqwest::Client::new()
.post(ADVENTURES_LEADERBOARD_URL)
.json(&AdventuresMetadataRequest::default())
.send()
.await?
.json::<AdventuresMetadataResponse>()
.await?;

let version = metadata_response
.versions
.first()
.ok_or(anyhow::anyhow!("cannot get metadata versions"))?;

let responses =
futures::future::try_join_all(version.maps.clone().into_iter().map(|game_map| async {
reqwest::Client::new()
.post(ADVENTURES_LEADERBOARD_URL)
.json(&AdventuresGetLeaderboardRequest {
ty: AdventuresGetLeaderboardType::GetLeaderboard,
data: AdventuresGetLeaderboardData {
version: version.version_number.clone(),
version_maps: version.maps.clone(),
map: game_map.to_string(),
},
})
.send()
.await
.map(|response| (game_map, response))
}))
.await?;

let rank_items = futures::future::try_join_all(
responses.into_iter().map(|(game_map, response)| async move {
let mut rank_items = response.json::<Vec<AdventuresMessage>>().await?;
rank_items.iter_mut().for_each(|rank_item| {
rank_item.ai = game_map.clone();
});
Ok::<_, reqwest::Error>(rank_items)
})).await?
.into_iter()
.flatten()
.collect::<Vec<AdventuresMessage>>();

Ok(get_highest_score_rank_items(rank_items))

}

async fn adventures_source_inner(
mpsc_send: mpsc::Sender<AdventuresMessage>,
refresh_duration: Duration,
cancellation_token: CancellationToken,
) {
let mut interval = tokio::time::interval(refresh_duration);
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
break;
}
_ = interval.tick() => {
let ranks = get_ranks().await.unwrap();
for rank in ranks {
mpsc_send.send(rank).await.unwrap();
}
}
}
}
}

impl Source for AdventuresMessageSourceHandle {
type Message = Message;
type Closed = ();

async fn next_message(&mut self) -> Option<Self::Message> {
self.mpsc_recv.recv().await.map(Message::Adventures)
}

async fn close(self) -> Self::Closed {
self.cancellation_token.cancel();
self.task_join.await.unwrap();
}
}