Skip to content

Commit 3d83a08

Browse files
committedJul 12, 2022
Fix empty toplevel batch bug and add step batch description
1 parent e21cd00 commit 3d83a08

File tree

4 files changed

+38
-3
lines changed

4 files changed

+38
-3
lines changed
 

‎lib/simplekiq.rb

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "simplekiq/orchestration"
88
require "simplekiq/orchestration_job"
99
require "simplekiq/batching_job"
10+
require "simplekiq/batch_tracker_job"
1011

1112
module Simplekiq
1213
class << self

‎lib/simplekiq/batch_tracker_job.rb

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
# This job serves two purposes:
4+
# * TODO: It provides a convenient way to track top-level orchestration batches
5+
# * The top-level orchestration batch would otherwise be empty (aside from
6+
# child-batches) and all sidekiq-pro batches must have at least 1 job
7+
8+
module Simplekiq
9+
class BatchTrackerJob
10+
include Sidekiq::Worker
11+
12+
def perform(klass_name, bid, args)
13+
# In the future, this will likely surface the toplevel batch to a callback method on the
14+
# orchestration job. We're holding off on this until we have time to design a comprehensive
15+
# plan for providing simplekiq-wide instrumentation, ideally while being backwards compatible
16+
# for in-flight orchestrations.
17+
18+
# For now, it's just satisfying the all-batches-must-have-jobs limitation in sidekiq-pro
19+
# described at the head of the file.
20+
end
21+
end
22+
end

‎lib/simplekiq/orchestration_executor.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ def self.execute(args:, job:, workflow:)
1212
orchestration_batch.description = "#{job.class.name} Simplekiq orchestration"
1313
Simplekiq.auto_define_callbacks(orchestration_batch, args: args, job: job)
1414

15+
orchestration_batch.jobs do
16+
Simplekiq::BatchTrackerJob.perform_async(job.class.name, orchestration_batch.bid, args)
17+
end
18+
1519
new.run_step(orchestration_batch, workflow, 0)
1620
end
1721

@@ -20,12 +24,14 @@ def run_step(orchestration_batch, workflow, step)
2024
# This will never be empty because Orchestration#serialized_workflow skips inserting
2125
# a new step for in_parallel if there were no inner jobs specified.
2226

27+
next_step = step + 1
2328
orchestration_batch.jobs do
2429
step_batch = Sidekiq::Batch.new
30+
step_batch.description = "Simplekiq orchestrated step #{next_step}"
2531
step_batch.on(
2632
"success",
2733
self.class,
28-
{"orchestration_workflow" => workflow, "step" => step + 1}
34+
{"orchestration_workflow" => workflow, "step" => next_step}
2935
)
3036

3137
step_batch.jobs do

‎spec/orchestration_executor_spec.rb

+8-2
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@ def execute
2323
described_class.execute(args: ["some", "args"], job: job, workflow: workflow)
2424
end
2525

26-
it "kicks off the first step with a new batch" do
27-
batch_double = instance_double(Sidekiq::Batch)
26+
it "kicks off the first step with a new batch with the empty tracking batch inside it" do
27+
batch_double = instance_double(Sidekiq::Batch, bid: 42)
2828
allow(Sidekiq::Batch).to receive(:new).and_return(batch_double)
2929
expect(batch_double).to receive(:description=).with("FakeOrchestration Simplekiq orchestration")
3030
expect(batch_double).to receive(:on).with("success", FakeOrchestration, "args" => ["some", "args"])
31+
expect(batch_double).to receive(:jobs) do |&block|
32+
expect(Simplekiq::BatchTrackerJob).to receive(:perform_async)
33+
block.call
34+
end
3135

3236
instance = instance_double(Simplekiq::OrchestrationExecutor)
3337
allow(Simplekiq::OrchestrationExecutor).to receive(:new).and_return(instance)
@@ -69,6 +73,7 @@ def execute
6973
block.call
7074
batch_stack.shift
7175
end
76+
7277
expect(step_batch).to receive(:jobs) do |&block|
7378
expect(batch_stack).to eq ["orchestration"]
7479
batch_stack.push("step")
@@ -86,6 +91,7 @@ def execute
8691
"orchestration_workflow" => workflow,
8792
"step" => 1
8893
})
94+
expect(step_batch).to receive(:description=).with("Simplekiq orchestrated step 1")
8995

9096
instance.run_step(orchestration_batch, workflow, 0)
9197
end

0 commit comments

Comments
 (0)
Please sign in to comment.