Skip to content

Commit

Permalink
Merge pull request #39 from llllllluc/feat/terminate-condition
Browse files Browse the repository at this point in the history
[Feature] add optional terminate condition for recurring job
  • Loading branch information
vladjdk authored Jul 25, 2023
2 parents b4a2669 + cc4fb6d commit 09e266d
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 97 deletions.
244 changes: 151 additions & 93 deletions contracts/warp-controller/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub fn reply(deps: DepsMut, env: Env, msg: Reply) -> Result<Response, ContractEr
labels: job.labels,
status: new_status,
condition: job.condition,
terminate_condition: job.terminate_condition,
msgs: job.msgs,
vars: job.vars,
recurring: job.recurring,
Expand Down Expand Up @@ -297,105 +298,162 @@ pub fn reply(deps: DepsMut, env: Env, msg: Reply) -> Result<Response, ContractEr
"failed_invalid_job_status",
));
} else {

let new_vars: String = deps.querier.query_wasm_smart(
config.resolver_address,
config.resolver_address.clone(),
&resolver::QueryMsg::QueryApplyVarFn(resolver::QueryApplyVarFnMsg {
vars: finished_job.vars,
status: finished_job.status.clone(),
}),
)?; //todo: TEST THIS
let new_job = PENDING_JOBS().update(
deps.storage,
state.current_job_id.u64(),
|s| match s {
None => Ok(Job {
id: state.current_job_id,
owner: finished_job.owner.clone(),
last_update_time: Uint64::from(env.block.time.seconds()),
name: finished_job.name.clone(),
description: finished_job.description,
labels: finished_job.labels,
status: JobStatus::Pending,
condition: finished_job.condition.clone(),
vars: new_vars,
requeue_on_evict: finished_job.requeue_on_evict,
recurring: finished_job.recurring,
msgs: finished_job.msgs.clone(),
reward: finished_job.reward,
assets_to_withdraw: finished_job.assets_to_withdraw,
}),
Some(_) => Err(ContractError::JobAlreadyExists {}),
},
)?;

state.current_job_id = state.current_job_id.checked_add(Uint64::new(1))?;
state.q = state.q.checked_add(Uint64::new(1))?;

msgs.push(
//send reward to controller
WasmMsg::Execute {
contract_addr: account.account.to_string(),
msg: to_binary(&account::ExecuteMsg::Generic(GenericMsg {
msgs: vec![CosmosMsg::Bank(BankMsg::Send {
to_address: config.fee_collector.to_string(),
amount: vec![Coin::new((fee).u128(), config.fee_denom.clone())],
})],
}))?,
funds: vec![],
},
);

msgs.push(
//send reward to controller
WasmMsg::Execute {
contract_addr: account.account.to_string(),
msg: to_binary(&account::ExecuteMsg::Generic(GenericMsg {
msgs: vec![CosmosMsg::Bank(BankMsg::Send {
to_address: env.contract.address.to_string(),
amount: vec![Coin::new((new_job.reward).u128(), config.fee_denom)],
})],
}))?,
funds: vec![],
},
);

msgs.push(
//withdraw all assets that are listed
WasmMsg::Execute {
contract_addr: account.account.to_string(),
msg: to_binary(&account::ExecuteMsg::WithdrawAssets(
WithdrawAssetsMsg {
asset_infos: new_job.assets_to_withdraw,
},
))?,
funds: vec![],
},
);

new_job_attrs.push(Attribute::new("action", "create_job"));
new_job_attrs.push(Attribute::new("job_id", new_job.id));
new_job_attrs.push(Attribute::new("job_owner", new_job.owner));
new_job_attrs.push(Attribute::new("job_name", new_job.name));
new_job_attrs.push(Attribute::new(
"job_status",
serde_json_wasm::to_string(&new_job.status)?,
));
new_job_attrs.push(Attribute::new(
"job_condition",
serde_json_wasm::to_string(&new_job.condition)?,
));
new_job_attrs.push(Attribute::new(
"job_msgs",
serde_json_wasm::to_string(&new_job.msgs)?,
));
new_job_attrs.push(Attribute::new("job_reward", new_job.reward));
new_job_attrs.push(Attribute::new("job_creation_fee", fee));
new_job_attrs.push(Attribute::new(
"job_last_updated_time",
new_job.last_update_time,
));
new_job_attrs.push(Attribute::new("sub_action", "recur_job"));

let should_terminate_job: bool;
match finished_job.terminate_condition.clone() {
Some(terminate_condition) => {
let resolution: StdResult<bool> = deps.querier.query_wasm_smart(
config.resolver_address,
&resolver::QueryMsg::QueryResolveCondition(
resolver::QueryResolveConditionMsg {
condition: terminate_condition,
vars: new_vars.clone(),
},
),
);
if let Err(e) = resolution {
should_terminate_job = true;
new_job_attrs.push(Attribute::new("action", "recur_job"));
new_job_attrs.push(Attribute::new(
"job_terminate_condition_status",
"invalid",
));
new_job_attrs.push(Attribute::new(
"creation_status",
format!(
"terminated_due_to_terminate_condition_resolves_to_error. {}",
e.to_string()
),
));
} else {
new_job_attrs.push(Attribute::new(
"job_terminate_condition_status",
"valid",
));
if resolution? {
should_terminate_job = true;
new_job_attrs.push(Attribute::new("action", "recur_job"));
new_job_attrs.push(Attribute::new(
"creation_status",
"terminated_due_to_terminate_condition_resolves_to_true",
));
} else {
should_terminate_job = false;
}
}
}
None => {
should_terminate_job = false;
}
}

if !should_terminate_job {
let new_job = PENDING_JOBS().update(
deps.storage,
state.current_job_id.u64(),
|s| match s {
None => Ok(Job {
id: state.current_job_id,
owner: finished_job.owner.clone(),
last_update_time: Uint64::from(env.block.time.seconds()),
name: finished_job.name.clone(),
description: finished_job.description,
labels: finished_job.labels,
status: JobStatus::Pending,
condition: finished_job.condition.clone(),
terminate_condition: finished_job.terminate_condition.clone(),
vars: new_vars,
requeue_on_evict: finished_job.requeue_on_evict,
recurring: finished_job.recurring,
msgs: finished_job.msgs.clone(),
reward: finished_job.reward,
assets_to_withdraw: finished_job.assets_to_withdraw,
}),
Some(_) => Err(ContractError::JobAlreadyExists {}),
},
)?;

state.current_job_id = state.current_job_id.checked_add(Uint64::new(1))?;
state.q = state.q.checked_add(Uint64::new(1))?;

msgs.push(
//send reward to controller
WasmMsg::Execute {
contract_addr: account.account.to_string(),
msg: to_binary(&account::ExecuteMsg::Generic(GenericMsg {
msgs: vec![CosmosMsg::Bank(BankMsg::Send {
to_address: config.fee_collector.to_string(),
amount: vec![Coin::new(
(fee).u128(),
config.fee_denom.clone(),
)],
})],
}))?,
funds: vec![],
},
);

msgs.push(
//send reward to controller
WasmMsg::Execute {
contract_addr: account.account.to_string(),
msg: to_binary(&account::ExecuteMsg::Generic(GenericMsg {
msgs: vec![CosmosMsg::Bank(BankMsg::Send {
to_address: env.contract.address.to_string(),
amount: vec![Coin::new(
(new_job.reward).u128(),
config.fee_denom.clone(),
)],
})],
}))?,
funds: vec![],
},
);

msgs.push(
//withdraw all assets that are listed
WasmMsg::Execute {
contract_addr: account.account.to_string(),
msg: to_binary(&account::ExecuteMsg::WithdrawAssets(
WithdrawAssetsMsg {
asset_infos: new_job.assets_to_withdraw,
},
))?,
funds: vec![],
},
);

new_job_attrs.push(Attribute::new("action", "create_job"));
new_job_attrs.push(Attribute::new("job_id", new_job.id));
new_job_attrs.push(Attribute::new("job_owner", new_job.owner));
new_job_attrs.push(Attribute::new("job_name", new_job.name));
new_job_attrs.push(Attribute::new(
"job_status",
serde_json_wasm::to_string(&new_job.status)?,
));
new_job_attrs.push(Attribute::new(
"job_condition",
serde_json_wasm::to_string(&new_job.condition)?,
));
new_job_attrs.push(Attribute::new(
"job_msgs",
serde_json_wasm::to_string(&new_job.msgs)?,
));
new_job_attrs.push(Attribute::new("job_reward", new_job.reward));
new_job_attrs.push(Attribute::new("job_creation_fee", fee));
new_job_attrs.push(Attribute::new(
"job_last_updated_time",
new_job.last_update_time,
));
new_job_attrs.push(Attribute::new("sub_action", "recur_job"));
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions contracts/warp-controller/src/execute/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn create_job(
config.resolver_address,
&resolver::QueryMsg::QueryValidateJobCreation(resolver::QueryValidateJobCreationMsg {
condition: data.condition.clone(),
terminate_condition: data.terminate_condition.clone(),
vars: data.vars.clone(),
msgs: data.msgs.clone(),
}),
Expand Down Expand Up @@ -67,6 +68,7 @@ pub fn create_job(
name: data.name,
status: JobStatus::Pending,
condition: data.condition.clone(),
terminate_condition: data.terminate_condition,
recurring: data.recurring,
requeue_on_evict: data.requeue_on_evict,
vars: data.vars,
Expand Down Expand Up @@ -157,6 +159,7 @@ pub fn delete_job(
name: job.name,
status: JobStatus::Cancelled,
condition: job.condition,
terminate_condition: job.terminate_condition,
msgs: job.msgs,
vars: job.vars,
recurring: job.recurring,
Expand Down Expand Up @@ -239,6 +242,7 @@ pub fn update_job(
labels: data.labels.unwrap_or(job.labels),
status: job.status,
condition: job.condition,
terminate_condition: job.terminate_condition,
msgs: job.msgs,
vars: job.vars,
recurring: job.recurring,
Expand Down Expand Up @@ -368,6 +372,7 @@ pub fn execute_job(
labels: job.labels,
status: JobStatus::Failed,
condition: job.condition,
terminate_condition: job.terminate_condition,
msgs: job.msgs,
vars,
recurring: job.recurring,
Expand Down Expand Up @@ -495,6 +500,7 @@ pub fn evict_job(
labels: job.labels,
status: JobStatus::Pending,
condition: job.condition,
terminate_condition: job.terminate_condition,
msgs: job.msgs,
vars: job.vars,
recurring: job.recurring,
Expand All @@ -517,6 +523,7 @@ pub fn evict_job(
labels: job.labels,
status: JobStatus::Evicted,
condition: job.condition,
terminate_condition: job.terminate_condition,
msgs: job.msgs,
vars: job.vars,
recurring: job.recurring,
Expand Down
19 changes: 15 additions & 4 deletions contracts/warp-resolver/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ pub fn query(deps: Deps, env: Env, msg: QueryMsg) -> StdResult<Binary> {
QueryMsg::QueryValidateJobCreation(data) => {
let _condition: Condition = serde_json_wasm::from_str(&data.condition)
.map_err(|e| StdError::generic_err(format!("Condition input invalid: {}", e)))?;
let terminate_condition_str =
data.terminate_condition.clone().unwrap_or("".to_string());
if !terminate_condition_str.is_empty() {
let _terminate_condition: Condition =
serde_json_wasm::from_str(&terminate_condition_str).map_err(|e| {
StdError::generic_err(format!("Terminate condition input invalid: {}", e))
})?;
}
let vars: Vec<Variable> = serde_json_wasm::from_str(&data.vars)
.map_err(|e| StdError::generic_err(format!("Vars input invalid: {}", e)))?;
let msgs: Vec<String> = serde_json_wasm::from_str(&data.msgs)
Expand All @@ -59,16 +67,19 @@ pub fn query(deps: Deps, env: Env, msg: QueryMsg) -> StdResult<Binary> {
));
}

if !(string_vars_in_vector(&vars, &data.condition)//stringified
&& string_vars_in_vector(&vars, &data.msgs))
//stringified
if !(string_vars_in_vector(&vars, &data.condition) //stringified
&& string_vars_in_vector(&vars, &terminate_condition_str) //stringified
&& string_vars_in_vector(&vars, &data.msgs))
{
return Err(StdError::generic_err(
ContractError::VariablesMissingFromVector {}.to_string(),
));
}

if !all_vector_vars_present(&vars, format!("{}{}", data.condition, data.msgs)) {
if !all_vector_vars_present(
&vars,
format!("{}{}{}", data.condition, terminate_condition_str, data.msgs),
) {
return Err(StdError::generic_err(
ContractError::ExcessVariablesInVector {}.to_string(),
));
Expand Down
Loading

0 comments on commit 09e266d

Please sign in to comment.