Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save dependantJobs as array of stepIds rather than job instances #30

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/Models/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function addJobs(array $jobs): void
'job' => serialize($job['job']),
'name' => $job['name'],
'uuid' => $job['job']->stepId,
'edges' => collect($job['job']->dependantJobs)->pluck('stepId')->all()
'edges' => $job['job']->dependantJobs
])
->pipe(function ($jobs) {
$this->jobs()->createMany($jobs);
Expand All @@ -79,7 +79,10 @@ public function onStepFinished($job): void
return;
}

collect($job->dependantJobs)
WorkflowJob::whereIn('uuid', $job->dependantJobs)
->get('job')
->pluck('job')
->map(fn ($job) => unserialize($job))
->filter(fn ($job) => $this->canJobRun($job))
->each(function ($job) {
$this->dispatchJob($job);
Expand Down
7 changes: 4 additions & 3 deletions src/WorkflowDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public function addJob(
}

$this->jobs[$id] = [
'job' => $job,
'job' => $job
->withJobId($id)
->withStepId(Str::orderedUuid()),
'name' => $name ?: get_class($job),
];

Expand All @@ -80,6 +82,7 @@ public function addWorkflow(AbstractWorkflow $workflow, array $dependencies = []
$this->graph->connectGraph($definition->graph, $workflowId, $dependencies);

foreach ($definition->jobs as $jobId => $job) {
$job['job'] = $job['job']->withJobId($workflowId . '.' . $jobId);
$this->jobs[$workflowId . '.' . $jobId] = $job;
}

Expand Down Expand Up @@ -121,8 +124,6 @@ public function build(?Closure $beforeCreate = null): array
foreach ($this->jobs as $id => $job) {
$job['job']
->withWorkflowId($workflow->id)
->withStepId(Str::orderedUuid())
->withJobId($id)
->withDependantJobs($this->graph->getDependantJobs($id))
->withDependencies($this->graph->getDependencies($id));
}
Expand Down
3 changes: 2 additions & 1 deletion src/WorkflowStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Sassnowski\Venture;

use Illuminate\Support\Arr;
use Ramsey\Uuid\UuidInterface;
use Sassnowski\Venture\Models\Workflow;
use Sassnowski\Venture\Models\WorkflowJob;
Expand Down Expand Up @@ -32,7 +33,7 @@ public function workflow(): ?Workflow

public function withDependantJobs(array $jobs): self
{
$this->dependantJobs = $jobs;
$this->dependantJobs = Arr::pluck($jobs, 'stepId');

return $this;
}
Expand Down
7 changes: 7 additions & 0 deletions tests/Helpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ function createWorkflowJob(Workflow $workflow, array $attributes = []): Workflow
'finished_at' => null,
], $attributes));
}

function wrapJobsForWorkflow($jobs) {
return collect($jobs)->map(fn ($job) => [
'job' => $job->withJobId($job->jobId ?? get_class($job)),
'name' => get_class($job)
])->all();
}
22 changes: 22 additions & 0 deletions tests/Stubs/NestedWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php


namespace Stubs;

use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\Facades\Workflow as WorkflowFacade;
use Sassnowski\Venture\WorkflowDefinition;

class NestedWorkflow extends AbstractWorkflow
{
public function __construct(public $job = null)
{
$this->job ??= new TestJob1();
}

public function definition(): WorkflowDefinition
{
return WorkflowFacade::define('::name::')->addJob($this->job);
}
}

22 changes: 22 additions & 0 deletions tests/Stubs/WorkflowWithWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php


namespace Stubs;

use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\Facades\Workflow as WorkflowFacade;
use Sassnowski\Venture\WorkflowDefinition;

class WorkflowWithWorkflow extends AbstractWorkflow
{
public function __construct(public $workflow)
{
}

public function definition(): WorkflowDefinition
{
return WorkflowFacade::define('::name::')
->addWorkflow($this->workflow);
}
}

19 changes: 11 additions & 8 deletions tests/WorkflowDefinitionTest.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php declare(strict_types=1);

use Carbon\Carbon;
use Stubs\NestedWorkflow;
use Stubs\TestJob1;
use Stubs\TestJob2;
use Stubs\TestJob3;
Expand Down Expand Up @@ -121,7 +122,7 @@
->addJob($testJob2, dependencies: [TestJob1::class])
->build();

assertEquals([$testJob2], $testJob1->dependantJobs);
assertEquals([$testJob2->stepId], $testJob1->dependantJobs);
assertEquals([], $testJob2->dependantJobs);
});

Expand Down Expand Up @@ -369,6 +370,15 @@ public function definition(): WorkflowDefinition
assertTrue($definition->hasJobWithDependencies(TestJob2::class, [TestJob1::class]));
});

it('adding another workflow updates the job id on nested job instances', function () {
$definition = (new WorkflowDefinition())
->addJob(new TestJob1())
->addJob(new TestJob2(), [TestJob1::class])
->addWorkflow(new NestedWorkflow($job = new TestJob1()));

assertEquals(NestedWorkflow::class . '.' . TestJob1::class, $job->jobId);
});

it('throws an exception when trying to add a job without the ShouldQueue interface', function () {
(new WorkflowDefinition())->addJob(new NonQueueableJob());
})->expectException(NonQueueableWorkflowStepException::class);
Expand Down Expand Up @@ -418,10 +428,3 @@ public function __invoke()
}
}

class NestedWorkflow extends AbstractWorkflow
{
public function definition(): WorkflowDefinition
{
return WorkflowFacade::define('::name::')->addJob(new TestJob1());
}
}
55 changes: 47 additions & 8 deletions tests/WorkflowTest.php
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
<?php declare(strict_types=1);

use Carbon\Carbon;
use Stubs\NestedWorkflow;
use Stubs\TestJob1;
use Stubs\TestJob2;
use Stubs\TestJob3;
use Illuminate\Support\Str;
use Illuminate\Support\Facades\Bus;
use Opis\Closure\SerializableClosure;
use Stubs\WorkflowWithWorkflow;
use function PHPUnit\Framework\assertNull;
use function PHPUnit\Framework\assertTrue;
use function PHPUnit\Framework\assertCount;
Expand Down Expand Up @@ -36,6 +38,38 @@
assertEquals(1, $workflow->refresh()->jobs_processed);
});

it('stores a finished job\'s id', function ($job, string $expectedJobId) {
$workflow = createWorkflow([
'job_count' => 1,
'jobs_processed' => 0,
]);

$workflow->onStepFinished($job);

assertEquals([$expectedJobId], $workflow->refresh()->finished_jobs);
})->with([
'no job id should default to class name' => [
new TestJob1(),
TestJob1::class,
],
'use existing job id' => [
(new TestJob1())->withJobId('::job-id::'),
'::job-id::',
]
]);

it('it stores finished job id for nested workflow jobs', function () {
$workflow = new WorkflowWithWorkflow(new NestedWorkflow(
$job = new TestJob1()
));
$definition = $workflow->definition();
[$model, $initial] = $definition->build();

$model->onStepFinished($job);

assertEquals([NestedWorkflow::class . '.' . TestJob1::class], $model->refresh()->finished_jobs);
});

it('marks itself as finished if the all jobs have been processed', function () {
Carbon::setTestNow(now());

Expand Down Expand Up @@ -75,14 +109,16 @@
it('runs a finished job\'s dependency if no other dependencies exist', function () {
Bus::fake();

$job1 = new TestJob1();
$job2 = new TestJob2();
$job1 = (new TestJob1())->withStepId(Str::orderedUuid());
$job2 = (new TestJob2())->withStepId(Str::orderedUuid());
$job1->withDependantJobs([$job2]);
$job2->withDependencies([TestJob1::class]);
$workflow = createWorkflow([
'job_count' => 2,
]);

$workflow->addJobs(wrapJobsForWorkflow([$job1, $job2]));

$workflow->onStepFinished($job1);

Bus::assertDispatched(TestJob2::class);
Expand All @@ -109,16 +145,19 @@
it('runs a job if all of its dependencies have finished', function () {
Bus::fake();

$job1 = new TestJob1();
$job2 = new TestJob2();
$job3 = (new TestJob3())->withJobId('::job-3-id::');
$job1->withDependantJobs([$job2]);
$job2->withDependencies([TestJob1::class, '::job-3-id::']);
$job3->withDependantJobs([$job2]);
$workflow = createWorkflow([
'job_count' => 3,
]);

$job1 = (new TestJob1())->withStepId(Str::orderedUuid());
$job2 = (new TestJob2())->withStepId(Str::orderedUuid());
$job3 = (new TestJob3())->withJobId('::job-3-id::')->withStepId(Str::orderedUuid());
$job1->withDependantJobs([$job2]);
$job2->withDependencies([TestJob1::class, '::job-3-id::']);
$job3->withDependantJobs([$job2]);

$workflow->addJobs(wrapJobsForWorkflow([$job1, $job2, $job3]));

$workflow->onStepFinished($job1);
$workflow->onStepFinished($job3);

Expand Down