Skip to content

Commit

Permalink
replace signal hell with global variable
Browse files Browse the repository at this point in the history
  • Loading branch information
fnschmidt committed Oct 6, 2024
1 parent 47c0209 commit a1ac6de
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 282 deletions.
61 changes: 26 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 13 additions & 37 deletions src/bin/mensi-telegram-bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ use stuwe_telegram_rs::bot_command_handlers::{
start_time_dialogue, subscribe, unsubscribe,
};
use stuwe_telegram_rs::constants::{
BACKEND, CD_DATA, DB_FILENAME, MENSI_DB, OLLAMA_HOST, OLLAMA_MODEL,
BACKEND, CD_DATA, DB_FILENAME, MENSI_DB, OLLAMA_HOST, OLLAMA_MODEL, USER_REGISTRATIONS,
};

use stuwe_telegram_rs::data_types::{
Backend, Command, DialogueState, JobHandlerTask, JobHandlerTaskType, JobType,
QueryRegistrationType, RegistrationEntry,
};
use stuwe_telegram_rs::db_operations::check_or_create_db_tables;
use stuwe_telegram_rs::shared_main::callback_handler;
use stuwe_telegram_rs::task_scheduler_funcs::{
handle_add_registration_task, handle_delete_registration_task, handle_query_registration_task,
handle_update_registration_task, load_jobs_from_db, start_mensaupd_hook_and_campusdual_job,
handle_add_registration_task, handle_delete_registration_task, handle_update_registration_task,
load_jobs_from_db, start_mensaupd_hook_and_campusdual_job,
};

use clap::Parser;
use std::env;
use std::sync::RwLock;
use teloxide::{
dispatching::{
dialogue::{self, InMemStorage},
Expand Down Expand Up @@ -98,33 +98,24 @@ async fn main() {
let bot = Bot::new(args.token);

let (jobhandler_task_tx, jobhandler_task_rx): JobHandlerTaskType = broadcast::channel(10);
let (user_registration_data_tx, _): QueryRegistrationType = broadcast::channel(10);

// every user has a mensa_id, but only users with auto send have a job_uuid inside RegistrEntry
{
let bot = bot.clone();
// there is effectively only one tx and rx, however since rx cant be passed as dptree dep (?!),
// tx has to be cloned and passed to both (inside command_handler it will be resubscribed to rx)
let jobhandler_task_tx = jobhandler_task_tx.clone();
let user_registration_data_tx = user_registration_data_tx.clone();
tokio::spawn(async move {
log::info!("Starting task scheduler...");
run_task_scheduler(
bot,
jobhandler_task_tx,
jobhandler_task_rx,
user_registration_data_tx,
)
.await;
run_task_scheduler(bot, jobhandler_task_tx, jobhandler_task_rx).await;
});
}

// passing a receiver doesnt work for some reason, so sending user_registration_data_tx and resubscribing to get rx
let command_handler_deps = dptree::deps![
InMemStorage::<DialogueState>::new(),
mensen,
jobhandler_task_tx,
user_registration_data_tx
jobhandler_task_tx
];
Dispatcher::builder(bot, schema())
.dependencies(command_handler_deps)
Expand Down Expand Up @@ -164,13 +155,15 @@ async fn run_task_scheduler(
bot: Bot,
jobhandler_task_tx: broadcast::Sender<JobHandlerTask>,
mut jobhandler_task_rx: broadcast::Receiver<JobHandlerTask>,
user_registration_data_tx: broadcast::Sender<Option<RegistrationEntry>>,
) {
let sched = JobScheduler::new().await.unwrap();

start_mensaupd_hook_and_campusdual_job(bot.clone(), &sched, jobhandler_task_tx).await;

let mut loaded_user_data = load_jobs_from_db(&bot, &sched).await;
let user_registrations = load_jobs_from_db(&bot, &sched).await;
USER_REGISTRATIONS
.set(RwLock::new(user_registrations))
.unwrap();

// start scheduler (non blocking)
sched.start().await.unwrap();
Expand All @@ -181,32 +174,15 @@ async fn run_task_scheduler(
while let Ok(job_handler_task) = jobhandler_task_rx.recv().await {
match job_handler_task.job_type {
JobType::Register => {
handle_add_registration_task(&bot, job_handler_task, &sched, &mut loaded_user_data)
.await;
handle_add_registration_task(&bot, job_handler_task, &sched).await;
}

JobType::UpdateRegistration => {
handle_update_registration_task(
&bot,
job_handler_task,
&sched,
&mut loaded_user_data,
)
.await;
handle_update_registration_task(&bot, job_handler_task, &sched).await;
}

JobType::DeleteRegistration => {
handle_delete_registration_task(job_handler_task, &sched, &mut loaded_user_data)
.await;
}

JobType::QueryRegistration => {
handle_query_registration_task(
job_handler_task,
&mut loaded_user_data,
user_registration_data_tx.clone(),
)
.await;
handle_delete_registration_task(job_handler_task, &sched).await;
}

JobType::BroadcastUpdate => {
Expand Down
51 changes: 13 additions & 38 deletions src/bin/stuwe-telegram-bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ use stuwe_telegram_rs::bot_command_handlers::{
start_time_dialogue, subscribe, unsubscribe,
};
use stuwe_telegram_rs::constants::{
API_URL, BACKEND, CD_DATA, DB_FILENAME, OLLAMA_HOST, OLLAMA_MODEL, STUWE_DB,
API_URL, BACKEND, CD_DATA, DB_FILENAME, OLLAMA_HOST, OLLAMA_MODEL, STUWE_DB, USER_REGISTRATIONS,
};
use stuwe_telegram_rs::data_backend::stuwe_parser::get_mensen;
use stuwe_telegram_rs::data_types::{
Backend, Command, DialogueState, JobHandlerTask, JobHandlerTaskType, JobType,
QueryRegistrationType, RegistrationEntry,
};
use stuwe_telegram_rs::db_operations::check_or_create_db_tables;
use stuwe_telegram_rs::shared_main::callback_handler;
use stuwe_telegram_rs::task_scheduler_funcs::{
handle_add_registration_task, handle_broadcast_update_task, handle_delete_registration_task,
handle_query_registration_task, handle_update_registration_task, load_jobs_from_db,
start_mensaupd_hook_and_campusdual_job,
handle_update_registration_task, load_jobs_from_db, start_mensaupd_hook_and_campusdual_job,
};

use clap::Parser;
use std::env;
use std::sync::RwLock;
use teloxide::{
dispatching::{
dialogue::{self, InMemStorage},
Expand Down Expand Up @@ -107,33 +106,24 @@ async fn main() {
let bot = Bot::new(args.token);

let (jobhandler_task_tx, jobhandler_task_rx): JobHandlerTaskType = broadcast::channel(10);
let (user_registration_data_tx, _): QueryRegistrationType = broadcast::channel(10);

// every user has a mensa_id, but only users with auto send have a job_uuid inside RegistrEntry
{
let bot = bot.clone();
// there is effectively only one tx and rx, however since rx cant be passed as dptree dep (?!),
// tx has to be cloned and passed to both (inside command_handler it will be resubscribed to rx)
let jobhandler_task_tx = jobhandler_task_tx.clone();
let user_registration_data_tx = user_registration_data_tx.clone();
tokio::spawn(async move {
log::info!("Starting task scheduler...");
run_task_scheduler(
bot,
jobhandler_task_tx,
jobhandler_task_rx,
user_registration_data_tx,
)
.await;
run_task_scheduler(bot, jobhandler_task_tx, jobhandler_task_rx).await;
});
}

// passing a receiver doesnt work for some reason, so sending user_registration_data_tx and resubscribing to get rx
let command_handler_deps = dptree::deps![
InMemStorage::<DialogueState>::new(),
mensen,
jobhandler_task_tx,
user_registration_data_tx
jobhandler_task_tx
];
Dispatcher::builder(bot, schema())
.dependencies(command_handler_deps)
Expand Down Expand Up @@ -173,13 +163,15 @@ async fn run_task_scheduler(
bot: Bot,
jobhandler_task_tx: broadcast::Sender<JobHandlerTask>,
mut jobhandler_task_rx: broadcast::Receiver<JobHandlerTask>,
user_registration_data_tx: broadcast::Sender<Option<RegistrationEntry>>,
) {
let sched = JobScheduler::new().await.unwrap();

start_mensaupd_hook_and_campusdual_job(bot.clone(), &sched, jobhandler_task_tx.clone()).await;

let mut loaded_user_data = load_jobs_from_db(&bot, &sched).await;
let user_registrations = load_jobs_from_db(&bot, &sched).await;
USER_REGISTRATIONS
.set(RwLock::new(user_registrations))
.unwrap();

// start scheduler (non blocking)
sched.start().await.unwrap();
Expand All @@ -190,36 +182,19 @@ async fn run_task_scheduler(
while let Ok(job_handler_task) = jobhandler_task_rx.recv().await {
match job_handler_task.job_type {
JobType::Register => {
handle_add_registration_task(&bot, job_handler_task, &sched, &mut loaded_user_data)
.await;
handle_add_registration_task(&bot, job_handler_task, &sched).await;
}

JobType::UpdateRegistration => {
handle_update_registration_task(
&bot,
job_handler_task,
&sched,
&mut loaded_user_data,
)
.await;
handle_update_registration_task(&bot, job_handler_task, &sched).await;
}

JobType::DeleteRegistration => {
handle_delete_registration_task(job_handler_task, &sched, &mut loaded_user_data)
.await;
}

JobType::QueryRegistration => {
handle_query_registration_task(
job_handler_task,
&mut loaded_user_data,
user_registration_data_tx.clone(),
)
.await;
handle_delete_registration_task(job_handler_task, &sched).await;
}

JobType::BroadcastUpdate => {
handle_broadcast_update_task(&bot, job_handler_task, &mut loaded_user_data).await;
handle_broadcast_update_task(&bot, job_handler_task).await;
}
}
}
Expand Down
Loading

0 comments on commit a1ac6de

Please sign in to comment.