-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-565 Properly report connection closures due to error #258
RUST-565 Properly report connection closures due to error #258
Conversation
@@ -201,6 +201,7 @@ impl Handshaker { | |||
let response = conn.send_command(command, None).await?; | |||
let end_time = PreciseTime::now(); | |||
|
|||
response.validate()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We weren't checking the initial handshake response to see if it returned an error, mistakenly returning Ok(connection)
here even if the handshake failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@@ -348,6 +353,14 @@ impl ConnectionPoolInner { | |||
// Establishing a pending connection failed, so that must be reflected in to total | |||
// connection count. | |||
self.total_connection_count.fetch_sub(1, Ordering::SeqCst); | |||
self.emit_event(|handler| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to CMAP, we are supposed to be emitting ConnectionClosed
events here. This makes sense because we emitted a ConnectionCreatedEvent
when this pending connection was created.
self.events.write().unwrap().push(event); | ||
} | ||
|
||
pub fn subscribe(&self) -> EventSubscriber { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a mechanism by which threads could subscribe to the event handler and wait for a particular event. This seemed like a more async-y approach than our current sleep for a little + check again. I updated the cmap spec tests to use it too, hopefully reducing those test failures we get every now and then due to not waiting long enough. I filed RUST-572 to cover the work for introducing this to the test suite as a whole.
@@ -137,3 +138,73 @@ async fn concurrent_connections() { | |||
.await | |||
.expect("disabling fail point should succeed"); | |||
} | |||
|
|||
#[cfg_attr(feature = "tokio-runtime", tokio::test(threaded_scheduler))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to reduce the boilerplate needed to enable failpoints, I introduced a FailPoint
type (see failpoint.rs
). Enabling this type returns a FailPointGuard
that automatically disables the failpoint server side in its drop
. This is especially useful because the drop
gets called even when the thread panics, as is common in failing tests. The problem with this is that we can't use the "dispatch a task from drop" methodology we use elsewhere in the driver, since after a panic the runtime itself is usually dropped and the tasks won't get executed. To get around this, I used RUNTIME.block_on
, but this deadlocks without using the threaded scheduler in tokio. Using the basic scheduler for the tests is nice because it ensures we're not blocking in the driver anywhere, though, so we don't want to completely eliminate it. I figured a decent compromise would be to only use the threaded scheduler in tests that require fail points and leave the rest of the tests on basic to keep verifying we're not blocking in the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's possible to take this a step further, e.g. automatically acquire the lock exclusively when we create a failpoint. This would presumably require us not to acquire the lock manually in the tests to avoid deadlocks, but maybe we could get around that by always acquring a read lock and then passing that into the constructor for Failpoint (which then drops it and acquires a write lock instead). There wouldn't be any way to get the read lock back in the test function after the Failpoint is dropped, but I'm guessing there isn't really anywherte we'd need that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a lot of cases we need to check the server version before we enable failpoints, so we definitely need to acquire at least a read lock beforehand. Also, we may need to repeatedly enable failpoints over the duration of a test (e.g. in a spec test), so I don't think we can use the one that upgrades the lock either. I think we'll just have to rely on our diligence until we come up with a macro for automating the lock acquisition or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough
@@ -19,7 +19,7 @@ fn empty_address() -> StreamAddress { | |||
} | |||
|
|||
/// Event emitted when a connection pool is created. | |||
#[derive(Debug, Deserialize, PartialEq)] | |||
#[derive(Clone, Debug, Deserialize, PartialEq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an API change driven by testing, but I figured it would be useful for both us and users so I went ahead with it.
use crate::{error::Result, operation::append_options, RUNTIME}; | ||
|
||
#[derive(Debug)] | ||
pub struct FailPoint { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm currently only using this type in the new test I wrote for this work, but we should try to slowly convert all usages of failpoints in the driver to use it, including ones we deserialize in spec tests (we do this in Swift).
}); | ||
|
||
if let Err(e) = result { | ||
println!("failed disabling failpoint: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't really do much besides log the error here
drop(conn); | ||
|
||
// wait for event to be emitted to ensure check in has completed. | ||
subscriber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new subscriber functionality allows us to test the actual path we use to close connections rather than a test-only shortcut. Ditto for pools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome
@@ -19,6 +19,7 @@ struct Arguments { | |||
|
|||
#[function_name::named] | |||
async fn run_aggregate_test(test_file: TestFile) { | |||
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we're setting failpoints in tests, acquiring the lock needs to be done before any i/o is performed whatsoever (e.g. in TestClient::new
or in the background threads of a client). To that end, I updated many tests who were in violation of this, since they were interfering with the failpoints set in the tests required for this work.
@@ -201,6 +201,7 @@ impl Handshaker { | |||
let response = conn.send_command(command, None).await?; | |||
let end_time = PreciseTime::now(); | |||
|
|||
response.validate()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@@ -137,3 +138,73 @@ async fn concurrent_connections() { | |||
.await | |||
.expect("disabling fail point should succeed"); | |||
} | |||
|
|||
#[cfg_attr(feature = "tokio-runtime", tokio::test(threaded_scheduler))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's possible to take this a step further, e.g. automatically acquire the lock exclusively when we create a failpoint. This would presumably require us not to acquire the lock manually in the tests to avoid deadlocks, but maybe we could get around that by always acquring a read lock and then passing that into the constructor for Failpoint (which then drops it and acquires a write lock instead). There wouldn't be any way to get the read lock back in the test function after the Failpoint is dropped, but I'm guessing there isn't really anywherte we'd need that.
drop(conn); | ||
|
||
// wait for event to be emitted to ensure check in has completed. | ||
subscriber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome
931887a
to
d469eca
Compare
RUST-565
This PR fixes a bug where connection closure events emitted due to errors never contained the
ConnectionClosedReason::Error
as a reason. This also fixes a bug whereok: 0
responses from the initial handshake were ignored.For convenience in testing, this PR also adds a
FailPoint
type that faciliates the enabling and cleanup of fail points.In order to test this, RUST-304 and RUST-568 are also fixed in this PR.