Skip to content

Commit

Permalink
dump
Browse files Browse the repository at this point in the history
  • Loading branch information
johnbchron committed Jul 10, 2024
1 parent e2cbfc2 commit b73c1a6
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 2 deletions.
244 changes: 244 additions & 0 deletions content/posts/a-new-job-scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,247 @@ Alright so what are the parts involved? Obviously some kind of `Job` trait, wher
For manipulating jobs, we need some sort of `Client` type that lets you submit, update, read the status of, and cancel jobs. We need a job `Runner` which queries the store and runs jobs that can be run. The runner is also responsible for supplying the context to the job.

The runner should "claim" a job so that we can have multiple runners at the same time, and the runner should also actively tick the job in the job store periodically while the job run function is running so that it can tell when another runner has forfeited a job (the job is "claimed" but was last ticked too long ago). The final responsibility of the runner is to aggregate logs and put them in long-term storage.

Let's get started!

## The `Job` Trait

```sh
cargo new jobs --lib
```

```rust
pub trait Job {

}
```

This is boring. What did I say we need again? Oh right; let's start with the (associated) return type and error type.

```rust
pub trait Job {
type Result;
type Error;
}
```

We obviously need to enforce some bounds on these types that allow us to 1) move the job around from thread to thread, and 2) pack the job away into the database and pull it back out again. So that translates to `Send + Sync + 'static` and `Deserialize + Serialize`, respectively.

```rust
use serde::{Deserialize, Serialize};

pub trait Job: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static {
type Result: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static;
type Error: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static;
}
```

The `for<'a> Deserialize<'a>` is because `Deserialize` the trait operates on a reference to the type instead of consuming it. We could use `DeserializeOwned` to consume it instead and get rid of the lifetime, but in this case there's no reason to.

These trait bounds are quite verbose though, so let's use a marker trait to be more concise.

```rust
use serde::{Deserialize, Serialize};

pub trait Portable:
for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static
{
}

impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static> Portable
for T
{
}

pub trait Job: Portable {
type Result: Portable;
type Error: Portable;
}
```

So what we're doing here is making a trait `Portable` with all the bounds described earlier, so anything that implements `Portable` is guaranteed to also implement `for<'a> Deserialize<'a> + Ser...`, and the compiler knows this. Then in the second block we're implementing `Portable` for everything that already meets the bounds. The `Portable` trait is now effectively an alias for `for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static`.

Let's add defaults for the associated types, so that we can leave them out when implementing `Job` if we don't need them. Let's set them to the unit type, `()`.

```rust
use serde::{Deserialize, Serialize};

pub trait Portable:
for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static
{
}

impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static> Portable
for T
{
}

pub trait Job: Portable {
type Result: Portable = ();
type Error: Portable = ();
}
```

The compiler tells us:
```sh
error[E0658]: associated type defaults are unstable
--> crates/jobs/src/lib.rs:22:3
|
22 | type Result: Portable = ();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: see issue #29661 <https://github.com/rust-lang/rust/issues/29661> for more information
= help: add `#![feature(associated_type_defaults)]` to the crate attributes to enable
= note: this compiler was built on 2024-07-01; consider upgrading it if it is out of date

error[E0658]: associated type defaults are unstable
--> crates/jobs/src/lib.rs:23:3
|
23 | type Error: Portable = ();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: see issue #29661 <https://github.com/rust-lang/rust/issues/29661> for more information
= help: add `#![feature(associated_type_defaults)]` to the crate attributes to enable
= note: this compiler was built on 2024-07-01; consider upgrading it if it is out of date

For more information about this error, try `rustc --explain E0658`.
error: could not compile `jobs` (lib) due to 2 previous errors
```
How clear. I guess I don't know why I assumed you could do that; this is even already on `nightly`. The "default" idea comes from generics I think? Anyways, let's check out [that issue](https://github.com/rust-lang/rust/issues/29661).
Reading that tracking issue, it seems that the most general case works (`trait Foo { type Bar = (); }`) but there's some weirdness surrounding defaults for constants defined in traits that use their associated types, and in `dyn` trait objects? Sounds good enough for me. It's just a little ergonomics thing anyways so no biggie to roll it back if it causes an ICE.
```rust
#![feature(associated_type_defaults)]
...
```
We need a method to provide some configuration options, and I'd like it to be a little flexible as to how the user provides it.
```rust
#![feature(associated_type_defaults)]
use serde::{Deserialize, Serialize};
pub struct JobConfig {}
pub trait Portable:
for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static
{
}
impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static> Portable
for T
{
}
pub trait Job: Portable {
type Result: Portable = ();
type Error: Portable = ();
fn config(&self) -> JobConfig;
}
```
Seems simple enough. Let's add the `run()` function. We're also adding a little `JobContext` struct that we'll pass as a reference into the `run()` function.
For now we'll pass an immutable reference and we'll push responsibility onto future us for doing the interior mutability to make that happen. Probably a mutex or something; we won't need a lot of bandwidth out of the context. Whatever -- later.
```rust
#![feature(associated_type_defaults)]
use std::future::Future;
use serde::{Deserialize, Serialize};
pub struct JobConfig {}
pub struct JobContext {}
pub trait Portable:
for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static
{
}
impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static> Portable
for T
{
}
pub trait Job: Portable {
type Result: Portable = ();
type Error: Portable = ();
fn config(&self) -> &JobConfig;
async fn run(
params: Self,
context: &JobContext,
) -> Result<Self::Result, Self::Error>;
}
```
When we use a bare `async fn ...` in a trait, we get a `cargo` warning.
```sh
warning: use of `async fn` in public traits is discouraged as auto trait bounds cannot be specified
--> crates/jobs/src/lib.rs:27:3
|
27 | async fn run(
| ^^^^^
|
= note: you can suppress this lint if you plan to use the trait only in your own code, or do not care about auto traits l
ike `Send` on the `Future`
= note: `#[warn(async_fn_in_trait)]` on by default
help: you can alternatively desugar to a normal `fn` that returns `impl Future` and add any desired bounds such as `Send`, but these cannot be relaxed without a breaking API change
|
27 ~ fn run(
28 | params: Self,
29 | context: &JobContext,
30 ~ ) -> impl std::future::Future<Output = Result<Self::Result, Self::Error>> + Send;
|
```
Once again, how helpful. "Auto trait bounds cannot be specified" could be a little unclear though.
Async functions are just functions that return `impl Future<Output = Something>`. This means that until `Job` is implemented, the return type implementing `Future` is not concrete. It's only specified by trait bounds. So essentially, we only know as much about that return type as we require with trait bounds. Why would we want to know more about that type? Work-stealing executors (like `tokio`) shuffle futures between threads, so they need futures to be `Send + 'static`.
So that lint is saying "you can't make sure that the future returned here is `Send` if you use the `async` syntax sugar, but you probably want to". Which we do, so we'll expand the syntax sugar and add the `Send` bound.
```rust
#![feature(associated_type_defaults)]
use std::future::Future;
use serde::{Deserialize, Serialize};
pub struct JobConfig {}
pub struct JobContext {}
pub trait Portable:
for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static
{
}
impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + 'static> Portable
for T
{
}
pub trait Job: Portable {
type Result: Portable = ();
type Error: Portable = ();
fn config(&self) -> JobConfig;
fn run(
params: Self,
context: &JobContext,
) -> impl Future<Output = Result<Self::Result, Self::Error>> + Send;
}
```
I won't say it's pretty, but it's something.
4 changes: 2 additions & 2 deletions style/main.scss
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
}

.markdown pre {
@apply my-2 bg-zinc-800 p-4 w-full rounded border border-zinc-600 text-lg leading-tight whitespace-pre-wrap;
@apply my-2 bg-zinc-800 p-3 w-full rounded border border-zinc-600 text-lg leading-tight whitespace-pre-wrap;
}

.markdown code {
Expand Down Expand Up @@ -85,4 +85,4 @@

.markdown .footnote-definition p {
@apply my-0;
}
}

0 comments on commit b73c1a6

Please sign in to comment.