-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add BaseWorker
and ProcessWorker
#7996
Conversation
✅ Deploy Preview for prefect-orion ready!
To edit notification comments on pull requests, go to your Netlify site settings. |
175cc35
to
f4e66e5
Compare
self._submit_run_and_capture_errors, flow_run | ||
) | ||
|
||
if readiness_result and not isinstance(readiness_result, Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if it's an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception is handled in _submit_run_and_capture_errors
where the flow run is either marked as failed if submission failed, or a error is logged saying that something might have gone wrong. This check is to make sure that the worker doesn't try to update the flow run in the backend if something when wrong during submission.
This was taken from the existing agent implementation so this could be a good time to improve this logic if there's a better way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure we'll learn more as we work with this, but this LGTM
@desertaxle Please don't merge until 2.7.7 is out. |
…r-builds-release * 'main' of https://github.com/ddelange/prefect: (131 commits) Fix docker-builds.yaml templating syntax (PrefectHQ#8114) Rename Worker pools -> work pools (PrefectHQ#8107) Build multi-arch images for commits on main (PrefectHQ#7900) [Issue PrefectHQ#456] Change example of `infra_override` in docs/concepts/deployment (PrefectHQ#8101) Bump @playwright/test from 1.29.1 to 1.29.2 in /orion-ui (PrefectHQ#8105) Bump @prefecthq/orion-design from 1.1.53 to 1.1.54 in /orion-ui (PrefectHQ#8104) Update deployment docs to include tag and idempotency key (PrefectHQ#7771) Add `BaseWorker` and `ProcessWorker` (PrefectHQ#7996) Add Peyton and Serina as global code owners (PrefectHQ#8098) Add release notes for 2.7.7 (PrefectHQ#8091) Add youtube badge (PrefectHQ#8089) Adds `MAX_RRULE_LENGTH` (PrefectHQ#7762) Limit task run cache key size (PrefectHQ#7275) Add --match flag to work queues documentation (PrefectHQ#7768) Modify disable ssl setting tests to allow any for headers and timeout (PrefectHQ#8086) Add test for allow_failure and quote (PrefectHQ#8055) Adds `experimental_field` decorator (PrefectHQ#8066) add docs on migrating block documents (PrefectHQ#8085) Add Redoc documentation for REST API reference (PrefectHQ#7503) Allow disabling SSL verification (PrefectHQ#7850) ...
…refect into docker-builds-consolidation * 'docker-builds-release' of https://github.com/ddelange/prefect: (131 commits) Fix docker-builds.yaml templating syntax (PrefectHQ#8114) Rename Worker pools -> work pools (PrefectHQ#8107) Build multi-arch images for commits on main (PrefectHQ#7900) [Issue PrefectHQ#456] Change example of `infra_override` in docs/concepts/deployment (PrefectHQ#8101) Bump @playwright/test from 1.29.1 to 1.29.2 in /orion-ui (PrefectHQ#8105) Bump @prefecthq/orion-design from 1.1.53 to 1.1.54 in /orion-ui (PrefectHQ#8104) Update deployment docs to include tag and idempotency key (PrefectHQ#7771) Add `BaseWorker` and `ProcessWorker` (PrefectHQ#7996) Add Peyton and Serina as global code owners (PrefectHQ#8098) Add release notes for 2.7.7 (PrefectHQ#8091) Add youtube badge (PrefectHQ#8089) Adds `MAX_RRULE_LENGTH` (PrefectHQ#7762) Limit task run cache key size (PrefectHQ#7275) Add --match flag to work queues documentation (PrefectHQ#7768) Modify disable ssl setting tests to allow any for headers and timeout (PrefectHQ#8086) Add test for allow_failure and quote (PrefectHQ#8055) Adds `experimental_field` decorator (PrefectHQ#8066) add docs on migrating block documents (PrefectHQ#8085) Add Redoc documentation for REST API reference (PrefectHQ#7503) Allow disabling SSL verification (PrefectHQ#7850) ...
This PR adds a
BaseWorker
class to implement most of the mechanics and patterns needed for workers.BaseWorker
is an abstract base class that implements storage scanning, queue polling, and sending worker heartbeats.Workers scan a workflow storage location at a given interval and apply any discovered deployments so that scheduled flow runs can be executed by the worker pool that the deployment was submitted to. Currently, the way to submit a deployment to a worker is by placing the deployment manifest and flow code into the worker's configured workflow storage location. We plan on creating convenience methods for submitting deployments to workers so that the mechanics of where the deployment manifest and flow code need to be submitted is abstracted.
Implementations of
BaseWorker
are required to implement a.run
method which is used for executing a flow run. This run method is roughly analogous to the.run
method on an infrastructure block.BaseWorker
implementation are also required to implement a.verify_submitted_deployment
method. The.verify_sbumitted_deployment
method gives each worker implementation an opportunity to verify that the deployment was submitted to the correct worker type and the current worker will be able to execute scheduled flow runs for the submitted deployment.Workers are capable of creating a worker pool if the worker pool that they are configured with at start up does not already exist. Otherwise, workers will warn if they join a worker pool that is configured for a different worker type. This functionality can be further expanded to ensure worker compatibility with worker pools that they are joining.
This PR also adds a
ProcessWorker
which is the firstBaseWorker
implementation. It is largely a port of theProcess
infrastructure block.Features for workers that are necessary, but will be implemented in another PR:
All of this functionality is gated behind the
PREFECT_EXPERIMENTAL_ENABLE_WORKERS
setting.Example
Start a worker:
To submit a deployment:
prefect deployment build ...
worker_pool_name
to the generated deployment manifestPREFECT_WORKER_WORKFLOW_STORAGE_PATH
(defaults to~/.prefect/workflows
)Checklist
<link to issue>
"fix
,feature
,enhancement