@@ -200,12 +200,23 @@ func NewAndStart(log *zap.Logger, config Config, beeRemoteClient *beeremote.Clie
200200
201201 allEntriesFound := 0
202202 for priority := range priorityLevels {
203- entriesFound , err := m .initScheduler (SubmissionIdPriorityRange (priority ))
203+ lastSubmissionId , nextRescheduledTime , entriesFound , err := m .initScheduler (SubmissionIdPriorityRange (priority ))
204204 if err != nil {
205205 m .log .Error ("failed to initialize scheduler" , zap .Error (err ))
206206 break
207207 }
208+
208209 allEntriesFound += entriesFound
210+ if ! nextRescheduledTime .IsZero () {
211+ m .scheduler .SetNextRescheduledTime (nextRescheduledTime , priority )
212+ }
213+ if lastSubmissionId != nil {
214+ if nextSubmissionId , _ , err := IncrementSubmissionId (* lastSubmissionId ); err != nil {
215+ m .log .Error ("failed to initialize scheduler" , zap .Error (err ))
216+ } else {
217+ m .scheduler .SetNextSubmissionId (nextSubmissionId , priority )
218+ }
219+ }
209220 }
210221 if allEntriesFound > 0 {
211222 m .log .Info ("discovered work requests from previous run" , zap .Int ("requests" , allEntriesFound ))
@@ -285,7 +296,7 @@ func (m *Manager) manage(deferredFuncs []func() error) {
285296 workJournal : m .workJournal ,
286297 jobStore : m .jobStore ,
287298 beeRemoteClient : m .beeRemoteClient ,
288- rescheduleWork : m .scheduler .AddWorkToken ,
299+ rescheduleWork : m .scheduler .RescheduleWork ,
289300 }
290301 m .workerWG .Add (1 )
291302 go w .run (m .workerCtx , m .workerWG )
@@ -299,17 +310,33 @@ func (m *Manager) manage(deferredFuncs []func() error) {
299310 case <- m .mgrCtx .Done ():
300311 return
301312 case allowedTokens := <- m .scheduler .tokensReleased :
313+ currentTime := time .Now ()
302314 for priority , ok := nextPriority (); ok ; priority , ok = nextPriority () {
303315 tokensDistributed := allowedTokens [priority ]
304316 if tokensDistributed == 0 {
305317 continue
306318 }
307319
308320 start , stop := SubmissionIdPriorityRange (priority )
309- err = m .pullInWork (start , stop , allowedTokens [priority ])
310- if err != nil {
321+ nextSubmissionId := m .scheduler .GetNextSubmissionId (priority )
322+
323+ // Add rescheduled work to the activeWork map
324+ nextRescheduledTime := m .scheduler .GetNextRescheduledTime (priority )
325+ if currentTime .After (nextRescheduledTime ) {
326+ if _ , nextRescheduledTime , err = m .pullInWork (start , nextSubmissionId , & allowedTokens [priority ]); err != nil {
327+ m .log .Error ("failed to pull in new work" , zap .Error (err ))
328+ break
329+ } else {
330+ m .scheduler .SetNextRescheduledTime (nextRescheduledTime , priority )
331+ }
332+ }
333+
334+ // Add new work to the activeWork map
335+ if next , _ , err := m .pullInWork (nextSubmissionId , stop , & allowedTokens [priority ]); err != nil {
311336 m .log .Error ("failed to pull in new work" , zap .Error (err ))
312337 break
338+ } else if next != nil {
339+ m .scheduler .SetNextSubmissionId (* next , priority )
313340 }
314341 }
315342 case completion := <- completedWork :
@@ -335,9 +362,13 @@ func (m *Manager) manage(deferredFuncs []func() error) {
335362}
336363
337364// pullInWork moves ready work from the priority range to the activeWork map.
338- func (m * Manager ) pullInWork (start string , stop string , availableTokens int ) error {
339- if availableTokens <= 0 {
340- return nil
365+ func (m * Manager ) pullInWork (start string , stop string , availableTokens * int ) (lastSubmissionId * string , nextExecuteAfter time.Time , err error ) {
366+ if availableTokens == nil {
367+ err = fmt .Errorf ("availableTokens was unexpectedly nil: this is a bug" )
368+ return
369+ }
370+ if * availableTokens <= 0 {
371+ return
341372 }
342373
343374 m .activeWorkMu .Lock ()
@@ -348,26 +379,29 @@ func (m *Manager) pullInWork(start string, stop string, availableTokens int) err
348379 kvstore .WithStopKey (stop ),
349380 )
350381 if err != nil {
351- return fmt .Errorf ("unable to get work journal entries: %w" , err )
382+ err = fmt .Errorf ("unable to get work journal entries: %w" , err )
383+ return
352384 }
353385 defer cleanupNext ()
354386
355387 item , err := nextItem ()
356388 if err != nil {
357- return fmt .Errorf ("unable to get work journal entry: %w" , err )
389+ err = fmt .Errorf ("unable to get work journal entry: %w" , err )
390+ return
358391 }
359392 if item == nil {
360- return nil
393+ return
361394 }
362395
396+ lastSubmissionId = new (string )
363397 currentTime := time .Now ()
364- for item != nil && availableTokens > 0 {
365- submissionID : = item .Key
398+ for item != nil && * availableTokens > 0 {
399+ * lastSubmissionId = item .Key
366400 entry := item .Entry .Value
367401
368402 if currentTime .After (entry .ExecuteAfter ) {
369403 workId := workIdentifier {
370- submissionID : submissionID ,
404+ submissionID : * lastSubmissionId ,
371405 jobID : entry .WorkRequest .JobId ,
372406 workRequestID : entry .WorkRequest .RequestId ,
373407 }
@@ -380,11 +414,15 @@ func (m *Manager) pullInWork(start string, stop string, availableTokens int) err
380414 activeWork := workAssignment {ctx : workCtx , workIdentifier : workId }
381415 m .activeWork [activeWork .workIdentifier ] = workContext {ctx : workCtx , cancel : workCtxCancel }
382416 m .activeWorkQueue <- activeWork
383- availableTokens -= 1
384- m .scheduler .RemoveWorkToken (submissionID )
417+ * availableTokens -= 1
418+ m .scheduler .RemoveWorkToken (* lastSubmissionId )
385419 priority := priorityIdMap [entry .WorkRequest .GetPriority ()]
386420 beeSyncActiveQueue .Add (priority , 1 )
387421 }
422+ } else {
423+ if nextExecuteAfter .IsZero () || entry .ExecuteAfter .Before (nextExecuteAfter ) {
424+ nextExecuteAfter = entry .ExecuteAfter
425+ }
388426 }
389427
390428 item , err = nextItem ()
@@ -393,41 +431,49 @@ func (m *Manager) pullInWork(start string, stop string, availableTokens int) err
393431 }
394432 }
395433
396- return err
434+ return
397435}
398436
399- // initScheduler adds tokens for unfinished work requests so they can be handled when the Sync
400- // node starts.
401- func (m * Manager ) initScheduler (start string , stop string ) (int , error ) {
437+ // initScheduler adds tokens for unfinished work requests so they can be handled when the Sync node
438+ // starts. It returns the last entry's submissionId, entries found and an error if there was one .
439+ func (m * Manager ) initScheduler (start string , stop string ) (submissionId * string , nextExecuteAfter time. Time , entriesFound int , err error ) {
402440 nextItem , cleanupNext , err := m .workJournal .GetEntries (
403441 kvstore .WithStartingKey (start ),
404442 kvstore .WithStopKey (stop ),
405443 )
406444 if err != nil {
407- return 0 , fmt .Errorf ("unable to get work journal entries: %w" , err )
445+ err = fmt .Errorf ("unable to get work journal entries: %w" , err )
446+ return
408447 }
409448 defer cleanupNext ()
410449
411450 submission , err := nextItem ()
412451 if err != nil {
413- return 0 , fmt .Errorf ("unable to get work journal entry: %w" , err )
452+ err = fmt .Errorf ("unable to get work journal entry: %w" , err )
453+ return
414454 }
415455 if submission == nil {
416- return 0 , nil
456+ return
417457 }
418458
419- entriesFound := 0
459+ submissionId = new ( string )
420460 for submission != nil {
421- m .scheduler .AddWorkToken (submission .Key )
461+ * submissionId = submission .Key
462+ entry := submission .Entry .Value
463+ m .scheduler .AddWorkToken (* submissionId )
422464 entriesFound ++
423465
466+ if nextExecuteAfter .IsZero () || entry .ExecuteAfter .Before (nextExecuteAfter ) {
467+ nextExecuteAfter = entry .ExecuteAfter
468+ }
469+
424470 submission , err = nextItem ()
425471 if err != nil {
426472 break
427473 }
428474 }
429475
430- return entriesFound , err
476+ return
431477}
432478
433479func (m * Manager ) SubmitWorkRequest (wr * flex.WorkRequest ) (* flex.Work , error ) {
0 commit comments