Skip to content

Commit

Permalink
Experimental work to reduce number of client threads down to 1 or 2 i…
Browse files Browse the repository at this point in the history
…n minimal config
  • Loading branch information
locka99 committed Aug 3, 2021
1 parent 3462e6a commit c44a35f
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 112 deletions.
156 changes: 81 additions & 75 deletions client/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ pub(crate) struct TcpTransport {
connection_state: ConnectionStateMgr,
/// Message queue for requests / responses
message_queue: Arc<RwLock<MessageQueue>>,
/// Use a single-threaded executor
single_threaded_executor: bool,
}

impl Drop for TcpTransport {
Expand All @@ -249,7 +247,6 @@ impl TcpTransport {
secure_channel: Arc<RwLock<SecureChannel>>,
session_state: Arc<RwLock<SessionState>>,
message_queue: Arc<RwLock<MessageQueue>>,
single_threaded_executor: bool,
) -> TcpTransport {
let connection_state = {
let session_state = trace_read_lock_unwrap!(session_state);
Expand All @@ -260,12 +257,15 @@ impl TcpTransport {
secure_channel,
connection_state,
message_queue,
single_threaded_executor,
}
}

/// Connects the stream to the specified endpoint
pub fn connect(&mut self, endpoint_url: &str) -> Result<(), StatusCode> {
pub fn connect(
&mut self,
runtime: &tokio::runtime::Runtime,
endpoint_url: &str,
) -> Result<(), StatusCode> {
if self.is_connected() {
panic!("Should not try to connect when already connected");
}
Expand Down Expand Up @@ -298,81 +298,41 @@ impl TcpTransport {

// The connection will be serviced on its own thread. When the thread terminates, the connection
// has also terminated.
self.spawn_connection_task(runtime, addr, endpoint_url);

{
let single_threaded_executor = self.single_threaded_executor;
let result = Arc::new(RwLock::new(Ok(())));

let (connection_state, session_state, secure_channel, message_queue) = (
self.connection_state.clone(),
self.session_state.clone(),
self.secure_channel.clone(),
self.message_queue.clone(),
let result_task = result.clone();
runtime.block_on(async move {
// Poll for the state to indicate connect is ready
debug!("Waiting for a connect (or failure to connect)");
let mut timer = interval_at(
Instant::now(),
Duration::from_millis(Self::WAIT_POLLING_TIMEOUT),
);
let endpoint_url = endpoint_url.to_string();

thread::spawn(move || {
debug!("Client tokio tasks are starting for connection");

let id = format!("client-connection-thread-{:?}", thread::current().id());

let mut builder = if !single_threaded_executor {
tokio::runtime::Builder::new_multi_thread()
} else {
tokio::runtime::Builder::new_current_thread()
};

builder.enable_all().build().unwrap().block_on(async {
register_runtime_component!(&id);

// This is the connection task
Self::connection_task(
addr,
connection_state.clone(),
endpoint_url,
session_state.clone(),
secure_channel,
message_queue,
)
.await;

// Tell the session that the connection is finished.
match connection_state.state() {
ConnectionState::Finished(status_code) => {
let mut session_state = trace_write_lock_unwrap!(session_state);
session_state.on_session_closed(status_code);
}
connection_state => {
error!(
"Connect task is not in a finished state, state = {:?}",
connection_state
);
}
loop {
match self.connection_state.state() {
ConnectionState::Processing => {
debug!("Connected");
break;
}
ConnectionState::Finished(status_code) => {
error!("Connected failed with status {}", status_code);
let mut result = trace_write_lock_unwrap!(result_task);
*result = Err(StatusCode::BadConnectionClosed);
break;
}
_ => {
// Still waiting for something to happen
}
deregister_runtime_component!(&id);
});

debug!("Client tokio tasks have stopped for connection");
});
}

// Poll for the state to indicate connect is ready
debug!("Waiting for a connect (or failure to connect)");
loop {
match self.connection_state.state() {
ConnectionState::Processing => {
debug!("Connected");
return Ok(());
}
ConnectionState::Finished(status_code) => {
error!("Connected failed with status {}", status_code);
return Err(StatusCode::BadConnectionClosed);
}
_ => {
// Still waiting for something to happen
}
timer.tick().await;
}
thread::sleep(Duration::from_millis(Self::WAIT_POLLING_TIMEOUT))
}
});

// Getting result is a bit of a pain
let result = trace_read_lock_unwrap!(result);
result.clone()
}

/// Disconnects the stream from the server (if it is connected)
Expand All @@ -392,15 +352,46 @@ impl TcpTransport {
self.connection_state.is_connected()
}

fn spawn_connection_task(
&self,
runtime: &tokio::runtime::Runtime,
addr: SocketAddr,
endpoint_url: &str,
) {
let (connection_state, session_state, secure_channel, message_queue) = (
self.connection_state.clone(),
self.session_state.clone(),
self.secure_channel.clone(),
self.message_queue.clone(),
);
let endpoint_url = endpoint_url.to_string();

let id = format!("client-connection-thread-{:?}", thread::current().id());
let connection_task = Self::connection_task(
id,
addr,
connection_state,
endpoint_url,
session_state,
secure_channel,
message_queue,
);

runtime.spawn(connection_task);
}

/// This is the main connection task for a connection.
async fn connection_task(
id: String,
addr: SocketAddr,
connection_state: ConnectionStateMgr,
endpoint_url: String,
session_state: Arc<RwLock<SessionState>>,
secure_channel: Arc<RwLock<SecureChannel>>,
message_queue: Arc<RwLock<MessageQueue>>,
) {
register_runtime_component!(&id);

debug!(
"Creating a connection task to connect to {} with url {}",
addr, endpoint_url
Expand Down Expand Up @@ -447,7 +438,7 @@ impl TcpTransport {
reader,
writer,
connection_state.clone(),
session_state,
session_state.clone(),
secure_channel,
message_queue,
);
Expand All @@ -469,6 +460,21 @@ impl TcpTransport {
}
}
}

// Tell the session that the connection is finished.
match connection_state.state() {
ConnectionState::Finished(status_code) => {
let mut session_state = trace_write_lock_unwrap!(session_state);
session_state.on_session_closed(status_code);
}
connection_state => {
error!(
"Connect task is not in a finished state, state = {:?}",
connection_state
);
}
}
deregister_runtime_component!(&id);
}

async fn write_bytes_task(
Expand Down
Loading

0 comments on commit c44a35f

Please sign in to comment.