Skip to content

Commit

Permalink
Simplify some code where Send is not needed anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
jonnius committed Aug 18, 2023
1 parent 2774548 commit 4dcc1da
Showing 1 changed file with 77 additions and 139 deletions.
216 changes: 77 additions & 139 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ use presage_store_sled::SledStore;
use tokio::sync::mpsc::{self, UnboundedReceiver};

pub struct Handler {
pub provisioning_link_rx: Option<Arc<Mutex<Receiver<Url>>>>,
pub provisioning_link_rx: Option<Receiver<Url>>,
pub provisioning_link: Option<Url>,
pub error_rx: Option<Arc<Mutex<Receiver<presage::Error<SledStoreError>>>>>,
pub error_rx: Option<Receiver<presage::Error<SledStoreError>>>,
pub receive_error: mpsc::Receiver<ApplicationError>,
pub sender: Option<Arc<Mutex<SplitSink<WebSocket, Message>>>>,
pub receiver: Option<Arc<Mutex<SplitStream<WebSocket>>>>,
Expand All @@ -60,7 +60,7 @@ pub struct Handler {
pub is_registered: Option<bool>,
pub captcha: Option<String>,
pub phone_number: Option<PhoneNumber>,
pub registration_manager: Option<Arc<Mutex<Manager<SledStore, Confirmation>>>>,
pub registration_manager: Option<Manager<SledStore, Confirmation>>,
}

impl Handler {
Expand Down Expand Up @@ -104,9 +104,9 @@ impl Handler {
}

Ok(Self {
provisioning_link_rx: Some(Arc::new(Mutex::new(provisioning_link_rx))),
provisioning_link_rx: Some(provisioning_link_rx),
provisioning_link: None,
error_rx: Some(Arc::new(Mutex::new(error_rx))),
error_rx: Some(error_rx),
receive_error,
sender: None,
receiver: None,
Expand Down Expand Up @@ -362,7 +362,6 @@ impl Handler {
self.send_provisioning_link().await;
log::debug!("Provisioning link sent successfully to client");
let error_reciever = self.error_rx.as_mut().unwrap();
let mut error_reciever = error_reciever.lock().await;
while let Ok(e) = error_reciever.try_recv() {
match e {
Some(u) => {
Expand Down Expand Up @@ -525,31 +524,29 @@ impl Handler {
}

let (provisioning_link_tx, provisioning_link_rx) = oneshot::channel();
self.provisioning_link_rx = Some(Arc::new(Mutex::new(provisioning_link_rx)));
self.provisioning_link_rx = Some(provisioning_link_rx);
let (error_tx, error_rx) = oneshot::channel();
self.error_rx = Some(Arc::new(Mutex::new(error_rx)));
self.error_rx = Some(error_rx);
let (send_content, receive_content) = mpsc::unbounded_channel();
self.receive_content = Arc::new(Mutex::new(Some(receive_content)));
let current_chat: Option<Thread> = None;
let current_chat_mutex = Arc::new(Mutex::new(current_chat));
let (send_error, receive_error) = mpsc::channel(MESSAGE_BOUND);
self.receive_error = receive_error;
log::debug!("Creating runtime");
thread::spawn(|| {
Runtime::new().unwrap().spawn(async move {
log::debug!("Spawning manager thread");
ManagerThread::new(
config_store.clone(),
"axolotl".to_string(),
provisioning_link_tx,
error_tx,
send_content,
current_chat_mutex,
send_error,
)
.await;
log::info!("Manager thread started, ready to receive messages from the client2");
});
tokio::task::spawn(async move {
log::debug!("Spawning manager thread");
ManagerThread::new(
config_store.clone(),
"axolotl".to_string(),
provisioning_link_tx,
error_tx,
send_content,
current_chat_mutex,
send_error,
)
.await;
log::info!("Manager thread started, ready to receive messages from the client2");
});
log::debug!("runtime created");

Expand Down Expand Up @@ -594,85 +591,57 @@ impl Handler {
}
let p = self.phone_number.clone().unwrap();
let c = self.captcha.clone().unwrap();
// create new error receiver
let (error_tx, error_rx) = oneshot::channel();

std::thread::spawn(move || {
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tokio::runtime::Runtime::new()
.expect("Failed to setup runtime")
.block_on(async move {
let config_store = match Handler::get_config_store().await {
Ok(c) => c,
Err(e) => {
log::error!("Error getting config store: {}", e);
return Err(ApplicationError::RegistrationError(
"Error getting config store".to_string(),
));
}
};
log::debug!("Creating manager for registration");
let manager = match Manager::register(
config_store,
RegistrationOptions {
signal_servers: presage::prelude::SignalServers::Production,
phone_number: p,
use_voice_call: false,
captcha: Some(c.as_str()),
force: true,
},
)
.await
{
Ok(m) => m,
Err(e) => {
log::error!("Error requesting pin: {}", e);
// todo send error to client
error_tx.send(Some(e)).unwrap();
return Err(ApplicationError::RegistrationError(
"Error requesting pin".to_string(),
));
}
};
log::debug!("check code_rx for code {:?}", code_rx);
let code = match code_rx.recv().await {
Some(c) => c,
None => {
log::error!("No code provided");
error_tx.send(None).unwrap();
return Err(ApplicationError::RegistrationError(
"No code provided".to_string(),
));
}
};
match manager.confirm_verification_code(code).await {
Ok(_) => (),
Err(e) => {
log::error!("Error confirming pin: {}", e);
error_tx.send(Some(e)).unwrap();
return Err(ApplicationError::RegistrationError(
"Error confirming pin".to_string(),
));
}
}
drop(error_tx);
Ok(())
})
.unwrap();
}));
if panic.is_err() {
log::error!("Error/panic getting verification code: {:?}", panic);
let config_store = match Handler::get_config_store().await {
Ok(c) => c,
Err(e) => {
log::error!("Error getting config store: {}", e);
return Err(ApplicationError::RegistrationError(
"Error getting config store".to_string(),
));
}
};
log::debug!("Creating manager for registration");
let manager = match Manager::register(
config_store,
RegistrationOptions {
signal_servers: presage::prelude::SignalServers::Production,
phone_number: p,
use_voice_call: false,
captcha: Some(c.as_str()),
force: true,
},
)
.await
{
Ok(m) => m,
Err(e) => {
log::error!("Error requesting pin: {}", e);
return Err(ApplicationError::RegistrationError(
"Error requesting pin".to_string(),
));
}
};
log::debug!("check code_rx for code {:?}", code_rx);
let code = match code_rx.recv().await {
Some(c) => c,
None => {
log::error!("No code provided");
return Err(ApplicationError::RegistrationError(
"No code provided".to_string(),
));
}
};
match manager.confirm_verification_code(code).await {
Ok(_) => (),
Err(e) => {
log::error!("Error confirming pin: {}", e);
return Err(ApplicationError::RegistrationError(
"Error confirming pin".to_string(),
));
}
});
log::debug!("Awaiting for error");
let error = error_rx.await.unwrap();
if let Some(error) = error {
log::debug!("Got error: {:?}", error);
self.send_error(ApplicationError::Presage(error)).await;
return Err(ApplicationError::RegistrationError(
"Error getting verification code".to_string(),
));
}

log::debug!("Getting verification code done");

Ok(())
Expand All @@ -690,24 +659,6 @@ impl Handler {
std::mem::drop(ws_sender);
}

async fn send_error(&self, error: ApplicationError) {
let mut error_string = error.to_string();
error_string.pop();
let message = format!(
"{{\"response_type\":\"registration_error\",\"data\":\"{}\"}}",
error_string
);
log::debug!("Sending error to client: {}", message);
let mut ws_sender = self.sender.as_ref().unwrap().lock().await;
match ws_sender.send(Message::text(message)).await {
Ok(_) => (),
Err(e) => {
log::error!("Error sending error to client: {}", e);
}
}
std::mem::drop(ws_sender);
}

async fn get_phone_pin(&self) {
let message = "{\"response_type\":\"pin\",\"data\":\"\"}".to_string();
let mut ws_sender = self.sender.as_ref().unwrap().lock().await;
Expand Down Expand Up @@ -776,29 +727,16 @@ impl Handler {

async fn handle_provisoning(&mut self) {
log::info!("Awaiting for provisioning link");
if self.provisioning_link_rx.is_none() {
log::error!("Provisioning link receiver not initialized");
return;
}
let mut p = self.provisioning_link_rx.as_ref().unwrap().lock().await;

while let Ok(mut url) = p.try_recv() {
url = match url {
Some(u) => Some(u),
None => {
thread::sleep(time::Duration::from_secs(1));
match p.try_recv() {
Ok(u) => u,
Err(_) => {
log::error!("Error getting provisioning link");
return;
}
}
}
};
if let Some(rx) = self.provisioning_link_rx.take() {
self.provisioning_link = rx.await.ok();

log::debug!("Got provisioning link: {:?}", url);
self.provisioning_link = url;
if let Some(url) = &self.provisioning_link {
log::debug!("Got provisioning link: {:?}", url);
} else {
log::error!("Error getting provisioning link");
}
} else {
log::error!("Provisioning link receiver not initialized");
}
}

Expand Down

0 comments on commit 4dcc1da

Please sign in to comment.