Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added WorkerRegistry and ability to add additional worker implementat… #98

Merged
merged 1 commit into from
Mar 7, 2023

Conversation

iborysenko
Copy link
Contributor

…ions (grpc, jobs, ...)

@iborysenko
Copy link
Contributor Author

Hi @Baldinof
This is preparation PR for future roadrunner/jobs integration

Now it will be possible to add something like

final class JobsWorker implements WorkerInterface
{
    public function __construct(
        private EventDispatcherInterface $eventDispatcher,
        private ConsumerInterface $consumer,
        private ProcessorRegistry $registry,
        private ChainExceptionHandler $exceptionHandler
    ) {
    }

    public function start(): void
    {
        $this->eventDispatcher->dispatch(new WorkerStartEvent());

        while ($task = $this->consumer->waitTask()) {
            $this->eventDispatcher->dispatch(new TaskReceivedEvent($task));
            $exception = null;
            try {
                $processor = $this->registry->getProcessor($task->getName());
                if (!$processor instanceof ProcessorInterface) {
                    throw new JobsException(\sprintf('missing processor for task "%s"', $task->getName()));
                }

                $task = $processor->process($task);
            } catch (\Throwable $e) {
                $exception = $e;
                $task = $this->exceptionHandler->handle($exception, $task);
            } finally {
                if (false === $task->isCompleted()) {
                    $task->complete();
                }
                $this->eventDispatcher->dispatch(new TaskCompletedEvent($task, $exception));
            }
        }

        $this->eventDispatcher->dispatch(new WorkerStopEvent());
    }
}

@Baldinof
Copy link
Owner

Hi!

Nice, I'll try to review it this week :)

Copy link
Owner

@Baldinof Baldinof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments. Great work :)

src/Worker/GrpcWorkerInterface.php Show resolved Hide resolved
src/Worker/WorkerRegistry.php Outdated Show resolved Hide resolved
@iborysenko iborysenko requested a review from Baldinof February 27, 2023 16:46
…ions (grpc, jobs, ...)

stan fixes

cs fix

revert

BC fix, deprecated GrpcWorkerInterface

CS fix

CS fix
@FluffyDiscord
Copy link
Contributor

@iborysenko would you mind adding full usage example to README ?

@iborysenko
Copy link
Contributor Author

FluffyDiscord

Hi, you meant full roadrunner-jobs configuration, or symfony flex setup ?
My implementation of roadrunner-jobs will be in the next MR, this MR is just structure preparation

@FluffyDiscord
Copy link
Contributor

FluffyDiscord

Hi, you meant full roadrunner-jobs configuration, or symfony flex setup ? My implementation of roadrunner-jobs will be in the next MR, this MR is just structure preparation

Aha, then I am looking forward to the next MR 😊

@Baldinof Baldinof merged commit adf9501 into Baldinof:2.x Mar 7, 2023
@Baldinof
Copy link
Owner

Baldinof commented Mar 7, 2023

Thank you @iborysenko!

@koekaverna
Copy link

It's awesome!
Waiting for release!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants