Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

feat: split report extrinsic into two new extrinsics (report and finalize_job) #73

Merged
merged 1 commit into from
Apr 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 110 additions & 79 deletions pallets/marketplace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ pub mod pallet {
ExecutionSuccess(JobId<T::AccountId>, ExecutionOperationHash),
/// An execution is reported to have failed.
ExecutionFailure(JobId<T::AccountId>, ExecutionFailureMessage),
/// This event is emitted when a job is finalized.
JobFinalized(JobId<T::AccountId>),
}

#[pallet::error]
Expand Down Expand Up @@ -285,6 +287,10 @@ pub mod pallet {
ReputationNotFound,
/// Job required module not available.
ModuleNotAvailableInMatch,
/// The job is not assigned to the given processor
JobNotAssigned,
/// The job cannot be finalized yet.
JobCannotBeFinalized,
}

#[pallet::hooks]
Expand Down Expand Up @@ -412,9 +418,8 @@ pub mod pallet {
#[pallet::call_index(4)]
#[pallet::weight(< T as Config >::WeightInfo::report())]
pub fn report(
origin: OriginFor<T>, // source
origin: OriginFor<T>,
job_id: JobId<T::AccountId>,
last: bool,
execution_result: ExecutionResult,
) -> DispatchResultWithPostInfo {
let who = ensure_signed(origin)?;
Expand Down Expand Up @@ -462,83 +467,6 @@ pub mod pallet {
Error::<T>::ReportOutsideSchedule
);

if last {
// update reputation since we don't expect further reports for this job
// (only update for attested devices!)
if ensure_source_verified::<T>(&who).is_ok() {
let e: <T as Config>::RegistrationExtra = registration.extra.clone().into();
let requirements: JobRequirementsFor<T> = e.into();

// parse reward into asset_id and amount
let reward_asset: <T as Config>::AssetId = requirements
.reward
.try_get_asset_id()
.map_err(|_| Error::<T>::JobRegistrationUnsupportedReward)?
.into();

T::AssetValidator::validate(&reward_asset).map_err(|e| e.into())?;

let reward_amount: <T as Config>::AssetAmount = requirements
.reward
.try_get_amount()
.map_err(|_| Error::<T>::JobRegistrationUnsupportedReward)?
.into();

// skip reputation update if reward is 0
if reward_amount > 0u8.into() {
let average_reward =
<StoredAverageReward<T>>::get(&reward_asset).unwrap_or(0);
let total_assigned =
<StoredTotalAssigned<T>>::get(&reward_asset).unwrap_or_default();

let total_reward = average_reward
.checked_mul(total_assigned - 1u128)
.ok_or(Error::<T>::CalculationOverflow)?;

let new_total_rewards = total_reward
.checked_add(reward_amount.clone().into())
.ok_or(Error::<T>::CalculationOverflow)?;

let mut beta_params = <StoredReputation<T>>::get(&who, &reward_asset)
.ok_or(Error::<T>::ReputationNotFound)?;

beta_params = BetaReputation::update(
beta_params,
assignment.sla.met,
assignment.sla.total - assignment.sla.met,
reward_amount.clone().into(),
average_reward,
)
.ok_or(Error::<T>::CalculationOverflow)?;

let new_average_reward = new_total_rewards
.checked_div(total_assigned)
.ok_or(Error::<T>::CalculationOverflow)?;

<StoredAverageReward<T>>::insert(reward_asset.clone(), new_average_reward);
<StoredReputation<T>>::insert(
&who,
&reward_asset,
BetaParameters {
r: beta_params.r,
s: beta_params.s,
},
);
}
}

// removed completed job from all storage points (completed SLA gets still deposited in event below)
<StoredMatches<T>>::remove(&who, &job_id);
<StoredJobStatus<T>>::remove(&job_id.0, &job_id.1);

// increase capacity
<StoredStorageCapacity<T>>::mutate(&who, |c| {
*c = c.unwrap_or(0).checked_add(registration.storage.into())
});

<StoredJobRegistration<T>>::remove(&job_id.0, &job_id.1);
}

// pay only after all other steps succeeded without errors because paying reward is not revertable
T::RewardManager::pay_reward(&assignment.fee_per_execution, &who)?;

Expand All @@ -554,6 +482,109 @@ pub mod pallet {
Self::deposit_event(Event::Reported(job_id, who, assignment.clone()));
Ok(().into())
}

/// Called processors when the assigned job can be finalized.
#[pallet::call_index(5)]
#[pallet::weight(<T as Config>::WeightInfo::finalize_job())]
pub fn finalize_job(
origin: OriginFor<T>,
job_id: JobId<T::AccountId>,
) -> DispatchResultWithPostInfo {
let who = ensure_signed(origin)?;

let registration = <StoredJobRegistration<T>>::get(&job_id.0, &job_id.1)
.ok_or(pallet_acurast::Error::<T>::JobRegistrationNotFound)?;

// find assignment
let assignment =
<StoredMatches<T>>::get(&who, &job_id).ok_or(Error::<T>::JobNotAssigned)?;

let now = Self::now()?
.checked_add(T::ReportTolerance::get())
.ok_or(Error::<T>::CalculationOverflow)?;
let (_actual_start, actual_end) = registration
.schedule
.range(assignment.start_delay)
.ok_or(Error::<T>::CalculationOverflow)?;
ensure!(actual_end.lt(&now), Error::<T>::JobCannotBeFinalized);

// update reputation since we don't expect further reports for this job
// (only update for attested devices!)
if ensure_source_verified::<T>(&who).is_ok() {
let extra: <T as Config>::RegistrationExtra = registration.extra.clone().into();
let requirements: JobRequirementsFor<T> = extra.into();

// parse reward into asset_id and amount
let reward_asset: <T as Config>::AssetId = requirements
.reward
.try_get_asset_id()
.map_err(|_| Error::<T>::JobRegistrationUnsupportedReward)?
.into();

T::AssetValidator::validate(&reward_asset).map_err(|e| e.into())?;

let reward_amount: <T as Config>::AssetAmount = requirements
.reward
.try_get_amount()
.map_err(|_| Error::<T>::JobRegistrationUnsupportedReward)?
.into();

// skip reputation update if reward is 0
if reward_amount > 0u8.into() {
let average_reward = <StoredAverageReward<T>>::get(&reward_asset).unwrap_or(0);
let total_assigned =
<StoredTotalAssigned<T>>::get(&reward_asset).unwrap_or_default();

let total_reward = average_reward
.checked_mul(total_assigned - 1u128)
.ok_or(Error::<T>::CalculationOverflow)?;

let new_total_rewards = total_reward
.checked_add(reward_amount.clone().into())
.ok_or(Error::<T>::CalculationOverflow)?;

let mut beta_params = <StoredReputation<T>>::get(&who, &reward_asset)
.ok_or(Error::<T>::ReputationNotFound)?;

beta_params = BetaReputation::update(
beta_params,
assignment.sla.met,
assignment.sla.total - assignment.sla.met,
reward_amount.clone().into(),
average_reward,
)
.ok_or(Error::<T>::CalculationOverflow)?;

let new_average_reward = new_total_rewards
.checked_div(total_assigned)
.ok_or(Error::<T>::CalculationOverflow)?;

<StoredAverageReward<T>>::insert(reward_asset.clone(), new_average_reward);
<StoredReputation<T>>::insert(
&who,
&reward_asset,
BetaParameters {
r: beta_params.r,
s: beta_params.s,
},
);
}
}

// removed completed job from all storage points (completed SLA gets still deposited in event below)
<StoredMatches<T>>::remove(&who, &job_id);
<StoredJobStatus<T>>::remove(&job_id.0, &job_id.1);

// increase capacity
<StoredStorageCapacity<T>>::mutate(&who, |c| {
*c = c.unwrap_or(0).checked_add(registration.storage.into())
});

<StoredJobRegistration<T>>::remove(&job_id.0, &job_id.1);

Self::deposit_event(Event::JobFinalized(job_id));
Ok(().into())
}
}

impl<T: Config> From<Error<T>> for pallet_acurast::Error<T> {
Expand Down
16 changes: 11 additions & 5 deletions pallets/marketplace/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ fn test_match() {
assert_ok!(AcurastMarketplace::report(
RuntimeOrigin::signed(processor_account_id()).into(),
job_id.clone(),
false,
ExecutionResult::Success(operation_hash())
));
// average reward only updated at end of job
Expand Down Expand Up @@ -180,9 +179,18 @@ fn test_match() {
assert_ok!(AcurastMarketplace::report(
RuntimeOrigin::signed(processor_account_id()).into(),
job_id.clone(),
true,
ExecutionResult::Success(operation_hash())
));

// pretend time moved on
later(registration.schedule.end_time + 1);
assert_eq!(4, System::block_number());

assert_ok!(AcurastMarketplace::finalize_job(
RuntimeOrigin::signed(processor_account_id()).into(),
job_id.clone()
));

assert_eq!(
None,
AcurastMarketplace::stored_matches(processor_account_id(), job_id.clone()),
Expand Down Expand Up @@ -297,6 +305,7 @@ fn test_match() {
sla: SLA { total: 2, met: 2 },
}
)),
RuntimeEvent::AcurastMarketplace(crate::Event::JobFinalized(job_id.clone(),)),
]
);
});
Expand Down Expand Up @@ -616,7 +625,6 @@ fn test_more_reports_than_expected() {
assert_ok!(AcurastMarketplace::report(
RuntimeOrigin::signed(processor_account_id()).into(),
job_id.clone(),
false,
ExecutionResult::Success(operation_hash())
));

Expand All @@ -625,7 +633,6 @@ fn test_more_reports_than_expected() {
assert_ok!(AcurastMarketplace::report(
RuntimeOrigin::signed(processor_account_id()).into(),
job_id.clone(),
false,
ExecutionResult::Success(operation_hash())
));

Expand All @@ -635,7 +642,6 @@ fn test_more_reports_than_expected() {
AcurastMarketplace::report(
RuntimeOrigin::signed(processor_account_id()).into(),
job_id.clone(),
true,
ExecutionResult::Success(operation_hash())
),
Error::<Test>::MoreReportsThanExpected
Expand Down
7 changes: 7 additions & 0 deletions pallets/marketplace/src/weights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub trait WeightInfo {
fn propose_matching() -> Weight;
fn acknowledge_match() -> Weight;
fn report() -> Weight;
fn finalize_job() -> Weight;
}

/// Weights for pallet_acurast_marketplace using the Substrate node and recommended hardware.
Expand Down Expand Up @@ -75,4 +76,10 @@ impl<T: frame_system::Config> WeightInfo for Weights<T> {
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(3))
}
fn finalize_job() -> Weight {
// Minimum execution time: nanoseconds.
Weight::from_ref_time(129_864_000)
.saturating_add(T::DbWeight::get().reads(6))
.saturating_add(T::DbWeight::get().writes(7))
}
}