diff --git a/content/posts/a-new-job-scheduler.md b/content/posts/a-new-job-scheduler.md index 14395e3..0b33011 100644 --- a/content/posts/a-new-job-scheduler.md +++ b/content/posts/a-new-job-scheduler.md @@ -86,3 +86,247 @@ Alright so what are the parts involved? Obviously some kind of `Job` trait, wher For manipulating jobs, we need some sort of `Client` type that lets you submit, update, read the status of, and cancel jobs. We need a job `Runner` which queries the store and runs jobs that can be run. The runner is also responsible for supplying the context to the job. The runner should "claim" a job so that we can have multiple runners at the same time, and the runner should also actively tick the job in the job store periodically while the job run function is running so that it can tell when another runner has forfeited a job (the job is "claimed" but was last ticked too long ago). The final responsibility of the runner is to aggregate logs and put them in long-term storage. + +Let's get started! + +## The `Job` Trait + +```sh +cargo new jobs --lib +``` + +```rust +pub trait Job { + +} +``` + +This is boring. What did I say we need again? Oh right; let's start with the (associated) return type and error type. + +```rust +pub trait Job { + type Result; + type Error; +} +``` + +We obviously need to enforce some bounds on these types that allow us to 1) move the job around from thread to thread, and 2) pack the job away into the database and pull it back out again. So that translates to `Send + Sync + 'static` and `Deserialize + Serialize`, respectively. + +```rust +use serde::{Deserialize, Serialize}; + +pub trait Job: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static { + type Result: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static; + type Error: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static; +} +``` + +The `for<'a> Deserialize<'a>` is because `Deserialize` the trait operates on a reference to the type instead of consuming it. We could use `DeserializeOwned` to consume it instead and get rid of the lifetime, but in this case there's no reason to. + +These trait bounds are quite verbose though, so let's use a marker trait to be more concise. + +```rust +use serde::{Deserialize, Serialize}; + +pub trait Portable: + for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static +{ +} + +impl Deserialize<'a> + Serialize + Send + Sync + 'static> Portable + for T +{ +} + +pub trait Job: Portable { + type Result: Portable; + type Error: Portable; +} +``` + +So what we're doing here is making a trait `Portable` with all the bounds described earlier, so anything that implements `Portable` is guaranteed to also implement `for<'a> Deserialize<'a> + Ser...`, and the compiler knows this. Then in the second block we're implementing `Portable` for everything that already meets the bounds. The `Portable` trait is now effectively an alias for `for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static`. + +Let's add defaults for the associated types, so that we can leave them out when implementing `Job` if we don't need them. Let's set them to the unit type, `()`. + +```rust +use serde::{Deserialize, Serialize}; + +pub trait Portable: + for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static +{ +} + +impl Deserialize<'a> + Serialize + Send + Sync + 'static> Portable + for T +{ +} + +pub trait Job: Portable { + type Result: Portable = (); + type Error: Portable = (); +} +``` + +The compiler tells us: +```sh +error[E0658]: associated type defaults are unstable + --> crates/jobs/src/lib.rs:22:3 + | +22 | type Result: Portable = (); + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ + | + = note: see issue #29661 for more information + = help: add `#![feature(associated_type_defaults)]` to the crate attributes to enable + = note: this compiler was built on 2024-07-01; consider upgrading it if it is out of date + +error[E0658]: associated type defaults are unstable + --> crates/jobs/src/lib.rs:23:3 + | +23 | type Error: Portable = (); + | ^^^^^^^^^^^^^^^^^^^^^^^^^^ + | + = note: see issue #29661 for more information + = help: add `#![feature(associated_type_defaults)]` to the crate attributes to enable + = note: this compiler was built on 2024-07-01; consider upgrading it if it is out of date + +For more information about this error, try `rustc --explain E0658`. +error: could not compile `jobs` (lib) due to 2 previous errors +``` + +How clear. I guess I don't know why I assumed you could do that; this is even already on `nightly`. The "default" idea comes from generics I think? Anyways, let's check out [that issue](https://github.com/rust-lang/rust/issues/29661). + +Reading that tracking issue, it seems that the most general case works (`trait Foo { type Bar = (); }`) but there's some weirdness surrounding defaults for constants defined in traits that use their associated types, and in `dyn` trait objects? Sounds good enough for me. It's just a little ergonomics thing anyways so no biggie to roll it back if it causes an ICE. + +```rust +#![feature(associated_type_defaults)] + +... +``` + +We need a method to provide some configuration options, and I'd like it to be a little flexible as to how the user provides it. + +```rust +#![feature(associated_type_defaults)] + +use serde::{Deserialize, Serialize}; + +pub struct JobConfig {} + +pub trait Portable: + for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static +{ +} + +impl Deserialize<'a> + Serialize + Send + Sync + 'static> Portable + for T +{ +} + +pub trait Job: Portable { + type Result: Portable = (); + type Error: Portable = (); + + fn config(&self) -> JobConfig; +} +``` + +Seems simple enough. Let's add the `run()` function. We're also adding a little `JobContext` struct that we'll pass as a reference into the `run()` function. + +For now we'll pass an immutable reference and we'll push responsibility onto future us for doing the interior mutability to make that happen. Probably a mutex or something; we won't need a lot of bandwidth out of the context. Whatever -- later. + +```rust +#![feature(associated_type_defaults)] + +use std::future::Future; + +use serde::{Deserialize, Serialize}; + +pub struct JobConfig {} + +pub struct JobContext {} + +pub trait Portable: + for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static +{ +} + +impl Deserialize<'a> + Serialize + Send + Sync + 'static> Portable + for T +{ +} + +pub trait Job: Portable { + type Result: Portable = (); + type Error: Portable = (); + + fn config(&self) -> &JobConfig; + + async fn run( + params: Self, + context: &JobContext, + ) -> Result; +} +``` + +When we use a bare `async fn ...` in a trait, we get a `cargo` warning. + +```sh +warning: use of `async fn` in public traits is discouraged as auto trait bounds cannot be specified + --> crates/jobs/src/lib.rs:27:3 + | +27 | async fn run( + | ^^^^^ + | + = note: you can suppress this lint if you plan to use the trait only in your own code, or do not care about auto traits l +ike `Send` on the `Future` + = note: `#[warn(async_fn_in_trait)]` on by default +help: you can alternatively desugar to a normal `fn` that returns `impl Future` and add any desired bounds such as `Send`, but these cannot be relaxed without a breaking API change + | +27 ~ fn run( +28 | params: Self, +29 | context: &JobContext, +30 ~ ) -> impl std::future::Future> + Send; + | +``` + +Once again, how helpful. "Auto trait bounds cannot be specified" could be a little unclear though. + +Async functions are just functions that return `impl Future`. This means that until `Job` is implemented, the return type implementing `Future` is not concrete. It's only specified by trait bounds. So essentially, we only know as much about that return type as we require with trait bounds. Why would we want to know more about that type? Work-stealing executors (like `tokio`) shuffle futures between threads, so they need futures to be `Send + 'static`. + +So that lint is saying "you can't make sure that the future returned here is `Send` if you use the `async` syntax sugar, but you probably want to". Which we do, so we'll expand the syntax sugar and add the `Send` bound. + +```rust +#![feature(associated_type_defaults)] + +use std::future::Future; + +use serde::{Deserialize, Serialize}; + +pub struct JobConfig {} + +pub struct JobContext {} + +pub trait Portable: + for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static +{ +} + +impl Deserialize<'a> + Serialize + Send + Sync + 'static> Portable + for T +{ +} + +pub trait Job: Portable { + type Result: Portable = (); + type Error: Portable = (); + + fn config(&self) -> JobConfig; + + fn run( + params: Self, + context: &JobContext, + ) -> impl Future> + Send; +} +``` + +I won't say it's pretty, but it's something. diff --git a/style/main.scss b/style/main.scss index f4cee06..2757dd0 100644 --- a/style/main.scss +++ b/style/main.scss @@ -47,7 +47,7 @@ } .markdown pre { - @apply my-2 bg-zinc-800 p-4 w-full rounded border border-zinc-600 text-lg leading-tight whitespace-pre-wrap; + @apply my-2 bg-zinc-800 p-3 w-full rounded border border-zinc-600 text-lg leading-tight whitespace-pre-wrap; } .markdown code { @@ -85,4 +85,4 @@ .markdown .footnote-definition p { @apply my-0; -} \ No newline at end of file +}