Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gh-2406] support rpc requests via proxies. #2462

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 165 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ tonic = { version = "0.8", features = ["gzip"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15" }
fast-socks5 = { version = "0.9.1" }
pin-project = { version = "1.1.5" }

codespan-reporting = "0.11.1"
codespan = "0.11.1"
Expand Down
3 changes: 3 additions & 0 deletions crates/rooch-rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jsonrpsee = { workspace = true }
serde_json = { workspace = true }
log = { workspace = true }
hex = { workspace = true }
axum = { workspace = true }
fast-socks5 = { workspace = true }
pin-project = { workspace = true }

move-core-types = { workspace = true }

Expand Down
7 changes: 7 additions & 0 deletions crates/rooch-rpc-client/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct Env {
pub alias: String,
pub rpc: String,
pub ws: Option<String>,
pub proxy: Option<String>,
}

impl Env {
Expand All @@ -83,6 +84,9 @@ impl Env {
if let Some(ws_url) = &self.ws {
builder = builder.ws_url(ws_url);
}
if let Some(proxy_url) = &self.proxy {
builder = builder.proxy_url(proxy_url);
}

builder.build(&self.rpc).await
}
Expand All @@ -92,6 +96,7 @@ impl Env {
alias: BuiltinChainID::Dev.chain_name(),
rpc: ROOCH_DEV_NET_URL.into(),
ws: None,
proxy: None,
}
}

Expand All @@ -100,6 +105,7 @@ impl Env {
alias: BuiltinChainID::Test.chain_name(),
rpc: ROOCH_TEST_NET_URL.into(),
ws: None,
proxy: None,
}
}

Expand All @@ -121,6 +127,7 @@ impl Default for Env {
alias: BuiltinChainID::Local.chain_name(),
rpc: ServerConfig::default().url(false),
ws: None,
proxy: None,
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions crates/rooch-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use fast_socks5::client::Socks5Stream;
use fast_socks5::server::{Config, Socks5Socket};
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use move_core_types::language_storage::ModuleId;
use move_core_types::metadata::Metadata;
use move_core_types::resolver::ModuleResolver;
Expand All @@ -15,9 +18,13 @@ use moveos_types::{
function_return_value::FunctionResult, module_binding::MoveFunctionCaller,
moveos_std::tx_context::TxContext, transaction::FunctionCall,
};
use pin_project::pin_project;
use rooch_client::RoochRpcClient;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Handle;

pub mod client_config;
Expand All @@ -27,6 +34,7 @@ pub mod wallet_context;
pub struct ClientBuilder {
request_timeout: Duration,
ws_url: Option<String>,
proxy_url: Option<String>,
}

impl ClientBuilder {
Expand All @@ -40,6 +48,11 @@ impl ClientBuilder {
self
}

pub fn proxy_url(mut self, url: impl AsRef<str>) -> Self {
self.proxy_url = Some(url.as_ref().to_string());
self
}

pub async fn build(self, http: impl AsRef<str>) -> Result<Client> {
// TODO: add verison info

Expand All @@ -50,25 +63,83 @@ impl ClientBuilder {
.build(http)?,
);

let server_addr = self.socks_server_no_auth().await;
let server_url = format!("ws://{}", server_addr);
// TODO: build_with_stream
let ws_client = Arc::new(WsClientBuilder::default().build(&server_url).await.unwrap());

Ok(Client {
http: http_client.clone(),
ws: ws_client.clone(),
rooch: RoochRpcClient::new(http_client.clone()),
})
}

pub async fn socks_server_no_auth(self) -> SocketAddr {
let mut config = Config::default();
config.set_dns_resolve(false);
let config = Arc::new(config);

let proxy_url = if self.proxy_url.is_some() {
self.proxy_url.clone().unwrap()
} else {
env::var("ALL_PROXY").unwrap()
};

let listener = TcpListener::bind(proxy_url).await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
self.spawn_socks_server(listener, config).await;

proxy_addr
}

pub async fn spawn_socks_server(self, listener: TcpListener, config: Arc<Config>) {
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let mut socks5_socket = Socks5Socket::new(stream, config.clone());
socks5_socket.set_reply_ip(addr.ip());

socks5_socket.upgrade_to_socks5().await.unwrap();
}
});
}

pub async fn connect_over_socks_stream(
self,
server_addr: SocketAddr,
) -> Socks5Stream<TcpStream> {
let target_addr = server_addr.ip().to_string();
let target_port = server_addr.port();

let socks_server = self.socks_server_no_auth().await;

Socks5Stream::connect(
socks_server,
target_addr,
target_port,
fast_socks5::client::Config::default(),
)
.await
.unwrap()
}
}

impl Default for ClientBuilder {
fn default() -> Self {
Self {
request_timeout: Duration::from_secs(60),
ws_url: None,
proxy_url: None,
}
}
}

#[derive(Clone)]
pub struct Client {
http: Arc<HttpClient>,
ws: Arc<WsClient>,
pub rooch: RoochRpcClient,
}

Expand All @@ -86,6 +157,14 @@ impl Client {
) -> Result<serde_json::Value> {
Ok(self.http.request(method, params).await?)
}

pub async fn request_with_ws(
&self,
method: &str,
params: Vec<serde_json::Value>,
) -> Result<serde_json::Value> {
Ok(self.ws.request(method, params).await?)
}
}

impl MoveFunctionCaller for Client {
Expand Down Expand Up @@ -122,3 +201,14 @@ impl ModuleResolver for &Client {
})
}
}

#[pin_project]
pub struct DataStream<T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin>(
#[pin] Socks5Stream<T>,
);

impl<T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin> DataStream<T> {
pub fn new(t: Socks5Stream<T>) -> Self {
Self(t)
}
}
11 changes: 10 additions & 1 deletion crates/rooch/src/commands/env/commands/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@ pub struct AddCommand {
pub rpc: String,
#[clap(long, value_hint = ValueHint::Url)]
pub ws: Option<String>,
#[clap(long, value_hint = ValueHint::Url)]
pub proxy: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxy is global, not a configuration option for Rooch Env.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxy is global, not a configuration option for Rooch Env.

I think it should and shall be distinguished from the global proxy for each specific application. Global proxy in the system and the proxy in the application are two different elements and they should not be deemed as one.

}

impl AddCommand {
pub async fn execute(self) -> RoochResult<()> {
let mut context = self.context_options.build()?;
let AddCommand { alias, rpc, ws, .. } = self;
let AddCommand {
alias,
rpc,
ws,
proxy,
..
} = self;
let env = Env {
ws,
proxy,
rpc,
alias: alias.clone(),
};
Expand Down
12 changes: 7 additions & 5 deletions crates/rooch/src/commands/env/commands/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ impl ListCommand {
let context = self.context_options.build()?;

println!(
"{:^24} | {:^48} | {:^48} | {:^12}",
"Env Alias", "RPC URL", "Websocket URL", "Active Env"
"{:^24} | {:^48} | {:^48} | {:^48} | {:^12}",
"Env Alias", "RPC URL", "Websocket URL", "Proxy", "Active Env"
);
println!("{}", ["-"; 153].join(""));
println!("{}", ["-"; 203].join(""));

for env in context.client_config.envs.iter() {
let mut active = "";
Expand All @@ -29,9 +29,11 @@ impl ListCommand {
}

let ws = env.ws.clone().unwrap_or("Null".to_owned());
let proxy = env.proxy.clone().unwrap_or("Null".to_owned());

println!(
"{:^24} | {:^48} | {:^48} | {:^12}",
env.alias, env.rpc, ws, active
"{:^24} | {:^48} | {:^48} | {:^48} | {:^12}",
env.alias, env.rpc, ws, proxy, active
)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/rooch/src/commands/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl CommandAction<()> for Init {
alias: "custom".to_string(),
rpc: chain_url[1].to_owned(),
ws: None,
proxy: None,
})
}

Expand Down Expand Up @@ -112,6 +113,7 @@ impl CommandAction<()> for Init {
alias,
rpc: url,
ws: None,
proxy: None,
}
})
}
Expand Down
7 changes: 4 additions & 3 deletions crates/rooch/src/commands/rpc/commands/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct RequestCommand {

#[clap(flatten)]
pub(crate) context_options: WalletContextOptions,

/// Return command outputs in json format
#[clap(long, default_value = "false")]
json: bool,
Expand All @@ -36,7 +35,8 @@ pub struct RequestCommand {
#[async_trait]
impl CommandAction<serde_json::Value> for RequestCommand {
async fn execute(self) -> RoochResult<serde_json::Value> {
let client = self.context_options.build()?.get_client().await?;
let context = self.context_options.build()?;
let client = context.get_client().await?;
let params = match self.params {
Some(serde_json::Value::Array(array)) => array,
Some(value) => {
Expand All @@ -51,7 +51,8 @@ impl CommandAction<serde_json::Value> for RequestCommand {
}
None => vec![],
};
Ok(client.request(self.method.as_str(), params).await?)

Ok(client.request_with_ws(&self.method, params).await?)
}

/// Executes the command, and serializes it to the common JSON output type
Expand Down
Loading