From 064a9fdf781016bab196a19ae600ca1fdc2c9f3e Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Wed, 10 Feb 2021 14:49:55 +0100 Subject: [PATCH 01/11] Implement reject Agreement flow --- core/market/src/db/dao/agreement.rs | 19 +++++ core/market/src/negotiation/common.rs | 7 +- core/market/src/negotiation/error.rs | 4 +- core/market/src/negotiation/provider.rs | 44 ++++++++++- core/market/src/negotiation/requestor.rs | 78 +++++++++++++++---- core/market/src/protocol/negotiation/error.rs | 12 +-- .../src/protocol/negotiation/messages.rs | 6 +- .../src/protocol/negotiation/provider.rs | 29 ++++--- .../src/protocol/negotiation/requestor.rs | 6 +- core/market/src/rest_api/provider.rs | 17 ++-- core/market/src/testing/mock_node.rs | 4 +- 11 files changed, 166 insertions(+), 60 deletions(-) diff --git a/core/market/src/db/dao/agreement.rs b/core/market/src/db/dao/agreement.rs index faa2c03f76..3a642e12ff 100644 --- a/core/market/src/db/dao/agreement.rs +++ b/core/market/src/db/dao/agreement.rs @@ -240,6 +240,25 @@ impl<'c> AgreementDao<'c> { .await } + pub async fn reject( + &self, + id: &AgreementId, + reason: Option, + ) -> Result { + let id = id.clone(); + do_with_transaction(self.pool, move |conn| { + log::debug!("Termination reason: {:?}", reason); + let mut agreement: Agreement = + market_agreement.filter(agreement::id.eq(&id)).first(conn)?; + + update_state(conn, &mut agreement, AgreementState::Rejected)?; + create_event(conn, &agreement, reason, Owner::Provider)?; + + Ok(agreement) + }) + .await + } + pub async fn terminate( &self, id: &AgreementId, diff --git a/core/market/src/negotiation/common.rs b/core/market/src/negotiation/common.rs index dd705a7a63..e7e657ca64 100644 --- a/core/market/src/negotiation/common.rs +++ b/core/market/src/negotiation/common.rs @@ -160,10 +160,7 @@ impl CommonBroker { caller_role, caller_id, &proposal_id, - reason - .as_ref() - .map(|r| format!("with reason: {}", r)) - .unwrap_or("without reason".into()), + reason.display() ); Ok(proposal) @@ -406,7 +403,7 @@ impl CommonBroker { Ok(()) } - fn reason2string(reason: &Option) -> Option { + pub(crate) fn reason2string(reason: &Option) -> Option { reason.as_ref().map(|reason| { serde_json::to_string::(reason).unwrap_or(reason.message.to_string()) }) diff --git a/core/market/src/negotiation/error.rs b/core/market/src/negotiation/error.rs index d3d28d7483..de68e7e9c1 100644 --- a/core/market/src/negotiation/error.rs +++ b/core/market/src/negotiation/error.rs @@ -15,7 +15,7 @@ use crate::db::{ }; use crate::matcher::error::{DemandError, QueryOfferError}; use crate::protocol::negotiation::error::{ - ApproveAgreementError, CommitAgreementError, CounterProposalError as ProtocolProposalError, + AgreementProtocolError, CommitAgreementError, CounterProposalError as ProtocolProposalError, GsbAgreementError, NegotiationApiInitError, ProposeAgreementError, RejectProposalError, TerminateAgreementError, }; @@ -74,7 +74,7 @@ pub enum AgreementError { #[error("Protocol error: {0}")] ProtocolCreate(#[from] ProposeAgreementError), #[error("Protocol error while approving: {0}")] - ProtocolApprove(#[from] ApproveAgreementError), + ProtocolApprove(#[from] AgreementProtocolError), #[error("Protocol error while terminating: {0}")] ProtocolTerminate(#[from] TerminateAgreementError), #[error("Protocol error while committing: {0}")] diff --git a/core/market/src/negotiation/provider.rs b/core/market/src/negotiation/provider.rs index d1870c5df9..44d27bbcf3 100644 --- a/core/market/src/negotiation/provider.rs +++ b/core/market/src/negotiation/provider.rs @@ -95,6 +95,7 @@ impl ProviderBroker { counter!("market.agreements.provider.terminated.reason", 0, "reason" => "Success"); counter!("market.agreements.provider.approving", 0); counter!("market.agreements.provider.committing", 0); + counter!("market.agreements.provider.rejected", 0); counter!("market.events.provider.queried", 0); counter!("market.proposals.provider.countered", 0); counter!("market.proposals.provider.init-negotiation", 0); @@ -183,7 +184,6 @@ impl ProviderBroker { .await?; counter!("market.proposals.provider.rejected.by-us", 1); - Ok(()) } @@ -275,7 +275,7 @@ impl ProviderBroker { // It can turn out, that we are in `Cancelled` state since we weren't under // lock during `self.api.approve_agreement` execution. In such a case, // we shouldn't return error from here. - Err(ApproveAgreementError::Remote(RemoteAgreementError::InvalidState( + Err(AgreementProtocolError::Remote(RemoteAgreementError::InvalidState( _, AgreementState::Cancelled, ))) => return Ok(ApprovalResult::Cancelled), @@ -307,7 +307,7 @@ impl ProviderBroker { let _hold = self.common.agreement_lock.lock(&agreement_id).await; dao.revert_approving(agreement_id).await.log_err().ok(); - Err(ApproveAgreementError::Timeout(agreement.id.clone()).into()) + Err(AgreementProtocolError::Timeout(agreement.id.clone()).into()) } Err(error) => Err(AgreementError::Internal(format!( "Code logic error. Agreement events notifier shouldn't return: {}.", @@ -340,6 +340,44 @@ impl ProviderBroker { } } } + + pub async fn reject_agreement( + &self, + id: Identity, + agreement_id: &AgreementId, + reason: Option, + ) -> Result<(), AgreementError> { + let dao = self.common.db.as_dao::(); + let agreement = { + let _hold = self.common.agreement_lock.lock(&agreement_id).await; + + let agreement = dao + .select(agreement_id, Some(id.identity), Utc::now().naive_utc()) + .await + .map_err(|e| AgreementError::Get(agreement_id.to_string(), e))? + .ok_or(AgreementError::NotFound(agreement_id.to_string()))?; + + validate_transition(&agreement, AgreementState::Rejected)?; + + self.api + .reject_agreement(&agreement, reason.clone()) + .await?; + + let reason_string = CommonBroker::reason2string(&reason); + dao.reject(&agreement.id, reason_string) + .await + .map_err(|e| AgreementError::UpdateState((&agreement.id).clone(), e))? + }; + + counter!("market.agreements.provider.rejected", 1); + log::info!( + "Provider {} rejected Agreement [{}]. Reason: {}", + id.display(), + &agreement.id, + reason.display(), + ); + Ok(()) + } } async fn on_agreement_committed( diff --git a/core/market/src/negotiation/requestor.rs b/core/market/src/negotiation/requestor.rs index f7dc5fb5e2..dd6644284d 100644 --- a/core/market/src/negotiation/requestor.rs +++ b/core/market/src/negotiation/requestor.rs @@ -6,7 +6,7 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedReceiver; use ya_client::model::market::{event::RequestorEvent, NewProposal, Reason}; -use ya_client::model::{node_id::ParseError, NodeId}; +use ya_client::model::NodeId; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::Identity; use ya_std_utils::LogErr; @@ -51,6 +51,7 @@ impl RequestorBroker { let broker1 = broker.clone(); let broker2 = broker.clone(); + let broker3 = broker.clone(); let broker_proposal_reject = broker.clone(); let broker_terminated = broker.clone(); @@ -68,9 +69,8 @@ impl RequestorBroker { move |caller: String, msg: AgreementApproved| { on_agreement_approved(broker2.clone(), caller, msg) }, - move |_caller: String, _msg: AgreementRejected| async move { - counter!("market.agreements.requestor.rejected", 1); - unimplemented!() + move |caller: String, msg: AgreementRejected| { + on_agreement_rejected(broker3.clone(), caller, msg) }, move |caller: String, msg: AgreementTerminated| { broker_terminated @@ -407,18 +407,11 @@ async fn on_agreement_approved( broker: CommonBroker, caller: String, msg: AgreementApproved, -) -> Result<(), ApproveAgreementError> { - let caller: NodeId = - caller - .parse() - .map_err(|e: ParseError| ApproveAgreementError::CallerParseError { - e: e.to_string(), - caller, - id: msg.agreement_id.clone(), - })?; +) -> Result<(), AgreementProtocolError> { + let caller: NodeId = CommonBroker::parse_caller(&caller)?; Ok(agreement_approved(broker, caller, msg) .await - .map_err(|e| ApproveAgreementError::Remote(e))?) + .map_err(|e| AgreementProtocolError::Remote(e))?) } async fn agreement_approved( @@ -570,6 +563,63 @@ async fn commit_agreement(broker: CommonBroker, agreement_id: AgreementId) { ); } +async fn on_agreement_rejected( + broker: CommonBroker, + caller: String, + msg: AgreementRejected, +) -> Result<(), AgreementProtocolError> { + let caller: NodeId = CommonBroker::parse_caller(&caller)?; + Ok(agreement_rejected(broker, caller, msg) + .await + .map_err(|e| AgreementProtocolError::Remote(e))?) +} + +async fn agreement_rejected( + broker: CommonBroker, + caller: NodeId, + msg: AgreementRejected, +) -> Result<(), RemoteAgreementError> { + let dao = broker.db.as_dao::(); + let agreement = { + let _hold = broker.agreement_lock.lock(&msg.agreement_id).await; + + let agreement = dao + .select(&msg.agreement_id, None, Utc::now().naive_utc()) + .await + .map_err(|_e| RemoteAgreementError::NotFound(msg.agreement_id.clone()))? + .ok_or(RemoteAgreementError::NotFound(msg.agreement_id.clone()))?; + + if agreement.provider_id != caller { + // Don't reveal, that we know this Agreement id. + Err(RemoteAgreementError::NotFound(msg.agreement_id.clone()))? + } + + validate_transition(&agreement, AgreementState::Rejected).map_err(|_| { + RemoteAgreementError::InvalidState(agreement.id.clone(), agreement.state.clone()) + })?; + + let reason_string = CommonBroker::reason2string(&msg.reason); + dao.reject(&agreement.id, reason_string) + .await + .log_err() + .map_err(|e| match e { + AgreementDaoError::InvalidTransition { from, .. } => { + RemoteAgreementError::InvalidState(agreement.id.clone(), from) + } + _ => RemoteAgreementError::InternalError(agreement.id.clone()), + })? + }; + + counter!("market.agreements.requestor.rejected", 1); + log::info!( + "Agreement [{}] rejected by [{}]. Reason: {}", + &agreement.id, + caller, + msg.reason.display(), + ); + Ok(()) +} + pub async fn proposal_receiver_thread( broker: CommonBroker, mut proposal_receiver: UnboundedReceiver, diff --git a/core/market/src/protocol/negotiation/error.rs b/core/market/src/protocol/negotiation/error.rs index 5c1775c5d7..b15a0feba2 100644 --- a/core/market/src/protocol/negotiation/error.rs +++ b/core/market/src/protocol/negotiation/error.rs @@ -101,17 +101,13 @@ pub enum RemoteProposeAgreementError { } #[derive(Error, Debug, Serialize, Deserialize)] -pub enum ApproveAgreementError { +pub enum AgreementProtocolError { #[error("Approve {0}.")] Gsb(#[from] GsbAgreementError), #[error("Remote failed to approve. Error: {0}")] Remote(RemoteAgreementError), - #[error("Can't parse {caller} for Agreement [{id}]: {e}")] - CallerParseError { - e: String, - caller: String, - id: AgreementId, - }, + #[error(transparent)] + CallerParse(#[from] CallerParseError), #[error("Timeout while sending approval of Agreement [{0}]")] Timeout(AgreementId), #[error("Agreement [{0}] doesn't contain approval timestamp.")] @@ -146,7 +142,7 @@ pub enum RemoteAgreementError { Expired(AgreementId), #[error("Agreement [{0}] in state {1}, can't be approved.")] InvalidState(AgreementId, AgreementState), - #[error("Can't approve Agreement [{0}] due to internal error.")] + #[error("Can't finish operation on Agreement [{0}] due to internal error.")] InternalError(AgreementId), } diff --git a/core/market/src/protocol/negotiation/messages.rs b/core/market/src/protocol/negotiation/messages.rs index e5521c6c8e..9d0ee0fea2 100644 --- a/core/market/src/protocol/negotiation/messages.rs +++ b/core/market/src/protocol/negotiation/messages.rs @@ -11,7 +11,7 @@ use crate::protocol::negotiation::error::{ use super::super::callback::CallbackMessage; use super::error::{ - ApproveAgreementError, CounterProposalError, GsbAgreementError, TerminateAgreementError, + AgreementProtocolError, CounterProposalError, GsbAgreementError, TerminateAgreementError, }; pub mod provider { @@ -145,7 +145,7 @@ pub struct AgreementApproved { impl RpcMessage for AgreementApproved { const ID: &'static str = "AgreementApproved"; type Item = (); - type Error = ApproveAgreementError; + type Error = AgreementProtocolError; } #[derive(Clone, Serialize, Deserialize)] @@ -158,7 +158,7 @@ pub struct AgreementRejected { impl RpcMessage for AgreementRejected { const ID: &'static str = "AgreementRejected"; type Item = (); - type Error = GsbAgreementError; + type Error = AgreementProtocolError; } #[derive(Clone, Serialize, Deserialize)] diff --git a/core/market/src/protocol/negotiation/provider.rs b/core/market/src/protocol/negotiation/provider.rs index 66e54ede5c..ea2f050d22 100644 --- a/core/market/src/protocol/negotiation/provider.rs +++ b/core/market/src/protocol/negotiation/provider.rs @@ -7,11 +7,11 @@ use ya_core_model::market::BUS_ID; use ya_net::{self as net, RemoteEndpoint}; use ya_service_bus::{typed::ServiceBinder, RpcEndpoint}; -use crate::db::model::{Agreement, AgreementId, Owner, Proposal}; +use crate::db::model::{Agreement, Owner, Proposal}; use super::super::callback::{CallbackHandler, HandlerSlot}; use super::error::{ - ApproveAgreementError, CounterProposalError, GsbAgreementError, GsbProposalError, + AgreementProtocolError, CounterProposalError, GsbAgreementError, GsbProposalError, NegotiationApiInitError, TerminateAgreementError, }; use super::messages::{ @@ -113,7 +113,7 @@ impl NegotiationApi { &self, agreement: &Agreement, timeout: f32, - ) -> Result<(), ApproveAgreementError> { + ) -> Result<(), AgreementProtocolError> { let timeout = Duration::from_secs_f32(timeout.max(0.0)); let id = agreement.id.clone(); @@ -122,10 +122,10 @@ impl NegotiationApi { signature: agreement .approved_signature .clone() - .ok_or(ApproveAgreementError::NotSigned(id.clone()))?, + .ok_or(AgreementProtocolError::NotSigned(id.clone()))?, approved_ts: agreement .approved_ts - .ok_or(ApproveAgreementError::NoApprovalTimestamp(id.clone()))?, + .ok_or(AgreementProtocolError::NoApprovalTimestamp(id.clone()))?, }; let net_send_fut = net::from(agreement.provider_id) .to(agreement.requestor_id) @@ -133,27 +133,26 @@ impl NegotiationApi { .send(msg); tokio::time::timeout(timeout, net_send_fut) .await - .map_err(|_| ApproveAgreementError::Timeout(id.clone()))? + .map_err(|_| AgreementProtocolError::Timeout(id.clone()))? .map_err(|e| GsbAgreementError(e.to_string(), id.clone()))??; Ok(()) } pub async fn reject_agreement( &self, - id: NodeId, - agreement_id: AgreementId, - owner: NodeId, - ) -> Result<(), GsbAgreementError> { + agreement: &Agreement, + reason: Option, + ) -> Result<(), AgreementProtocolError> { let msg = AgreementRejected { - agreement_id: agreement_id.clone(), - reason: None, + agreement_id: agreement.id.clone(), + reason, }; - net::from(id) - .to(owner) + net::from(agreement.provider_id) + .to(agreement.requestor_id) .service(&requestor::agreement_addr(BUS_ID)) .send(msg) .await - .map_err(|e| GsbAgreementError(e.to_string(), agreement_id))??; + .map_err(|e| GsbAgreementError(e.to_string(), agreement.id.clone()))??; Ok(()) } diff --git a/core/market/src/protocol/negotiation/requestor.rs b/core/market/src/protocol/negotiation/requestor.rs index 2e7db95ee6..009d9bd803 100644 --- a/core/market/src/protocol/negotiation/requestor.rs +++ b/core/market/src/protocol/negotiation/requestor.rs @@ -11,7 +11,7 @@ use crate::db::model::{Agreement, AgreementId, Owner, Proposal}; use super::super::callback::{CallbackHandler, HandlerSlot}; use super::error::{ - ApproveAgreementError, CounterProposalError, GsbAgreementError, GsbProposalError, + AgreementProtocolError, CounterProposalError, GsbAgreementError, GsbProposalError, NegotiationApiInitError, TerminateAgreementError, }; use super::messages::{ @@ -236,7 +236,7 @@ impl NegotiationApi { self, caller: String, msg: AgreementApproved, - ) -> Result<(), ApproveAgreementError> { + ) -> Result<(), AgreementProtocolError> { log::debug!( "Negotiation API: Agreement [{}] approved by [{}].", &msg.agreement_id, @@ -252,7 +252,7 @@ impl NegotiationApi { self, caller: String, msg: AgreementRejected, - ) -> Result<(), GsbAgreementError> { + ) -> Result<(), AgreementProtocolError> { log::debug!( "Negotiation API: Agreement [{}] rejected by [{}].", &msg.agreement_id, diff --git a/core/market/src/rest_api/provider.rs b/core/market/src/rest_api/provider.rs index 24f6536ec7..be9c4c978a 100644 --- a/core/market/src/rest_api/provider.rs +++ b/core/market/src/rest_api/provider.rs @@ -163,9 +163,16 @@ async fn approve_agreement( #[actix_web::post("/agreements/{agreement_id}/reject")] async fn reject_agreement( - _market: Data>, - _path: Path, - _id: Identity, -) -> HttpResponse { - HttpResponse::NotImplemented().finish() + market: Data>, + path: Path, + id: Identity, + body: Json>, +) -> impl Responder { + let agreement_id = path.into_inner().to_id(Owner::Provider)?; + market + .provider_engine + .reject_agreement(id, &agreement_id, body.into_inner()) + .await + .log_err() + .map(|_| HttpResponse::Ok().finish()) } diff --git a/core/market/src/testing/mock_node.rs b/core/market/src/testing/mock_node.rs index 5213bec522..738d4f214e 100644 --- a/core/market/src/testing/mock_node.rs +++ b/core/market/src/testing/mock_node.rs @@ -593,7 +593,7 @@ impl MarketServiceExt for MarketService { pub mod default { use super::*; use crate::protocol::negotiation::error::{ - ApproveAgreementError, CommitAgreementError, CounterProposalError, GsbAgreementError, + AgreementProtocolError, CommitAgreementError, CounterProposalError, GsbAgreementError, ProposeAgreementError, RejectProposalError, TerminateAgreementError, }; @@ -656,7 +656,7 @@ pub mod default { pub async fn empty_on_agreement_approved( _caller: String, _msg: AgreementApproved, - ) -> Result<(), ApproveAgreementError> { + ) -> Result<(), AgreementProtocolError> { Ok(()) } From 4f4053546a1a7c0d6cabebc61160df705281be27 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Wed, 10 Feb 2021 15:23:54 +0100 Subject: [PATCH 02/11] [Test] Test reject_agreement and fix discovered bugs --- core/market/src/negotiation/error.rs | 2 +- core/market/src/negotiation/provider.rs | 2 +- core/market/src/negotiation/requestor.rs | 2 + .../src/protocol/negotiation/messages.rs | 14 ++ .../src/protocol/negotiation/provider.rs | 5 +- .../src/protocol/negotiation/requestor.rs | 5 +- core/market/src/rest_api/error.rs | 2 +- core/market/src/rest_api/provider.rs | 2 +- core/market/src/testing/mock_node.rs | 2 +- core/market/tests/test_agreement.rs | 2 +- core/market/tests/test_agreement_rejection.rs | 136 ++++++++++++++++++ 11 files changed, 166 insertions(+), 8 deletions(-) create mode 100644 core/market/tests/test_agreement_rejection.rs diff --git a/core/market/src/negotiation/error.rs b/core/market/src/negotiation/error.rs index de68e7e9c1..3939fbcb67 100644 --- a/core/market/src/negotiation/error.rs +++ b/core/market/src/negotiation/error.rs @@ -74,7 +74,7 @@ pub enum AgreementError { #[error("Protocol error: {0}")] ProtocolCreate(#[from] ProposeAgreementError), #[error("Protocol error while approving: {0}")] - ProtocolApprove(#[from] AgreementProtocolError), + Protocol(#[from] AgreementProtocolError), #[error("Protocol error while terminating: {0}")] ProtocolTerminate(#[from] TerminateAgreementError), #[error("Protocol error while committing: {0}")] diff --git a/core/market/src/negotiation/provider.rs b/core/market/src/negotiation/provider.rs index 44d27bbcf3..078b1f34e7 100644 --- a/core/market/src/negotiation/provider.rs +++ b/core/market/src/negotiation/provider.rs @@ -343,7 +343,7 @@ impl ProviderBroker { pub async fn reject_agreement( &self, - id: Identity, + id: &Identity, agreement_id: &AgreementId, reason: Option, ) -> Result<(), AgreementError> { diff --git a/core/market/src/negotiation/requestor.rs b/core/market/src/negotiation/requestor.rs index dd6644284d..142570cfb0 100644 --- a/core/market/src/negotiation/requestor.rs +++ b/core/market/src/negotiation/requestor.rs @@ -610,6 +610,8 @@ async fn agreement_rejected( })? }; + broker.notify_agreement(&agreement).await; + counter!("market.agreements.requestor.rejected", 1); log::info!( "Agreement [{}] rejected by [{}]. Reason: {}", diff --git a/core/market/src/protocol/negotiation/messages.rs b/core/market/src/protocol/negotiation/messages.rs index 9d0ee0fea2..a0f4f9c710 100644 --- a/core/market/src/protocol/negotiation/messages.rs +++ b/core/market/src/protocol/negotiation/messages.rs @@ -252,6 +252,20 @@ impl AgreementApproved { } } +impl AgreementRejected { + pub fn translate(mut self, owner: Owner) -> Self { + self.agreement_id = self.agreement_id.translate(owner); + self + } +} + +impl AgreementCancelled { + pub fn translate(mut self, owner: Owner) -> Self { + self.agreement_id = self.agreement_id.translate(owner); + self + } +} + impl AgreementReceived { pub fn translate(mut self, owner: Owner) -> Self { self.agreement_id = self.agreement_id.translate(owner); diff --git a/core/market/src/protocol/negotiation/provider.rs b/core/market/src/protocol/negotiation/provider.rs index ea2f050d22..3cec642653 100644 --- a/core/market/src/protocol/negotiation/provider.rs +++ b/core/market/src/protocol/negotiation/provider.rs @@ -239,7 +239,10 @@ impl NegotiationApi { &msg.agreement_id, &caller ); - self.inner.agreement_cancelled.call(caller, msg).await + self.inner + .agreement_cancelled + .call(caller, msg.translate(Owner::Provider)) + .await } async fn on_agreement_terminated( diff --git a/core/market/src/protocol/negotiation/requestor.rs b/core/market/src/protocol/negotiation/requestor.rs index 009d9bd803..c52fce4606 100644 --- a/core/market/src/protocol/negotiation/requestor.rs +++ b/core/market/src/protocol/negotiation/requestor.rs @@ -258,7 +258,10 @@ impl NegotiationApi { &msg.agreement_id, &caller ); - self.inner.agreement_rejected.call(caller, msg).await + self.inner + .agreement_rejected + .call(caller, msg.translate(Owner::Requestor)) + .await } async fn on_agreement_terminated( diff --git a/core/market/src/rest_api/error.rs b/core/market/src/rest_api/error.rs index 0c368553b8..41aca0ec35 100644 --- a/core/market/src/rest_api/error.rs +++ b/core/market/src/rest_api/error.rs @@ -206,7 +206,7 @@ impl ResponseError for AgreementError { | AgreementError::Get(..) | AgreementError::Gsb(_) | AgreementError::ProtocolCreate(_) - | AgreementError::ProtocolApprove(_) + | AgreementError::Protocol(_) | AgreementError::ProtocolTerminate(_) | AgreementError::ProtocolCommit(_) | AgreementError::Internal(_) => HttpResponse::InternalServerError().json(msg), diff --git a/core/market/src/rest_api/provider.rs b/core/market/src/rest_api/provider.rs index be9c4c978a..048dbbec2f 100644 --- a/core/market/src/rest_api/provider.rs +++ b/core/market/src/rest_api/provider.rs @@ -171,7 +171,7 @@ async fn reject_agreement( let agreement_id = path.into_inner().to_id(Owner::Provider)?; market .provider_engine - .reject_agreement(id, &agreement_id, body.into_inner()) + .reject_agreement(&id, &agreement_id, body.into_inner()) .await .log_err() .map(|_| HttpResponse::Ok().finish()) diff --git a/core/market/src/testing/mock_node.rs b/core/market/src/testing/mock_node.rs index 738d4f214e..7186feeee8 100644 --- a/core/market/src/testing/mock_node.rs +++ b/core/market/src/testing/mock_node.rs @@ -663,7 +663,7 @@ pub mod default { pub async fn empty_on_agreement_rejected( _caller: String, _msg: AgreementRejected, - ) -> Result<(), GsbAgreementError> { + ) -> Result<(), AgreementProtocolError> { Ok(()) } diff --git a/core/market/tests/test_agreement.rs b/core/market/tests/test_agreement.rs index 3938682cc0..a3f611d6b1 100644 --- a/core/market/tests/test_agreement.rs +++ b/core/market/tests/test_agreement.rs @@ -792,7 +792,7 @@ async fn net_err_while_approving() { .await; match result.unwrap_err() { - AgreementError::ProtocolApprove(_) => (), + AgreementError::Protocol(_) => (), e => panic!("expected protocol error, but got: {}", e), }; } diff --git a/core/market/tests/test_agreement_rejection.rs b/core/market/tests/test_agreement_rejection.rs new file mode 100644 index 0000000000..e5dce3be08 --- /dev/null +++ b/core/market/tests/test_agreement_rejection.rs @@ -0,0 +1,136 @@ +use chrono::{Duration, Utc}; + +use ya_client::model::market::agreement::State as AgreementState; + +use ya_market::assert_err_eq; +use ya_market::testing::{ + agreement_utils::{gen_reason, negotiate_agreement}, + events_helper::*, + mock_node::MarketServiceExt, + proposal_util::{exchange_draft_proposals, NegotiationHelper}, + ApprovalStatus, MarketsNetwork, Owner, WaitForApprovalError, +}; + +const REQ_NAME: &str = "Node-1"; +const PROV_NAME: &str = "Node-2"; + +/// Agreement rejection happy path. +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_agreement_rejected() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let proposal_id = exchange_draft_proposals(&network, REQ_NAME, PROV_NAME) + .await + .unwrap() + .proposal_id; + + let prov_market = network.get_market(PROV_NAME); + let req_market = network.get_market(REQ_NAME); + let req_engine = &req_market.requestor_engine; + let req_id = network.get_default_id(REQ_NAME); + let prov_id = network.get_default_id(PROV_NAME); + + let agreement_id = req_engine + .create_agreement( + req_id.clone(), + &proposal_id, + Utc::now() + Duration::milliseconds(30), + ) + .await + .unwrap(); + + req_engine + .confirm_agreement(req_id.clone(), &agreement_id, None) + .await + .unwrap(); + + prov_market + .provider_engine + .reject_agreement( + &prov_id, + &agreement_id.clone().translate(Owner::Provider), + Some(gen_reason("Not-interested")), + ) + .await + .unwrap(); + + let agreement = req_market + .get_agreement(&agreement_id, &req_id) + .await + .unwrap(); + assert_eq!(agreement.state, AgreementState::Rejected); + + let agreement = prov_market + .get_agreement(&agreement_id.clone().translate(Owner::Provider), &prov_id) + .await + .unwrap(); + assert_eq!(agreement.state, AgreementState::Rejected); +} + +/// `wait_for_approval` should wake up after rejection. +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_agreement_rejected_wait_for_approval() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let proposal_id = exchange_draft_proposals(&network, REQ_NAME, PROV_NAME) + .await + .unwrap() + .proposal_id; + + let prov_market = network.get_market(PROV_NAME); + let req_market = network.get_market(REQ_NAME); + let req_engine = &req_market.requestor_engine; + let req_id = network.get_default_id(REQ_NAME); + + let agreement_id = req_engine + .create_agreement( + req_id.clone(), + &proposal_id, + Utc::now() + Duration::milliseconds(30), + ) + .await + .unwrap(); + + req_engine + .confirm_agreement(req_id.clone(), &agreement_id, None) + .await + .unwrap(); + + let agr_id = agreement_id.clone().translate(Owner::Provider); + let reject_handle = tokio::task::spawn_local(async move { + tokio::time::delay_for(std::time::Duration::from_millis(20)).await; + prov_market + .provider_engine + .reject_agreement( + &network.get_default_id(PROV_NAME), + &agr_id.clone().translate(Owner::Provider), + Some(gen_reason("Not-interested")), + ) + .await + .unwrap(); + }); + + // wait_for_approval should wake up after rejection. + let result = req_engine + .wait_for_approval(&agreement_id, 0.3) + .await + .unwrap(); + assert_eq!(result, ApprovalStatus::Rejected); + + tokio::time::timeout(Duration::milliseconds(600).to_std().unwrap(), reject_handle) + .await + .unwrap() + .unwrap(); +} From 55937c1762950a015088977f10f6db0cd449c5f9 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Wed, 10 Feb 2021 15:49:46 +0100 Subject: [PATCH 03/11] [Test] Test rejecting Agreement in Approved and Terminated state --- core/market/tests/test_agreement_rejection.rs | 91 +++++++++++++++++-- 1 file changed, 84 insertions(+), 7 deletions(-) diff --git a/core/market/tests/test_agreement_rejection.rs b/core/market/tests/test_agreement_rejection.rs index e5dce3be08..9fc00033b7 100644 --- a/core/market/tests/test_agreement_rejection.rs +++ b/core/market/tests/test_agreement_rejection.rs @@ -1,14 +1,12 @@ use chrono::{Duration, Utc}; -use ya_client::model::market::agreement::State as AgreementState; +use ya_client::model::market::agreement::State as ClientAgreementState; use ya_market::assert_err_eq; use ya_market::testing::{ agreement_utils::{gen_reason, negotiate_agreement}, - events_helper::*, - mock_node::MarketServiceExt, - proposal_util::{exchange_draft_proposals, NegotiationHelper}, - ApprovalStatus, MarketsNetwork, Owner, WaitForApprovalError, + proposal_util::exchange_draft_proposals, + AgreementDaoError, AgreementError, AgreementState, ApprovalStatus, MarketsNetwork, Owner, }; const REQ_NAME: &str = "Node-1"; @@ -64,13 +62,13 @@ async fn test_agreement_rejected() { .get_agreement(&agreement_id, &req_id) .await .unwrap(); - assert_eq!(agreement.state, AgreementState::Rejected); + assert_eq!(agreement.state, ClientAgreementState::Rejected); let agreement = prov_market .get_agreement(&agreement_id.clone().translate(Owner::Provider), &prov_id) .await .unwrap(); - assert_eq!(agreement.state, AgreementState::Rejected); + assert_eq!(agreement.state, ClientAgreementState::Rejected); } /// `wait_for_approval` should wake up after rejection. @@ -134,3 +132,82 @@ async fn test_agreement_rejected_wait_for_approval() { .unwrap() .unwrap(); } + +/// Rejecting `Approved` and `Terminated` Agreement is not allowed. +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_reject_agreement_in_wrong_state() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let prov_id = network.get_default_id(PROV_NAME); + let prov_market = network.get_market(PROV_NAME); + let req_market = network.get_market(REQ_NAME); + let req_id = network.get_default_id(REQ_NAME); + + let negotiation = negotiate_agreement( + &network, + REQ_NAME, + PROV_NAME, + "negotiation", + "r-session", + "p-session", + ) + .await + .unwrap(); + + let result = prov_market + .provider_engine + .reject_agreement( + &prov_id, + &negotiation.p_agreement, + Some(gen_reason("Not-interested")), + ) + .await; + + assert!(result.is_err()); + assert_err_eq!( + AgreementError::UpdateState( + negotiation.p_agreement.clone(), + AgreementDaoError::InvalidTransition { + from: AgreementState::Approved, + to: AgreementState::Rejected + } + ), + result + ); + + req_market + .terminate_agreement( + req_id.clone(), + negotiation.r_agreement.clone().into_client(), + Some(gen_reason("Failure")), + ) + .await + .unwrap(); + + let result = prov_market + .provider_engine + .reject_agreement( + &prov_id, + &negotiation.p_agreement, + Some(gen_reason("Not-interested")), + ) + .await; + + assert!(result.is_err()); + assert_err_eq!( + AgreementError::UpdateState( + negotiation.p_agreement.clone(), + AgreementDaoError::InvalidTransition { + from: AgreementState::Terminated, + to: AgreementState::Rejected + } + ), + result + ); +} From 5d9110b480560538b5d6d4754283f68d4b616b05 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Thu, 11 Feb 2021 15:16:28 +0100 Subject: [PATCH 04/11] [Test] Test Rejection events and termination from rejected state --- core/market/tests/test_agreement.rs | 66 +++++++++++++++ core/market/tests/test_agreement_events.rs | 98 ++++++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/core/market/tests/test_agreement.rs b/core/market/tests/test_agreement.rs index a3f611d6b1..dc2cef26e6 100644 --- a/core/market/tests/test_agreement.rs +++ b/core/market/tests/test_agreement.rs @@ -1103,6 +1103,72 @@ async fn test_terminate_from_wrong_states() { }; } +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_terminate_rejected_agreement() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let proposal_id = exchange_draft_proposals(&network, REQ_NAME, PROV_NAME) + .await + .unwrap() + .proposal_id; + + let prov_market = network.get_market(PROV_NAME); + let req_market = network.get_market(REQ_NAME); + let req_engine = &req_market.requestor_engine; + let req_id = network.get_default_id(REQ_NAME); + let prov_id = network.get_default_id(PROV_NAME); + + let agreement_id = req_engine + .create_agreement( + req_id.clone(), + &proposal_id, + Utc::now() + Duration::milliseconds(30), + ) + .await + .unwrap(); + + req_engine + .confirm_agreement(req_id.clone(), &agreement_id, None) + .await + .unwrap(); + + prov_market + .provider_engine + .reject_agreement( + &prov_id, + &agreement_id.clone().translate(Owner::Provider), + Some(gen_reason("Not-interested")), + ) + .await + .unwrap(); + + let result = req_market + .terminate_agreement( + req_id.clone(), + agreement_id.into_client(), + Some(gen_reason("Failure")), + ) + .await; + + match result { + Ok(_) => panic!("Terminate Agreement should fail."), + Err(AgreementError::UpdateState( + id, + AgreementDaoError::InvalidTransition { + from: AgreementState::Rejected, + to: AgreementState::Terminated, + }, + )) => assert_eq!(id, agreement_id), + e => panic!("Wrong error returned, got: {:?}", e), + }; +} + /// We expect, that reason string is structured and can\ /// be deserialized to `Reason` struct. #[cfg_attr(not(feature = "test-suite"), ignore)] diff --git a/core/market/tests/test_agreement_events.rs b/core/market/tests/test_agreement_events.rs index 0e11dd34ea..9a20087699 100644 --- a/core/market/tests/test_agreement_events.rs +++ b/core/market/tests/test_agreement_events.rs @@ -335,3 +335,101 @@ async fn test_waiting_for_agreement_event() { negotiation.r_agreement.into_client() ); } + +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_agreement_rejected_event() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let proposal_id = exchange_draft_proposals(&network, REQ_NAME, PROV_NAME) + .await + .unwrap() + .proposal_id; + + let req_market = network.get_market(REQ_NAME); + let req_engine = &req_market.requestor_engine; + let req_id = network.get_default_id(REQ_NAME); + let prov_id = network.get_default_id(PROV_NAME); + let prov_market = network.get_market(PROV_NAME); + + let agreement_id = req_engine + .create_agreement( + req_id.clone(), + &proposal_id, + Utc::now() + Duration::hours(1), + ) + .await + .unwrap(); + + let confirm_timestamp = Utc::now(); + req_engine + .confirm_agreement(req_id.clone(), &agreement_id, None) + .await + .unwrap(); + + // Provider will approve agreement after some delay. + let agr_id = agreement_id.clone(); + let from_timestamp = confirm_timestamp.clone(); + let query_handle = tokio::task::spawn_local(async move { + tokio::time::delay_for(std::time::Duration::from_millis(20)).await; + prov_market + .provider_engine + .reject_agreement( + &network.get_default_id(PROV_NAME), + &agr_id.clone().translate(Owner::Provider), + Some(gen_reason("Not-interested")), + ) + .await + .unwrap(); + + // We expect, that both Provider and Requestor will get event. + let events = prov_market + .query_agreement_events(&None, 0.1, Some(2), from_timestamp, &prov_id) + .await + .unwrap(); + + // Expect single event + assert_eq!(events.len(), 1); + assert_eq!(events[0].agreement_id, agr_id.into_client()); + + match &events[0].event_type { + AgreementEventType::AgreementRejectedEvent { reason } => { + assert_eq!(reason.as_ref().unwrap().message, "Not-interested"); + } + e => panic!( + "Expected AgreementEventType::AgreementRejectedEvent, got: {:?}", + e + ), + }; + }); + + let events = req_market + .query_agreement_events(&None, 0.5, Some(2), confirm_timestamp, &req_id) + .await + .unwrap(); + + // Expect single event + assert_eq!(events.len(), 1); + assert_eq!(events[0].agreement_id, agreement_id.into_client()); + + match &events[0].event_type { + AgreementEventType::AgreementRejectedEvent { reason } => { + assert_eq!(reason.as_ref().unwrap().message, "Not-interested"); + } + e => panic!( + "Expected AgreementEventType::AgreementRejectedEvent, got: {:?}", + e + ), + }; + + // Protect from eternal waiting. + tokio::time::timeout(Duration::milliseconds(600).to_std().unwrap(), query_handle) + .await + .unwrap() + .unwrap(); +} From 0450238007645763d0a359db61dffd52adc3974f Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Thu, 11 Feb 2021 16:04:12 +0100 Subject: [PATCH 05/11] [Test] Test reject Agreement in Rejected state --- core/market/tests/test_agreement_rejection.rs | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/core/market/tests/test_agreement_rejection.rs b/core/market/tests/test_agreement_rejection.rs index 9fc00033b7..f09ed4dc80 100644 --- a/core/market/tests/test_agreement_rejection.rs +++ b/core/market/tests/test_agreement_rejection.rs @@ -2,6 +2,7 @@ use chrono::{Duration, Utc}; use ya_client::model::market::agreement::State as ClientAgreementState; +use ya_client::model::market::AgreementEventType; use ya_market::assert_err_eq; use ya_market::testing::{ agreement_utils::{gen_reason, negotiate_agreement}, @@ -211,3 +212,117 @@ async fn test_reject_agreement_in_wrong_state() { result ); } + +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_reject_rejected_agreement() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let proposal_id = exchange_draft_proposals(&network, REQ_NAME, PROV_NAME) + .await + .unwrap() + .proposal_id; + + let prov_market = network.get_market(PROV_NAME); + let req_market = network.get_market(REQ_NAME); + let req_engine = &req_market.requestor_engine; + let req_id = network.get_default_id(REQ_NAME); + let prov_id = network.get_default_id(PROV_NAME); + + let r_agreement = req_engine + .create_agreement( + req_id.clone(), + &proposal_id, + Utc::now() + Duration::milliseconds(30), + ) + .await + .unwrap(); + + req_engine + .confirm_agreement(req_id.clone(), &r_agreement, None) + .await + .unwrap(); + + let ref_timestamp = Utc::now(); + + prov_market + .provider_engine + .reject_agreement( + &prov_id, + &r_agreement.clone().translate(Owner::Provider), + Some(gen_reason("Not-interested")), + ) + .await + .unwrap(); + + let p_agreement = r_agreement.clone().translate(Owner::Provider); + let result = prov_market + .provider_engine + .reject_agreement( + &prov_id, + &p_agreement, + Some(gen_reason("More-uninterested")), + ) + .await; + + match result { + Ok(_) => panic!("Reject Agreement should fail."), + Err(AgreementError::UpdateState( + id, + AgreementDaoError::InvalidTransition { + from: AgreementState::Rejected, + to: AgreementState::Rejected, + }, + )) => assert_eq!(id, p_agreement), + e => panic!("Wrong error returned, got: {:?}", e), + }; + + let agreement = req_market + .get_agreement(&r_agreement, &req_id) + .await + .unwrap(); + assert_eq!(agreement.state, ClientAgreementState::Rejected); + + let agreement = prov_market + .get_agreement(&p_agreement, &prov_id) + .await + .unwrap(); + assert_eq!(agreement.state, ClientAgreementState::Rejected); + + let events = req_market + .query_agreement_events(&None, 0.0, Some(3), ref_timestamp, &req_id) + .await + .unwrap(); + + assert_eq!(events.len(), 1); + match &events[0].event_type { + AgreementEventType::AgreementRejectedEvent { reason } => { + assert_eq!(reason.as_ref().unwrap().message, "Not-interested"); + } + e => panic!( + "Expected AgreementEventType::AgreementRejectedEvent, got: {:?}", + e + ), + }; + + let events = prov_market + .query_agreement_events(&None, 0.0, Some(3), ref_timestamp, &prov_id) + .await + .unwrap(); + + assert_eq!(events.len(), 1); + match &events[0].event_type { + AgreementEventType::AgreementRejectedEvent { reason } => { + assert_eq!(reason.as_ref().unwrap().message, "Not-interested"); + } + e => panic!( + "Expected AgreementEventType::AgreementRejectedEvent, got: {:?}", + e + ), + }; +} From ff9c49fffcdcc17d683979ebff571141dbe73da4 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 12 Feb 2021 12:54:20 +0100 Subject: [PATCH 06/11] wait_for_approval could return rejection Reason (But spec doesn't allow) --- core/market/src/db/dao/agreement.rs | 5 +- core/market/src/db/dao/agreement_events.rs | 19 ++++++- core/market/src/db/dao/negotiation_events.rs | 5 +- core/market/src/db/model/agreement_events.rs | 53 ++++++++++++++----- .../market/src/db/model/negotiation_events.rs | 13 ++--- core/market/src/negotiation/common.rs | 15 ++---- core/market/src/negotiation/provider.rs | 3 +- core/market/src/negotiation/requestor.rs | 28 ++++++++-- core/market/tests/test_agreement_rejection.rs | 9 +++- 9 files changed, 104 insertions(+), 46 deletions(-) diff --git a/core/market/src/db/dao/agreement.rs b/core/market/src/db/dao/agreement.rs index 3a642e12ff..4840a42036 100644 --- a/core/market/src/db/dao/agreement.rs +++ b/core/market/src/db/dao/agreement.rs @@ -1,6 +1,7 @@ use chrono::NaiveDateTime; use diesel::prelude::*; +use ya_client::model::market::Reason; use ya_client::model::NodeId; use ya_persistence::executor::{do_with_transaction, AsDao, ConnType, PoolType}; @@ -243,7 +244,7 @@ impl<'c> AgreementDao<'c> { pub async fn reject( &self, id: &AgreementId, - reason: Option, + reason: Option, ) -> Result { let id = id.clone(); do_with_transaction(self.pool, move |conn| { @@ -262,7 +263,7 @@ impl<'c> AgreementDao<'c> { pub async fn terminate( &self, id: &AgreementId, - reason: Option, + reason: Option, terminator: Owner, ) -> Result { let id = id.clone(); diff --git a/core/market/src/db/dao/agreement_events.rs b/core/market/src/db/dao/agreement_events.rs index 76c1e4e307..d89f16b374 100644 --- a/core/market/src/db/dao/agreement_events.rs +++ b/core/market/src/db/dao/agreement_events.rs @@ -1,12 +1,13 @@ use chrono::NaiveDateTime; use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl}; +use ya_client::model::market::Reason; use ya_client::model::NodeId; use ya_persistence::executor::{readonly_transaction, ConnType}; use ya_persistence::executor::{AsDao, PoolType}; use crate::db::dao::AgreementDaoError; -use crate::db::model::{Agreement, AgreementEvent, NewAgreementEvent}; +use crate::db::model::{Agreement, AgreementEvent, AgreementId, NewAgreementEvent}; use crate::db::model::{AppSessionId, Owner}; use crate::db::schema::market_agreement::dsl as agreement; use crate::db::schema::market_agreement::dsl::market_agreement; @@ -62,12 +63,26 @@ impl<'c> AgreementEventsDao<'c> { }) .await } + + pub async fn select_for_agreement( + &self, + agreement_id: &AgreementId, + ) -> DbResult> { + let agreement_id = agreement_id.clone(); + readonly_transaction(self.pool, move |conn| { + Ok(market_agreement_event + .filter(event::agreement_id.eq(agreement_id)) + .order_by(event::timestamp.asc()) + .load::(conn)?) + }) + .await + } } pub(crate) fn create_event( conn: &ConnType, agreement: &Agreement, - reason: Option, + reason: Option, terminator: Owner, ) -> Result<(), AgreementDaoError> { let event = NewAgreementEvent::new(agreement, reason, terminator) diff --git a/core/market/src/db/dao/negotiation_events.rs b/core/market/src/db/dao/negotiation_events.rs index e5ae7c578f..27b965658d 100644 --- a/core/market/src/db/dao/negotiation_events.rs +++ b/core/market/src/db/dao/negotiation_events.rs @@ -1,7 +1,9 @@ use chrono::Utc; +use diesel::dsl::sql; use diesel::{sql_types, ExpressionMethods, QueryDsl, RunQueryDsl}; use thiserror::Error; +use ya_client::model::market::Reason; use ya_persistence::executor::ConnType; use ya_persistence::executor::{do_with_transaction, AsDao, PoolType}; @@ -12,7 +14,6 @@ use crate::db::model::{Agreement, EventType, MarketEvent, Owner, Proposal, Subsc use crate::db::schema::market_negotiation_event::dsl; use crate::db::{DbError, DbResult}; use crate::market::EnvConfig; -use diesel::dsl::sql; const EVENT_STORE_DAYS: EnvConfig<'static, u64> = EnvConfig { name: "YAGNA_MARKET_EVENT_STORE_DAYS", @@ -55,7 +56,7 @@ impl<'c> NegotiationEventsDao<'c> { pub async fn add_proposal_rejected_event( &self, proposal: &Proposal, - reason: Option, + reason: Option, ) -> DbResult<()> { let event = MarketEvent::proposal_rejected(proposal, reason); do_with_transaction(self.pool, move |conn| { diff --git a/core/market/src/db/model/agreement_events.rs b/core/market/src/db/model/agreement_events.rs index f88641595f..cb4e7fa280 100644 --- a/core/market/src/db/model/agreement_events.rs +++ b/core/market/src/db/model/agreement_events.rs @@ -1,10 +1,12 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use diesel::sql_types::Text; +use std::fmt; use std::fmt::Debug; use crate::db::model::{Agreement, AgreementId, AgreementState, Owner}; use crate::db::schema::market_agreement_event; +use std::str::FromStr; use ya_client::model::market::agreement_event::AgreementTerminator; use ya_client::model::market::{ AgreementEventType as ClientEventType, AgreementOperationEvent as ClientEvent, Reason, @@ -30,6 +32,10 @@ pub enum AgreementEventType { Terminated, } +#[derive(DbTextField, Debug, Clone, AsExpression, FromSqlRow)] +#[sql_type = "Text"] +pub struct DbReason(pub Reason); + #[derive(Clone, Debug, Queryable)] pub struct AgreementEvent { pub id: i32, @@ -37,7 +43,7 @@ pub struct AgreementEvent { pub event_type: AgreementEventType, pub timestamp: NaiveDateTime, pub issuer: Owner, - pub reason: Option, + pub reason: Option, pub signature: Option, } @@ -48,7 +54,7 @@ pub struct NewAgreementEvent { pub event_type: AgreementEventType, pub timestamp: NaiveDateTime, pub issuer: Owner, - pub reason: Option, + pub reason: Option, } #[derive(thiserror::Error, Debug, Clone)] @@ -58,7 +64,7 @@ pub struct EventFromAgreementError(pub String); impl NewAgreementEvent { pub(crate) fn new( agreement: &Agreement, - reason: Option, + reason: Option, terminator: Owner, ) -> Result { Ok(Self { @@ -79,7 +85,7 @@ impl NewAgreementEvent { }, timestamp: Utc::now().naive_utc(), issuer: terminator, - reason, + reason: reason.map(|reason| DbReason(reason)), }) } } @@ -88,16 +94,7 @@ impl AgreementEvent { pub fn into_client(self) -> ClientEvent { let agreement_id = self.agreement_id.into_client(); let event_date = DateTime::::from_utc(self.timestamp, Utc); - let reason = self - .reason - .map(|reason| serde_json::from_str::(&reason)) - .map(|result| result.map_err(|e| { - log::warn!( - "Agreement Event with not parsable Reason in database. Error: {}. Shouldn't happen \ - because market is responsible for rejecting invalid Reasons.", e - ) - }).ok()) - .flatten(); + let reason = self.reason.map(|reason| reason.0); match self.event_type { AgreementEventType::Approved => ClientEvent { @@ -134,3 +131,31 @@ impl AgreementEvent { } } } + +impl FromStr for DbReason { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + Ok(DbReason(serde_json::from_str::(s) + .map_err(|e| { + log::warn!( + "Agreement Event with not parsable Reason in database. Error: {}. Shouldn't happen \ + because market is responsible for rejecting invalid Reasons.", e + ) + } + ).ok().unwrap_or(Reason { + message: "Invalid Reason in DB".into(), + extra: Default::default() + }))) + } +} + +impl fmt::Display for DbReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match serde_json::to_string(&self.0) { + Ok(reason) => write!(f, "{}", reason), + // It's impossible since Reason is serializable. + Err(_) => write!(f, "Serialization failed!"), + } + } +} diff --git a/core/market/src/db/model/negotiation_events.rs b/core/market/src/db/model/negotiation_events.rs index bf1fdce4b1..96994fa051 100644 --- a/core/market/src/db/model/negotiation_events.rs +++ b/core/market/src/db/model/negotiation_events.rs @@ -10,6 +10,7 @@ use ya_persistence::executor::DbExecutor; use super::SubscriptionId; use crate::db::dao::{AgreementDao, ProposalDao}; +use crate::db::model::agreement_events::DbReason; use crate::db::model::{Agreement, AgreementId, Owner, Proposal, ProposalId}; use crate::db::schema::market_negotiation_event; @@ -63,7 +64,7 @@ pub struct MarketEvent { /// It can be Proposal, Agreement or structure, /// that will represent PropertyQuery. pub artifact_id: ProposalId, - pub reason: Option, + pub reason: Option, } #[derive(Clone, Debug, Insertable)] @@ -72,7 +73,7 @@ pub struct NewMarketEvent { pub subscription_id: SubscriptionId, pub event_type: EventType, pub artifact_id: ProposalId, // TODO: typed - pub reason: Option, + pub reason: Option, } impl MarketEvent { @@ -88,7 +89,7 @@ impl MarketEvent { } } - pub fn proposal_rejected(proposal: &Proposal, reason: Option) -> NewMarketEvent { + pub fn proposal_rejected(proposal: &Proposal, reason: Option) -> NewMarketEvent { NewMarketEvent { subscription_id: proposal.negotiation.subscription_id.clone(), event_type: match proposal.body.id.owner() { @@ -96,7 +97,7 @@ impl MarketEvent { Owner::Provider => EventType::ProviderProposalRejected, }, artifact_id: proposal.body.id.clone(), - reason, + reason: reason.map(|reason| DbReason(reason)), } } @@ -124,7 +125,7 @@ impl MarketEvent { proposal_id: self.artifact_id.to_string(), reason: match self.reason { None => None, - Some(r) => Some(serde_json::from_str(&r).unwrap_or_else(|_| Reason::new(r))), + Some(reason) => Some(reason.0), }, }), EventType::RequestorPropertyQuery => unimplemented!(), @@ -176,7 +177,7 @@ impl MarketEvent { proposal_id: self.artifact_id.to_string(), reason: match self.reason { None => None, - Some(r) => Some(serde_json::from_str(&r).unwrap_or_else(|_| Reason::new(r))), + Some(reason) => Some(reason.0), }, }), EventType::ProviderPropertyQuery => unimplemented!(), diff --git a/core/market/src/negotiation/common.rs b/core/market/src/negotiation/common.rs index e7e657ca64..62ad88f410 100644 --- a/core/market/src/negotiation/common.rs +++ b/core/market/src/negotiation/common.rs @@ -366,8 +366,7 @@ impl CommonBroker { ) .await?; - let reason_string = CommonBroker::reason2string(&reason); - dao.terminate(&agreement.id, reason_string, agreement.id.owner()) + dao.terminate(&agreement.id, reason.clone(), agreement.id.owner()) .await .map_err(|e| AgreementError::UpdateState((&agreement.id).clone(), e))?; } @@ -403,12 +402,6 @@ impl CommonBroker { Ok(()) } - pub(crate) fn reason2string(reason: &Option) -> Option { - reason.as_ref().map(|reason| { - serde_json::to_string::(reason).unwrap_or(reason.message.to_string()) - }) - } - // Called remotely via GSB pub async fn on_agreement_terminated( self, @@ -454,8 +447,7 @@ impl CommonBroker { Err(RemoteAgreementError::NotFound(agreement_id.clone()))? } - let reason_string = CommonBroker::reason2string(&msg.reason); - dao.terminate(&agreement_id, reason_string, caller_role) + dao.terminate(&agreement_id, msg.reason.clone(), caller_role) .await .map_err(|e| { log::warn!( @@ -628,10 +620,9 @@ impl CommonBroker { // TODO: If creating Proposal succeeds, but event can't be added, provider // TODO: will never answer to this Proposal. Solve problem when Event API will be available. let subscription_id = proposal.negotiation.subscription_id.clone(); - let reason = CommonBroker::reason2string(&msg.reason); self.db .as_dao::() - .add_proposal_rejected_event(&proposal, reason) + .add_proposal_rejected_event(&proposal, msg.reason.clone()) .await .map_err(|e| { // TODO: Don't leak our database error, but send meaningful message as response. diff --git a/core/market/src/negotiation/provider.rs b/core/market/src/negotiation/provider.rs index 078b1f34e7..a0e0a177c8 100644 --- a/core/market/src/negotiation/provider.rs +++ b/core/market/src/negotiation/provider.rs @@ -363,8 +363,7 @@ impl ProviderBroker { .reject_agreement(&agreement, reason.clone()) .await?; - let reason_string = CommonBroker::reason2string(&reason); - dao.reject(&agreement.id, reason_string) + dao.reject(&agreement.id, reason.clone()) .await .map_err(|e| AgreementError::UpdateState((&agreement.id).clone(), e))? }; diff --git a/core/market/src/negotiation/requestor.rs b/core/market/src/negotiation/requestor.rs index 142570cfb0..a1662ab454 100644 --- a/core/market/src/negotiation/requestor.rs +++ b/core/market/src/negotiation/requestor.rs @@ -21,6 +21,8 @@ use crate::protocol::negotiation::{error::*, messages::*, requestor::Negotiation use super::{common::*, error::*, notifier::NotifierError, EventNotifier}; use crate::config::Config; +use crate::db::dao::AgreementEventsDao; +use crate::db::model::AgreementEventType; use crate::utils::display::EnableDisplay; #[derive(Clone, derive_more::Display, Debug, PartialEq)] @@ -30,7 +32,7 @@ pub enum ApprovalStatus { #[display(fmt = "Cancelled")] Cancelled, #[display(fmt = "Rejected")] - Rejected, + Rejected { reason: Option }, } /// Requestor part of negotiation logic. @@ -319,7 +321,26 @@ impl RequestorBroker { return Ok(ApprovalStatus::Approved); } AgreementState::Rejected => { - return Ok(ApprovalStatus::Rejected); + // `AgreementRejectedEvent` should be last and the only event for this + // Agreement. If it' not + return Ok(ApprovalStatus::Rejected { + reason: self + .common + .db + .as_dao::() + .select_for_agreement(&agreement.id) + .await + .map(|events| { + events.last().cloned().map(|event| { + if event.event_type != AgreementEventType::Rejected { log::error!("Expected AgreementRejected event in DB for Agreement [{}].", &agreement.id); + }; + event.reason.map(|reason| reason.0) + }) + }) + .ok() + .flatten() + .flatten(), + }); } AgreementState::Cancelled => { return Ok(ApprovalStatus::Cancelled); @@ -598,8 +619,7 @@ async fn agreement_rejected( RemoteAgreementError::InvalidState(agreement.id.clone(), agreement.state.clone()) })?; - let reason_string = CommonBroker::reason2string(&msg.reason); - dao.reject(&agreement.id, reason_string) + dao.reject(&agreement.id, msg.reason.clone()) .await .log_err() .map_err(|e| match e { diff --git a/core/market/tests/test_agreement_rejection.rs b/core/market/tests/test_agreement_rejection.rs index f09ed4dc80..3428e0c21e 100644 --- a/core/market/tests/test_agreement_rejection.rs +++ b/core/market/tests/test_agreement_rejection.rs @@ -2,7 +2,7 @@ use chrono::{Duration, Utc}; use ya_client::model::market::agreement::State as ClientAgreementState; -use ya_client::model::market::AgreementEventType; +use ya_client::model::market::{AgreementEventType, Reason}; use ya_market::assert_err_eq; use ya_market::testing::{ agreement_utils::{gen_reason, negotiate_agreement}, @@ -126,7 +126,12 @@ async fn test_agreement_rejected_wait_for_approval() { .wait_for_approval(&agreement_id, 0.3) .await .unwrap(); - assert_eq!(result, ApprovalStatus::Rejected); + assert_eq!( + result, + ApprovalStatus::Rejected { + reason: Some(Reason::new("Not-interested")) + } + ); tokio::time::timeout(Duration::milliseconds(600).to_std().unwrap(), reject_handle) .await From 12a155cc30f956b93d5f6f9c13d841ecbf60f015 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 12 Feb 2021 13:04:49 +0100 Subject: [PATCH 07/11] [Test] Add REST test for Agreement rejection --- core/market/tests/test_agreement_rejection.rs | 10 +-- core/market/tests/test_rest_api.rs | 64 +++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/core/market/tests/test_agreement_rejection.rs b/core/market/tests/test_agreement_rejection.rs index 3428e0c21e..6c3a5ffc01 100644 --- a/core/market/tests/test_agreement_rejection.rs +++ b/core/market/tests/test_agreement_rejection.rs @@ -39,7 +39,7 @@ async fn test_agreement_rejected() { .create_agreement( req_id.clone(), &proposal_id, - Utc::now() + Duration::milliseconds(30), + Utc::now() + Duration::milliseconds(300), ) .await .unwrap(); @@ -97,7 +97,7 @@ async fn test_agreement_rejected_wait_for_approval() { .create_agreement( req_id.clone(), &proposal_id, - Utc::now() + Duration::milliseconds(30), + Utc::now() + Duration::milliseconds(1500), ) .await .unwrap(); @@ -109,7 +109,7 @@ async fn test_agreement_rejected_wait_for_approval() { let agr_id = agreement_id.clone().translate(Owner::Provider); let reject_handle = tokio::task::spawn_local(async move { - tokio::time::delay_for(std::time::Duration::from_millis(20)).await; + tokio::time::delay_for(std::time::Duration::from_millis(50)).await; prov_market .provider_engine .reject_agreement( @@ -123,7 +123,7 @@ async fn test_agreement_rejected_wait_for_approval() { // wait_for_approval should wake up after rejection. let result = req_engine - .wait_for_approval(&agreement_id, 0.3) + .wait_for_approval(&agreement_id, 1.4) .await .unwrap(); assert_eq!( @@ -243,7 +243,7 @@ async fn test_reject_rejected_agreement() { .create_agreement( req_id.clone(), &proposal_id, - Utc::now() + Duration::milliseconds(30), + Utc::now() + Duration::milliseconds(300), ) .await .unwrap(); diff --git a/core/market/tests/test_rest_api.rs b/core/market/tests/test_rest_api.rs index 8a7761b1fe..c1c8de8d88 100644 --- a/core/market/tests/test_rest_api.rs +++ b/core/market/tests/test_rest_api.rs @@ -5,6 +5,7 @@ use chrono::Utc; use serde::de::DeserializeOwned; use serde_json::json; +use ya_client::model::market::agreement::State as ClientAgreementState; use ya_client::model::market::{ agreement as client_agreement, Agreement, AgreementOperationEvent, Demand, NewDemand, NewOffer, Offer, Proposal, Reason, @@ -14,6 +15,7 @@ use ya_client::web::QueryParamsBuilder; use ya_market::testing::agreement_utils::negotiate_agreement; use ya_market::testing::events_helper::requestor::expect_approve; use ya_market::testing::{ + agreement_utils::gen_reason, client::{sample_demand, sample_offer}, mock_node::{wait_for_bcast, MarketServiceExt}, proposal_util::exchange_draft_proposals, @@ -546,6 +548,68 @@ async fn test_terminate_agreement_without_reason() { ); } +/// Agreement rejection happy path. +#[cfg_attr(not(feature = "test-suite"), ignore)] +#[serial_test::serial] +async fn test_rest_agreement_rejected() { + let network = MarketsNetwork::new(None) + .await + .add_market_instance(REQ_NAME) + .await + .add_market_instance(PROV_NAME) + .await; + + let proposal_id = exchange_draft_proposals(&network, REQ_NAME, PROV_NAME) + .await + .unwrap() + .proposal_id; + + let prov_market = network.get_market(PROV_NAME); + let req_market = network.get_market(REQ_NAME); + let req_engine = &req_market.requestor_engine; + let req_id = network.get_default_id(REQ_NAME); + let prov_id = network.get_default_id(PROV_NAME); + + let agreement_id = req_engine + .create_agreement( + req_id.clone(), + &proposal_id, + Utc::now() + chrono::Duration::milliseconds(300), + ) + .await + .unwrap(); + + req_engine + .confirm_agreement(req_id.clone(), &agreement_id, None) + .await + .unwrap(); + + let url = format!( + "/market-api/v1/agreements/{}/reject", + agreement_id.into_client(), + ); + let req = test::TestRequest::post() + .uri(&url) + .set_json(&Some(gen_reason("Not-interested"))) + .to_request(); + let mut app = network.get_rest_app(PROV_NAME).await; + let resp = test::call_service(&mut app, req).await; + + assert_eq!(resp.status(), StatusCode::OK); + + let agreement = req_market + .get_agreement(&agreement_id, &req_id) + .await + .unwrap(); + assert_eq!(agreement.state, ClientAgreementState::Rejected); + + let agreement = prov_market + .get_agreement(&agreement_id.clone().translate(Owner::Provider), &prov_id) + .await + .unwrap(); + assert_eq!(agreement.state, ClientAgreementState::Rejected); +} + // #[cfg_attr(not(feature = "test-suite"), ignore)] // #[actix_rt::test] // #[serial_test::serial] From fc19a7f32cc2b9a7e49c477cc80b16546d4ee66c Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 16 Feb 2021 11:28:33 +0100 Subject: [PATCH 08/11] Fix test_proposal_events_last to wait for broadcast long enough --- core/market/tests/test_negotiations.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/market/tests/test_negotiations.rs b/core/market/tests/test_negotiations.rs index 0e6923eb9c..08e4ad48ec 100644 --- a/core/market/tests/test_negotiations.rs +++ b/core/market/tests/test_negotiations.rs @@ -4,8 +4,8 @@ use ya_market::testing::{ mock_offer::client::{not_matching_demand, not_matching_offer, sample_demand, sample_offer}, negotiation::error::{CounterProposalError, RemoteProposalError}, proposal_util::{exchange_draft_proposals, NegotiationHelper}, - MarketServiceExt, MarketsNetwork, Owner, ProposalError, ProposalState, ProposalValidationError, - SaveProposalError, + wait_for_bcast, MarketServiceExt, MarketsNetwork, Owner, ProposalError, ProposalState, + ProposalValidationError, SaveProposalError, }; use ya_market_resolver::flatten::flatten_json; @@ -913,7 +913,7 @@ async fn test_proposal_events_last() { .await .unwrap(); - market3 + let sub3 = market3 .subscribe_offer(&sample_offer(), &identity3) .await .unwrap(); @@ -928,6 +928,9 @@ async fn test_proposal_events_last() { .await .unwrap(); + // Make sure, that broadcast will reach Requestor. + wait_for_bcast(1000, &market1, &sub3, true).await; + let events = market1 .requestor_engine .query_events(&demand_id, 3.0, Some(5)) From b067cc9f9a004bc8b23916f42790df7ca1241e05 Mon Sep 17 00:00:00 2001 From: nieznanysprawiciel Date: Thu, 18 Feb 2021 14:49:33 +0100 Subject: [PATCH 09/11] Update core/market/src/db/dao/agreement.rs Co-authored-by: Dariusz Rybi --- core/market/src/db/dao/agreement.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/market/src/db/dao/agreement.rs b/core/market/src/db/dao/agreement.rs index 4840a42036..db520c9a46 100644 --- a/core/market/src/db/dao/agreement.rs +++ b/core/market/src/db/dao/agreement.rs @@ -248,7 +248,7 @@ impl<'c> AgreementDao<'c> { ) -> Result { let id = id.clone(); do_with_transaction(self.pool, move |conn| { - log::debug!("Termination reason: {:?}", reason); + log::debug!("Rejection reason: {:?}", reason); let mut agreement: Agreement = market_agreement.filter(agreement::id.eq(&id)).first(conn)?; From 6bbfc746735d1408fe6bcbaa1f4fb5e2473e2973 Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Wed, 24 Feb 2021 14:17:44 +0100 Subject: [PATCH 10/11] Test disabled cache on windows CI --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1237c47368..69f1c96333 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,7 +34,7 @@ jobs: - name: Cache cargo registry uses: actions/cache@v1 - if: matrix.os != 'macos-latest' + if: matrix.os != 'macos-latest' && matrix.os != 'windows-latest' with: path: ~/.cargo/registry key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} @@ -43,7 +43,7 @@ jobs: - name: Cache cargo index uses: actions/cache@v1 - if: matrix.os != 'macos-latest' + if: matrix.os != 'macos-latest' && matrix.os != 'windows-latest' with: path: ~/.cargo/git key: ${{ runner.os }}-cargo-index-${{ hashFiles('**/Cargo.lock') }} From 55d17b59fc78cff1b58a89c3100cabbf5e1d74a1 Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Thu, 25 Feb 2021 15:14:28 +0100 Subject: [PATCH 11/11] Disable cache on market builds (github workflow) --- .github/workflows/rust.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 69f1c96333..17c134fb1d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,7 +34,7 @@ jobs: - name: Cache cargo registry uses: actions/cache@v1 - if: matrix.os != 'macos-latest' && matrix.os != 'windows-latest' + if: matrix.os != 'macos-latest' && !startsWith( github.head_ref, 'market/' ) with: path: ~/.cargo/registry key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} @@ -43,7 +43,7 @@ jobs: - name: Cache cargo index uses: actions/cache@v1 - if: matrix.os != 'macos-latest' && matrix.os != 'windows-latest' + if: matrix.os != 'macos-latest' && !startsWith( github.head_ref, 'market/' ) with: path: ~/.cargo/git key: ${{ runner.os }}-cargo-index-${{ hashFiles('**/Cargo.lock') }} @@ -64,14 +64,14 @@ jobs: - name: Cache cargo build uses: actions/cache@v1 - if: matrix.os != 'macos-latest' + if: matrix.os != 'macos-latest' && !startsWith( github.head_ref, 'market/' ) with: path: target key: ${{ runner.os }}-cargo-build-target1-${{ hashFiles('**/Cargo.lock') }} - name: Cache vcpkg's artifacts uses: actions/cache@v1 - if: matrix.os == 'windows-latest' + if: matrix.os == 'windows-latest' && !startsWith( github.head_ref, 'market/' ) with: path: c:/vcpkg/installed key: vcpkg-${{ runner.os }}-v0