From ec9a4a7c87ac24ffbaf6aff29da9e63894dbffe5 Mon Sep 17 00:00:00 2001 From: Matthias Larsen Date: Wed, 21 Apr 2021 14:45:10 +0200 Subject: [PATCH 1/5] feat: save dependantJobs as array of stepIds rather than job instances Signed-off-by: Matthias Larsen --- src/Models/Workflow.php | 7 +++++-- src/WorkflowDefinition.php | 6 +++--- src/WorkflowStep.php | 3 ++- tests/Helpers.php | 7 +++++++ tests/WorkflowDefinitionTest.php | 2 +- tests/WorkflowTest.php | 21 +++++++++++++-------- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/Models/Workflow.php b/src/Models/Workflow.php index dd30f9d..d7c5613 100644 --- a/src/Models/Workflow.php +++ b/src/Models/Workflow.php @@ -58,7 +58,7 @@ public function addJobs(array $jobs): void 'job' => serialize($job['job']), 'name' => $job['name'], 'uuid' => $job['job']->stepId, - 'edges' => collect($job['job']->dependantJobs)->pluck('stepId')->all() + 'edges' => $job['job']->dependantJobs ]) ->pipe(function ($jobs) { $this->jobs()->createMany($jobs); @@ -79,7 +79,10 @@ public function onStepFinished($job): void return; } - collect($job->dependantJobs) + $jobs = WorkflowJob::where('uuid', $job->dependantJobs) + ->get('job') + ->pluck('job') + ->map(fn ($job) => unserialize($job)) ->filter(fn ($job) => $this->canJobRun($job)) ->each(function ($job) { $this->dispatchJob($job); diff --git a/src/WorkflowDefinition.php b/src/WorkflowDefinition.php index 8e776d2..27883d8 100644 --- a/src/WorkflowDefinition.php +++ b/src/WorkflowDefinition.php @@ -59,7 +59,9 @@ public function addJob( } $this->jobs[$id] = [ - 'job' => $job, + 'job' => $job + ->withJobId($id) + ->withStepId(Str::orderedUuid()), 'name' => $name ?: get_class($job), ]; @@ -121,8 +123,6 @@ public function build(?Closure $beforeCreate = null): array foreach ($this->jobs as $id => $job) { $job['job'] ->withWorkflowId($workflow->id) - ->withStepId(Str::orderedUuid()) - ->withJobId($id) ->withDependantJobs($this->graph->getDependantJobs($id)) ->withDependencies($this->graph->getDependencies($id)); } diff --git a/src/WorkflowStep.php b/src/WorkflowStep.php index f56b26f..82d35a7 100644 --- a/src/WorkflowStep.php +++ b/src/WorkflowStep.php @@ -2,6 +2,7 @@ namespace Sassnowski\Venture; +use Illuminate\Support\Arr; use Ramsey\Uuid\UuidInterface; use Sassnowski\Venture\Models\Workflow; use Sassnowski\Venture\Models\WorkflowJob; @@ -32,7 +33,7 @@ public function workflow(): ?Workflow public function withDependantJobs(array $jobs): self { - $this->dependantJobs = $jobs; + $this->dependantJobs = Arr::pluck($jobs, 'stepId'); return $this; } diff --git a/tests/Helpers.php b/tests/Helpers.php index 0fc2e2f..14c05e0 100644 --- a/tests/Helpers.php +++ b/tests/Helpers.php @@ -25,3 +25,10 @@ function createWorkflowJob(Workflow $workflow, array $attributes = []): Workflow 'finished_at' => null, ], $attributes)); } + +function wrapJobsForWorkflow($jobs) { + return collect($jobs)->map(fn ($job) => [ + 'job' => $job->withJobId($job->jobId ?? get_class($job)), + 'name' => get_class($job) + ])->all(); +} diff --git a/tests/WorkflowDefinitionTest.php b/tests/WorkflowDefinitionTest.php index 20b7c3b..2f078d7 100644 --- a/tests/WorkflowDefinitionTest.php +++ b/tests/WorkflowDefinitionTest.php @@ -121,7 +121,7 @@ ->addJob($testJob2, dependencies: [TestJob1::class]) ->build(); - assertEquals([$testJob2], $testJob1->dependantJobs); + assertEquals([$testJob2->stepId], $testJob1->dependantJobs); assertEquals([], $testJob2->dependantJobs); }); diff --git a/tests/WorkflowTest.php b/tests/WorkflowTest.php index 3bb645a..ad54328 100644 --- a/tests/WorkflowTest.php +++ b/tests/WorkflowTest.php @@ -75,14 +75,16 @@ it('runs a finished job\'s dependency if no other dependencies exist', function () { Bus::fake(); - $job1 = new TestJob1(); - $job2 = new TestJob2(); + $job1 = (new TestJob1())->withStepId(Str::orderedUuid()); + $job2 = (new TestJob2())->withStepId(Str::orderedUuid()); $job1->withDependantJobs([$job2]); $job2->withDependencies([TestJob1::class]); $workflow = createWorkflow([ 'job_count' => 2, ]); + $workflow->addJobs(wrapJobsForWorkflow([$job1, $job2])); + $workflow->onStepFinished($job1); Bus::assertDispatched(TestJob2::class); @@ -109,16 +111,19 @@ it('runs a job if all of its dependencies have finished', function () { Bus::fake(); - $job1 = new TestJob1(); - $job2 = new TestJob2(); - $job3 = (new TestJob3())->withJobId('::job-3-id::'); - $job1->withDependantJobs([$job2]); - $job2->withDependencies([TestJob1::class, '::job-3-id::']); - $job3->withDependantJobs([$job2]); $workflow = createWorkflow([ 'job_count' => 3, ]); + $job1 = (new TestJob1())->withStepId(Str::orderedUuid()); + $job2 = (new TestJob2())->withStepId(Str::orderedUuid()); + $job3 = (new TestJob3())->withJobId('::job-3-id::')->withStepId(Str::orderedUuid()); + $job1->withDependantJobs([$job2]); + $job2->withDependencies([TestJob1::class, '::job-3-id::']); + $job3->withDependantJobs([$job2]); + + $workflow->addJobs(wrapJobsForWorkflow([$job1, $job2, $job3])); + $workflow->onStepFinished($job1); $workflow->onStepFinished($job3); From 5ee021f420716549fc1a64ecd4365bee9413bf95 Mon Sep 17 00:00:00 2001 From: Matthias Larsen Date: Wed, 21 Apr 2021 16:50:07 +0200 Subject: [PATCH 2/5] fix: use whereIn Signed-off-by: Matthias Larsen --- src/Models/Workflow.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Models/Workflow.php b/src/Models/Workflow.php index d7c5613..6a6caa5 100644 --- a/src/Models/Workflow.php +++ b/src/Models/Workflow.php @@ -79,7 +79,7 @@ public function onStepFinished($job): void return; } - $jobs = WorkflowJob::where('uuid', $job->dependantJobs) + WorkflowJob::whereIn('uuid', $job->dependantJobs) ->get('job') ->pluck('job') ->map(fn ($job) => unserialize($job)) From 714eb31a50a97a6bb4ab6c234490a6a5869b7a88 Mon Sep 17 00:00:00 2001 From: Matthias Larsen Date: Wed, 21 Apr 2021 16:50:42 +0200 Subject: [PATCH 3/5] fix: update jobId on jobs when nesting workflows Signed-off-by: Matthias Larsen --- src/WorkflowDefinition.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/WorkflowDefinition.php b/src/WorkflowDefinition.php index 27883d8..9579bd3 100644 --- a/src/WorkflowDefinition.php +++ b/src/WorkflowDefinition.php @@ -82,6 +82,7 @@ public function addWorkflow(AbstractWorkflow $workflow, array $dependencies = [] $this->graph->connectGraph($definition->graph, $workflowId, $dependencies); foreach ($definition->jobs as $jobId => $job) { + $job['job'] = $job['job']->withJobId($workflowId . '.' . $jobId); $this->jobs[$workflowId . '.' . $jobId] = $job; } From c5cb76d58117a4125c7b2145cdfaa5c55fbb879a Mon Sep 17 00:00:00 2001 From: Matthias Larsen Date: Mon, 26 Apr 2021 10:51:01 +0200 Subject: [PATCH 4/5] test: ensure nested workflow job ids gets updated for proper referencing Signed-off-by: Matthias Larsen --- tests/Stubs/NestedWorkflow.php | 22 ++++++++++++++++ tests/Stubs/WorkflowWithWorkflow.php | 22 ++++++++++++++++ tests/WorkflowDefinitionTest.php | 17 +++++++----- tests/WorkflowTest.php | 39 ++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 7 deletions(-) create mode 100644 tests/Stubs/NestedWorkflow.php create mode 100644 tests/Stubs/WorkflowWithWorkflow.php diff --git a/tests/Stubs/NestedWorkflow.php b/tests/Stubs/NestedWorkflow.php new file mode 100644 index 0000000..fa33c69 --- /dev/null +++ b/tests/Stubs/NestedWorkflow.php @@ -0,0 +1,22 @@ +job ??= new TestJob1(); + } + + public function definition(): WorkflowDefinition + { + return WorkflowFacade::define('::name::')->addJob($this->job); + } +} + diff --git a/tests/Stubs/WorkflowWithWorkflow.php b/tests/Stubs/WorkflowWithWorkflow.php new file mode 100644 index 0000000..50efa09 --- /dev/null +++ b/tests/Stubs/WorkflowWithWorkflow.php @@ -0,0 +1,22 @@ +addWorkflow($this->workflow); + } +} + diff --git a/tests/WorkflowDefinitionTest.php b/tests/WorkflowDefinitionTest.php index 2f078d7..bd43a13 100644 --- a/tests/WorkflowDefinitionTest.php +++ b/tests/WorkflowDefinitionTest.php @@ -1,6 +1,7 @@ hasJobWithDependencies(TestJob2::class, [TestJob1::class])); }); +it('adding another workflow updates the job id on nested job instances', function () { + $definition = (new WorkflowDefinition()) + ->addJob(new TestJob1()) + ->addJob(new TestJob2(), [TestJob1::class]) + ->addWorkflow(new NestedWorkflow($job = new TestJob1())); + + assertEquals(NestedWorkflow::class . '.' . TestJob1::class, $job->jobId); +}); + it('throws an exception when trying to add a job without the ShouldQueue interface', function () { (new WorkflowDefinition())->addJob(new NonQueueableJob()); })->expectException(NonQueueableWorkflowStepException::class); @@ -418,10 +428,3 @@ public function __invoke() } } -class NestedWorkflow extends AbstractWorkflow -{ - public function definition(): WorkflowDefinition - { - return WorkflowFacade::define('::name::')->addJob(new TestJob1()); - } -} diff --git a/tests/WorkflowTest.php b/tests/WorkflowTest.php index ad54328..1796ddd 100644 --- a/tests/WorkflowTest.php +++ b/tests/WorkflowTest.php @@ -1,12 +1,14 @@ refresh()->jobs_processed); }); +it('it stores finished job id, defaulting to class name', function () { + $job1 = new TestJob1(); + $workflow = createWorkflow([ + 'job_count' => 1, + 'jobs_processed' => 0, + ]); + + $workflow->onStepFinished($job1); + + assertEquals([$job1::class], $workflow->refresh()->finished_jobs); +}); + +it('it stores finished job id', function () { + $job1 = (new TestJob1())->withJobId('::job-id::'); + $workflow = createWorkflow([ + 'job_count' => 1, + 'jobs_processed' => 0, + ]); + + $workflow->onStepFinished($job1); + + assertEquals(['::job-id::'], $workflow->refresh()->finished_jobs); +}); + +it('it stores finished job id for nested workflow jobs', function () { + $workflow = new WorkflowWithWorkflow(new NestedWorkflow( + $job = new TestJob1() + )); + $definition = $workflow->definition(); + [$model, $initial] = $definition->build(); + + $model->onStepFinished($job); + + assertEquals(NestedWorkflow::class . '.' . TestJob1::class, $job->jobId); + assertEquals([NestedWorkflow::class . '.' . TestJob1::class], $model->refresh()->finished_jobs); +}); + it('marks itself as finished if the all jobs have been processed', function () { Carbon::setTestNow(now()); From 60d140e9db46c04cb53696ffb244482d1e68daa2 Mon Sep 17 00:00:00 2001 From: Matthias Larsen Date: Tue, 11 May 2021 09:13:39 +0200 Subject: [PATCH 5/5] feedback Signed-off-by: Matthias Larsen --- tests/WorkflowTest.php | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/tests/WorkflowTest.php b/tests/WorkflowTest.php index 1796ddd..37cfd6d 100644 --- a/tests/WorkflowTest.php +++ b/tests/WorkflowTest.php @@ -38,29 +38,25 @@ assertEquals(1, $workflow->refresh()->jobs_processed); }); -it('it stores finished job id, defaulting to class name', function () { - $job1 = new TestJob1(); +it('stores a finished job\'s id', function ($job, string $expectedJobId) { $workflow = createWorkflow([ 'job_count' => 1, 'jobs_processed' => 0, ]); - $workflow->onStepFinished($job1); - - assertEquals([$job1::class], $workflow->refresh()->finished_jobs); -}); - -it('it stores finished job id', function () { - $job1 = (new TestJob1())->withJobId('::job-id::'); - $workflow = createWorkflow([ - 'job_count' => 1, - 'jobs_processed' => 0, - ]); - - $workflow->onStepFinished($job1); + $workflow->onStepFinished($job); - assertEquals(['::job-id::'], $workflow->refresh()->finished_jobs); -}); + assertEquals([$expectedJobId], $workflow->refresh()->finished_jobs); +})->with([ + 'no job id should default to class name' => [ + new TestJob1(), + TestJob1::class, + ], + 'use existing job id' => [ + (new TestJob1())->withJobId('::job-id::'), + '::job-id::', + ] +]); it('it stores finished job id for nested workflow jobs', function () { $workflow = new WorkflowWithWorkflow(new NestedWorkflow( @@ -71,7 +67,6 @@ $model->onStepFinished($job); - assertEquals(NestedWorkflow::class . '.' . TestJob1::class, $job->jobId); assertEquals([NestedWorkflow::class . '.' . TestJob1::class], $model->refresh()->finished_jobs); });