Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market - reject agreement #1035

Merged
merged 19 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1eacec9
Merge branch 'master' of github.com:golemfactory/yagna into 0.6.0-rc7…
nieznanysprawiciel Feb 10, 2021
064a9fd
Implement reject Agreement flow
nieznanysprawiciel Feb 10, 2021
4f40535
[Test] Test reject_agreement and fix discovered bugs
nieznanysprawiciel Feb 10, 2021
55937c1
[Test] Test rejecting Agreement in Approved and Terminated state
nieznanysprawiciel Feb 10, 2021
6eaccac
Merge branch 'master' of github.com:golemfactory/yagna into market/re…
nieznanysprawiciel Feb 11, 2021
5d9110b
[Test] Test Rejection events and termination from rejected state
nieznanysprawiciel Feb 11, 2021
0450238
[Test] Test reject Agreement in Rejected state
nieznanysprawiciel Feb 11, 2021
6ef8950
Merge branch 'master' into market/reject-agreement
nieznanysprawiciel Feb 12, 2021
ff9c49f
wait_for_approval could return rejection Reason (But spec doesn't allow)
nieznanysprawiciel Feb 12, 2021
12a155c
[Test] Add REST test for Agreement rejection
nieznanysprawiciel Feb 12, 2021
fc19a7f
Fix test_proposal_events_last to wait for broadcast long enough
nieznanysprawiciel Feb 16, 2021
9455804
Merge branch 'master' into market/reject-agreement
nieznanysprawiciel Feb 18, 2021
b067cc9
Update core/market/src/db/dao/agreement.rs
nieznanysprawiciel Feb 18, 2021
6a1912f
Merge branch 'master' of github.com:golemfactory/yagna into market/re…
nieznanysprawiciel Feb 18, 2021
6bbfc74
Test disabled cache on windows CI
jiivan Feb 24, 2021
07d3ce5
Merge branch 'master' into market/reject-agreement
nieznanysprawiciel Feb 25, 2021
55d17b5
Disable cache on market builds (github workflow)
jiivan Feb 25, 2021
41557ba
Merge branch 'master' into market/reject-agreement
jiivan Feb 25, 2021
43546a9
Merge branch 'master' into market/reject-agreement
jiivan Feb 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:

- name: Cache cargo registry
uses: actions/cache@v1
if: matrix.os != 'macos-latest'
if: matrix.os != 'macos-latest' && !startsWith( github.head_ref, 'market/' )
with:
path: ~/.cargo/registry
key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }}
Expand All @@ -43,7 +43,7 @@ jobs:

- name: Cache cargo index
uses: actions/cache@v1
if: matrix.os != 'macos-latest'
if: matrix.os != 'macos-latest' && !startsWith( github.head_ref, 'market/' )
with:
path: ~/.cargo/git
key: ${{ runner.os }}-cargo-index-${{ hashFiles('**/Cargo.lock') }}
Expand All @@ -64,14 +64,14 @@ jobs:

- name: Cache cargo build
uses: actions/cache@v1
if: matrix.os != 'macos-latest'
if: matrix.os != 'macos-latest' && !startsWith( github.head_ref, 'market/' )
with:
path: target
key: ${{ runner.os }}-cargo-build-target1-${{ hashFiles('**/Cargo.lock') }}

- name: Cache vcpkg's artifacts
uses: actions/cache@v1
if: matrix.os == 'windows-latest'
if: matrix.os == 'windows-latest' && !startsWith( github.head_ref, 'market/' )
with:
path: c:/vcpkg/installed
key: vcpkg-${{ runner.os }}-v0
Expand Down
22 changes: 21 additions & 1 deletion core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::NaiveDateTime;
use diesel::prelude::*;

use ya_client::model::market::Reason;
use ya_client::model::NodeId;
use ya_persistence::executor::{do_with_transaction, AsDao, ConnType, PoolType};

Expand Down Expand Up @@ -240,10 +241,29 @@ impl<'c> AgreementDao<'c> {
.await
}

pub async fn reject(
&self,
id: &AgreementId,
reason: Option<Reason>,
) -> Result<Agreement, AgreementDaoError> {
let id = id.clone();
do_with_transaction(self.pool, move |conn| {
log::debug!("Rejection reason: {:?}", reason);
let mut agreement: Agreement =
market_agreement.filter(agreement::id.eq(&id)).first(conn)?;

update_state(conn, &mut agreement, AgreementState::Rejected)?;
create_event(conn, &agreement, reason, Owner::Provider)?;

Ok(agreement)
})
.await
}

pub async fn terminate(
&self,
id: &AgreementId,
reason: Option<String>,
reason: Option<Reason>,
terminator: Owner,
) -> Result<bool, AgreementDaoError> {
let id = id.clone();
Expand Down
19 changes: 17 additions & 2 deletions core/market/src/db/dao/agreement_events.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use chrono::NaiveDateTime;
use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};

use ya_client::model::market::Reason;
use ya_client::model::NodeId;
use ya_persistence::executor::{readonly_transaction, ConnType};
use ya_persistence::executor::{AsDao, PoolType};

use crate::db::dao::AgreementDaoError;
use crate::db::model::{Agreement, AgreementEvent, NewAgreementEvent};
use crate::db::model::{Agreement, AgreementEvent, AgreementId, NewAgreementEvent};
use crate::db::model::{AppSessionId, Owner};
use crate::db::schema::market_agreement::dsl as agreement;
use crate::db::schema::market_agreement::dsl::market_agreement;
Expand Down Expand Up @@ -62,12 +63,26 @@ impl<'c> AgreementEventsDao<'c> {
})
.await
}

pub async fn select_for_agreement(
&self,
agreement_id: &AgreementId,
) -> DbResult<Vec<AgreementEvent>> {
let agreement_id = agreement_id.clone();
readonly_transaction(self.pool, move |conn| {
Ok(market_agreement_event
.filter(event::agreement_id.eq(agreement_id))
.order_by(event::timestamp.asc())
.load::<AgreementEvent>(conn)?)
})
.await
}
}

pub(crate) fn create_event(
conn: &ConnType,
agreement: &Agreement,
reason: Option<String>,
reason: Option<Reason>,
terminator: Owner,
) -> Result<(), AgreementDaoError> {
let event = NewAgreementEvent::new(agreement, reason, terminator)
Expand Down
5 changes: 3 additions & 2 deletions core/market/src/db/dao/negotiation_events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use chrono::Utc;
use diesel::dsl::sql;
use diesel::{sql_types, ExpressionMethods, QueryDsl, RunQueryDsl};
use thiserror::Error;

use ya_client::model::market::Reason;
use ya_persistence::executor::ConnType;
use ya_persistence::executor::{do_with_transaction, AsDao, PoolType};

Expand All @@ -12,7 +14,6 @@ use crate::db::model::{Agreement, EventType, MarketEvent, Owner, Proposal, Subsc
use crate::db::schema::market_negotiation_event::dsl;
use crate::db::{DbError, DbResult};
use crate::market::EnvConfig;
use diesel::dsl::sql;

const EVENT_STORE_DAYS: EnvConfig<'static, u64> = EnvConfig {
name: "YAGNA_MARKET_EVENT_STORE_DAYS",
Expand Down Expand Up @@ -55,7 +56,7 @@ impl<'c> NegotiationEventsDao<'c> {
pub async fn add_proposal_rejected_event(
&self,
proposal: &Proposal,
reason: Option<String>,
reason: Option<Reason>,
) -> DbResult<()> {
let event = MarketEvent::proposal_rejected(proposal, reason);
do_with_transaction(self.pool, move |conn| {
Expand Down
53 changes: 39 additions & 14 deletions core/market/src/db/model/agreement_events.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::sql_types::Text;
use std::fmt;
use std::fmt::Debug;

use crate::db::model::{Agreement, AgreementId, AgreementState, Owner};
use crate::db::schema::market_agreement_event;

use std::str::FromStr;
use ya_client::model::market::agreement_event::AgreementTerminator;
use ya_client::model::market::{
AgreementEventType as ClientEventType, AgreementOperationEvent as ClientEvent, Reason,
Expand All @@ -30,14 +32,18 @@ pub enum AgreementEventType {
Terminated,
}

#[derive(DbTextField, Debug, Clone, AsExpression, FromSqlRow)]
#[sql_type = "Text"]
pub struct DbReason(pub Reason);

#[derive(Clone, Debug, Queryable)]
pub struct AgreementEvent {
pub id: i32,
pub agreement_id: AgreementId,
pub event_type: AgreementEventType,
pub timestamp: NaiveDateTime,
pub issuer: Owner,
pub reason: Option<String>,
pub reason: Option<DbReason>,
pub signature: Option<String>,
}

Expand All @@ -48,7 +54,7 @@ pub struct NewAgreementEvent {
pub event_type: AgreementEventType,
pub timestamp: NaiveDateTime,
pub issuer: Owner,
pub reason: Option<String>,
pub reason: Option<DbReason>,
}

#[derive(thiserror::Error, Debug, Clone)]
Expand All @@ -58,7 +64,7 @@ pub struct EventFromAgreementError(pub String);
impl NewAgreementEvent {
pub(crate) fn new(
agreement: &Agreement,
reason: Option<String>,
reason: Option<Reason>,
terminator: Owner,
) -> Result<Self, EventFromAgreementError> {
Ok(Self {
Expand All @@ -79,7 +85,7 @@ impl NewAgreementEvent {
},
timestamp: Utc::now().naive_utc(),
issuer: terminator,
reason,
reason: reason.map(|reason| DbReason(reason)),
})
}
}
Expand All @@ -88,16 +94,7 @@ impl AgreementEvent {
pub fn into_client(self) -> ClientEvent {
let agreement_id = self.agreement_id.into_client();
let event_date = DateTime::<Utc>::from_utc(self.timestamp, Utc);
let reason = self
.reason
.map(|reason| serde_json::from_str::<Reason>(&reason))
.map(|result| result.map_err(|e| {
log::warn!(
"Agreement Event with not parsable Reason in database. Error: {}. Shouldn't happen \
because market is responsible for rejecting invalid Reasons.", e
)
}).ok())
.flatten();
let reason = self.reason.map(|reason| reason.0);

match self.event_type {
AgreementEventType::Approved => ClientEvent {
Expand Down Expand Up @@ -134,3 +131,31 @@ impl AgreementEvent {
}
}
}

impl FromStr for DbReason {
type Err = serde_json::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(DbReason(serde_json::from_str::<Reason>(s)
.map_err(|e| {
log::warn!(
"Agreement Event with not parsable Reason in database. Error: {}. Shouldn't happen \
because market is responsible for rejecting invalid Reasons.", e
)
}
).ok().unwrap_or(Reason {
message: "Invalid Reason in DB".into(),
extra: Default::default()
})))
}
}

impl fmt::Display for DbReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_json::to_string(&self.0) {
Ok(reason) => write!(f, "{}", reason),
// It's impossible since Reason is serializable.
Err(_) => write!(f, "Serialization failed!"),
}
}
}
13 changes: 7 additions & 6 deletions core/market/src/db/model/negotiation_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ya_persistence::executor::DbExecutor;

use super::SubscriptionId;
use crate::db::dao::{AgreementDao, ProposalDao};
use crate::db::model::agreement_events::DbReason;
use crate::db::model::{Agreement, AgreementId, Owner, Proposal, ProposalId};
use crate::db::schema::market_negotiation_event;

Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct MarketEvent {
/// It can be Proposal, Agreement or structure,
/// that will represent PropertyQuery.
pub artifact_id: ProposalId,
pub reason: Option<String>,
pub reason: Option<DbReason>,
}

#[derive(Clone, Debug, Insertable)]
Expand All @@ -72,7 +73,7 @@ pub struct NewMarketEvent {
pub subscription_id: SubscriptionId,
pub event_type: EventType,
pub artifact_id: ProposalId, // TODO: typed
pub reason: Option<String>,
pub reason: Option<DbReason>,
}

impl MarketEvent {
Expand All @@ -88,15 +89,15 @@ impl MarketEvent {
}
}

pub fn proposal_rejected(proposal: &Proposal, reason: Option<String>) -> NewMarketEvent {
pub fn proposal_rejected(proposal: &Proposal, reason: Option<Reason>) -> NewMarketEvent {
NewMarketEvent {
subscription_id: proposal.negotiation.subscription_id.clone(),
event_type: match proposal.body.id.owner() {
Owner::Requestor => EventType::RequestorProposalRejected,
Owner::Provider => EventType::ProviderProposalRejected,
},
artifact_id: proposal.body.id.clone(),
reason,
reason: reason.map(|reason| DbReason(reason)),
}
}

Expand Down Expand Up @@ -124,7 +125,7 @@ impl MarketEvent {
proposal_id: self.artifact_id.to_string(),
reason: match self.reason {
None => None,
Some(r) => Some(serde_json::from_str(&r).unwrap_or_else(|_| Reason::new(r))),
Some(reason) => Some(reason.0),
},
}),
EventType::RequestorPropertyQuery => unimplemented!(),
Expand Down Expand Up @@ -176,7 +177,7 @@ impl MarketEvent {
proposal_id: self.artifact_id.to_string(),
reason: match self.reason {
None => None,
Some(r) => Some(serde_json::from_str(&r).unwrap_or_else(|_| Reason::new(r))),
Some(reason) => Some(reason.0),
},
}),
EventType::ProviderPropertyQuery => unimplemented!(),
Expand Down
20 changes: 4 additions & 16 deletions core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ impl CommonBroker {
caller_role,
caller_id,
&proposal_id,
reason
.as_ref()
.map(|r| format!("with reason: {}", r))
.unwrap_or("without reason".into()),
reason.display()
);

Ok(proposal)
Expand Down Expand Up @@ -369,8 +366,7 @@ impl CommonBroker {
)
.await?;

let reason_string = CommonBroker::reason2string(&reason);
dao.terminate(&agreement.id, reason_string, agreement.id.owner())
dao.terminate(&agreement.id, reason.clone(), agreement.id.owner())
.await
.map_err(|e| AgreementError::UpdateState((&agreement.id).clone(), e))?;
}
Expand Down Expand Up @@ -406,12 +402,6 @@ impl CommonBroker {
Ok(())
}

fn reason2string(reason: &Option<Reason>) -> Option<String> {
reason.as_ref().map(|reason| {
serde_json::to_string::<Reason>(reason).unwrap_or(reason.message.to_string())
})
}

// Called remotely via GSB
pub async fn on_agreement_terminated(
self,
Expand Down Expand Up @@ -457,8 +447,7 @@ impl CommonBroker {
Err(RemoteAgreementError::NotFound(agreement_id.clone()))?
}

let reason_string = CommonBroker::reason2string(&msg.reason);
dao.terminate(&agreement_id, reason_string, caller_role)
dao.terminate(&agreement_id, msg.reason.clone(), caller_role)
.await
.map_err(|e| {
log::warn!(
Expand Down Expand Up @@ -631,10 +620,9 @@ impl CommonBroker {
// TODO: If creating Proposal succeeds, but event can't be added, provider
// TODO: will never answer to this Proposal. Solve problem when Event API will be available.
let subscription_id = proposal.negotiation.subscription_id.clone();
let reason = CommonBroker::reason2string(&msg.reason);
self.db
.as_dao::<NegotiationEventsDao>()
.add_proposal_rejected_event(&proposal, reason)
.add_proposal_rejected_event(&proposal, msg.reason.clone())
.await
.map_err(|e| {
// TODO: Don't leak our database error, but send meaningful message as response.
Expand Down
4 changes: 2 additions & 2 deletions core/market/src/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::db::{
};
use crate::matcher::error::{DemandError, QueryOfferError};
use crate::protocol::negotiation::error::{
ApproveAgreementError, CommitAgreementError, CounterProposalError as ProtocolProposalError,
AgreementProtocolError, CommitAgreementError, CounterProposalError as ProtocolProposalError,
GsbAgreementError, NegotiationApiInitError, ProposeAgreementError, RejectProposalError,
TerminateAgreementError,
};
Expand Down Expand Up @@ -73,7 +73,7 @@ pub enum AgreementError {
#[error("Protocol error: {0}")]
ProtocolCreate(#[from] ProposeAgreementError),
#[error("Protocol error while approving: {0}")]
ProtocolApprove(#[from] ApproveAgreementError),
Protocol(#[from] AgreementProtocolError),
#[error("Protocol error while terminating: {0}")]
ProtocolTerminate(#[from] TerminateAgreementError),
#[error("Protocol error while committing: {0}")]
Expand Down
Loading