5
5
// Copyright 2023 Oxide Computer Company
6
6
7
7
use std:: {
8
- borrow:: Cow ,
9
- fmt,
10
- ops:: ControlFlow ,
11
- pin:: Pin ,
12
- sync:: {
13
- atomic:: { AtomicUsize , Ordering } ,
14
- Mutex ,
15
- } ,
16
- task:: Poll ,
8
+ borrow:: Cow , fmt, ops:: ControlFlow , pin:: Pin , sync:: Mutex , task:: Poll ,
17
9
} ;
18
10
19
11
use cancel_safe_futures:: coop_cancel;
@@ -184,14 +176,12 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
184
176
async fn execute_impl (
185
177
mut self ,
186
178
) -> Result < CompletionContext < S > , ExecutionError < S > > {
187
- // TODO: this absolutely does not need to be an atomic! However it is
188
- // currently so because of a bug in rustc, fixed in Rust 1.70. Fix this
189
- // once omicron is on Rust 1.70.
190
- //
191
- // https://github.com/rust-lang/rust/pull/107844
192
- let event_index = AtomicUsize :: new ( 0 ) ;
193
- let next_event_index = || event_index. fetch_add ( 1 , Ordering :: SeqCst ) ;
194
- let exec_cx = ExecutionContext :: new (
179
+ let mut event_index = 0 ;
180
+ let next_event_index = || {
181
+ event_index += 1 ;
182
+ event_index - 1
183
+ } ;
184
+ let mut exec_cx = ExecutionContext :: new (
195
185
self . execution_id ,
196
186
next_event_index,
197
187
self . sender . clone ( ) ,
@@ -243,7 +233,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
243
233
self . sender . send ( Event :: Step ( StepEvent {
244
234
spec : S :: schema_name ( ) ,
245
235
execution_id : self . execution_id ,
246
- event_index : next_event_index ( ) ,
236
+ event_index : ( exec_cx . next_event_index ) ( ) ,
247
237
total_elapsed : exec_cx. total_start . elapsed ( ) ,
248
238
kind : StepEventKind :: NoStepsDefined ,
249
239
} ) ) . await ?;
@@ -264,7 +254,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
264
254
let event = Event :: Step ( StepEvent {
265
255
spec : S :: schema_name ( ) ,
266
256
execution_id : self . execution_id ,
267
- event_index : next_event_index ( ) ,
257
+ event_index : ( exec_cx . next_event_index ) ( ) ,
268
258
total_elapsed : exec_cx. total_start . elapsed ( ) ,
269
259
kind : StepEventKind :: ExecutionStarted {
270
260
steps : step_infos,
@@ -771,7 +761,7 @@ struct StepExec<'a, S: StepSpec> {
771
761
}
772
762
773
763
impl < ' a , S : StepSpec > StepExec < ' a , S > {
774
- async fn execute < F : Fn ( ) -> usize > (
764
+ async fn execute < F : FnMut ( ) -> usize > (
775
765
self ,
776
766
log : & slog:: Logger ,
777
767
step_exec_cx : StepExecutionContext < S , F > ,
@@ -884,12 +874,12 @@ impl<S: StepSpec, F> ExecutionContext<S, F> {
884
874
}
885
875
886
876
fn create (
887
- & self ,
877
+ & mut self ,
888
878
step_info : StepInfoWithMetadata < S > ,
889
- ) -> StepExecutionContext < S , & F > {
879
+ ) -> StepExecutionContext < S , & mut F > {
890
880
StepExecutionContext {
891
881
execution_id : self . execution_id ,
892
- next_event_index : DebugIgnore ( & self . next_event_index . 0 ) ,
882
+ next_event_index : DebugIgnore ( & mut self . next_event_index . 0 ) ,
893
883
total_start : self . total_start ,
894
884
step_info,
895
885
sender : self . sender . clone ( ) ,
@@ -941,7 +931,7 @@ struct StepProgressReporter<S: StepSpec, F> {
941
931
sender : mpsc:: Sender < Event < S > > ,
942
932
}
943
933
944
- impl < S : StepSpec , F : Fn ( ) -> usize > StepProgressReporter < S , F > {
934
+ impl < S : StepSpec , F : FnMut ( ) -> usize > StepProgressReporter < S , F > {
945
935
fn new ( step_exec_cx : StepExecutionContext < S , F > ) -> Self {
946
936
let step_start = Instant :: now ( ) ;
947
937
Self {
@@ -1069,7 +1059,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
1069
1059
}
1070
1060
}
1071
1061
1072
- async fn handle_abort ( self , message : String ) -> ExecutionError < S > {
1062
+ async fn handle_abort ( mut self , message : String ) -> ExecutionError < S > {
1073
1063
// Send the abort message over the channel.
1074
1064
//
1075
1065
// The only way this can fail is if the event receiver is closed or
@@ -1104,7 +1094,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
1104
1094
}
1105
1095
1106
1096
async fn next_step (
1107
- self ,
1097
+ mut self ,
1108
1098
step_res : Result < StepOutcome < S > , S :: Error > ,
1109
1099
next_step_info : & StepInfoWithMetadata < S > ,
1110
1100
) -> Result < ( ) , ExecutionError < S > > {
@@ -1144,7 +1134,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
1144
1134
}
1145
1135
1146
1136
async fn last_step (
1147
- self ,
1137
+ mut self ,
1148
1138
step_res : Result < StepOutcome < S > , S :: Error > ,
1149
1139
) -> Result < ( ) , ExecutionError < S > > {
1150
1140
match step_res {
@@ -1182,7 +1172,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
1182
1172
}
1183
1173
1184
1174
async fn send_error (
1185
- self ,
1175
+ mut self ,
1186
1176
error : & S :: Error ,
1187
1177
) -> Result < ( ) , mpsc:: error:: SendError < Event < S > > > {
1188
1178
// Stringify `error` into a message + list causes; this is written the
0 commit comments