Skip to content

Commit

Permalink
fix malformed sql error handling,added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Mar 27, 2024
1 parent c426a95 commit 375bd04
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 9 deletions.
23 changes: 21 additions & 2 deletions src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::Node;
use crate::TypeConfig;
use openraft::LeaderId;
use openraft::LogId;
use rxqlite_common::Message;
use rxqlite_common::{Message,MessageResponse};

pub async fn sql_consistent_or_fast(
message: Message,
Expand All @@ -17,7 +17,26 @@ pub async fn sql_consistent_or_fast(
let sql = message.sql();

let is_write = rxqlite_sqlx_common::is_query_write(sql);

if let Err(err) = &is_write {
let response_message = MessageResponse::Error(format!("{}",err));
let client_write_response = openraft::raft::ClientWriteResponse::<TypeConfig> {
log_id: LogId {
leader_id: LeaderId {
term: u64::MAX,
node_id: u64::MAX,
},
index: u64::MAX,
},
data: Some(response_message),
membership: None,
};
let res = Result::<
openraft::raft::ClientWriteResponse<TypeConfig>,
openraft::error::RaftError<u64, openraft::error::ClientWriteError<u64, Node>>,
>::Ok(client_write_response);
return Ok(reply::json(&res));
}
let is_write=is_write.unwrap();
if is_write {
let res: Result<openraft::raft::ClientWriteResponse<TypeConfig>, _> =
app.raft.client_write(message).await;
Expand Down
4 changes: 3 additions & 1 deletion src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ impl TestManager {

loop {
if let Ok(metrics) = self.get_metrics(node_id).await {
if metrics.current_leader.is_some() {
let voter_ids = metrics.membership_config.voter_ids();
if voter_ids.count() == self.node_count() {
return Ok(());
return Ok(());
}
}
}
reattempts -= 1;
if reattempts == 0 {
Expand Down
174 changes: 168 additions & 6 deletions src/tests/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use rxqlite_common::{Message, MessageResponse};
use sqlx::types::chrono::{/*DateTime,*/ Utc};
use sqlx_sqlite_cipher::notifications::{Action, Notification};

fn do_notifications(test_name: &str, tls_config: Option<TestTlsConfig>) {
fn do_notifications(test_name: &str,
tls_config: Option<TestTlsConfig>) {
let rt = Runtime::new().unwrap();

let _ = rt.block_on(async {
//const QUERY: &str ="SELECT name,birth_date from _test_user_ where name = ?";
let mut tm = TestManager::new(test_name, 3, tls_config);
//tm.keep_temp_directories=true;
//tm.keep_temp_directories=keep_temp_directories;
tm.wait_for_cluster_established(1, 60).await.unwrap();
let notifications_addr = tm.instances.get(&1).unwrap().notifications_addr.clone();
let client = tm.clients.get_mut(&1).unwrap();
Expand Down Expand Up @@ -60,16 +61,161 @@ fn do_notifications(test_name: &str, tls_config: Option<TestTlsConfig>) {
.await
.unwrap();
assert!(message.is_some());
match message.unwrap() {
let insert_row_id = match message.unwrap() {
NotificationEvent::Notification(Notification::Update {
action,
database: _,
database,
table,
row_id: _,
row_id,
}) => {
assert_eq!(action, Action::SQLITE_INSERT);
assert_eq!(&database, "main");
assert_eq!(&table, "_test_user_");
row_id
}
};
let message = Message::Execute(
"DELETE FROM _test_user_ WHERE name = ?".into(),
vec![name.into()],
);
let response = client.sql(&message).await.unwrap();
let message = response.data.unwrap();
match message {
MessageResponse::Rows(rows) => assert!(rows.len() == 0),
MessageResponse::Error(err) => panic!("{}", err),
}
let notification_stream = client.notification_stream.as_mut().unwrap();
let message = notification_stream
.read_timeout(tokio::time::Duration::from_secs(60))
.await
.unwrap();
assert!(message.is_some());
match message.unwrap() {
NotificationEvent::Notification(Notification::Update {
action,
database,
table,
row_id,
}) => {
assert_eq!(action, Action::SQLITE_DELETE);
assert_eq!(&database, "main");
assert_eq!(&table, "_test_user_");
assert_eq!(insert_row_id,row_id);
}
}

});
}


fn do_notifications2(test_name: &str,
tls_config: Option<TestTlsConfig>) {
let rt = Runtime::new().unwrap();

let _ = rt.block_on(async {
//const QUERY: &str ="SELECT name,birth_date from _test_user_ where name = ?";
let mut tm = TestManager::new(test_name, 3, tls_config);
//tm.keep_temp_directories=keep_temp_directories;
const MAX_ITER:usize = 5;
for i in 0..MAX_ITER{
tm.wait_for_cluster_established(1, 60).await.unwrap();
let notifications_addr = tm.instances.get(&1).unwrap().notifications_addr.clone();
let client = tm.clients.get_mut(&1).unwrap();

client
.start_listening_for_notifications(&notifications_addr)
.await
.unwrap();

let message = Message::Execute(
"CREATE TABLE IF NOT EXISTS _test_user_ (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
birth_date DATETIME NOT NULL
)"
.into(),
vec![],
);
let response = client.sql(&message).await.unwrap();

let message = response.data.unwrap();
match message {
MessageResponse::Rows(rows) => assert!(rows.len() == 0),
MessageResponse::Error(err) => panic!("{}", err),
}
let name = "Ha";
let birth_date = Utc::now();
let message = Message::Execute(
"INSERT INTO _test_user_ (name,birth_date) VALUES (?,?)".into(),
vec![name.into(), birth_date.into()],
);
let response = client.sql(&message).await.unwrap();
let message = response.data.unwrap();
match message {
MessageResponse::Rows(rows) => assert!(rows.len() == 0),
MessageResponse::Error(err) => panic!("{}", err),
}

//tm.wait_for_last_applied_log(response.log_id,60).await.unwrap();

// now we check for notification, will do with this:

let notification_stream = client.notification_stream.as_mut().unwrap();
let message = notification_stream
.read_timeout(tokio::time::Duration::from_secs(60))
.await
.unwrap();
assert!(message.is_some());
let insert_row_id = match message.unwrap() {
NotificationEvent::Notification(Notification::Update {
action,
database,
table,
row_id,
}) => {
assert_eq!(action, Action::SQLITE_INSERT);
assert_eq!(&database, "main");
assert_eq!(&table, "_test_user_");
row_id
}
};
let message = Message::Execute(
"DELETE FROM _test_user_ WHERE name = ?".into(),
vec![name.into()],
);
let response = client.sql(&message).await.unwrap();
let message = response.data.unwrap();
match message {
MessageResponse::Rows(rows) => assert!(rows.len() == 0),
MessageResponse::Error(err) => panic!("{}", err),
}
let notification_stream = client.notification_stream.as_mut().unwrap();
let message = notification_stream
.read_timeout(tokio::time::Duration::from_secs(60))
.await
.unwrap();
assert!(message.is_some());
match message.unwrap() {
NotificationEvent::Notification(Notification::Update {
action,
database,
table,
row_id,
}) => {
assert_eq!(action, Action::SQLITE_DELETE);
assert_eq!(&database, "main");
assert_eq!(&table, "_test_user_");
assert_eq!(insert_row_id,row_id);
}
}
client
.stop_listening_for_notifications()
.await
.unwrap();
if i < MAX_ITER - 1 {
tm.kill_all().unwrap();
tm.start().unwrap();
}
}
});
}
Expand All @@ -82,7 +228,23 @@ fn notifications() {
#[test]
fn notifications_insecure_ssl() {
do_notifications(
"notifications_insecure_ssl",
"notifications_insecure_ssl",
Some(TestTlsConfig::default().accept_invalid_certificates(true)),
);
}

#[test]
fn notifications2_no_ssl() {
do_notifications2("notifications2_no_ssl", None);

}

#[test]
fn notifications2_insecure_ssl() {
do_notifications2(
"notifications2_insecure_ssl",
Some(TestTlsConfig::default().accept_invalid_certificates(true)),
);


}

0 comments on commit 375bd04

Please sign in to comment.