diff --git a/pallets/marketplace/src/lib.rs b/pallets/marketplace/src/lib.rs index 6c2e641d..6256bfc6 100644 --- a/pallets/marketplace/src/lib.rs +++ b/pallets/marketplace/src/lib.rs @@ -189,6 +189,8 @@ pub mod pallet { ExecutionSuccess(JobId, ExecutionOperationHash), /// An execution is reported to have failed. ExecutionFailure(JobId, ExecutionFailureMessage), + /// This event is emitted when a job is finalized. + JobFinalized(JobId), } #[pallet::error] @@ -285,6 +287,10 @@ pub mod pallet { ReputationNotFound, /// Job required module not available. ModuleNotAvailableInMatch, + /// The job is not assigned to the given processor + JobNotAssigned, + /// The job cannot be finalized yet. + JobCannotBeFinalized, } #[pallet::hooks] @@ -412,9 +418,8 @@ pub mod pallet { #[pallet::call_index(4)] #[pallet::weight(< T as Config >::WeightInfo::report())] pub fn report( - origin: OriginFor, // source + origin: OriginFor, job_id: JobId, - last: bool, execution_result: ExecutionResult, ) -> DispatchResultWithPostInfo { let who = ensure_signed(origin)?; @@ -462,83 +467,6 @@ pub mod pallet { Error::::ReportOutsideSchedule ); - if last { - // update reputation since we don't expect further reports for this job - // (only update for attested devices!) - if ensure_source_verified::(&who).is_ok() { - let e: ::RegistrationExtra = registration.extra.clone().into(); - let requirements: JobRequirementsFor = e.into(); - - // parse reward into asset_id and amount - let reward_asset: ::AssetId = requirements - .reward - .try_get_asset_id() - .map_err(|_| Error::::JobRegistrationUnsupportedReward)? - .into(); - - T::AssetValidator::validate(&reward_asset).map_err(|e| e.into())?; - - let reward_amount: ::AssetAmount = requirements - .reward - .try_get_amount() - .map_err(|_| Error::::JobRegistrationUnsupportedReward)? - .into(); - - // skip reputation update if reward is 0 - if reward_amount > 0u8.into() { - let average_reward = - >::get(&reward_asset).unwrap_or(0); - let total_assigned = - >::get(&reward_asset).unwrap_or_default(); - - let total_reward = average_reward - .checked_mul(total_assigned - 1u128) - .ok_or(Error::::CalculationOverflow)?; - - let new_total_rewards = total_reward - .checked_add(reward_amount.clone().into()) - .ok_or(Error::::CalculationOverflow)?; - - let mut beta_params = >::get(&who, &reward_asset) - .ok_or(Error::::ReputationNotFound)?; - - beta_params = BetaReputation::update( - beta_params, - assignment.sla.met, - assignment.sla.total - assignment.sla.met, - reward_amount.clone().into(), - average_reward, - ) - .ok_or(Error::::CalculationOverflow)?; - - let new_average_reward = new_total_rewards - .checked_div(total_assigned) - .ok_or(Error::::CalculationOverflow)?; - - >::insert(reward_asset.clone(), new_average_reward); - >::insert( - &who, - &reward_asset, - BetaParameters { - r: beta_params.r, - s: beta_params.s, - }, - ); - } - } - - // removed completed job from all storage points (completed SLA gets still deposited in event below) - >::remove(&who, &job_id); - >::remove(&job_id.0, &job_id.1); - - // increase capacity - >::mutate(&who, |c| { - *c = c.unwrap_or(0).checked_add(registration.storage.into()) - }); - - >::remove(&job_id.0, &job_id.1); - } - // pay only after all other steps succeeded without errors because paying reward is not revertable T::RewardManager::pay_reward(&assignment.fee_per_execution, &who)?; @@ -554,6 +482,109 @@ pub mod pallet { Self::deposit_event(Event::Reported(job_id, who, assignment.clone())); Ok(().into()) } + + /// Called processors when the assigned job can be finalized. + #[pallet::call_index(5)] + #[pallet::weight(::WeightInfo::finalize_job())] + pub fn finalize_job( + origin: OriginFor, + job_id: JobId, + ) -> DispatchResultWithPostInfo { + let who = ensure_signed(origin)?; + + let registration = >::get(&job_id.0, &job_id.1) + .ok_or(pallet_acurast::Error::::JobRegistrationNotFound)?; + + // find assignment + let assignment = + >::get(&who, &job_id).ok_or(Error::::JobNotAssigned)?; + + let now = Self::now()? + .checked_add(T::ReportTolerance::get()) + .ok_or(Error::::CalculationOverflow)?; + let (_actual_start, actual_end) = registration + .schedule + .range(assignment.start_delay) + .ok_or(Error::::CalculationOverflow)?; + ensure!(actual_end.lt(&now), Error::::JobCannotBeFinalized); + + // update reputation since we don't expect further reports for this job + // (only update for attested devices!) + if ensure_source_verified::(&who).is_ok() { + let extra: ::RegistrationExtra = registration.extra.clone().into(); + let requirements: JobRequirementsFor = extra.into(); + + // parse reward into asset_id and amount + let reward_asset: ::AssetId = requirements + .reward + .try_get_asset_id() + .map_err(|_| Error::::JobRegistrationUnsupportedReward)? + .into(); + + T::AssetValidator::validate(&reward_asset).map_err(|e| e.into())?; + + let reward_amount: ::AssetAmount = requirements + .reward + .try_get_amount() + .map_err(|_| Error::::JobRegistrationUnsupportedReward)? + .into(); + + // skip reputation update if reward is 0 + if reward_amount > 0u8.into() { + let average_reward = >::get(&reward_asset).unwrap_or(0); + let total_assigned = + >::get(&reward_asset).unwrap_or_default(); + + let total_reward = average_reward + .checked_mul(total_assigned - 1u128) + .ok_or(Error::::CalculationOverflow)?; + + let new_total_rewards = total_reward + .checked_add(reward_amount.clone().into()) + .ok_or(Error::::CalculationOverflow)?; + + let mut beta_params = >::get(&who, &reward_asset) + .ok_or(Error::::ReputationNotFound)?; + + beta_params = BetaReputation::update( + beta_params, + assignment.sla.met, + assignment.sla.total - assignment.sla.met, + reward_amount.clone().into(), + average_reward, + ) + .ok_or(Error::::CalculationOverflow)?; + + let new_average_reward = new_total_rewards + .checked_div(total_assigned) + .ok_or(Error::::CalculationOverflow)?; + + >::insert(reward_asset.clone(), new_average_reward); + >::insert( + &who, + &reward_asset, + BetaParameters { + r: beta_params.r, + s: beta_params.s, + }, + ); + } + } + + // removed completed job from all storage points (completed SLA gets still deposited in event below) + >::remove(&who, &job_id); + >::remove(&job_id.0, &job_id.1); + + // increase capacity + >::mutate(&who, |c| { + *c = c.unwrap_or(0).checked_add(registration.storage.into()) + }); + + >::remove(&job_id.0, &job_id.1); + + Self::deposit_event(Event::JobFinalized(job_id)); + Ok(().into()) + } } impl From> for pallet_acurast::Error { diff --git a/pallets/marketplace/src/tests.rs b/pallets/marketplace/src/tests.rs index 2c4ae220..0d9711eb 100644 --- a/pallets/marketplace/src/tests.rs +++ b/pallets/marketplace/src/tests.rs @@ -136,7 +136,6 @@ fn test_match() { assert_ok!(AcurastMarketplace::report( RuntimeOrigin::signed(processor_account_id()).into(), job_id.clone(), - false, ExecutionResult::Success(operation_hash()) )); // average reward only updated at end of job @@ -180,9 +179,18 @@ fn test_match() { assert_ok!(AcurastMarketplace::report( RuntimeOrigin::signed(processor_account_id()).into(), job_id.clone(), - true, ExecutionResult::Success(operation_hash()) )); + + // pretend time moved on + later(registration.schedule.end_time + 1); + assert_eq!(4, System::block_number()); + + assert_ok!(AcurastMarketplace::finalize_job( + RuntimeOrigin::signed(processor_account_id()).into(), + job_id.clone() + )); + assert_eq!( None, AcurastMarketplace::stored_matches(processor_account_id(), job_id.clone()), @@ -297,6 +305,7 @@ fn test_match() { sla: SLA { total: 2, met: 2 }, } )), + RuntimeEvent::AcurastMarketplace(crate::Event::JobFinalized(job_id.clone(),)), ] ); }); @@ -616,7 +625,6 @@ fn test_more_reports_than_expected() { assert_ok!(AcurastMarketplace::report( RuntimeOrigin::signed(processor_account_id()).into(), job_id.clone(), - false, ExecutionResult::Success(operation_hash()) )); @@ -625,7 +633,6 @@ fn test_more_reports_than_expected() { assert_ok!(AcurastMarketplace::report( RuntimeOrigin::signed(processor_account_id()).into(), job_id.clone(), - false, ExecutionResult::Success(operation_hash()) )); @@ -635,7 +642,6 @@ fn test_more_reports_than_expected() { AcurastMarketplace::report( RuntimeOrigin::signed(processor_account_id()).into(), job_id.clone(), - true, ExecutionResult::Success(operation_hash()) ), Error::::MoreReportsThanExpected diff --git a/pallets/marketplace/src/weights.rs b/pallets/marketplace/src/weights.rs index 8188ff49..6a9a6a01 100644 --- a/pallets/marketplace/src/weights.rs +++ b/pallets/marketplace/src/weights.rs @@ -34,6 +34,7 @@ pub trait WeightInfo { fn propose_matching() -> Weight; fn acknowledge_match() -> Weight; fn report() -> Weight; + fn finalize_job() -> Weight; } /// Weights for pallet_acurast_marketplace using the Substrate node and recommended hardware. @@ -75,4 +76,10 @@ impl WeightInfo for Weights { .saturating_add(T::DbWeight::get().reads(2)) .saturating_add(T::DbWeight::get().writes(3)) } + fn finalize_job() -> Weight { + // Minimum execution time: nanoseconds. + Weight::from_ref_time(129_864_000) + .saturating_add(T::DbWeight::get().reads(6)) + .saturating_add(T::DbWeight::get().writes(7)) + } }