Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
whitevegagabriel committed Aug 30, 2023
1 parent 10b8c8a commit a7eddb5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
54 changes: 27 additions & 27 deletions rust/examples/l2cap_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> PyResult<()> {
let cli = Cli::parse();

println!("<<< connecting to HCI...");
let transport = Transport::open(cli.hci_transport).await?;
let transport = Transport::open(cli.transport).await?;
println!("<<< connected");

let mut device = Device::from_config_file_with_hci(
Expand Down Expand Up @@ -130,15 +130,14 @@ mod server_bridge {
device.register_l2cap_channel_server(
args.psm,
move |_py, l2cap_channel| {
let channel_info = match l2cap_channel.debug_string() {
Ok(info_string) => info_string,
Err(py_err) => format!("failed to get l2cap channel info ({})", py_err),
};
let channel_info = l2cap_channel
.debug_string()
.unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})"));
println!("{} {channel_info}", "*** L2CAP channel:".cyan());

let host = host.clone();
// Ensure Python event loop is available to l2cap `disconnect`
let _ = run_future_with_current_task_locals(handle_connection_oriented_channel(
let _ = run_future_with_current_task_locals(proxy_data_between_l2cap_and_tcp(
l2cap_channel,
host,
port,
Expand All @@ -156,10 +155,9 @@ mod server_bridge {
);

device.on_connection(|_py, mut connection| {
let connection_info = match connection.debug_string() {
Ok(info_string) => info_string,
Err(py_err) => format!("failed to get connection info ({})", py_err),
};
let connection_info = connection
.debug_string()
.unwrap_or_else(|e| format!("failed to get connection info ({e})"));
println!(
"{} {}",
"@@@ Bluetooth connection: ".green(),
Expand All @@ -185,7 +183,7 @@ mod server_bridge {
Ok(())
}

async fn handle_connection_oriented_channel(
async fn proxy_data_between_l2cap_and_tcp(
mut l2cap_channel: LeConnectionOrientedChannel,
tcp_host: String,
tcp_port: u16,
Expand Down Expand Up @@ -225,7 +223,7 @@ mod server_bridge {
}
Err(err) => {
println!("{}", format!("!!! Connection failed: {err}").red());
if let Some(channel) = l2cap_channel.lock().await.take() {
if let Some(mut channel) = l2cap_channel.lock().await.take() {
// Bumble might enter an invalid state if disconnection request is received from
// l2cap client before receiving a disconnection response from the same client,
// blocking this async call from returning.
Expand Down Expand Up @@ -360,12 +358,12 @@ mod client_bridge {
let max_credits = args.max_credits;
let mtu = args.mtu;
let mps = args.mps;
let ble_connection = Arc::new(ble_connection);
let ble_connection = Arc::new(Mutex::new(ble_connection));
// Ensure Python event loop is available to l2cap `disconnect`
let _ = run_future_with_current_task_locals(async move {
while let Ok((tcp_stream, addr)) = listener.accept().await {
let ble_connection = ble_connection.clone();
let _ = run_future_with_current_task_locals(handle_tcp_connection(
let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap(
ble_connection,
tcp_stream,
addr,
Expand All @@ -380,8 +378,8 @@ mod client_bridge {
Ok(())
}

async fn handle_tcp_connection(
ble_connection: Arc<Connection>,
async fn proxy_data_between_tcp_and_l2cap(
ble_connection: Arc<Mutex<Connection>>,
tcp_stream: TcpStream,
addr: SocketAddr,
psm: u16,
Expand All @@ -396,6 +394,8 @@ mod client_bridge {
);

let mut l2cap_channel = match ble_connection
.lock()
.await
.open_l2cap_channel(psm, Some(max_credits), Some(mtu), Some(mps))
.await
{
Expand All @@ -406,10 +406,9 @@ mod client_bridge {
return Err(e);
}
};
let channel_info = match l2cap_channel.debug_string() {
Ok(info_string) => info_string,
Err(py_err) => format!("failed to get l2cap channel info ({})", py_err),
};
let channel_info = l2cap_channel
.debug_string()
.unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})"));

println!("{}{}", "*** L2CAP channel: ".cyan(), channel_info);

Expand Down Expand Up @@ -516,7 +515,7 @@ async fn proxy_tcp_rx_to_l2cap_tx(
if len == 0 {
println!("{}", "!!! End of stream".yellow());

if let Some(channel) = l2cap_channel.lock().await.take() {
if let Some(mut channel) = l2cap_channel.lock().await.take() {
channel.disconnect().await.map_err(|e| {
eprintln!("Failed to call disconnect on l2cap channel: {e}");
e
Expand All @@ -541,7 +540,7 @@ async fn proxy_tcp_rx_to_l2cap_tx(
}
Err(e) => {
println!("{}", format!("!!! TCP connection lost: {}", e).red());
if let Some(channel) = l2cap_channel.lock().await.take() {
if let Some(mut channel) = l2cap_channel.lock().await.take() {
let _ = channel.disconnect().await.map_err(|e| {
eprintln!("Failed to call disconnect on l2cap channel: {e}");
});
Expand All @@ -552,10 +551,11 @@ async fn proxy_tcp_rx_to_l2cap_tx(
}
}

/// Copies the current thread's task locals into a Python "awaitable" and encapsulates it in a Rust
/// Copies the current thread's TaskLocals into a Python "awaitable" and encapsulates it in a Rust
/// future, running it as a Python Task.
///
/// If the calling thread has a Python event loop, then the Python Task will too.
/// `TaskLocals` stores the current event loop, and allows the user to copy the current Python
/// context if necessary. In this case, the python event loop is used when calling `disconnect` on
/// an l2cap connection, or else the call will fail.
pub fn run_future_with_current_task_locals<F>(
fut: F,
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
Expand Down Expand Up @@ -585,7 +585,7 @@ struct Cli {
///
/// <https://google.github.io/bumble/transports/index.html>
#[arg(long)]
hci_transport: String,
transport: String,

/// PSM for L2CAP Connection-oriented Channel.
///
Expand Down Expand Up @@ -633,7 +633,7 @@ enum Subcommand {
bluetooth_address: String,
/// TCP host that the l2cap client will bind to and listen for incoming TCP connections.
/// Data is bridged like so:
/// TCP client <-> (TCP server / **L2CAP client**) <-> (L2CAP server / TCP client) <-> TCP Client
/// TCP client <-> (TCP server / **L2CAP client**) <-> (L2CAP server / TCP client) <-> TCP server
#[arg(long, default_value = "localhost")]
tcp_host: String,
/// TCP port that the client will connect to.
Expand Down
8 changes: 4 additions & 4 deletions rust/src/wrapper/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ impl Device {
}

/// Registers an L2CAP connection oriented channel server. When a client connects to the server,
/// the `server` callback returns a handle to the established channel. When optional arguments
/// the `server` callback is passed a handle to the established channel. When optional arguments
/// are not specified, the Python module specifies the defaults.
pub fn register_l2cap_channel_server(
&self,
&mut self,
psm: u16,
server: impl Fn(Python, LeConnectionOrientedChannel) -> PyResult<()> + Send + 'static,
max_credits: Option<u16>,
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Connection {
/// Open an L2CAP channel using this connection. When optional arguments are not specified, the
/// Python module specifies the defaults.
pub async fn open_l2cap_channel(
&self,
&mut self,
psm: u16,
max_credits: Option<u16>,
mtu: Option<u16>,
Expand All @@ -233,7 +233,7 @@ impl Connection {

/// Disconnect from device with provided reason. When optional arguments are not specified, the
/// Python module specifies the defaults.
pub async fn disconnect(self, reason: Option<HciErrorCode>) -> PyResult<()> {
pub async fn disconnect(&mut self, reason: Option<HciErrorCode>) -> PyResult<()> {
Python::with_gil(|py| {
let kwargs = PyDict::new(py);
kwargs.set_opt_item("reason", reason)?;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/wrapper/l2cap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl LeConnectionOrientedChannel {
}

/// Wait for queued data to be sent on this channel.
pub async fn drain(&self) -> PyResult<()> {
pub async fn drain(&mut self) -> PyResult<()> {
Python::with_gil(|py| {
self.0
.call_method0(py, intern!(py, "drain"))
Expand Down Expand Up @@ -72,7 +72,7 @@ impl LeConnectionOrientedChannel {
/// `tokio::main` and `async_std::main`.
///
/// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars.
pub async fn disconnect(self) -> PyResult<()> {
pub async fn disconnect(&mut self) -> PyResult<()> {
Python::with_gil(|py| {
self.0
.call_method0(py, intern!(py, "disconnect"))
Expand Down

0 comments on commit a7eddb5

Please sign in to comment.