From bee66456a8ffd272a731548722912550ecc066ed Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 21 Mar 2024 14:38:16 +0100 Subject: [PATCH] fix conn fails to upgrade lock --- .../src/connection/connection_manager.rs | 40 ++++---- libsql-server/src/connection/libsql.rs | 94 ++++++++++++++++++- 2 files changed, 117 insertions(+), 17 deletions(-) diff --git a/libsql-server/src/connection/connection_manager.rs b/libsql-server/src/connection/connection_manager.rs index 598f67d25e..f4710ebf92 100644 --- a/libsql-server/src/connection/connection_manager.rs +++ b/libsql-server/src/connection/connection_manager.rs @@ -1,5 +1,6 @@ use std::ops::Deref; use std::sync::Arc; +use std::thread::current; use std::time::{Duration, Instant}; use crossbeam::deque::Steal; @@ -147,23 +148,30 @@ impl ManagedConnectionWalWrapper { break; } else { // not us, maybe we need to steal the lock? - drop(current); if started_at.elapsed() >= self.manager.inner.txn_timeout_duration { - let handle = { - self.manager - .inner - .abort_handle - .lock() - .get(&id) - .unwrap() - .clone() - }; - // the guard must be dropped before rolling back, or end write txn will - // deadlock - tracing::debug!("forcing rollback of {id}"); - handle.abort(); - parker.park(); - tracing::debug!(line = line!(), "unparked"); + if acquired { + drop(current); + let handle = { + self.manager + .inner + .abort_handle + .lock() + .get(&id) + .unwrap() + .clone() + }; + // the guard must be dropped before rolling back, or end write txn will + // deadlock + tracing::debug!("forcing rollback of {id}"); + handle.abort(); + parker.park(); + tracing::debug!(line = line!(), "unparked"); + } else { + tracing::debug!( + "connection {id} failed to acquire lock after timeout" + ); + *current = None; + } } else { // otherwise we wait for the txn to timeout, or to be unparked by it let before = Instant::now(); diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index 38892957dc..10a37dd7cb 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -984,7 +984,6 @@ mod test { } #[tokio::test] - /// verify that releasing a txn before the timeout async fn force_rollback_reset() { let tmp = tempdir().unwrap(); let make_conn = MakeLibSqlConn::new( @@ -1069,4 +1068,97 @@ mod test { .await .unwrap(); } + + #[tokio::test] + async fn connection_fails_to_upgrade_lock() { + let tmp = tempdir().unwrap(); + let make_conn = MakeLibSqlConn::new( + tmp.path().into(), + PassthroughWalWrapper, + Default::default(), + MetaStoreHandle::load(tmp.path()).unwrap(), + Arc::new([]), + 100000000, + 100000000, + DEFAULT_AUTO_CHECKPOINT, + watch::channel(None).1, + None, + Default::default(), + Arc::new(|_| unreachable!()), + ) + .await + .unwrap(); + + let conn1 = make_conn.make_connection().await.unwrap(); + tokio::task::spawn_blocking({ + let conn = conn1.clone(); + move || { + let builder = Connection::run( + conn.inner.clone(), + Program::seq(&["CREATE TABLE TEST (x)", "BEGIN", "SELECT * FROM test"]), + TestBuilder::default(), + ) + .unwrap(); + assert!(!conn.inner.lock().is_autocommit()); + assert!(builder.into_ret()[0].is_ok()); + } + }) + .await + .unwrap(); + + let conn2 = make_conn.make_connection().await.unwrap(); + tokio::task::spawn_blocking({ + let conn = conn2.clone(); + move || { + let before = Instant::now(); + let builder = Connection::run( + conn.inner.clone(), + Program::seq(&["INSERT INTO test VALUES (12)"]), + TestBuilder::default(), + ) + .unwrap(); + assert!(!conn.inner.lock().is_autocommit()); + assert!(builder.into_ret()[0].is_ok()); + before.elapsed() + } + }) + .await + .unwrap(); + + tokio::task::spawn_blocking({ + let conn = conn1.clone(); + move || { + let builder = Connection::run( + conn.inner.clone(), + Program::seq(&["INSERT INTO test VALUES (12)"]), + TestBuilder::default(), + ) + .unwrap(); + assert!(conn.inner.lock().is_autocommit()); + // fails to acquire lock + assert!(builder.into_ret()[0].is_err()); + } + }) + .await + .unwrap(); + + tokio::time::sleep(TXN_TIMEOUT * 2).await; + + tokio::task::spawn_blocking({ + let conn = conn2.clone(); + move || { + let builder = Connection::run( + conn.inner.clone(), + Program::seq(&["INSERT INTO test VALUES (12)"]), + TestBuilder::default(), + ) + .unwrap(); + assert!(conn.inner.lock().is_autocommit()); + // not blocking + assert!(builder.into_ret()[0].is_ok()); + } + }) + .await + .unwrap(); + } }