diff --git a/fuel-client/assets/schema.sdl b/fuel-client/assets/schema.sdl index 25cb40c84f7..fce1aee2bd0 100644 --- a/fuel-client/assets/schema.sdl +++ b/fuel-client/assets/schema.sdl @@ -243,6 +243,46 @@ type ContractOutput { stateRoot: Bytes32! } +type DaMessage { + amount: U64! + sender: Address! + recipient: Address! + owner: Address! + nonce: U64! + data: [Int!]! + daHeight: U64! + fuelBlockSpend: U64 +} + +type DaMessageConnection { + """ + Information to aid in pagination. + """ + pageInfo: PageInfo! + """ + A list of edges. + """ + edges: [DaMessageEdge!]! + """ + A list of nodes. + """ + nodes: [DaMessage!]! +} + +""" +An edge in a connection. +""" +type DaMessageEdge { + """ + A cursor for use in pagination + """ + cursor: String! + """ + The item at the end of the edge + """ + node: DaMessage! +} + """ Implement the DateTime scalar @@ -399,6 +439,7 @@ type Query { contractBalance(contract: ContractId!, asset: AssetId!): ContractBalance! contractBalances(filter: ContractBalanceFilterInput!, first: Int, after: String, last: Int, before: String): ContractBalanceConnection! nodeInfo: NodeInfo! + messages(owner: Address, first: Int, after: String, last: Int, before: String): DaMessageConnection! } type Receipt { diff --git a/fuel-client/src/client.rs b/fuel-client/src/client.rs index 3879b29ebb5..043c1a6cc8f 100644 --- a/fuel-client/src/client.rs +++ b/fuel-client/src/client.rs @@ -446,6 +446,19 @@ impl FuelClient { Ok(balances) } + + pub async fn messages( + &self, + owner: Option<&str>, + request: PaginationRequest, + ) -> io::Result> { + let owner: Option = owner.map(|owner| owner.parse()).transpose()?; + let query = schema::message::OwnedDaMessageQuery::build(&(owner, request).into()); + + let messages = self.query(query).await?.messages.into(); + + Ok(messages) + } } #[cfg(any(test, feature = "test-helpers"))] diff --git a/fuel-client/src/client/schema.rs b/fuel-client/src/client/schema.rs index 7793b6907dc..92b7c9b6567 100644 --- a/fuel-client/src/client/schema.rs +++ b/fuel-client/src/client/schema.rs @@ -18,6 +18,7 @@ pub mod block; pub mod chain; pub mod coin; pub mod contract; +pub mod message; pub mod node_info; pub mod primitives; pub mod tx; @@ -221,7 +222,7 @@ pub struct OutputBreakpoint { } /// Generic graphql pagination query args -#[derive(cynic::FragmentArguments, Debug)] +#[derive(cynic::FragmentArguments, Debug, Default)] pub struct ConnectionArgs { /// Skip until cursor (forward pagination) pub after: Option, diff --git a/fuel-client/src/client/schema/message.rs b/fuel-client/src/client/schema/message.rs new file mode 100644 index 00000000000..447089e5959 --- /dev/null +++ b/fuel-client/src/client/schema/message.rs @@ -0,0 +1,107 @@ +use super::{PageDirection, PageInfo, PaginatedResult, PaginationRequest}; +use crate::client::schema::{schema, Address, U64}; + +#[derive(cynic::QueryFragment, Debug)] +#[cynic(schema_path = "./assets/schema.sdl")] +pub struct DaMessage { + pub amount: U64, + pub sender: Address, + pub recipient: Address, + pub owner: Address, + pub nonce: U64, + pub data: Vec, + pub da_height: U64, + pub fuel_block_spend: Option, +} + +#[derive(cynic::QueryFragment, Debug)] +#[cynic( + schema_path = "./assets/schema.sdl", + graphql_type = "Query", + argument_struct = "OwnedMessagesConnectionArgs" +)] +pub struct OwnedDaMessageQuery { + #[arguments(owner = &args.owner, after = &args.after, before = &args.before, first = &args.first, last = &args.last)] + pub messages: DaMessageConnection, +} + +#[derive(cynic::QueryFragment, Debug)] +#[cynic(schema_path = "./assets/schema.sdl")] +pub struct DaMessageConnection { + pub edges: Vec, + pub page_info: PageInfo, +} + +#[derive(cynic::QueryFragment, Debug)] +#[cynic(schema_path = "./assets/schema.sdl")] +pub struct DaMessageEdge { + pub cursor: String, + pub node: DaMessage, +} + +#[derive(cynic::FragmentArguments, Debug)] +pub struct OwnedMessagesConnectionArgs { + /// Filter messages based on an owner + pub owner: Option
, + /// Skip until coin id (forward pagination) + pub after: Option, + /// Skip until coin id (backward pagination) + pub before: Option, + /// Retrieve the first n coins in order (forward pagination) + pub first: Option, + /// Retrieve the last n coins in order (backward pagination). + /// Can't be used at the same time as `first`. + pub last: Option, +} + +impl From<(Option
, PaginationRequest)> for OwnedMessagesConnectionArgs { + fn from(r: (Option
, PaginationRequest)) -> Self { + match r.1.direction { + PageDirection::Forward => OwnedMessagesConnectionArgs { + owner: r.0, + after: r.1.cursor, + before: None, + first: Some(r.1.results as i32), + last: None, + }, + PageDirection::Backward => OwnedMessagesConnectionArgs { + owner: r.0, + after: None, + before: r.1.cursor, + first: None, + last: Some(r.1.results as i32), + }, + } + } +} + +impl From for PaginatedResult { + fn from(conn: DaMessageConnection) -> Self { + PaginatedResult { + cursor: conn.page_info.end_cursor, + has_next_page: conn.page_info.has_next_page, + has_previous_page: conn.page_info.has_previous_page, + results: conn.edges.into_iter().map(|e| e.node).collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn owned_message_query_gql_output() { + use cynic::QueryBuilder; + + let operation = OwnedDaMessageQuery::build(OwnedMessagesConnectionArgs { + owner: Some(Address::default()), + after: None, + before: None, + first: None, + last: None, + }); + + insta::assert_snapshot!(operation.query) + } +} diff --git a/fuel-client/src/client/schema/snapshots/fuel_gql_client__client__schema__message__tests__owned_message_query_gql_output.snap b/fuel-client/src/client/schema/snapshots/fuel_gql_client__client__schema__message__tests__owned_message_query_gql_output.snap new file mode 100644 index 00000000000..68f889c82b0 --- /dev/null +++ b/fuel-client/src/client/schema/snapshots/fuel_gql_client__client__schema__message__tests__owned_message_query_gql_output.snap @@ -0,0 +1,29 @@ +--- +source: fuel-client/src/client/schema/message.rs +assertion_line: 105 +expression: operation.query +--- +query Query($_0: Address, $_1: Int, $_2: String, $_3: Int, $_4: String) { + messages(owner: $_0, first: $_1, after: $_2, last: $_3, before: $_4) { + edges { + cursor + node { + amount + sender + recipient + owner + nonce + data + daHeight + fuelBlockSpend + } + } + pageInfo { + endCursor + hasNextPage + hasPreviousPage + startCursor + } + } +} + diff --git a/fuel-core/src/database/message.rs b/fuel-core/src/database/message.rs index edb6a65c5be..84c841e41c3 100644 --- a/fuel-core/src/database/message.rs +++ b/fuel-core/src/database/message.rs @@ -10,6 +10,7 @@ use fuel_core_interfaces::{ model::DaMessage, }; use std::borrow::Cow; +use std::ops::Deref; impl Storage for Database { type Error = KvStoreError; @@ -76,6 +77,16 @@ impl Database { }) }) } + + pub fn all_messages( + &self, + start: Option, + direction: Option, + ) -> impl Iterator> + '_ { + let start = start.map(|v| v.deref().to_vec()); + self.iter_all::, DaMessage>(columns::DA_MESSAGES, None, start, direction) + .map(|res| res.map(|(_, message)| message)) + } } /// Get a Key by chaining Owner + MessageId diff --git a/fuel-core/src/schema.rs b/fuel-core/src/schema.rs index 8aba43b9b39..6c5726aa62d 100644 --- a/fuel-core/src/schema.rs +++ b/fuel-core/src/schema.rs @@ -7,6 +7,7 @@ pub mod coin; pub mod contract; pub mod dap; pub mod health; +pub mod message; pub mod node_info; pub mod scalars; pub mod tx; @@ -23,6 +24,7 @@ pub struct Query( contract::ContractQuery, contract::ContractBalanceQuery, node_info::NodeQuery, + message::MessageQuery, ); #[derive(MergedObject, Default)] @@ -39,6 +41,6 @@ pub fn build_schema() -> SchemaBuilder { Query::default(), Mutation::default(), EmptySubscription::default(), - ["TransactionConnection"], + ["TransactionConnection", "DaMessageConnection"], ) } diff --git a/fuel-core/src/schema/message.rs b/fuel-core/src/schema/message.rs new file mode 100644 index 00000000000..64d918a6cdd --- /dev/null +++ b/fuel-core/src/schema/message.rs @@ -0,0 +1,151 @@ +use super::scalars::{Address, MessageId, U64}; +use crate::{database::Database, state::IterDirection}; +use anyhow::anyhow; +use async_graphql::{ + connection::{self, Connection, Edge, EmptyFields}, + Context, Object, +}; +use fuel_core_interfaces::{ + common::{fuel_storage::Storage, fuel_types}, + db::KvStoreError, + model, +}; +use itertools::Itertools; + +pub struct DaMessage(pub(crate) model::DaMessage); + +#[Object] +impl DaMessage { + async fn amount(&self) -> U64 { + self.0.amount.into() + } + + async fn sender(&self) -> Address { + self.0.sender.into() + } + + async fn recipient(&self) -> Address { + self.0.recipient.into() + } + + async fn owner(&self) -> Address { + self.0.owner.into() + } + + async fn nonce(&self) -> U64 { + self.0.nonce.into() + } + + async fn data(&self) -> &Vec { + &self.0.data + } + + async fn da_height(&self) -> U64 { + self.0.da_height.into() + } + + async fn fuel_block_spend(&self) -> Option { + self.0.fuel_block_spend.map(|v| v.into()) + } +} + +#[derive(Default)] +pub struct MessageQuery {} + +#[Object] +impl MessageQuery { + async fn messages( + &self, + ctx: &Context<'_>, + #[graphql(desc = "address of the owner")] owner: Option
, + first: Option, + after: Option, + last: Option, + before: Option, + ) -> async_graphql::Result> { + let db = ctx.data_unchecked::().clone(); + + connection::query( + after, + before, + first, + last, + |after: Option, before: Option, first, last| async move { + let (records_to_fetch, direction) = if let Some(first) = first { + (first, IterDirection::Forward) + } else if let Some(last) = last { + (last, IterDirection::Reverse) + } else { + (0, IterDirection::Forward) + }; + + if (first.is_some() && before.is_some()) + || (after.is_some() && before.is_some()) + || (last.is_some() && after.is_some()) + { + return Err(anyhow!("Wrong argument combination")); + } + + let start = if direction == IterDirection::Forward { + after + } else { + before + }; + + let (mut messages, has_next_page, has_previous_page) = if let Some(owner) = owner { + let mut message_ids = + db.owned_message_ids(owner.into(), start.map(Into::into), Some(direction)); + let mut started = None; + if start.is_some() { + // skip initial result + started = message_ids.next(); + } + let message_ids = message_ids.take(records_to_fetch + 1); + let message_ids: Vec = message_ids.try_collect()?; + let has_next_page = message_ids.len() > records_to_fetch; + + let messages: Vec = message_ids + .iter() + .take(records_to_fetch) + .map(|msg_id| { + Storage::::get(&db, msg_id) + .transpose() + .ok_or(KvStoreError::NotFound)? + .map(|f| f.into_owned()) + }) + .try_collect()?; + (messages, has_next_page, started.is_some()) + } else { + let mut messages = db.all_messages(start.map(Into::into), Some(direction)); + let mut started = None; + if start.is_some() { + // skip initial result + started = messages.next(); + } + let messages: Vec = + messages.take(records_to_fetch + 1).try_collect()?; + let has_next_page = messages.len() > records_to_fetch; + let messages = messages.into_iter().take(records_to_fetch).collect(); + (messages, has_next_page, started.is_some()) + }; + + // reverse after filtering next page test record to maintain consistent ordering + // in the response regardless of whether first or last was used. + if direction == IterDirection::Forward { + messages.reverse(); + } + + let mut connection = Connection::new(has_previous_page, has_next_page); + + connection.edges.extend( + messages + .into_iter() + .map(|message| Edge::new(message.id().into(), DaMessage(message))), + ); + + Ok::, anyhow::Error>(connection) + }, + ) + .await + } +} diff --git a/fuel-tests/tests/lib.rs b/fuel-tests/tests/lib.rs index f05bef1889b..ae464ff9fa1 100644 --- a/fuel-tests/tests/lib.rs +++ b/fuel-tests/tests/lib.rs @@ -7,6 +7,7 @@ mod dap; mod debugger; mod health; mod helpers; +mod messages; mod node_info; mod snapshot; mod tx; diff --git a/fuel-tests/tests/messages.rs b/fuel-tests/tests/messages.rs new file mode 100644 index 00000000000..2f3a176509e --- /dev/null +++ b/fuel-tests/tests/messages.rs @@ -0,0 +1,143 @@ +use fuel_core::{config::Config, database::Database, service::FuelService}; +use fuel_core_interfaces::common::fuel_storage::Storage; +use fuel_core_interfaces::model::DaMessage; +use fuel_crypto::fuel_types::{Address, MessageId}; +use fuel_gql_client::client::{FuelClient, PageDirection, PaginationRequest}; + +#[tokio::test] +async fn messages_returns_messages_for_all_owners() { + // setup server & client + let mut db = Database::default(); + let srv = FuelService::from_database(db.clone(), Config::local_node()) + .await + .unwrap(); + let client = FuelClient::from(srv.bound_address); + + // create some owners + let owner_a = Address::new([1; 32]); + let owner_b = Address::new([2; 32]); + + // create some messages for owner A + let first_msg = DaMessage { + owner: owner_a, + ..Default::default() + }; + let second_msg = DaMessage { + owner: owner_a, + ..Default::default() + }; + + // create a message for owner B + let third_msg = DaMessage { + owner: owner_b, + ..Default::default() + }; + + // store the messages + let first_id = MessageId::new([1; 32]); + let _ = Storage::::insert(&mut db, &first_id, &first_msg).unwrap(); + + let second_id = MessageId::new([2; 32]); + let _ = Storage::::insert(&mut db, &second_id, &second_msg).unwrap(); + + let third_id = MessageId::new([3; 32]); + let _ = Storage::::insert(&mut db, &third_id, &third_msg).unwrap(); + + // get the messages + let request = PaginationRequest { + cursor: None, + results: 5, + direction: PageDirection::Forward, + }; + let result = client.messages(None, request).await.unwrap(); + + // verify that there are 3 messages stored in total + assert_eq!(result.results.len(), 3); +} + +#[tokio::test] +async fn messages_by_owner_returns_messages_for_the_given_owner() { + // setup server & client + let mut db = Database::default(); + let srv = FuelService::from_database(db.clone(), Config::local_node()) + .await + .unwrap(); + let client = FuelClient::from(srv.bound_address); + + // create some owners + let owner_a = Address::new([1; 32]); + let owner_b = Address::new([2; 32]); + + // create some messages for owner A + let first_msg = DaMessage { + owner: owner_a, + ..Default::default() + }; + let second_msg = DaMessage { + owner: owner_a, + ..Default::default() + }; + + // create a message for owner B + let third_msg = DaMessage { + owner: owner_b, + ..Default::default() + }; + + // store the messages + let first_id = MessageId::new([1; 32]); + let _ = Storage::::insert(&mut db, &first_id, &first_msg).unwrap(); + + let second_id = MessageId::new([2; 32]); + let _ = Storage::::insert(&mut db, &second_id, &second_msg).unwrap(); + + let third_id = MessageId::new([3; 32]); + let _ = Storage::::insert(&mut db, &third_id, &third_msg).unwrap(); + + let request = PaginationRequest { + cursor: None, + results: 5, + direction: PageDirection::Forward, + }; + + // get the messages from Owner A + let result = client + .messages(Some(&owner_a.to_string()), request.clone()) + .await + .unwrap(); + + // verify that Owner A has 2 messages + assert_eq!(result.results.len(), 2); + + // get the messages from Owner B + let result = client + .messages(Some(&owner_b.to_string()), request.clone()) + .await + .unwrap(); + + // verify that Owner B has 1 message + assert_eq!(result.results.len(), 1); +} + +#[tokio::test] +async fn messages_empty_results_for_owner_with_no_messages() { + let db = Database::default(); + let srv = FuelService::from_database(db.clone(), Config::local_node()) + .await + .unwrap(); + let client = FuelClient::from(srv.bound_address); + + let owner = Address::new([1; 32]); + let request = PaginationRequest { + cursor: None, + results: 5, + direction: PageDirection::Forward, + }; + + let result = client + .messages(Some(&owner.to_string()), request) + .await + .unwrap(); + + assert_eq!(result.results.len(), 0); +}