-
Notifications
You must be signed in to change notification settings - Fork 506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Porting network layer to Rust #1098
Comments
The first implementation aims to port the HTTP client. The client is backed by a hyper client that is configured for HTTPS. The custom response returned by the client has the body, status and headers that have keys in #[pyclass]
pub struct HttpClient {
client: Client<HttpsConnector<hyper::client::HttpConnector>>,
header_keys: Vec<String>,
}
#[pyclass]
pub struct HttpResponse {
#[pyo3(get)]
pub status: u16,
#[pyo3(get)]
headers: HashMap<String, String>,
#[pyo3(get)]
body: Vec<u8>,
} The live trading setup has many async tasks running. Many of them will continue to be implemented in Python for the time being. So the rust and python async runtimes will have to co-exist. Here we use pyo3-asyncio library to help interface these two runtimes. At a high level it wraps a rust async task in python async task, and the result returned by the rust task is converted into a python object. Note: this means for each request two async tasks are created and executed. The underlying rust task and the wrapping python task. Currently this test passes.
|
Hi @twitu @cjdsellers |
@rsmb7z This only concerns the layer which is currently implemented in Cython, so the intent is that we continue to build adapters in Python on top of the above. |
With #1100 and 250e199, a hyper based HttpClient has been added and tested. The core Http client and response are same as previously described: #[pyclass]
pub struct HttpClient {
client: Client<HttpsConnector<hyper::client::HttpConnector>>,
header_keys: Vec<String>,
}
#[pyclass]
pub struct HttpResponse {
#[pyo3(get)]
pub status: u16,
#[pyo3(get)]
headers: HashMap<String, String>,
#[pyo3(get)]
body: Vec<u8>,
} They are also exposed as Python classes by pyo3. And the Python class exposes the request method, along with specializations for #[pymethods]
impl HttpClient {
#[new]
#[pyo3(signature=(header_keys=[].to_vec()))]
#[must_use]
pub fn new(header_keys: Vec<String>) -> Self
pub fn request<'py>(
slf: PyRef<'_, Self>,
method_str: String,
url: String,
headers: HashMap<String, String>,
body: Option<&'py PyBytes>,
py: Python<'py>,
) -> PyResult<&'py PyAny>
} Internally all of them have this logic, which calls the async function to send a request and returns a Rust future. The pyo3_asyncio::tokio::future_into_py(py, async move {
match client.send_request(method, url, headers, body_vec).await {
Ok(res) => Ok(res),
Err(e) => Err(PyErr::new::<PyException, _>(format!(
"Error handling repsonse: {e}"
))),
}
}) A key point to note here is that there is in total three event loops in the program, and two of them are managed by the The direct Python event loop is created by the main control flow logic. It is used for scheduling all other Python tasks i.e. anything that does not directly await a Rust task. Like tasks related to other clients, engines, cache etc. This diagram shows how an http send request is scheduled and then awaited by the main control flow in Python. |
The library creates and maintains a single event loop throughout the lifetime of the program. It does not create a new event loop for each request, so having three event loops (2 python and 1 rust) does not impose significant overhead based on initial benchmarks. static GET_RUNNING_LOOP: OnceCell<PyObject> = OnceCell::new();
/// Get a reference to the Python Event Loop from Rust
///
/// Equivalent to `asyncio.get_running_loop()` in Python 3.7+.
pub fn get_running_loop(py: Python) -> PyResult<&PyAny> {
// Ideally should call get_running_loop, but calls get_event_loop for compatibility when
// get_running_loop is not available.
GET_RUNNING_LOOP
.get_or_try_init(|| -> PyResult<PyObject> {
let asyncio = asyncio(py)?;
Ok(asyncio.getattr("get_running_loop")?.into())
})?
.as_ref(py)
.call0()
} The benchmarks introduced in e8d249e roughly measures the client throughput in terms of reqs/s. The requests are sent to server running on the same machine.
*cython makes a connection with each request. So the connections are disconnected after each concurrent batch The general conclusion is that the rust/pyo3 and cython client have similar throughput (< order of magnitude in difference) - indicating that IO/system resources are the main constraint. However logs from full functional tests, where the server is a real world URL shows about 3x improvement in latency from 2-4 ms (cython) to 0.7 ms (pyo3). |
Here's context around the design of the websocket client. The websocket client is async and can do the following -
The client will primarily be used to receive data from the server and occasionally to send messages and heartbeats to the server. Additionally, it might need hooks to check the status of the connection (disconnected, closed etc). This is the expected client usage. It will keep receiving messages in a loop and the message will be passed to a handler. But message can be sent to from one or more scopes/tasks. async run(client):
while client.can_read():
message = await client.recv()
client.handler(message)
async do_something_else(client):
client.send(data) Using Arc MutexWhile this works well in Python, implementing this in Rust is challenging. Since a websocket has a single underlying TCP stream. Receive from it or sending to it requires a mutable reference 12. Since receiving and sending can be done by multiple callers in different tasks. The client needs to be wrapped with an Arc Mutex 3. The arc is so that the client can be cloned and passed to different async tasks - for receiving and sending. The mutex locks the client and only allows one task to get mutable access to it. #[pyclass]
struct WebSocketClient {
stream: Arc<Mutex<WebSocketStream>>,
} This is the most straight forward implementation. The assumption here is that messages are received far more than messages are sent. So execution will rarely block on acquiring the lock. Using channelsWe can avoid Arc Mutex by separating the python control flow and the rust client using channels. Here we will utilize our previous assumption that the client will only receive messages in one scope/task - the one running the loop. Here we have a rust native The use std::sync::mpsc::{Sender, Receiver};
use pyo3::prelude::*;
struct WebSocketClient { /* stuff */ }
#[pyclass]
struct WebSocketClientPy {
sender: Sender<CustomMessage>,
recv: Receiver<Vec<u8>>,
}
#[pymethods]
impl WebSocketClientPy {
pub fn send(slf: PyRef<'_, Self>, data: u8) {
slf.sender.send(data);
}
pub fn recv(slf: PyRef<'_, Self>) -> u8 {
slf.recv.recv().unwrap()
}
} This implementation can be a simplified further by strictly enforcing the assumption there is only one reader (the python loop) and sender (rust client) for received messages. Then we need only one channel to pass control messages from Footnotes
|
The final design for sockets (websocket and raw) uses a different approach in #1118 . It assumes and enforces a single reader multiple writer model for the client. All methods require mutable references (exclusive access) and pyo3-asyncio requires ownership (Clone), which means the connection has to be wrapped in an Arc Mutex. However because it is a stream we can cleverly split it into reading and writing ends. This way only the writer needs to be wrapped in an Arc Mutex the reader can remain lockless. This is a great advantage because it is expected that the connection will be read more than written to. [#pyclass]
pub struct WebSocketClient {
pub read_task: Option<task::JoinHandle<io::Result<()>>>,
pub heartbeat_task: Option<task::JoinHandle<()>>,
write_mutex: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
}
impl WebSocketClient {
pub async fn connect(
url: &str,
handler: PyObject,
heartbeat: Option<u64>,
) -> Result<Self, Error>
} The read end of the connection is moved into an async task that continuously polls the connection for new messages and then passes it on to a handler. Another task is created to heartbeat the server at a fixed interval. The websocket client is highly stateful and can have these transitions -
|
In #1129 we change the underlying crate used for websockets to
Since
|
The While The latest design of the websockets (#1138) and sockets (#1146) are very similar. Websockets design is illustrated here since it's more featureful.
To enforce single reader, multiple writer we split the underlying stream into reader and writer ends. The reader gets moved to a reader task while the writer end gets wrapped in an Auto-reconnect is more involved and requires a two layered client. Auto-reconnect can only be done by a continuously running task that monitors the inner client. If the inner client has disconnected the task must reconnect and rebuild the state of the client. The state of the inner client consists of the following -
type MessageWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
type SharedMessageWriter =
Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>;
type MessageReader = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
struct WebSocketClientInner {
read_task: task::JoinHandle<()>,
heartbeat_task: Option<task::JoinHandle<()>>,
writer: SharedMessageWriter,
url: String,
handler: PyObject,
heartbeat: Option<u64>,
}
#[pyclass]
pub struct WebSocketClient {
writer: SharedMessageWriter,
controller_task: task::JoinHandle<()>,
disconnect_mode: Arc<Mutex<bool>>,
} To mutably change the state of the inner client the reconnect task needs to own the client. However, since we want to expose write/send as a Python interface, we need the a clone of the writer in the outer client as well. This avoids using channels/message passing complexity. However, this means the inner task, after it reconnects, needs to share the new writer with the outer task. The reconnect task locks the mutex and updates the writer shared with the. All subsequent send calls use the updated writer. let (new_writer, reader) = WebSocketClientInner::connect_with_server(&self.url).await?;
let mut guard = self.writer.lock().await;
*guard = new_writer;
drop(guard); |
This issue tracks the effort and discussion for porting the network layer to Rust.
The network module is relevant to the live trading functionality. Exchange adapters use network module clients to make requests to exchanges and pass received data to the engine. It has clients for 3 protocols - HTTP, websockets and raw socket.
The end goal is to port all three client implementations to rust and expose them as pyo3 modules to be directly consumed by the adapters. Further along it is expected that the adapters will also be ported and the intermediate pyo3 layer will no longer be necessary. Since the network module is IO heavy it is currently implemented as async and will be async in the rust version as well.
The text was updated successfully, but these errors were encountered: