From f08488919b03dd8b4f0448d46768429585a05563 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Thu, 4 Jul 2024 11:45:40 -0400 Subject: [PATCH 01/14] change python package name to 'kaspa' --- python/Cargo.toml | 2 +- python/README.md | 4 ++-- python/examples/addresses.py | 2 +- python/examples/rpc.py | 2 +- python/examples/test.py | 2 +- python/pyproject.toml | 6 +++--- python/src/lib.rs | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index da7e079b65..e1c857c7f1 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true include.workspace = true [lib] -name = "kaspapy" +name = "kaspa" crate-type = ["cdylib"] [dependencies] diff --git a/python/README.md b/python/README.md index 717beb1988..dbc4534a94 100644 --- a/python/README.md +++ b/python/README.md @@ -17,6 +17,6 @@ Rusty-Kaspa/Rust bindings for Python, using [PyO3](https://pyo3.rs/v0.20.0/) and See Python files in `./python/examples`. # Project Layout -The Python package `kaspapy` is built from the `kaspa-python` crate, which is located at `./python`. +The Python package `kaspa` is built from the `kaspa-python` crate, which is located at `./python`. -As such, the `kaspapy` function in `./python/src/lib.rs` is a good starting point. This function uses PyO3 to add functionality to the package. +As such, the `kaspa` function in `./python/src/lib.rs` is a good starting point. This function uses PyO3 to add functionality to the package. diff --git a/python/examples/addresses.py b/python/examples/addresses.py index dcc01159fc..db00c2bea6 100644 --- a/python/examples/addresses.py +++ b/python/examples/addresses.py @@ -1,4 +1,4 @@ -from kaspapy import ( +from kaspa import ( PrivateKey, ) diff --git a/python/examples/rpc.py b/python/examples/rpc.py index 9b151be72d..1f490ceafb 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -2,7 +2,7 @@ import json import time -from kaspapy import RpcClient +from kaspa import RpcClient async def main(): diff --git a/python/examples/test.py b/python/examples/test.py index bbdb01d6bb..cc2cd78065 100644 --- a/python/examples/test.py +++ b/python/examples/test.py @@ -1,4 +1,4 @@ -from kaspapy import PrivateKeyGenerator +from kaspa import PrivateKeyGenerator if __name__ == "__main__": x = PrivateKeyGenerator('xprv9s21ZrQH143K2hP7m1bU4ZT6tWgX1Qn2cWvtLVDX6sTJVyg3XBa4p1So4s7uEvVFGyBhQWWRe8JeLPeDZ462LggxkkJpZ9z1YMzmPahnaZA', False, 1) diff --git a/python/pyproject.toml b/python/pyproject.toml index 4bcdcb9260..b41e388b64 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -3,7 +3,7 @@ requires = ["maturin>=1.0,<2.0"] build-backend = "maturin" [project] -name = "kaspapy" +name = "kaspa" description = "Kaspa Python Bindings" version = "0.1.0" requires-python = ">=3.8" @@ -23,8 +23,8 @@ dependencies = [] # changelog = "" [package.metadata.maturin] -name = "kaspapy" +name = "kaspa" description = "Kaspa Python Bindings" [tool.maturin] -name = "kaspapy" \ No newline at end of file +name = "kaspa" \ No newline at end of file diff --git a/python/src/lib.rs b/python/src/lib.rs index e1afc2e16f..ee8ce06421 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -3,7 +3,7 @@ cfg_if::cfg_if! { use pyo3::prelude::*; #[pymodule] - fn kaspapy(m: &Bound<'_, PyModule>) -> PyResult<()> { + fn kaspa(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; From 86d88286ebdfbbc1234719717666269f15236d27 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Thu, 4 Jul 2024 12:06:39 -0400 Subject: [PATCH 02/14] Introduce Py wRPC client wrapping Inner struct that contains KaspaRpcClient --- rpc/macros/src/wrpc/python.rs | 2 +- rpc/wrpc/python/src/client.rs | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/rpc/macros/src/wrpc/python.rs b/rpc/macros/src/wrpc/python.rs index aad60e7416..f9a95da73f 100644 --- a/rpc/macros/src/wrpc/python.rs +++ b/rpc/macros/src/wrpc/python.rs @@ -41,7 +41,7 @@ impl ToTokens for RpcTable { #[pymethods] impl RpcClient { fn #fn_call(&self, py: Python, request: Py) -> PyResult> { - let client = self.client.clone(); + let client = self.inner.client.clone(); let request : #request_type = serde_pyobject::from_pyobject(request.into_bound(py)).unwrap(); diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 8c74cdb1ff..6b7c181d8a 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -7,11 +7,15 @@ use kaspa_wrpc_client::{ KaspaRpcClient, WrpcEncoding, }; use pyo3::{prelude::*, types::PyDict}; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; + +pub struct Inner { + client: Arc, +} #[pyclass] pub struct RpcClient { - client: KaspaRpcClient, + inner: Inner, // url: String, // encoding: Option, // verbose : Option, @@ -35,7 +39,9 @@ impl RpcClient { client.connect(Some(options)).await.map_err(|e| pyo3::exceptions::PyException::new_err(e.to_string()))?; Python::with_gil(|py| { - Py::new(py, RpcClient { client }) + Py::new(py, { + RpcClient { inner: Inner { client: client.into() } } + }) .map(|py_rpc_client| py_rpc_client.into_py(py)) .map_err(|e| pyo3::exceptions::PyException::new_err(e.to_string())) }) @@ -43,11 +49,11 @@ impl RpcClient { } fn is_connected(&self) -> bool { - self.client.is_connected() + self.inner.client.is_connected() } fn get_server_info(&self, py: Python) -> PyResult> { - let client = self.client.clone(); + let client = self.inner.client.clone(); py_async! {py, async move { let response = client.get_server_info_call(GetServerInfoRequest { }).await?; Python::with_gil(|py| { @@ -57,7 +63,7 @@ impl RpcClient { } fn get_block_dag_info(&self, py: Python) -> PyResult> { - let client = self.client.clone(); + let client = self.inner.client.clone(); py_async! {py, async move { let response = client.get_block_dag_info_call(GetBlockDagInfoRequest { }).await?; Python::with_gil(|py| { @@ -70,7 +76,7 @@ impl RpcClient { #[pymethods] impl RpcClient { fn is_connected_test(&self) -> bool { - self.client.is_connected() + self.inner.client.is_connected() } } From a1497802d13d95af0bfde9bace2531b7c5853c96 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Thu, 4 Jul 2024 13:11:53 -0400 Subject: [PATCH 03/14] scaffolding for Python wRPC client Inner struct --- Cargo.lock | 1 + python/examples/rpc.py | 3 +- rpc/wrpc/python/Cargo.toml | 1 + rpc/wrpc/python/src/client.rs | 79 ++++++++++++++++++++++++++--------- 4 files changed, 63 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3e0f9d91b..332a49783a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3781,6 +3781,7 @@ version = "0.14.1" dependencies = [ "cfg-if 1.0.0", "kaspa-consensus-core", + "kaspa-notify", "kaspa-python-macros", "kaspa-rpc-core", "kaspa-rpc-macros", diff --git a/python/examples/rpc.py b/python/examples/rpc.py index 1f490ceafb..cf3513b232 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -6,7 +6,8 @@ async def main(): - client = await RpcClient.connect(url = "ws://localhost:17110") + client = RpcClient(url = "ws://localhost:17110") + await client.connect() print(f'Client is connected: {client.is_connected()}') get_server_info_response = await client.get_server_info() diff --git a/rpc/wrpc/python/Cargo.toml b/rpc/wrpc/python/Cargo.toml index 6596e2f308..bae0f872e6 100644 --- a/rpc/wrpc/python/Cargo.toml +++ b/rpc/wrpc/python/Cargo.toml @@ -19,6 +19,7 @@ py-sdk = [ [dependencies] cfg-if.workspace = true kaspa-consensus-core.workspace = true +kaspa-notify.workspace = true kaspa-rpc-core.workspace = true kaspa-rpc-macros.workspace = true kaspa-wrpc-client.workspace = true diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 6b7c181d8a..81b4aaca51 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -2,31 +2,76 @@ use kaspa_python_macros::py_async; use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::model::*; use kaspa_rpc_macros::build_wrpc_python_interface; +use kaspa_notify::listener::ListenerId; use kaspa_wrpc_client::{ client::{ConnectOptions, ConnectStrategy}, - KaspaRpcClient, WrpcEncoding, + KaspaRpcClient, + result::Result, + WrpcEncoding, }; use pyo3::{prelude::*, types::PyDict}; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicBool}, + Arc, Mutex, + }, + time::Duration, +}; +pub use workflow_core::channel::{Channel, DuplexChannel}; pub struct Inner { client: Arc, + // resolver TODO + notification_task: AtomicBool, + notification_ctl: DuplexChannel, + // callbacks TODO + listener_id: Arc>>, + notification_channel: Channel, } #[pyclass] pub struct RpcClient { - inner: Inner, + inner: Arc, // url: String, - // encoding: Option, - // verbose : Option, - // timeout: Option, + // encoding TODO + // verbose TODO + // timeout TODO +} + +impl RpcClient { + fn new(url: Option, encoding: Option) -> Result { + let encoding = encoding.unwrap_or(WrpcEncoding::Borsh); + + let client = Arc::new( + KaspaRpcClient::new(encoding, url.as_deref(), None, None, None) + .unwrap() + ); + + let rpc_client = RpcClient { + inner: Arc::new(Inner { + client, + notification_task: AtomicBool::new(false), + notification_ctl: DuplexChannel::oneshot(), + listener_id: Arc::new(Mutex::new(None)), + notification_channel: Channel::unbounded() + }) + }; + + Ok(rpc_client) + } } #[pymethods] impl RpcClient { - #[staticmethod] - fn connect(py: Python, url: Option) -> PyResult> { - let client = KaspaRpcClient::new(WrpcEncoding::Borsh, url.as_deref(), None, None, None)?; + #[new] + fn ctor(url: Option) -> PyResult { + // TODO expose args to Python similar to WASM wRPC Client IRpcConfig + + Ok(Self::new(url, None)?) + } + + fn connect(&self, py: Python) -> PyResult> { + // TODO expose args to Python similar to WASM wRPC Client IConnectOptions let options = ConnectOptions { block_async_connect: true, @@ -35,17 +80,11 @@ impl RpcClient { ..Default::default() }; - pyo3_asyncio_0_21::tokio::future_into_py(py, async move { - client.connect(Some(options)).await.map_err(|e| pyo3::exceptions::PyException::new_err(e.to_string()))?; - - Python::with_gil(|py| { - Py::new(py, { - RpcClient { inner: Inner { client: client.into() } } - }) - .map(|py_rpc_client| py_rpc_client.into_py(py)) - .map_err(|e| pyo3::exceptions::PyException::new_err(e.to_string())) - }) - }) + let client = self.inner.client.clone(); + py_async! {py, async move { + let _ = client.connect(Some(options)).await.map_err(|e| pyo3::exceptions::PyException::new_err(e.to_string())); + Ok(()) + }} } fn is_connected(&self) -> bool { From b50150dc7e48eb4ebedbc6cd521ee7ede76c8fa8 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Sat, 6 Jul 2024 12:13:02 -0400 Subject: [PATCH 04/14] Python wRPC subscribtions prototype --- Cargo.lock | 3 + python/examples/rpc.py | 26 +++- rpc/wrpc/python/Cargo.toml | 3 + rpc/wrpc/python/src/client.rs | 260 +++++++++++++++++++++++++++++++--- 4 files changed, 271 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 332a49783a..0232fd7054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3779,7 +3779,9 @@ dependencies = [ name = "kaspa-wrpc-python" version = "0.14.1" dependencies = [ + "ahash 0.8.11", "cfg-if 1.0.0", + "futures", "kaspa-consensus-core", "kaspa-notify", "kaspa-python-macros", @@ -3792,6 +3794,7 @@ dependencies = [ "serde_json", "thiserror", "workflow-core", + "workflow-log", "workflow-rpc", ] diff --git a/python/examples/rpc.py b/python/examples/rpc.py index cf3513b232..50e35333fc 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -5,11 +5,21 @@ from kaspa import RpcClient -async def main(): - client = RpcClient(url = "ws://localhost:17110") - await client.connect() - print(f'Client is connected: {client.is_connected()}') +def subscription_callback(event): + print(event) + +async def rpc_subscriptions(client): + client.add_event_listener('all', subscription_callback) + + await client.subscribe_daa_score() + await client.subscribe_virtual_chain_changed(True) + + await asyncio.sleep(10) + await client.unsubscribe_daa_score() + await client.unsubscribe_virtual_chain_changed(True) + +async def rpc_calls(client): get_server_info_response = await client.get_server_info() print(get_server_info_response) @@ -25,6 +35,14 @@ async def main(): get_balances_by_addresses_response = await client.get_balances_by_addresses_call(get_balances_by_addresses_request) print(get_balances_by_addresses_response) +async def main(): + client = RpcClient(url = "ws://localhost:17110") + await client.connect() + print(f'Client is connected: {client.is_connected()}') + + await rpc_calls(client) + await rpc_subscriptions(client) + if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/rpc/wrpc/python/Cargo.toml b/rpc/wrpc/python/Cargo.toml index bae0f872e6..a7530c3089 100644 --- a/rpc/wrpc/python/Cargo.toml +++ b/rpc/wrpc/python/Cargo.toml @@ -17,7 +17,9 @@ py-sdk = [ ] [dependencies] +ahash.workspace = true cfg-if.workspace = true +futures.workspace = true kaspa-consensus-core.workspace = true kaspa-notify.workspace = true kaspa-rpc-core.workspace = true @@ -30,4 +32,5 @@ serde_json.workspace = true serde-pyobject = "0.3.0" thiserror.workspace = true workflow-core.workspace = true +workflow-log.workspace = true workflow-rpc.workspace = true \ No newline at end of file diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 81b4aaca51..c08c77c088 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -1,60 +1,104 @@ +use ahash::AHashMap; +use futures::*; +use kaspa_notify::listener::ListenerId; +use kaspa_notify::notification::Notification; +use kaspa_notify::scope::{Scope, VirtualChainChangedScope, VirtualDaaScoreChangedScope}; +use kaspa_notify::{connection::ChannelType, events::EventType}; use kaspa_python_macros::py_async; use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::model::*; +use kaspa_rpc_core::notify::connection::ChannelConnection; use kaspa_rpc_macros::build_wrpc_python_interface; -use kaspa_notify::listener::ListenerId; use kaspa_wrpc_client::{ client::{ConnectOptions, ConnectStrategy}, - KaspaRpcClient, + error::Error, result::Result, - WrpcEncoding, + KaspaRpcClient, WrpcEncoding, }; +use pyo3::exceptions::PyException; use pyo3::{prelude::*, types::PyDict}; +use std::str::FromStr; use std::{ sync::{ - atomic::{AtomicBool}, + atomic::{AtomicBool, Ordering}, Arc, Mutex, }, time::Duration, }; -pub use workflow_core::channel::{Channel, DuplexChannel}; +use workflow_core::channel::{Channel, DuplexChannel}; +use workflow_log::*; +use workflow_rpc::client::Ctl; + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +enum NotificationEvent { + All, + Notification(EventType), + RpcCtl(Ctl), +} + +impl FromStr for NotificationEvent { + type Err = Error; + fn from_str(s: &str) -> Result { + if s == "all" { + Ok(NotificationEvent::All) + } else if let Ok(ctl) = Ctl::from_str(s) { + Ok(NotificationEvent::RpcCtl(ctl)) + } else if let Ok(event) = EventType::from_str(s) { + Ok(NotificationEvent::Notification(event)) + } else { + Err(Error::custom(format!("Invalid notification event type: `{}`", s))) + } + } +} pub struct Inner { client: Arc, // resolver TODO notification_task: AtomicBool, notification_ctl: DuplexChannel, - // callbacks TODO + callbacks: Arc>>>, listener_id: Arc>>, notification_channel: Channel, } +impl Inner { + fn notification_callbacks(&self, event: NotificationEvent) -> Option> { + let notification_callbacks = self.callbacks.lock().unwrap(); + let all = notification_callbacks.get(&NotificationEvent::All).cloned(); + let target = notification_callbacks.get(&event).cloned(); + match (all, target) { + (Some(mut vec_all), Some(vec_target)) => { + vec_all.extend(vec_target); + Some(vec_all) + } + (Some(vec_all), None) => Some(vec_all), + (None, Some(vec_target)) => Some(vec_target), + (None, None) => None, + } + } +} + #[pyclass] +#[derive(Clone)] pub struct RpcClient { inner: Arc, - // url: String, - // encoding TODO - // verbose TODO - // timeout TODO } impl RpcClient { fn new(url: Option, encoding: Option) -> Result { let encoding = encoding.unwrap_or(WrpcEncoding::Borsh); - let client = Arc::new( - KaspaRpcClient::new(encoding, url.as_deref(), None, None, None) - .unwrap() - ); + let client = Arc::new(KaspaRpcClient::new(encoding, url.as_deref(), None, None, None).unwrap()); let rpc_client = RpcClient { inner: Arc::new(Inner { client, notification_task: AtomicBool::new(false), notification_ctl: DuplexChannel::oneshot(), + callbacks: Arc::new(Default::default()), listener_id: Arc::new(Mutex::new(None)), - notification_channel: Channel::unbounded() - }) + notification_channel: Channel::unbounded(), + }), }; Ok(rpc_client) @@ -72,7 +116,6 @@ impl RpcClient { fn connect(&self, py: Python) -> PyResult> { // TODO expose args to Python similar to WASM wRPC Client IConnectOptions - let options = ConnectOptions { block_async_connect: true, connect_timeout: Some(Duration::from_millis(5_000)), @@ -80,6 +123,8 @@ impl RpcClient { ..Default::default() }; + self.start_notification_task(py).unwrap(); + let client = self.inner.client.clone(); py_async! {py, async move { let _ = client.connect(Some(options)).await.map_err(|e| pyo3::exceptions::PyException::new_err(e.to_string())); @@ -110,6 +155,187 @@ impl RpcClient { }) }} } + + fn add_event_listener(&self, event: String, callback: PyObject) -> PyResult<()> { + let event = NotificationEvent::from_str(event.as_str()).unwrap(); + self.inner.callbacks.lock().unwrap().entry(event).or_default().push(callback.clone()); + Ok(()) + } + + // fn remove_event_listener() TODO + // fn clear_event_listener() TODO + + fn remove_all_event_listeners(&self) -> PyResult<()> { + *self.inner.callbacks.lock().unwrap() = Default::default(); + Ok(()) + } +} + +impl RpcClient { + pub fn listener_id(&self) -> Option { + *self.inner.listener_id.lock().unwrap() + } + + fn start_notification_task(&self, py: Python) -> Result<()> { + if self.inner.notification_task.load(Ordering::SeqCst) { + return Ok(()); + } + + self.inner.notification_task.store(true, Ordering::SeqCst); + + let ctl_receiver = self.inner.notification_ctl.request.receiver.clone(); + let ctl_sender = self.inner.notification_ctl.response.sender.clone(); + let notification_receiver = self.inner.notification_channel.receiver.clone(); + let ctl_multiplexer_channel = + self.inner.client.rpc_client().ctl_multiplexer().as_ref().expect("Python RpcClient ctl_multiplexer is None").channel(); + let this = self.clone(); + + let _ = pyo3_asyncio_0_21::tokio::future_into_py(py, async move { + loop { + select_biased! { + msg = ctl_multiplexer_channel.recv().fuse() => { + if let Ok(ctl) = msg { + + match ctl { + Ctl::Connect => { + let listener_id = this.inner.client.register_new_listener(ChannelConnection::new( + "kaspapy-wrpc-client-python", + this.inner.notification_channel.sender.clone(), + ChannelType::Persistent, + )); + *this.inner.listener_id.lock().unwrap() = Some(listener_id); + } + Ctl::Disconnect => { + let listener_id = this.inner.listener_id.lock().unwrap().take(); + if let Some(listener_id) = listener_id { + if let Err(err) = this.inner.client.unregister_listener(listener_id).await { + log_error!("Error in unregister_listener: {:?}",err); + } + } + } + } + + let event = NotificationEvent::RpcCtl(ctl); + if let Some(handlers) = this.inner.notification_callbacks(event) { + for handler in handlers.into_iter() { + Python::with_gil(|py| { + let object = PyDict::new_bound(py); + object.set_item("type", ctl.to_string()).unwrap(); + // objectdict.set_item("rpc", ).unwrap(); TODO + + handler.call1(py, (object,)) + .map_err(|e| { + pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)) + }) + .unwrap(); + }); + } + } + } + }, + msg = notification_receiver.recv().fuse() => { + if let Ok(notification) = &msg { + match ¬ification { + // TODO mirror wasm implementation for Notification::UtxosChanged + // kaspa_rpc_core::Notification::UtxosChanged(utxos_changed_notification) => {}, + _ => { + let event_type = notification.event_type(); + let notification_event = NotificationEvent::Notification(event_type); + if let Some(handlers) = this.inner.notification_callbacks(notification_event) { + for handler in handlers.into_iter() { + Python::with_gil(|py| { + let object = PyDict::new_bound(py); + object.set_item("type", event_type.to_string()).unwrap(); + + // TODO eliminate unnecessary nesting in `data` + // e.g. response currently looks like this: + // {'type': 'VirtualDaaScoreChanged', 'data': {'VirtualDaaScoreChanged': {'virtualDaaScore': 83965064}}} + object.set_item("data", serde_pyobject::to_pyobject(py, ¬ification).unwrap().to_object(py)).unwrap(); + + handler.call1(py, (object,)) + .map_err(|e| { + pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)) + }) + .unwrap(); + }); + } + } + } + } + } + } + _ = ctl_receiver.recv().fuse() => { + break; + }, + + } + } + + if let Some(listener_id) = this.listener_id() { + this.inner.listener_id.lock().unwrap().take(); + if let Err(err) = this.inner.client.unregister_listener(listener_id).await { + log_error!("Error in unregister_listener: {:?}", err); + } + } + + ctl_sender.send(()).await.ok(); + + Python::with_gil(|_| Ok(())) + }); + + Ok(()) + } +} + +#[pymethods] +impl RpcClient { + fn subscribe_daa_score(&self, py: Python) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.start_notify(listener_id, Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC subscribe on a closed connection")) + } + } + + fn unsubscribe_daa_score(&self, py: Python) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.stop_notify(listener_id, Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC unsubscribe on a closed connection")) + } + } + + fn subscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.start_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC subscribe on a closed connection")) + } + } + + fn unsubscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.stop_notify(listener_id, Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids })).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC unsubscribe on a closed connection")) + } + } } #[pymethods] From 84e46202e3d5b32edbe4feb9b9958e8dc425778b Mon Sep 17 00:00:00 2001 From: cdvallone Date: Sat, 6 Jul 2024 17:54:25 -0400 Subject: [PATCH 05/14] Python wRPC subscription callback args/kwargs --- python/examples/rpc.py | 7 ++-- rpc/wrpc/python/src/client.rs | 67 +++++++++++++++++++++++++++++------ 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/python/examples/rpc.py b/python/examples/rpc.py index 50e35333fc..b9b2771eda 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -5,11 +5,12 @@ from kaspa import RpcClient -def subscription_callback(event): - print(event) +def subscription_callback(event, callback_id, **kwargs): + print(kwargs.get('kwarg1')) + print(f'{callback_id} | {event}') async def rpc_subscriptions(client): - client.add_event_listener('all', subscription_callback) + client.add_event_listener('all', subscription_callback, callback_id=1, kwarg1='Im a kwarg!!') await client.subscribe_daa_score() await client.subscribe_virtual_chain_changed(True) diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index c08c77c088..5c537d2aa1 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -15,8 +15,11 @@ use kaspa_wrpc_client::{ result::Result, KaspaRpcClient, WrpcEncoding, }; -use pyo3::exceptions::PyException; -use pyo3::{prelude::*, types::PyDict}; +use pyo3::{ + exceptions::PyException, + prelude::*, + types::{PyDict, PyTuple} +}; use std::str::FromStr; use std::{ sync::{ @@ -51,18 +54,41 @@ impl FromStr for NotificationEvent { } } +#[derive(Clone)] +struct PyCallback { + callback: PyObject, + args: Option>, + kwargs: Option>, +} + +impl PyCallback { + fn append_event_to_args(&self, py: Python<'_>, event: Bound<'_, PyDict>) -> PyResult> { + match &self.args { + Some(existing_args) => { + let tuple_ref = existing_args.bind(py); + + let mut new_args: Vec = tuple_ref.iter().map(|arg| arg.to_object(py)).collect(); + new_args.push(event.into()); + + Ok(Py::from(PyTuple::new_bound(py, new_args))) + }, + None => Ok(Py::from(PyTuple::new_bound(py, [event]))), + } + } +} + pub struct Inner { client: Arc, // resolver TODO notification_task: AtomicBool, notification_ctl: DuplexChannel, - callbacks: Arc>>>, + callbacks: Arc>>>, listener_id: Arc>>, notification_channel: Channel, } impl Inner { - fn notification_callbacks(&self, event: NotificationEvent) -> Option> { + fn notification_callbacks(&self, event: NotificationEvent) -> Option> { let notification_callbacks = self.callbacks.lock().unwrap(); let all = notification_callbacks.get(&NotificationEvent::All).cloned(); let target = notification_callbacks.get(&event).cloned(); @@ -156,9 +182,23 @@ impl RpcClient { }} } - fn add_event_listener(&self, event: String, callback: PyObject) -> PyResult<()> { + #[pyo3(signature = (event, callback, *args, **kwargs))] + fn add_event_listener( + &self, + py: Python, + event: String, + callback: PyObject, + args: &Bound<'_, PyTuple>, + kwargs: Option<&Bound<'_, PyDict>> + ) -> PyResult<()> { let event = NotificationEvent::from_str(event.as_str()).unwrap(); - self.inner.callbacks.lock().unwrap().entry(event).or_default().push(callback.clone()); + + let args = args.to_object(py).extract::>(py).unwrap(); + let kwargs = kwargs.unwrap().to_object(py).extract::>(py).unwrap(); + + let py_callback = PyCallback { callback, args: Some(args), kwargs: Some(kwargs) }; + + self.inner.callbacks.lock().unwrap().entry(event).or_default().push(py_callback); Ok(()) } @@ -223,7 +263,10 @@ impl RpcClient { object.set_item("type", ctl.to_string()).unwrap(); // objectdict.set_item("rpc", ).unwrap(); TODO - handler.call1(py, (object,)) + let args = handler.append_event_to_args(py, object).unwrap(); + let kwargs = handler.kwargs.as_ref().map(|kw| kw.bind(py)); + + handler.callback.call_bound(py, args.bind(py), kwargs) .map_err(|e| { pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)) }) @@ -244,15 +287,17 @@ impl RpcClient { if let Some(handlers) = this.inner.notification_callbacks(notification_event) { for handler in handlers.into_iter() { Python::with_gil(|py| { - let object = PyDict::new_bound(py); - object.set_item("type", event_type.to_string()).unwrap(); - // TODO eliminate unnecessary nesting in `data` // e.g. response currently looks like this: // {'type': 'VirtualDaaScoreChanged', 'data': {'VirtualDaaScoreChanged': {'virtualDaaScore': 83965064}}} + let object = PyDict::new_bound(py); + object.set_item("type", event_type.to_string()).unwrap(); object.set_item("data", serde_pyobject::to_pyobject(py, ¬ification).unwrap().to_object(py)).unwrap(); - handler.call1(py, (object,)) + let args = handler.append_event_to_args(py, object).unwrap(); + let kwargs = handler.kwargs.as_ref().map(|kw| kw.bind(py)); + + handler.callback.call_bound(py, args.bind(py), kwargs) .map_err(|e| { pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)) }) From a876623a80fadde37fd67fa3fa14c750d83d8c9d Mon Sep 17 00:00:00 2001 From: cdvallone Date: Sat, 6 Jul 2024 18:12:47 -0400 Subject: [PATCH 06/14] lint --- rpc/wrpc/python/src/client.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 5c537d2aa1..987cca5eb9 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -17,8 +17,8 @@ use kaspa_wrpc_client::{ }; use pyo3::{ exceptions::PyException, - prelude::*, - types::{PyDict, PyTuple} + prelude::*, + types::{PyDict, PyTuple}, }; use std::str::FromStr; use std::{ @@ -71,7 +71,7 @@ impl PyCallback { new_args.push(event.into()); Ok(Py::from(PyTuple::new_bound(py, new_args))) - }, + } None => Ok(Py::from(PyTuple::new_bound(py, [event]))), } } @@ -184,12 +184,12 @@ impl RpcClient { #[pyo3(signature = (event, callback, *args, **kwargs))] fn add_event_listener( - &self, - py: Python, - event: String, - callback: PyObject, - args: &Bound<'_, PyTuple>, - kwargs: Option<&Bound<'_, PyDict>> + &self, + py: Python, + event: String, + callback: PyObject, + args: &Bound<'_, PyTuple>, + kwargs: Option<&Bound<'_, PyDict>>, ) -> PyResult<()> { let event = NotificationEvent::from_str(event.as_str()).unwrap(); From b338ea6806af601f4ad25ce54f09c7d8995c4b5a Mon Sep 17 00:00:00 2001 From: cdvallone Date: Sun, 7 Jul 2024 09:50:37 -0400 Subject: [PATCH 07/14] minor refactor, handling of UTXO change notification --- Cargo.lock | 1 + Cargo.toml | 1 + rpc/core/Cargo.toml | 3 +- rpc/core/src/api/notifications.rs | 18 ++++++++ rpc/wrpc/python/src/client.rs | 71 +++++++++++++++++++------------ 5 files changed, 65 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0232fd7054..1086040391 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3289,6 +3289,7 @@ dependencies = [ "paste", "pyo3", "serde", + "serde-pyobject", "serde-wasm-bindgen", "serde_json", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index 2fb05684f6..3362a0f42d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,6 +238,7 @@ seqlock = "0.2.0" serde = { version = "1.0.190", features = ["derive", "rc"] } serde_bytes = "0.11.12" serde_json = "1.0.107" +serde-pyobject = "0.3.0" serde_repr = "0.1.18" serde-value = "0.7.0" serde-wasm-bindgen = "0.6.1" diff --git a/rpc/core/Cargo.toml b/rpc/core/Cargo.toml index a5d595cafa..c5f2080a90 100644 --- a/rpc/core/Cargo.toml +++ b/rpc/core/Cargo.toml @@ -14,7 +14,7 @@ wasm32-sdk = [ "kaspa-consensus-client/wasm32-sdk", "kaspa-consensus-wasm/wasm32-sdk" ] -py-sdk = ["pyo3"] +py-sdk = ["pyo3", "serde-pyobject"] [dependencies] kaspa-addresses.workspace = true @@ -45,6 +45,7 @@ log.workspace = true paste.workspace = true pyo3 = { workspace = true, optional = true } serde-wasm-bindgen.workspace = true +serde-pyobject = { workspace = true, optional = true } serde.workspace = true smallvec.workspace = true thiserror.workspace = true diff --git a/rpc/core/src/api/notifications.rs b/rpc/core/src/api/notifications.rs index 6449f25c02..444a28b88e 100644 --- a/rpc/core/src/api/notifications.rs +++ b/rpc/core/src/api/notifications.rs @@ -10,7 +10,9 @@ use kaspa_notify::{ Subscription, }, }; +use pyo3::prelude::*; use serde::{Deserialize, Serialize}; +use serde_pyobject::to_pyobject; use std::sync::Arc; use wasm_bindgen::JsValue; use workflow_wasm::serde::to_value; @@ -62,6 +64,22 @@ impl Notification { Notification::VirtualChainChanged(v) => to_value(&v), } } + + pub fn to_pyobject(&self, py: Python) -> PyResult { + let bound_obj = match self { + Notification::BlockAdded(v) => to_pyobject(py, &v), + Notification::FinalityConflict(v) => to_pyobject(py, &v), + Notification::FinalityConflictResolved(v) => to_pyobject(py, &v), + Notification::NewBlockTemplate(v) => to_pyobject(py, &v), + Notification::PruningPointUtxoSetOverride(v) => to_pyobject(py, &v), + Notification::UtxosChanged(v) => to_pyobject(py, &v), + Notification::VirtualDaaScoreChanged(v) => to_pyobject(py, &v), + Notification::SinkBlueScoreChanged(v) => to_pyobject(py, &v), + Notification::VirtualChainChanged(v) => to_pyobject(py, &v), + }; + + Ok(bound_obj.unwrap().to_object(py)) + } } impl NotificationTrait for Notification { diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 987cca5eb9..1875d6a419 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -62,7 +62,7 @@ struct PyCallback { } impl PyCallback { - fn append_event_to_args(&self, py: Python<'_>, event: Bound<'_, PyDict>) -> PyResult> { + fn append_to_args(&self, py: Python, event: Bound) -> PyResult> { match &self.args { Some(existing_args) => { let tuple_ref = existing_args.bind(py); @@ -75,6 +75,19 @@ impl PyCallback { None => Ok(Py::from(PyTuple::new_bound(py, [event]))), } } + + fn execute(&self, py: Python, event: Bound) -> PyResult { + let args = self.append_to_args(py, event).unwrap(); + let kwargs = self.kwargs.as_ref().map(|kw| kw.bind(py)); + + let result = self + .callback + .call_bound(py, args.bind(py), kwargs) + .map_err(|e| pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e))) + .unwrap(); + + Ok(result) + } } pub struct Inner { @@ -259,18 +272,11 @@ impl RpcClient { if let Some(handlers) = this.inner.notification_callbacks(event) { for handler in handlers.into_iter() { Python::with_gil(|py| { - let object = PyDict::new_bound(py); - object.set_item("type", ctl.to_string()).unwrap(); + let event = PyDict::new_bound(py); + event.set_item("type", ctl.to_string()).unwrap(); // objectdict.set_item("rpc", ).unwrap(); TODO - let args = handler.append_event_to_args(py, object).unwrap(); - let kwargs = handler.kwargs.as_ref().map(|kw| kw.bind(py)); - - handler.callback.call_bound(py, args.bind(py), kwargs) - .map_err(|e| { - pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)) - }) - .unwrap(); + handler.execute(py, event).unwrap(); }); } } @@ -279,29 +285,38 @@ impl RpcClient { msg = notification_receiver.recv().fuse() => { if let Ok(notification) = &msg { match ¬ification { - // TODO mirror wasm implementation for Notification::UtxosChanged - // kaspa_rpc_core::Notification::UtxosChanged(utxos_changed_notification) => {}, + kaspa_rpc_core::Notification::UtxosChanged(utxos_changed_notification) => { + let event_type = notification.event_type(); + let notification_event = NotificationEvent::Notification(event_type); + if let Some(handlers) = this.inner.notification_callbacks(notification_event) { + let UtxosChangedNotification { added, removed } = utxos_changed_notification; + + for handler in handlers.into_iter() { + Python::with_gil(|py| { + let added = serde_pyobject::to_pyobject(py, added).unwrap(); + let removed = serde_pyobject::to_pyobject(py, removed).unwrap(); + + let event = PyDict::new_bound(py); + event.set_item("type", event_type.to_string()).unwrap(); + event.set_item("added", &added.to_object(py)).unwrap(); + event.set_item("removed", &removed.to_object(py)).unwrap(); + + handler.execute(py, event).unwrap(); + }) + } + } + }, _ => { let event_type = notification.event_type(); let notification_event = NotificationEvent::Notification(event_type); if let Some(handlers) = this.inner.notification_callbacks(notification_event) { for handler in handlers.into_iter() { Python::with_gil(|py| { - // TODO eliminate unnecessary nesting in `data` - // e.g. response currently looks like this: - // {'type': 'VirtualDaaScoreChanged', 'data': {'VirtualDaaScoreChanged': {'virtualDaaScore': 83965064}}} - let object = PyDict::new_bound(py); - object.set_item("type", event_type.to_string()).unwrap(); - object.set_item("data", serde_pyobject::to_pyobject(py, ¬ification).unwrap().to_object(py)).unwrap(); - - let args = handler.append_event_to_args(py, object).unwrap(); - let kwargs = handler.kwargs.as_ref().map(|kw| kw.bind(py)); - - handler.callback.call_bound(py, args.bind(py), kwargs) - .map_err(|e| { - pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)) - }) - .unwrap(); + let event = PyDict::new_bound(py); + event.set_item("type", event_type.to_string()).unwrap(); + event.set_item("data", ¬ification.to_pyobject(py).unwrap()).unwrap(); + + handler.execute(py, event).unwrap(); }); } } From 3b981288fbdbfbe18ee4f3d273e296a106b09d1e Mon Sep 17 00:00:00 2001 From: cdvallone Date: Sun, 7 Jul 2024 10:08:23 -0400 Subject: [PATCH 08/14] properly gate python code in kaspa-rpc-core --- rpc/core/Cargo.toml | 5 ++++- rpc/core/src/api/notifications.rs | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/rpc/core/Cargo.toml b/rpc/core/Cargo.toml index c5f2080a90..3303313370 100644 --- a/rpc/core/Cargo.toml +++ b/rpc/core/Cargo.toml @@ -14,7 +14,10 @@ wasm32-sdk = [ "kaspa-consensus-client/wasm32-sdk", "kaspa-consensus-wasm/wasm32-sdk" ] -py-sdk = ["pyo3", "serde-pyobject"] +py-sdk = [ + "pyo3", + "serde-pyobject" +] [dependencies] kaspa-addresses.workspace = true diff --git a/rpc/core/src/api/notifications.rs b/rpc/core/src/api/notifications.rs index 444a28b88e..4698f36431 100644 --- a/rpc/core/src/api/notifications.rs +++ b/rpc/core/src/api/notifications.rs @@ -10,8 +10,10 @@ use kaspa_notify::{ Subscription, }, }; +#[cfg(feature = "py-sdk")] use pyo3::prelude::*; use serde::{Deserialize, Serialize}; +#[cfg(feature = "py-sdk")] use serde_pyobject::to_pyobject; use std::sync::Arc; use wasm_bindgen::JsValue; @@ -65,6 +67,7 @@ impl Notification { } } + #[cfg(feature = "py-sdk")] pub fn to_pyobject(&self, py: Python) -> PyResult { let bound_obj = match self { Notification::BlockAdded(v) => to_pyobject(py, &v), From 4889ffdfb52ac7609c5dcc79d188d588eff3a9db Mon Sep 17 00:00:00 2001 From: cdvallone Date: Sun, 7 Jul 2024 13:44:31 -0400 Subject: [PATCH 09/14] Attempt to fix test suite CI failure --- Cargo.toml | 5 ++--- python/Cargo.toml | 1 + rpc/wrpc/python/Cargo.toml | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3362a0f42d..2cf8f4a70e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -217,7 +217,7 @@ paste = "1.0.14" pbkdf2 = "0.12.2" portable-atomic = { version = "1.5.1", features = ["float"] } prost = "0.12.1" -pyo3 = { version = "0.21.0", features = ["extension-module", "multiple-pymethods"] } +pyo3 = { version = "0.21.0", features = ["multiple-pymethods"] } pyo3-asyncio-0-21 = { version = "0.21", features = ["attributes", "tokio-runtime"] } rand = "0.8.5" rand_chacha = "0.3.1" @@ -333,5 +333,4 @@ overflow-checks = true [profile.heap] inherits = "release" debug = true -strip = false - +strip = false \ No newline at end of file diff --git a/python/Cargo.toml b/python/Cargo.toml index e1c857c7f1..38bc00132b 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -20,6 +20,7 @@ pyo3.workspace = true [features] default = [] py-sdk = [ + "pyo3/extension-module", "kaspa-addresses/py-sdk", "kaspa-wallet-keys/py-sdk", "kaspa-wrpc-python/py-sdk", diff --git a/rpc/wrpc/python/Cargo.toml b/rpc/wrpc/python/Cargo.toml index a7530c3089..a18cf43dc0 100644 --- a/rpc/wrpc/python/Cargo.toml +++ b/rpc/wrpc/python/Cargo.toml @@ -10,8 +10,9 @@ license.workspace = true repository.workspace = true [features] -default = ["py-sdk"] +default = [] py-sdk = [ + "pyo3/extension-module", "kaspa-rpc-core/py-sdk", "kaspa-wrpc-client/py-sdk", ] @@ -29,7 +30,7 @@ kaspa-python-macros.workspace = true pyo3.workspace = true pyo3-asyncio-0-21.workspace = true serde_json.workspace = true -serde-pyobject = "0.3.0" +serde-pyobject.workspace = true thiserror.workspace = true workflow-core.workspace = true workflow-log.workspace = true From f851599a6ded3ea45fdcd0d93f486897d41d24e9 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Tue, 9 Jul 2024 17:04:02 -0400 Subject: [PATCH 10/14] Subscribe UTXOs Changed --- Cargo.lock | 1 + rpc/wrpc/python/Cargo.toml | 4 +++- rpc/wrpc/python/src/client.rs | 27 ++++++++++++++++++++++++++- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1086040391..1581728f0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3783,6 +3783,7 @@ dependencies = [ "ahash 0.8.11", "cfg-if 1.0.0", "futures", + "kaspa-addresses", "kaspa-consensus-core", "kaspa-notify", "kaspa-python-macros", diff --git a/rpc/wrpc/python/Cargo.toml b/rpc/wrpc/python/Cargo.toml index a18cf43dc0..ac1069d949 100644 --- a/rpc/wrpc/python/Cargo.toml +++ b/rpc/wrpc/python/Cargo.toml @@ -10,9 +10,10 @@ license.workspace = true repository.workspace = true [features] -default = [] +default = ["py-sdk"] py-sdk = [ "pyo3/extension-module", + "kaspa-addresses/py-sdk", "kaspa-rpc-core/py-sdk", "kaspa-wrpc-client/py-sdk", ] @@ -21,6 +22,7 @@ py-sdk = [ ahash.workspace = true cfg-if.workspace = true futures.workspace = true +kaspa-addresses.workspace = true kaspa-consensus-core.workspace = true kaspa-notify.workspace = true kaspa-rpc-core.workspace = true diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 1875d6a419..6329280358 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -1,8 +1,9 @@ use ahash::AHashMap; use futures::*; +use kaspa_addresses::Address; use kaspa_notify::listener::ListenerId; use kaspa_notify::notification::Notification; -use kaspa_notify::scope::{Scope, VirtualChainChangedScope, VirtualDaaScoreChangedScope}; +use kaspa_notify::scope::{Scope, UtxosChangedScope, VirtualChainChangedScope, VirtualDaaScoreChangedScope}; use kaspa_notify::{connection::ChannelType, events::EventType}; use kaspa_python_macros::py_async; use kaspa_rpc_core::api::rpc::RpcApi; @@ -373,6 +374,30 @@ impl RpcClient { } } + fn subscribe_utxos_changed(&self, py: Python, addresses: Vec
) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.start_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC subscribe on a closed connection")) + } + } + + fn unsubscribe_utxos_changed(&self, py: Python, addresses: Vec
) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.stop_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC unsubscribe on a closed connection")) + } + } + fn subscribe_virtual_chain_changed(&self, py: Python, include_accepted_transaction_ids: bool) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); From dcee8929be1617ce3c6205117120fb77e8cea2ea Mon Sep 17 00:00:00 2001 From: cdvallone Date: Wed, 10 Jul 2024 16:35:20 -0400 Subject: [PATCH 11/14] subscriptions --- python/examples/rpc.py | 19 +++++--- rpc/macros/src/lib.rs | 6 +++ rpc/macros/src/wrpc/python.rs | 91 ++++++++++++++++++++++++++++++++++- rpc/wrpc/python/src/client.rs | 61 +++++++++++------------ 4 files changed, 140 insertions(+), 37 deletions(-) diff --git a/python/examples/rpc.py b/python/examples/rpc.py index b9b2771eda..c46888bd5b 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -1,24 +1,30 @@ import asyncio import json import time +import os from kaspa import RpcClient -def subscription_callback(event, callback_id, **kwargs): +def subscription_callback(event, name, **kwargs): print(kwargs.get('kwarg1')) - print(f'{callback_id} | {event}') + print(f'{name} | {event}') async def rpc_subscriptions(client): - client.add_event_listener('all', subscription_callback, callback_id=1, kwarg1='Im a kwarg!!') + # client.add_event_listener('all', subscription_callback, callback_id=1, kwarg1='Im a kwarg!!') + client.add_event_listener('all', subscription_callback, name="all") - await client.subscribe_daa_score() + await client.subscribe_virtual_daa_score_changed() await client.subscribe_virtual_chain_changed(True) + await client.subscribe_block_added() + await client.subscribe_new_block_template() await asyncio.sleep(10) - await client.unsubscribe_daa_score() + await client.unsubscribe_virtual_daa_score_changed() await client.unsubscribe_virtual_chain_changed(True) + await client.ubsubscribe_block_added() + await client.ubsubscribe_new_block_template() async def rpc_calls(client): get_server_info_response = await client.get_server_info() @@ -37,7 +43,8 @@ async def rpc_calls(client): print(get_balances_by_addresses_response) async def main(): - client = RpcClient(url = "ws://localhost:17110") + rpc_host = os.environ.get("KASPA_RPC_HOST") + client = RpcClient(url = f"ws://{rpc_host}:17210") await client.connect() print(f'Client is connected: {client.is_connected()}') diff --git a/rpc/macros/src/lib.rs b/rpc/macros/src/lib.rs index 6a5c9d63b2..24ecf8239b 100644 --- a/rpc/macros/src/lib.rs +++ b/rpc/macros/src/lib.rs @@ -16,6 +16,12 @@ pub fn build_wrpc_python_interface(input: TokenStream) -> TokenStream { wrpc::python::build_wrpc_python_interface(input) } +#[proc_macro] +#[proc_macro_error] +pub fn build_wrpc_python_subscriptions(input: TokenStream) -> TokenStream { + wrpc::python::build_wrpc_python_subscriptions(input) +} + #[proc_macro] #[proc_macro_error] pub fn declare_typescript_wasm_interface(input: TokenStream) -> TokenStream { diff --git a/rpc/macros/src/wrpc/python.rs b/rpc/macros/src/wrpc/python.rs index f9a95da73f..0b535cd4ba 100644 --- a/rpc/macros/src/wrpc/python.rs +++ b/rpc/macros/src/wrpc/python.rs @@ -1,6 +1,8 @@ use crate::handler::*; -use proc_macro2::TokenStream; +use convert_case::{Case, Casing}; +use proc_macro2::{Ident, Span, TokenStream}; use quote::{quote, ToTokens}; +use regex::Regex; use std::convert::Into; use syn::{ parse::{Parse, ParseStream}, @@ -71,3 +73,90 @@ pub fn build_wrpc_python_interface(input: proc_macro::TokenStream) -> proc_macro // println!("MACRO: {}", ts.to_string()); ts.into() } + +#[derive(Debug)] +struct RpcSubscriptions { + handlers: ExprArray, +} + +impl Parse for RpcSubscriptions { + fn parse(input: ParseStream) -> Result { + let parsed = Punctuated::::parse_terminated(input).unwrap(); + if parsed.len() != 1 { + return Err(Error::new_spanned(parsed, "usage: build_wrpc_python_!([getInfo, ..])".to_string())); + } + + let mut iter = parsed.iter(); + // Intake enum variants as an array + let handlers = get_handlers(iter.next().unwrap().clone())?; + + Ok(RpcSubscriptions { handlers }) + } +} + +impl ToTokens for RpcSubscriptions { + fn to_tokens(&self, tokens: &mut TokenStream) { + let mut targets = Vec::new(); + + for handler in self.handlers.elems.iter() { + // TODO docs (name, docs) + let (name, _) = match handler { + syn::Expr::Path(expr_path) => (expr_path.path.to_token_stream().to_string(), &expr_path.attrs), + _ => { + continue; + } + }; + + let name = format!("Notify{}", name.as_str()); + let regex = Regex::new(r"^Notify").unwrap(); + let blank = regex.replace(&name, ""); + let subscribe = regex.replace(&name, "Subscribe"); + let unsubscribe = regex.replace(&name, "Unsubscribe"); + let scope = Ident::new(&blank, Span::call_site()); + let sub_scope = Ident::new(format!("{blank}Scope").as_str(), Span::call_site()); + let fn_subscribe_snake = Ident::new(&subscribe.to_case(Case::Snake), Span::call_site()); + let fn_unsubscribe_snake = Ident::new(&unsubscribe.to_case(Case::Snake), Span::call_site()); + + targets.push(quote! { + #[pymethods] + impl RpcClient { + fn #fn_subscribe_snake(&self, py: Python) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.start_notify(listener_id, Scope::#scope(#sub_scope {})).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC subscribe on a closed connection")) + } + } + + fn #fn_unsubscribe_snake(&self, py: Python) -> PyResult> { + if let Some(listener_id) = self.listener_id() { + let client = self.inner.client.clone(); + py_async! {py, async move { + client.stop_notify(listener_id, Scope::#scope(#sub_scope {})).await?; + Ok(()) + }} + } else { + Err(PyErr::new::("RPC unsubscribe on a closed connection")) + } + } + } + }); + } + + quote! { + #(#targets)* + } + .to_tokens(tokens); + } +} + +pub fn build_wrpc_python_subscriptions(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let rpc_table = parse_macro_input!(input as RpcSubscriptions); + let ts = rpc_table.to_token_stream(); + // println!("MACRO: {}", ts.to_string()); + ts.into() +} diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 6329280358..672bc3e5a1 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -9,10 +9,11 @@ use kaspa_python_macros::py_async; use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::model::*; use kaspa_rpc_core::notify::connection::ChannelConnection; -use kaspa_rpc_macros::build_wrpc_python_interface; +use kaspa_rpc_macros::{build_wrpc_python_interface, build_wrpc_python_subscriptions}; use kaspa_wrpc_client::{ client::{ConnectOptions, ConnectStrategy}, error::Error, + prelude::*, result::Result, KaspaRpcClient, WrpcEncoding, }; @@ -154,6 +155,18 @@ impl RpcClient { Ok(Self::new(url, None)?) } + fn url(&self) -> Option { + self.inner.client.url() + } + + fn is_connected(&self) -> bool { + self.inner.client.is_connected() + } + + fn encoding(&self) -> String { + self.inner.client.encoding().to_string() + } + fn connect(&self, py: Python) -> PyResult> { // TODO expose args to Python similar to WASM wRPC Client IConnectOptions let options = ConnectOptions { @@ -172,9 +185,7 @@ impl RpcClient { }} } - fn is_connected(&self) -> bool { - self.inner.client.is_connected() - } + // fn disconnect() TODO fn get_server_info(&self, py: Python) -> PyResult> { let client = self.inner.client.clone(); @@ -230,6 +241,8 @@ impl RpcClient { *self.inner.listener_id.lock().unwrap() } + // fn stop_notification_task() TODO + fn start_notification_task(&self, py: Python) -> Result<()> { if self.inner.notification_task.load(Ordering::SeqCst) { return Ok(()); @@ -350,30 +363,6 @@ impl RpcClient { #[pymethods] impl RpcClient { - fn subscribe_daa_score(&self, py: Python) -> PyResult> { - if let Some(listener_id) = self.listener_id() { - let client = self.inner.client.clone(); - py_async! {py, async move { - client.start_notify(listener_id, Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; - Ok(()) - }} - } else { - Err(PyErr::new::("RPC subscribe on a closed connection")) - } - } - - fn unsubscribe_daa_score(&self, py: Python) -> PyResult> { - if let Some(listener_id) = self.listener_id() { - let client = self.inner.client.clone(); - py_async! {py, async move { - client.stop_notify(listener_id, Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; - Ok(()) - }} - } else { - Err(PyErr::new::("RPC unsubscribe on a closed connection")) - } - } - fn subscribe_utxos_changed(&self, py: Python, addresses: Vec
) -> PyResult> { if let Some(listener_id) = self.listener_id() { let client = self.inner.client.clone(); @@ -381,7 +370,7 @@ impl RpcClient { client.start_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; Ok(()) }} - } else { + } else { Err(PyErr::new::("RPC subscribe on a closed connection")) } } @@ -392,7 +381,7 @@ impl RpcClient { py_async! {py, async move { client.stop_notify(listener_id, Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; Ok(()) - }} + }} } else { Err(PyErr::new::("RPC unsubscribe on a closed connection")) } @@ -430,6 +419,18 @@ impl RpcClient { } } +build_wrpc_python_subscriptions!([ + // UtxosChanged - added above due to parameter `addresses: Vec
`` + // VirtualChainChanged - added above due to paramter `include_accepted_transaction_ids: bool` + BlockAdded, + FinalityConflict, + FinalityConflictResolved, + NewBlockTemplate, + PruningPointUtxoSetOverride, + SinkBlueScoreChanged, + VirtualDaaScoreChanged, +]); + build_wrpc_python_interface!([ AddPeer, Ban, From 5c2b0192abb3bb6f70cdd15a926143fbfaf4de7e Mon Sep 17 00:00:00 2001 From: cdvallone Date: Wed, 10 Jul 2024 16:57:32 -0400 Subject: [PATCH 12/14] wRPC client disconnect --- python/examples/rpc.py | 7 ++++--- rpc/wrpc/python/src/client.rs | 22 ++++++++++++++++++---- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/python/examples/rpc.py b/python/examples/rpc.py index c46888bd5b..2ee5530967 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -7,7 +7,6 @@ def subscription_callback(event, name, **kwargs): - print(kwargs.get('kwarg1')) print(f'{name} | {event}') async def rpc_subscriptions(client): @@ -23,8 +22,8 @@ async def rpc_subscriptions(client): await client.unsubscribe_virtual_daa_score_changed() await client.unsubscribe_virtual_chain_changed(True) - await client.ubsubscribe_block_added() - await client.ubsubscribe_new_block_template() + await client.unsubscribe_block_added() + await client.unsubscribe_new_block_template() async def rpc_calls(client): get_server_info_response = await client.get_server_info() @@ -51,6 +50,8 @@ async def main(): await rpc_calls(client) await rpc_subscriptions(client) + await client.disconnect() + if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index 672bc3e5a1..ddfc6d908a 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -95,7 +95,7 @@ impl PyCallback { pub struct Inner { client: Arc, // resolver TODO - notification_task: AtomicBool, + notification_task: Arc, notification_ctl: DuplexChannel, callbacks: Arc>>>, listener_id: Arc>>, @@ -134,7 +134,7 @@ impl RpcClient { let rpc_client = RpcClient { inner: Arc::new(Inner { client, - notification_task: AtomicBool::new(false), + notification_task: Arc::new(AtomicBool::new(false)), notification_ctl: DuplexChannel::oneshot(), callbacks: Arc::new(Default::default()), listener_id: Arc::new(Mutex::new(None)), @@ -185,7 +185,15 @@ impl RpcClient { }} } - // fn disconnect() TODO + fn disconnect(&self, py: Python) -> PyResult> { + let client = self.clone(); + + py_async! {py, async move { + client.inner.client.disconnect().await?; + client.stop_notification_task().await?; + Ok(()) + }} + } fn get_server_info(&self, py: Python) -> PyResult> { let client = self.inner.client.clone(); @@ -241,7 +249,13 @@ impl RpcClient { *self.inner.listener_id.lock().unwrap() } - // fn stop_notification_task() TODO + async fn stop_notification_task(&self) -> Result<()> { + if self.inner.notification_task.load(Ordering::SeqCst) { + self.inner.notification_ctl.signal(()).await?; + self.inner.notification_task.store(false, Ordering::SeqCst); + } + Ok(()) + } fn start_notification_task(&self, py: Python) -> Result<()> { if self.inner.notification_task.load(Ordering::SeqCst) { From c5fc8763b31267a7d6a44c8f5530a900167f0089 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Wed, 10 Jul 2024 17:50:23 -0400 Subject: [PATCH 13/14] unregister callbacks --- python/examples/rpc.py | 8 ++++++- rpc/wrpc/python/src/client.rs | 40 +++++++++++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/python/examples/rpc.py b/python/examples/rpc.py index 2ee5530967..a7fa8c3c62 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -18,13 +18,19 @@ async def rpc_subscriptions(client): await client.subscribe_block_added() await client.subscribe_new_block_template() - await asyncio.sleep(10) + await asyncio.sleep(5) + + client.remove_event_listener('all') + print('Removed all event listeners. Sleeping for 5 seconds before unsubscribing. Should see nothing print.') + + await asyncio.sleep(5) await client.unsubscribe_virtual_daa_score_changed() await client.unsubscribe_virtual_chain_changed(True) await client.unsubscribe_block_added() await client.unsubscribe_new_block_template() + async def rpc_calls(client): get_server_info_response = await client.get_server_info() print(get_server_info_response) diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index ddfc6d908a..a48cfb9270 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -187,7 +187,7 @@ impl RpcClient { fn disconnect(&self, py: Python) -> PyResult> { let client = self.clone(); - + py_async! {py, async move { client.inner.client.disconnect().await?; client.stop_notification_task().await?; @@ -235,7 +235,43 @@ impl RpcClient { Ok(()) } - // fn remove_event_listener() TODO + fn remove_event_listener(&self, py: Python, event: String, callback: Option) -> PyResult<()> { + let event = NotificationEvent::from_str(event.as_str()).unwrap(); + let mut callbacks = self.inner.callbacks.lock().unwrap(); + + match (&event, callback) { + (NotificationEvent::All, None) => { + // Remove all callbacks from "all" events + callbacks.clear(); + } + (NotificationEvent::All, Some(callback)) => { + // Remove given callback from "all" events + for callbacks in callbacks.values_mut() { + callbacks.retain(|c| { + let cb_ref = c.callback.bind(py); + let callback_ref = callback.bind(py); + cb_ref.as_ref().ne(callback_ref.as_ref()).unwrap_or(true) + }); + } + } + (_, None) => { + // Remove all callbacks from given event + callbacks.remove(&event); + } + (_, Some(callback)) => { + // Remove given callback from given event + if let Some(callbacks) = callbacks.get_mut(&event) { + callbacks.retain(|c| { + let cb_ref = c.callback.bind(py); + let callback_ref = callback.bind(py); + cb_ref.as_ref().ne(callback_ref.as_ref()).unwrap_or(true) + }); + } + } + } + Ok(()) + } + // fn clear_event_listener() TODO fn remove_all_event_listeners(&self) -> PyResult<()> { From 2845f839275a9b1f73598138c71aef41c7f8ff86 Mon Sep 17 00:00:00 2001 From: cdvallone Date: Wed, 10 Jul 2024 18:06:23 -0400 Subject: [PATCH 14/14] fix failing kaspad build --- rpc/wrpc/python/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/wrpc/python/Cargo.toml b/rpc/wrpc/python/Cargo.toml index ac1069d949..d0ba90bfe3 100644 --- a/rpc/wrpc/python/Cargo.toml +++ b/rpc/wrpc/python/Cargo.toml @@ -10,7 +10,7 @@ license.workspace = true repository.workspace = true [features] -default = ["py-sdk"] +default = [] py-sdk = [ "pyo3/extension-module", "kaspa-addresses/py-sdk",