From b6fcaa5acc933740ae9e896bea90e5cc552cd607 Mon Sep 17 00:00:00 2001 From: 0x009922 Date: Thu, 28 Apr 2022 08:44:39 +0300 Subject: [PATCH] [refactor] #2144: redesign client's http workflow, expose internal api (#2147) --- client/src/client.rs | 405 +++++++++++++++++++------ client/src/http.rs | 35 +++ client/src/http_client.rs | 136 --------- client/src/http_default.rs | 200 ++++++++++++ client/src/lib.rs | 4 +- client/tests/integration/pagination.rs | 3 +- client/tests/integration/tx_history.rs | 3 +- 7 files changed, 560 insertions(+), 226 deletions(-) create mode 100644 client/src/http.rs delete mode 100644 client/src/http_client.rs create mode 100644 client/src/http_default.rs diff --git a/client/src/client.rs b/client/src/client.rs index 04083795644..30c9c651c76 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -3,13 +3,14 @@ use std::{ collections::HashMap, fmt::{self, Debug, Formatter}, + marker::PhantomData, sync::mpsc, thread, time::Duration, }; use eyre::{eyre, Result, WrapErr}; -use http_client::WebSocketStream; +use http_default::WebSocketStream; use iroha_config::{GetConfiguration, PostConfiguration}; use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::prelude::*; @@ -22,9 +23,159 @@ use small::SmallStr; use crate::{ config::Configuration, - http_client::{self, StatusCode, WebSocketError, WebSocketMessage}, + http::{Headers as HttpHeaders, Method as HttpMethod, RequestBuilder, Response, StatusCode}, + http_default::{self, DefaultRequestBuilder, WebSocketError, WebSocketMessage}, }; +/// General trait for all response handlers +pub trait ResponseHandler> { + /// What is the output of the handler + type Output; + + /// Function to parse HTTP response with body `T` to output `O` + /// + /// # Errors + /// Implementation dependent. + fn handle(self, response: Response) -> Result; +} + +/// Phantom struct that handles responses of Query API. +/// Depending on input query struct, transforms a response into appropriate output. +#[derive(Clone, Copy)] +pub struct QueryResponseHandler(PhantomData); + +impl Default for QueryResponseHandler { + fn default() -> Self { + Self(PhantomData) + } +} + +impl ResponseHandler for QueryResponseHandler +where + R: Query + Into + Debug, + >::Error: Into, +{ + type Output = PaginatedQueryOutput; + + fn handle(self, resp: Response>) -> Result> { + if resp.status() != StatusCode::OK { + return Err(ResponseReport::with_msg("Failed to make query request", &resp).into()); + } + + let result = VersionedPaginatedQueryResult::decode_versioned(resp.body())?; + let VersionedPaginatedQueryResult::V1(result) = result; + PaginatedQueryOutput::try_from(result) + } +} + +/// Phantom struct that handles Transaction API HTTP response +#[derive(Clone, Copy)] +pub struct TransactionResponseHandler; + +impl ResponseHandler for TransactionResponseHandler { + type Output = (); + + fn handle(self, resp: Response>) -> Result<()> { + if resp.status() == StatusCode::OK { + Ok(()) + } else { + Err(ResponseReport::with_msg("Failed to submit instructions", &resp).into()) + } + } +} + +/// Phantom struct that handles status check HTTP response +#[derive(Clone, Copy)] +pub struct StatusResponseHandler; + +impl ResponseHandler for StatusResponseHandler { + type Output = Status; + + fn handle(self, resp: Response>) -> Result { + if resp.status() != StatusCode::OK { + return Err(ResponseReport::with_msg("Failed to get status", &resp).into()); + } + serde_json::from_slice(resp.body()).wrap_err("Failed to decode body") + } +} + +/// Private structure to incapsulate error reporting for HTTP response. +struct ResponseReport(eyre::Report); + +impl ResponseReport { + /// Constructs report with provided message + fn with_msg(msg: S, response: &Response>) -> Self + where + S: AsRef, + { + let status = response.status(); + let body = String::from_utf8_lossy(response.body()); + let msg = msg.as_ref(); + + Self(eyre!("{msg}; status: {status}; response body: {body}")) + } +} + +impl From for eyre::Report { + fn from(report: ResponseReport) -> Self { + report.0 + } +} + +/// More convenient version of [`iroha_data_model::prelude::PaginatedQueryResult`]. +/// The only difference is that this struct has `output` field extracted from the result +/// accordingly to the source query. +pub struct PaginatedQueryOutput +where + R: Query + Into + Debug, + >::Error: Into, +{ + /// Query output + pub output: R::Output, + /// See [`iroha_data_model::prelude::PaginatedQueryResult`] + pub pagination: Pagination, + /// See [`iroha_data_model::prelude::PaginatedQueryResult`] + pub total: u64, +} + +impl PaginatedQueryOutput +where + R: Query + Into + Debug, + >::Error: Into, +{ + /// Extracts output as is + pub fn only_output(self) -> R::Output { + self.output + } +} + +impl TryFrom for PaginatedQueryOutput +where + R: Query + Into + Debug, + >::Error: Into, +{ + type Error = eyre::Report; + + fn try_from( + PaginatedQueryResult { + result, + pagination, + total, + }: PaginatedQueryResult, + ) -> Result { + let QueryResult(result) = result; + let output = R::Output::try_from(result) + .map_err(Into::into) + .wrap_err("Unexpected type")?; + + Ok(Self { + output, + pagination, + total, + }) + } +} + /// Iroha client #[derive(Clone)] pub struct Client { @@ -43,7 +194,7 @@ pub struct Client { /// Current account account_id: AccountId, /// Http headers which will be appended to each request - headers: http_client::Headers, + headers: HttpHeaders, /// If `true` add nonce, which makes different hashes for /// transactions which occur repeatedly and/or simultaneously add_transaction_nonce: bool, @@ -56,7 +207,7 @@ impl Client { /// # Errors /// If configuration isn't valid (e.g public/private keys don't match) pub fn new(configuration: &Configuration) -> Result { - Self::with_headers(configuration, http_client::Headers::default()) + Self::with_headers(configuration, HttpHeaders::default()) } /// Constructor for client from configuration and headers @@ -202,31 +353,47 @@ impl Client { &self, transaction: Transaction, ) -> Result> { + let (req, hash, resp_handler) = + self.prepare_transaction_request::(transaction)?; + let response = req + .send() + .wrap_err_with(|| format!("Failed to send transaction with hash {:?}", hash))?; + resp_handler.handle(response)?; + Ok(hash) + } + + /// Lower-level Instructions API entry point. + /// + /// Returns a tuple with a provided request builder, a hash of the transaction, and a response handler. + /// Despite the fact that response handling can be implemented just by asserting that status code is 200, + /// it is better to use a response handler anyway. It allows to abstract from implementation details. + /// + /// For general usage example see [`Client::prepare_query_request`]. + /// + /// # Errors + /// Fails if transaction check fails + pub fn prepare_transaction_request( + &self, + transaction: Transaction, + ) -> Result<(B, HashOf, TransactionResponseHandler)> + where + B: RequestBuilder, + { transaction.check_limits(&self.transaction_limits)?; let transaction: VersionedTransaction = transaction.into(); let hash = transaction.hash(); let transaction_bytes: Vec = transaction.encode_versioned(); - let response = http_client::post( - format!("{}/{}", &self.torii_url, uri::TRANSACTION), - transaction_bytes, - Vec::<(String, String)>::new(), - self.headers.clone(), - ) - .wrap_err_with(|| { - format!( - "Failed to send transaction with hash {:?}", - transaction.hash() + + Ok(( + B::new( + HttpMethod::POST, + format!("{}/{}", &self.torii_url, uri::TRANSACTION), ) - })?; - if response.status() == StatusCode::OK { - Ok(hash) - } else { - Err(eyre!( - "Failed to submit instructions with HTTP status: {}. Response body: {}", - response.status(), - String::from_utf8_lossy(response.body()) - )) - } + .bytes(transaction_bytes) + .headers(self.headers.clone()), + hash, + TransactionResponseHandler, + )) } /// Submits and waits until the transaction is either rejected or committed. @@ -319,43 +486,98 @@ impl Client { ) } - /// Query API entry point. Requests queries from `Iroha` peers with pagination. + /// Lower-level Query API entry point. Prepares an http-request and returns it with an http-response handler. /// /// # Errors - /// Fails if sending request fails - #[log] - pub fn request_with_pagination( + /// Fails if query signing fails. + /// + /// # Examples + /// + /// ```rust,ignore + /// use eyre::Result; + /// use iroha_client::{ + /// client::{Client, ResponseHandler}, + /// http::{RequestBuilder, Response}, + /// }; + /// use iroha_data_model::prelude::{Account, FindAllAccounts, Pagination}; + /// + /// struct YourAsyncRequest; + /// + /// impl YourAsyncRequest { + /// async fn send(self) -> Response> { + /// // do the stuff + /// } + /// } + /// + /// // Implement builder for this request + /// impl RequestBuilder for YourAsyncRequest { + /// // ... + /// } + /// + /// async fn fetch_accounts(client: &Client) -> Result> { + /// // Put `YourAsyncRequest` as a type here + /// // It returns the request and the handler (zero-cost abstraction) for the response + /// let (req, resp_handler) = client.prepare_query_request::<_, YourAsyncRequest>( + /// FindAllAccounts::new(), + /// Pagination::default(), + /// )?; + /// + /// // Do what you need to send the request and to get the response + /// let resp = req.send().await; + /// + /// // Handle response with the handler and get typed result + /// let accounts = resp_handler.handle(resp)?; + /// + /// Ok(accounts.only_output()) + /// } + /// ``` + pub fn prepare_query_request( &self, request: R, pagination: Pagination, - ) -> Result + ) -> Result<(B, QueryResponseHandler)> where R: Query + Into + Debug, >::Error: Into, + B: RequestBuilder, { let pagination: Vec<_> = pagination.into(); let request = QueryRequest::new(request.into(), self.account_id.clone()); let request: VersionedSignedQueryRequest = self.sign_query(request)?.into(); - let response = http_client::post( - format!("{}/{}", &self.torii_url, uri::QUERY), - request.encode_versioned(), - pagination, - self.headers.clone(), - )?; - if response.status() != StatusCode::OK { - return Err(eyre!( - "Failed to make query request with HTTP status: {}, {}", - response.status(), - std::str::from_utf8(response.body()).unwrap_or(""), - )); - } - let result = VersionedPaginatedQueryResult::decode_versioned(response.body())?; - let VersionedPaginatedQueryResult::V1(PaginatedQueryResult { result, .. }) = result; - let QueryResult(result) = result; - R::Output::try_from(result) - .map_err(Into::into) - .wrap_err("Unexpected type") + Ok(( + B::new( + HttpMethod::POST, + format!("{}/{}", &self.torii_url, uri::QUERY), + ) + .bytes(request.encode_versioned()) + .params(pagination) + .headers(self.headers.clone()), + QueryResponseHandler::default(), + )) + } + + /// Query API entry point. Requests queries from `Iroha` peers with pagination. + /// + /// Uses default blocking http-client. If you need some custom integration, look at + /// [`Self::prepare_query_request()`]. + /// + /// # Errors + /// Fails if sending request fails + #[log] + pub fn request_with_pagination( + &self, + request: R, + pagination: Pagination, + ) -> Result> + where + R: Query + Into + Debug, + >::Error: Into, + { + let (req, resp_handler) = + self.prepare_query_request::(request, pagination)?; + let response = req.send()?; + resp_handler.handle(response) } /// Query API entry point. Requests queries from `Iroha` peers. @@ -369,6 +591,7 @@ impl Client { >::Error: Into, { self.request_with_pagination(request, Pagination::default()) + .map(PaginatedQueryOutput::only_output) } /// Connects through `WebSocket` to listen for `Iroha` pipeline and data events. @@ -401,12 +624,14 @@ impl Client { ) -> Result> { let pagination: Vec<_> = pagination.into(); for _ in 0..retry_count { - let response = http_client::get( + let response = DefaultRequestBuilder::new( + HttpMethod::GET, format!("{}/{}", &self.torii_url, uri::PENDING_TRANSACTIONS), - Vec::new(), - pagination.clone(), - self.headers.clone(), - )?; + ) + .params(pagination.clone()) + .headers(self.headers.clone()) + .send()?; + if response.status() == StatusCode::OK { let pending_transactions = VersionedPendingTransactions::decode_versioned(response.body())?; @@ -456,14 +681,14 @@ impl Client { let headers = [("Content-Type".to_owned(), "application/json".to_owned())] .into_iter() .collect(); - let get_cfg = serde_json::to_vec(get_config).wrap_err("Failed to serialize")?; - - let resp = http_client::get::<_, Vec<(&str, &str)>, _, _>( + let resp = DefaultRequestBuilder::new( + HttpMethod::GET, format!("{}/{}", &self.torii_url, uri::CONFIGURATION), - get_cfg, - vec![], - headers, - )?; + ) + .bytes(serde_json::to_vec(get_config).wrap_err("Failed to serialize")?) + .headers(headers) + .send()?; + if resp.status() != StatusCode::OK { return Err(eyre!( "Failed to get configuration with HTTP status: {}. {}", @@ -482,13 +707,14 @@ impl Client { let headers = [("Content-type".to_owned(), "application/json".to_owned())] .into_iter() .collect(); - let resp = http_client::post::<_, Vec<(&str, &str)>, _, _>( - &format!("{}/{}", self.torii_url, uri::CONFIGURATION), - serde_json::to_vec(&post_config) - .wrap_err(format!("Failed to serialize {:?}", post_config))?, - vec![], - headers, - )?; + let body = serde_json::to_vec(&post_config) + .wrap_err(format!("Failed to serialize {:?}", post_config))?; + let url = &format!("{}/{}", self.torii_url, uri::CONFIGURATION); + let resp = DefaultRequestBuilder::new(HttpMethod::POST, url) + .bytes(body) + .headers(headers) + .send()?; + if resp.status() != StatusCode::OK { return Err(eyre!( "Failed to post configuration with HTTP status: {}. {}", @@ -523,20 +749,29 @@ impl Client { /// # Errors /// Fails if sending request or decoding fails pub fn get_status(&self) -> Result { - let resp = http_client::get::<_, Vec<(&str, &str)>, _, _>( - format!("{}/{}", &self.telemetry_url, uri::STATUS), - Vec::new(), - vec![], - self.headers.clone(), - )?; - if resp.status() != StatusCode::OK { - return Err(eyre!( - "Failed to get status with HTTP status: {}. {}", - resp.status(), - std::str::from_utf8(resp.body()).unwrap_or(""), - )); - } - serde_json::from_slice(resp.body()).wrap_err("Failed to decode body") + let (req, resp_handler) = self.prepare_status_request::(); + let resp = req.send()?; + resp_handler.handle(resp) + } + + /// Prepares http-request to implement [`Self::get_status()`] on your own. + /// + /// For general usage example see [`Client::prepare_query_request`]. + /// + /// # Errors + /// Fails if request build fails + pub fn prepare_status_request(&self) -> (B, StatusResponseHandler) + where + B: RequestBuilder, + { + ( + B::new( + HttpMethod::GET, + format!("{}/{}", &self.telemetry_url, uri::STATUS), + ) + .headers(self.headers.clone()), + StatusResponseHandler, + ) } } @@ -551,12 +786,8 @@ impl EventIterator { /// /// # Errors /// Fails if connecting and sending subscription to web socket fails - pub fn new( - url: &str, - event_filter: EventFilter, - headers: http_client::Headers, - ) -> Result { - let mut stream = http_client::web_socket_connect(url, headers)?; + pub fn new(url: &str, event_filter: EventFilter, headers: HttpHeaders) -> Result { + let mut stream = http_default::web_socket_connect(url, headers)?; stream.write_message(WebSocketMessage::Binary( VersionedEventSubscriberMessage::from(EventSubscriberMessage::from(event_filter)) .encode_versioned(), diff --git a/client/src/http.rs b/client/src/http.rs new file mode 100644 index 00000000000..ff93f0c9bb6 --- /dev/null +++ b/client/src/http.rs @@ -0,0 +1,35 @@ +use std::{borrow::Borrow, collections::HashMap}; + +pub use http::{Method, Response, StatusCode}; + +/// Type alias for HTTP headers hash map +pub type Headers = HashMap; + +/// General trait for building http-requests. +/// +/// To use custom builder with client, you need to implement this trait for some type and pass it +/// to the client that will fill it. +pub trait RequestBuilder { + /// Constructs a new builder with provided method and URL + #[must_use] + fn new(method: Method, url: U) -> Self + where + U: AsRef; + + /// Sets request's body in bytes + #[must_use] + fn bytes(self, data: Vec) -> Self; + + /// Sets request's query params + #[must_use] + fn params(self, params: P) -> Self + where + P: IntoIterator, + P::Item: Borrow<(K, V)>, + K: AsRef, + V: ToString; + + /// Sets request's headers + #[must_use] + fn headers(self, headers: Headers) -> Self; +} diff --git a/client/src/http_client.rs b/client/src/http_client.rs deleted file mode 100644 index ffe83e5619c..00000000000 --- a/client/src/http_client.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::{borrow::Borrow, collections::HashMap, net::TcpStream}; - -use attohttpc::{body, header::HeaderName, RequestBuilder, Response as AttohttpcResponse}; -use eyre::{eyre, Error, Result, WrapErr}; -pub use http::{Response, StatusCode}; -use tungstenite::{stream::MaybeTlsStream, WebSocket}; -pub use tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; - -type Bytes = Vec; -pub type Headers = HashMap; - -trait AttoHttpReqExt: Sized { - fn set_headers(self, headers: Headers) -> Result; -} - -impl AttoHttpReqExt for RequestBuilder>> { - fn set_headers(mut self, headers: Headers) -> Result { - for (h, v) in headers { - let h = HeaderName::from_bytes(h.as_ref()) - .wrap_err_with(|| format!("Failed to parse header name {}", h))?; - self = self.header(h, v); - } - Ok(self) - } -} - -trait HttpReqExt: Sized { - fn set_headers(self, headers: Headers) -> Result; -} - -impl HttpReqExt for http::request::Builder { - fn set_headers(mut self, headers: Headers) -> Result { - for (h, v) in headers { - let h = HeaderName::from_bytes(h.as_ref()) - .wrap_err_with(|| format!("Failed to parse header name {}", h))?; - self = self.header(h, v); - } - Ok(self) - } -} - -pub fn post( - url: U, - body: Bytes, - query_params: P, - headers: Headers, -) -> Result> -where - U: AsRef, - P: IntoIterator, - P::Item: Borrow<(K, V)>, - K: AsRef, - V: ToString, -{ - let url = url.as_ref(); - let response = attohttpc::post(url) - .bytes(body) - .params(query_params) - .set_headers(headers)? - .send() - .wrap_err_with(|| format!("Failed to send http post request to {}", url))?; - ClientResponse(response).try_into() -} - -pub fn get( - url: U, - body: Bytes, - query_params: P, - headers: Headers, -) -> Result> -where - U: AsRef, - P: IntoIterator, - P::Item: Borrow<(K, V)>, - K: AsRef, - V: ToString, -{ - let url = url.as_ref(); - let response = attohttpc::get(url) - .bytes(body) - .params(query_params) - .set_headers(headers)? - .send() - .wrap_err_with(|| format!("Failed to send http get request to {}", url))?; - ClientResponse(response).try_into() -} - -pub type WebSocketStream = WebSocket>; - -pub fn web_socket_connect(uri: U, headers: Headers) -> Result -where - U: AsRef, -{ - let ws_uri = if let Some(https_uri) = uri.as_ref().strip_prefix("https://") { - "wss://".to_owned() + https_uri - } else if let Some(http_uri) = uri.as_ref().strip_prefix("http://") { - "ws://".to_owned() + http_uri - } else { - return Err(eyre!("No schema in web socket uri provided")); - }; - - let req = http::Request::builder() - .uri(ws_uri) - .set_headers(headers) - .wrap_err("Failed to build web socket request")? - .body(()) - .wrap_err("Failed to build web socket request")?; - - let (stream, _) = tungstenite::connect(req)?; - Ok(stream) -} - -struct ClientResponse(AttohttpcResponse); - -impl TryFrom for Response { - type Error = Error; - - fn try_from(response: ClientResponse) -> Result { - let ClientResponse(response) = response; - let mut builder = Response::builder().status(response.status()); - let headers = builder - .headers_mut() - .ok_or_else(|| eyre!("Failed to get headers map reference."))?; - for (key, value) in response.headers() { - headers.insert(key, value.clone()); - } - response - .bytes() - .wrap_err("Failed to get response as bytes") - .and_then(|bytes| { - builder - .body(bytes) - .wrap_err("Failed to construct response bytes body") - }) - } -} diff --git a/client/src/http_default.rs b/client/src/http_default.rs new file mode 100644 index 00000000000..6ae54235c35 --- /dev/null +++ b/client/src/http_default.rs @@ -0,0 +1,200 @@ +use std::{borrow::Borrow, net::TcpStream}; + +use attohttpc::{ + body as atto_body, RequestBuilder as AttoHttpRequestBuilder, Response as AttoHttpResponse, +}; +use eyre::{eyre, Error, Result, WrapErr}; +use http::header::HeaderName; +use tungstenite::{stream::MaybeTlsStream, WebSocket}; +pub use tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; + +use crate::http::{Headers, Method, RequestBuilder, Response}; + +type Bytes = Vec; +type AttoHttpRequestBuilderWithBytes = AttoHttpRequestBuilder>; + +trait SetSingleHeader { + fn header(self, key: HeaderName, value: String) -> Self; +} + +impl SetSingleHeader for AttoHttpRequestBuilderWithBytes { + #[allow(clippy::only_used_in_recursion)] // False-positive + fn header(self, key: HeaderName, value: String) -> Self { + self.header(key, value) + } +} + +impl SetSingleHeader for http::request::Builder { + fn header(self, key: HeaderName, value: String) -> Self { + self.header(key, value) + } +} + +trait SetHeadersExt: Sized + SetSingleHeader { + fn set_headers(mut self, headers: Headers) -> Result { + for (h, v) in headers { + let h = HeaderName::from_bytes(h.as_ref()) + .wrap_err_with(|| format!("Failed to parse header name {}", h))?; + self = self.header(h, v); + } + Ok(self) + } +} + +impl SetHeadersExt for AttoHttpRequestBuilderWithBytes {} + +impl SetHeadersExt for http::request::Builder {} + +/// Default request builder & sender implemented on top of `attohttpc` crate. +/// +/// Its main goal is not to be efficient, but simple. Its implementation contains +/// some intermediate allocations that could be avoided with additional complexity. +pub struct DefaultRequestBuilder { + method: Method, + url: String, + params: Option>, + headers: Option, + body: Option, +} + +impl DefaultRequestBuilder { + /// Sends prepared request and returns bytes response + /// + /// # Errors + /// Fails if request building and sending fails or response transformation fails + pub fn send(self) -> Result> { + let Self { + method, + url, + body, + params, + headers, + .. + } = self; + + let bytes_anyway = match body { + Some(bytes) => bytes, + None => Vec::new(), + }; + + // for error formatting + let method_url_cloned = (method.clone(), url.clone()); + + let mut builder = AttoHttpRequestBuilder::new(method, url).bytes(bytes_anyway); + if let Some(params) = params { + builder = builder.params(params); + } + if let Some(headers) = headers { + builder = builder.set_headers(headers)?; + } + + let response = builder.send().wrap_err_with(|| { + format!( + "Failed to send http {} request to {}", + &method_url_cloned.0, &method_url_cloned.1 + ) + })?; + + ClientResponse(response).try_into() + } +} + +impl RequestBuilder for DefaultRequestBuilder { + fn new(method: Method, url: U) -> Self + where + U: AsRef, + { + Self { + method, + url: url.as_ref().to_owned(), + headers: None, + params: None, + body: None, + } + } + + fn bytes(self, data: Vec) -> Self { + Self { + body: Some(data), + ..self + } + } + + fn headers(self, headers: Headers) -> Self { + Self { + headers: Some(headers), + ..self + } + } + + fn params(self, params: P) -> Self + where + P: IntoIterator, + P::Item: Borrow<(K, V)>, + K: AsRef, + V: ToString, + { + Self { + params: Some( + params + .into_iter() + .map(|pair| { + let (k, v) = pair.borrow(); + (k.as_ref().to_owned(), v.to_string()) + }) + .collect(), + ), + ..self + } + } +} + +pub type WebSocketStream = WebSocket>; + +pub fn web_socket_connect(uri: U, headers: Headers) -> Result +where + U: AsRef, +{ + let ws_uri = if let Some(https_uri) = uri.as_ref().strip_prefix("https://") { + "wss://".to_owned() + https_uri + } else if let Some(http_uri) = uri.as_ref().strip_prefix("http://") { + "ws://".to_owned() + http_uri + } else { + return Err(eyre!("No schema in web socket uri provided")); + }; + + let req = http::Request::builder() + .uri(ws_uri) + .set_headers(headers) + .wrap_err("Failed to build web socket request")? + .body(()) + .wrap_err("Failed to build web socket request")?; + + let (stream, _) = tungstenite::connect(req)?; + Ok(stream) +} + +struct ClientResponse(AttoHttpResponse); + +impl TryFrom for Response { + type Error = Error; + + fn try_from(response: ClientResponse) -> Result { + let ClientResponse(response) = response; + let mut builder = Response::builder().status(response.status()); + let headers = builder + .headers_mut() + .ok_or_else(|| eyre!("Failed to get headers map reference."))?; + for (key, value) in response.headers() { + headers.insert(key, value.clone()); + } + response + .bytes() + .wrap_err("Failed to get response as bytes") + .and_then(|bytes| { + builder + .body(bytes) + .wrap_err("Failed to construct response bytes body") + }) + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index bf795c06814..cc8f68be429 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -5,7 +5,9 @@ pub mod client; /// Module with iroha client config pub mod config; pub use config::Configuration; -mod http_client; +/// Module with general communication primitives like an HTTP request builder. +pub mod http; +mod http_default; /// Module containing sample configurations for tests and benchmarks. pub mod samples { diff --git a/client/tests/integration/pagination.rs b/client/tests/integration/pagination.rs index 3802a08a41c..d696a8d9411 100644 --- a/client/tests/integration/pagination.rs +++ b/client/tests/integration/pagination.rs @@ -38,6 +38,7 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() { limit: Some(5), }, ) - .expect("Failed to get assets"); + .expect("Failed to get assets") + .only_output(); assert_eq!(vec.len(), 5); } diff --git a/client/tests/integration/tx_history.rs b/client/tests/integration/tx_history.rs index ab8af770b38..50655522b9a 100644 --- a/client/tests/integration/tx_history.rs +++ b/client/tests/integration/tx_history.rs @@ -64,7 +64,8 @@ fn client_has_rejected_and_acepted_txs_should_return_tx_history() { limit: Some(50), }, ) - .expect("Failed to get transaction history"); + .expect("Failed to get transaction history") + .only_output(); assert_eq!(transactions.len(), 50); let mut prev_creation_time = 0;