-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add middleware system for jobs #584
Conversation
Here, experiment with a middleware-like system that adds middleware functions to job lifecycles, which results in them being invoked during specific phases of a job like as it's being inserted or worked. The most obvious unlock for this is telemetry (e.g. logging, metrics), but it also acts as a building block for features like encrypted jobs.
// | ||
// InsertBegin is *not* invoked on a batch insertion with InsertMany or | ||
// InsertManyTx. Integrations should implement InsertManyBegin separately. | ||
// InsertBegin(ctx context.Context, params *JobLifecycleInsertParams) (context.Context, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I originally tried to write this system as "hooks" instead of middleware, and you can see what the old entrypoints looked like here (left them in as gravestones for the time being). They almost worked, but I found that I was running into two major problems with them compared to middleware:
- It'd be a common thing to want to be put something in context in the "begin" part, then extract that thing in the "end" part (e.g. for metrics or whatever). I accomplished that by returning a context here as you can see, but it was fairly awkward, and I don't think users would've like it.
- The bigger issue is that it was hard to know how the "end" functions should be called (or not called) under various error conditions like a panic or error returned. We would've either need to send back parameters for all of (1) successful return, (2) possible error, (3) panic val, OR just have totally separate functions for when errors occurred, but both those options were extremely awkward. With middleware, the return values are right there, and it's up to the caller to just do whatever they want with them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I think this is probably the cleaner & more flexible option.
@@ -1150,7 +1155,7 @@ func (c *Client[TTx]) ID() string { | |||
return c.config.ID | |||
} | |||
|
|||
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So not 100% sure on this one yet, but the "insert" part of the middleware needs to receive a type that's not a JobRow
because we don't have a job row yet. I basically took JobInsertParams
, duplicated it, and promoted it to rivertype
. The types are different for now, but they can be type converted to one another because they have the same fields. I basically did this because I like the naming of JobInsertParams
, but they could also be the same type or even slightly different types with a couple fields dropped (CreatedAt
for example, which is only needed for time injection).
Either way, JobInsertParams
stays internal for now, so it should leave refactoring flexibility ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about pulling arg encoding out of this helper and passing the middleware functions a type that includes raw unencoded JobArgs
? My thought is that this unlocks more dynamic behavior, because middleware then have the ability to do type assertions against JobArgs
including to assert interface implementations.
The downside is they lose the ability to directly access the encoded json bytes, but then I'm not sure I know of any cases where that's desirable. For metadata, sure, but not for args.
@bgentry Wanted to get a minimal POC out there just so I don't spend too much time on this in case there's backpressure, so didn't write tests or anything like that. Rough thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome progress. I had a few thoughts/concerns to consider whether this design may need some tweaks, but I'm generally good to move forward with it.
func (l *JobMiddlewareDefaults) Insert(ctx context.Context, params *rivertype.JobInsertParams, doInner func(ctx context.Context) (*rivertype.JobInsertResult, error)) (*rivertype.JobInsertResult, error) { | ||
return doInner(ctx) | ||
} | ||
|
||
func (l *JobMiddlewareDefaults) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) (int, error)) (int, error) { | ||
return doInner(ctx) | ||
} | ||
|
||
func (l *JobMiddlewareDefaults) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { | ||
return doInner(ctx) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for this to be the most useful it would need to be customizable on a per-job basis. I'm wondering what that looks like in practice with this design. Like what if I wanted to add a middleware that uses some aspect of the worker or args to dynamically determine what to do? (maybe some optional interface gets fulfilled by either of those types to indicate to the middleware what it should do).
The problem with trying to do that here is the args have already been encoded, so there's no longer any access to the underlying JobArgs
type. Is there any path to potentially having the middleware stack get called before the JSON encoding part? That could more easily enable dynamic behavior based on the type.
Additionally, this might be further exposing the somewhat confusing split between JobArgs
and Worker
implementations. We had some recent customer feedback about it being a little weird that i.e. the timeout must be customized on the Worker
and can't easily be tweaked at insertion time via the args. In this case though you mentioned potentially allowing for middleware to be configured at the JobArgs
level, which seems fine for insert time but IMO doesn't make any sense for the Work()
middleware. I don't want to have two separate middleware stacks/concepts, but it does feel a bit odd to have both of these on a single interface given the way this split is designed today 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up putting JobArgs
into the struct in my uniqueness PR #590 and I think it's a great thing to have available. Can still encode args in advance but just keep the original around for introspection.
// | ||
// InsertBegin is *not* invoked on a batch insertion with InsertMany or | ||
// InsertManyTx. Integrations should implement InsertManyBegin separately. | ||
// InsertBegin(ctx context.Context, params *JobLifecycleInsertParams) (context.Context, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I think this is probably the cleaner & more flexible option.
I rebased this branch on top of #589 now that it's merged. I also refactored our bulk insert methods so they use the same underlying code aside from a narrow adapter function for each query. I think it can be improved further, but IMO it's a good start: https://github.com/riverqueue/river/compare/bg-lifecycle-hooks-on-insert-many-refactor?expand=1 Finally, I had the realization that given the goal of aligning our |
I know we talked about potentially wanting to have the database transaction available as part of the middleware interface, but I think I've talked myself out of that. With the driver concept it becomes pretty tough to do that cleanly, especially given we don't want the driver interfaces to be considered stable. I also don't think it's needed for anything I'm doing at the moment (#627 seems like the way for me to do database-related customizations). |
A version of this was merged in #632 and will be in the next release. |
Here, experiment with a middleware-like system that adds middleware
functions to job lifecycles, which results in them being invoked during
specific phases of a job like as it's being inserted or worked.
The most obvious unlock for this is telemetry (e.g. logging, metrics),
but it also acts as a building block for features like encrypted jobs.