-
Notifications
You must be signed in to change notification settings - Fork 12
feat: Combinator Steps + Atomic #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
012c3f9 to
9023027
Compare
9023027 to
7071c25
Compare
karim-agha
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of those are strong opinions, just discussion starters.
src/pipelines/step/composite/mod.rs
Outdated
| let mut composite = CompositeStep::<P, _>::new(TryAllMode); | ||
| composite.append_step(AlwaysOkStep); | ||
| composite.append_step(AlwaysOkStep); | ||
| composite.append_step(AlwaysOkStep); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest that we tweak the DevEx here a little bit and enable the following syntax:
let pipeline = Pipeline::default()
.with_prologue(All((Step1, Step2, Step3))
.step(Step4)
.with_epilogue(Any((Step5, Step6)));This is how this syntax is implemented for constructing pipelines:
Lines 226 to 269 in d52ed99
| pub trait IntoPipeline<P: Platform, Marker = ()> { | |
| #[track_caller] | |
| fn into_pipeline(self) -> Pipeline<P>; | |
| } | |
| impl<P: Platform, F: FnOnce(Pipeline<P>) -> Pipeline<P>> | |
| IntoPipeline<P, Variant<0>> for F | |
| { | |
| #[track_caller] | |
| fn into_pipeline(self) -> Pipeline<P> { | |
| self(Pipeline::<P>::default()) | |
| } | |
| } | |
| impl<P: Platform> IntoPipeline<P, Variant<0>> for Pipeline<P> { | |
| #[track_caller] | |
| fn into_pipeline(self) -> Pipeline<P> { | |
| self | |
| } | |
| } | |
| impl<P: Platform, S0: Step<P>> IntoPipeline<P, Variant<0>> for (S0,) { | |
| #[track_caller] | |
| fn into_pipeline(self) -> Pipeline<P> { | |
| Pipeline::default().with_step(self.0) | |
| } | |
| } | |
| impl<P: Platform, S0: Step<P>> IntoPipeline<P, Variant<1>> for S0 { | |
| #[track_caller] | |
| fn into_pipeline(self) -> Pipeline<P> { | |
| Pipeline::default().with_step(self) | |
| } | |
| } | |
| // Generate implementations for tuples of steps up to 32 elements | |
| #[cfg(not(feature = "long-pipelines-syntax"))] | |
| impl_into_pipeline_steps!(32); | |
| // Generate implementations for tuples of steps up to 512 elements. | |
| // This is opt-in through a compile-time feature flag and in practice | |
| // should never be needed, but it's here just in case. | |
| #[cfg(feature = "long-pipelines-syntax")] | |
| impl_into_pipeline_steps!(128); |
Not a blocker, but a very nice to have to preserve consistency with the overall syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also had this in mind, but to keep things scoped what about not adding it directly and just relying on combinator implementation with macro?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the trait CombinatorStep that takes steps: impl Into<Steps<P>>, that should allows for adding this syntax
| }; | ||
| } | ||
|
|
||
| combinator!(Atomic, and); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combinator! is a private macro that implements CombinatorStep with a struct with a vec of step instances.
One nice thing is that because CombinatorStep has Step as a supertrait, this force implementation of Step for the combinator step.
| if ctx.deadline_reached() { | ||
| return ControlFlow::Break(initial); | ||
| } | ||
|
|
||
| match step.step(current, ctx.clone()).await { | ||
| ControlFlow::Ok(next) => current = next, | ||
| _ => return ControlFlow::Ok(initial), | ||
| } | ||
| } | ||
|
|
||
| if ctx.deadline_reached() { | ||
| ControlFlow::Break(initial) | ||
| } else { | ||
| ControlFlow::Ok(current) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following previous discussion, I changed to return Ok(initial) on Fail/Break.
What about deadline reached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the deadline is reached before all steps complete successfully then we don't have atomic result semantics and I would return the initial payload.
| if ctx.deadline_reached() { | ||
| return ControlFlow::Break(initial); | ||
| } | ||
|
|
||
| match step.step(current, ctx.clone()).await { | ||
| ControlFlow::Ok(next) => current = next, | ||
| _ => return ControlFlow::Ok(initial), | ||
| } | ||
| } | ||
|
|
||
| if ctx.deadline_reached() { | ||
| ControlFlow::Break(initial) | ||
| } else { | ||
| ControlFlow::Ok(current) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the deadline is reached before all steps complete successfully then we don't have atomic result semantics and I would return the initial payload.
The idea is to be able to defines combinator step: a step that is combined out of multiple underlying steps.
I introduce a
CombinatorStepthat is a subtrait ofStep, that takes aimpl Into<Vec<Arc<StepInstance<P>>>>to allows for different syntaxic sugar later :)Then a
combinatormacro is defined to encapsulate base logic of creating a combinator step by:CombinatorStepappendmethod that push the given step in the steps vec, but with the option to provide a different name (likeandwith atomic,orwith any etc..)steps()function to get a slice of the step instancesThe generated struct still needs to implement manually
Step. This is enforced at compile time becauseCombinatorStepimpl requiresStepimpl.Other points:
Syncbounds on some futures, that seemed unnecessaryIncluded as well a combinator step implementation for Atomic (related work was done in #59 but moved here)
Towards #31