From 375bd04547bd9c9d581ea353955d3d8bb8dec472 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 27 Mar 2024 16:51:00 +0100 Subject: [PATCH] fix malformed sql error handling,added more tests --- src/network/api.rs | 23 ++++- src/tests/mod.rs | 4 +- src/tests/notifications.rs | 174 +++++++++++++++++++++++++++++++++++-- 3 files changed, 192 insertions(+), 9 deletions(-) diff --git a/src/network/api.rs b/src/network/api.rs index 85525b7..d9a2220 100644 --- a/src/network/api.rs +++ b/src/network/api.rs @@ -7,7 +7,7 @@ use crate::Node; use crate::TypeConfig; use openraft::LeaderId; use openraft::LogId; -use rxqlite_common::Message; +use rxqlite_common::{Message,MessageResponse}; pub async fn sql_consistent_or_fast( message: Message, @@ -17,7 +17,26 @@ pub async fn sql_consistent_or_fast( let sql = message.sql(); let is_write = rxqlite_sqlx_common::is_query_write(sql); - + if let Err(err) = &is_write { + let response_message = MessageResponse::Error(format!("{}",err)); + let client_write_response = openraft::raft::ClientWriteResponse:: { + log_id: LogId { + leader_id: LeaderId { + term: u64::MAX, + node_id: u64::MAX, + }, + index: u64::MAX, + }, + data: Some(response_message), + membership: None, + }; + let res = Result::< + openraft::raft::ClientWriteResponse, + openraft::error::RaftError>, + >::Ok(client_write_response); + return Ok(reply::json(&res)); + } + let is_write=is_write.unwrap(); if is_write { let res: Result, _> = app.raft.client_write(message).await; diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 04ec3a8..c4953e7 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -117,10 +117,12 @@ impl TestManager { loop { if let Ok(metrics) = self.get_metrics(node_id).await { + if metrics.current_leader.is_some() { let voter_ids = metrics.membership_config.voter_ids(); if voter_ids.count() == self.node_count() { - return Ok(()); + return Ok(()); } + } } reattempts -= 1; if reattempts == 0 { diff --git a/src/tests/notifications.rs b/src/tests/notifications.rs index b3b756e..c7e0545 100644 --- a/src/tests/notifications.rs +++ b/src/tests/notifications.rs @@ -5,13 +5,14 @@ use rxqlite_common::{Message, MessageResponse}; use sqlx::types::chrono::{/*DateTime,*/ Utc}; use sqlx_sqlite_cipher::notifications::{Action, Notification}; -fn do_notifications(test_name: &str, tls_config: Option) { +fn do_notifications(test_name: &str, + tls_config: Option) { let rt = Runtime::new().unwrap(); let _ = rt.block_on(async { //const QUERY: &str ="SELECT name,birth_date from _test_user_ where name = ?"; let mut tm = TestManager::new(test_name, 3, tls_config); - //tm.keep_temp_directories=true; + //tm.keep_temp_directories=keep_temp_directories; tm.wait_for_cluster_established(1, 60).await.unwrap(); let notifications_addr = tm.instances.get(&1).unwrap().notifications_addr.clone(); let client = tm.clients.get_mut(&1).unwrap(); @@ -60,16 +61,161 @@ fn do_notifications(test_name: &str, tls_config: Option) { .await .unwrap(); assert!(message.is_some()); - match message.unwrap() { + let insert_row_id = match message.unwrap() { NotificationEvent::Notification(Notification::Update { action, - database: _, + database, table, - row_id: _, + row_id, }) => { assert_eq!(action, Action::SQLITE_INSERT); + assert_eq!(&database, "main"); assert_eq!(&table, "_test_user_"); + row_id } + }; + let message = Message::Execute( + "DELETE FROM _test_user_ WHERE name = ?".into(), + vec![name.into()], + ); + let response = client.sql(&message).await.unwrap(); + let message = response.data.unwrap(); + match message { + MessageResponse::Rows(rows) => assert!(rows.len() == 0), + MessageResponse::Error(err) => panic!("{}", err), + } + let notification_stream = client.notification_stream.as_mut().unwrap(); + let message = notification_stream + .read_timeout(tokio::time::Duration::from_secs(60)) + .await + .unwrap(); + assert!(message.is_some()); + match message.unwrap() { + NotificationEvent::Notification(Notification::Update { + action, + database, + table, + row_id, + }) => { + assert_eq!(action, Action::SQLITE_DELETE); + assert_eq!(&database, "main"); + assert_eq!(&table, "_test_user_"); + assert_eq!(insert_row_id,row_id); + } + } + + }); +} + + +fn do_notifications2(test_name: &str, + tls_config: Option) { + let rt = Runtime::new().unwrap(); + + let _ = rt.block_on(async { + //const QUERY: &str ="SELECT name,birth_date from _test_user_ where name = ?"; + let mut tm = TestManager::new(test_name, 3, tls_config); + //tm.keep_temp_directories=keep_temp_directories; + const MAX_ITER:usize = 5; + for i in 0..MAX_ITER{ + tm.wait_for_cluster_established(1, 60).await.unwrap(); + let notifications_addr = tm.instances.get(&1).unwrap().notifications_addr.clone(); + let client = tm.clients.get_mut(&1).unwrap(); + + client + .start_listening_for_notifications(¬ifications_addr) + .await + .unwrap(); + + let message = Message::Execute( + "CREATE TABLE IF NOT EXISTS _test_user_ ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + birth_date DATETIME NOT NULL + )" + .into(), + vec![], + ); + let response = client.sql(&message).await.unwrap(); + + let message = response.data.unwrap(); + match message { + MessageResponse::Rows(rows) => assert!(rows.len() == 0), + MessageResponse::Error(err) => panic!("{}", err), + } + let name = "Ha"; + let birth_date = Utc::now(); + let message = Message::Execute( + "INSERT INTO _test_user_ (name,birth_date) VALUES (?,?)".into(), + vec![name.into(), birth_date.into()], + ); + let response = client.sql(&message).await.unwrap(); + let message = response.data.unwrap(); + match message { + MessageResponse::Rows(rows) => assert!(rows.len() == 0), + MessageResponse::Error(err) => panic!("{}", err), + } + + //tm.wait_for_last_applied_log(response.log_id,60).await.unwrap(); + + // now we check for notification, will do with this: + + let notification_stream = client.notification_stream.as_mut().unwrap(); + let message = notification_stream + .read_timeout(tokio::time::Duration::from_secs(60)) + .await + .unwrap(); + assert!(message.is_some()); + let insert_row_id = match message.unwrap() { + NotificationEvent::Notification(Notification::Update { + action, + database, + table, + row_id, + }) => { + assert_eq!(action, Action::SQLITE_INSERT); + assert_eq!(&database, "main"); + assert_eq!(&table, "_test_user_"); + row_id + } + }; + let message = Message::Execute( + "DELETE FROM _test_user_ WHERE name = ?".into(), + vec![name.into()], + ); + let response = client.sql(&message).await.unwrap(); + let message = response.data.unwrap(); + match message { + MessageResponse::Rows(rows) => assert!(rows.len() == 0), + MessageResponse::Error(err) => panic!("{}", err), + } + let notification_stream = client.notification_stream.as_mut().unwrap(); + let message = notification_stream + .read_timeout(tokio::time::Duration::from_secs(60)) + .await + .unwrap(); + assert!(message.is_some()); + match message.unwrap() { + NotificationEvent::Notification(Notification::Update { + action, + database, + table, + row_id, + }) => { + assert_eq!(action, Action::SQLITE_DELETE); + assert_eq!(&database, "main"); + assert_eq!(&table, "_test_user_"); + assert_eq!(insert_row_id,row_id); + } + } + client + .stop_listening_for_notifications() + .await + .unwrap(); + if i < MAX_ITER - 1 { + tm.kill_all().unwrap(); + tm.start().unwrap(); + } } }); } @@ -82,7 +228,23 @@ fn notifications() { #[test] fn notifications_insecure_ssl() { do_notifications( - "notifications_insecure_ssl", + "notifications_insecure_ssl", + Some(TestTlsConfig::default().accept_invalid_certificates(true)), + ); +} + +#[test] +fn notifications2_no_ssl() { + do_notifications2("notifications2_no_ssl", None); + +} + +#[test] +fn notifications2_insecure_ssl() { + do_notifications2( + "notifications2_insecure_ssl", Some(TestTlsConfig::default().accept_invalid_certificates(true)), ); + + }