Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add enqueue_multi #38

Open
kirillsalykin opened this issue Oct 27, 2024 · 14 comments
Open

Add enqueue_multi #38

kirillsalykin opened this issue Oct 27, 2024 · 14 comments

Comments

@kirillsalykin
Copy link

Sometimes you want to enqueue a lot of jobs, like 10_000 or more, inserting task one-by-one is not very performant.
Something like can speed up things a lot

insert into tasks
values (...), (..), (...)

psql is limited params number limite to 65535, so if my calculations are correct, it can be inserted around 5.461,25 tasks at a time, lets round it to 5000.

So if it is needed to enqueue 10_000 tasks, it can be done in two batched by 5_000.

what do you think?

@maxcountryman
Copy link
Owner

Yes some sort of batched enqueue and dequeue makes sense to me.

@kirillsalykin
Copy link
Author

could this one also be for 0.1.0 release?
I may try to work on this

@maxcountryman
Copy link
Owner

Certainly!

@maxcountryman
Copy link
Owner

@kirillsalykin are you planning to take this on?

@kirillsalykin
Copy link
Author

kirillsalykin commented Nov 10, 2024

i plan to work on this next week, lets not postpone release because of this - dont want to be a stopper.
I mean, cut 0.1 release without enqueue_multi

@kirillsalykin
Copy link
Author

hi @maxcountryman
I've started working on the feature and have some thoughts I'd like to share/validate.

I see this implemented as:

    async fn enqueue_multi<'a, E>(&self, executor: E, tasks: &[NewTask]) -> Result<Vec<TaskId>>
    where
        E: PgExecutor<'a> {
     // split `tasks` into batches (like 5000 tasks in a batch, exact number TBD) and insert each batch into db with a tx
     // consider this operatioan atomic
    ...
   }

where NewTask is:

pub struct NewTask {
    pub(crate) input: Value,  
    pub(crate) scheduled_for: Option<Instant>, // <- instead of `delay`
    pub(crate) retry_policy: RetryPolicy,
}

impl NewTask {
   // use a builder instead?
    pub fn new<T: Serialize>(input: T, scheduled_for: Option<Instant>, retry_policy: RetryPolicy) -> Self {
        let input = serde_json::to_value(input).expect("Failed to serialize input");
        Self {
            input,
            scheduled_for,
            retry_policy,
        }
    }
}

This approach lets the user schedule different tasks in one go (as long as they go into the same queue).

Questions:

  1. It seems like NewTask could be used with enqueue, making enqueue_with_delay obsolete.
  2. Do we need Result<Vec<TaskId>>? Is there a case where you'd want to cancel all scheduled tasks? If not, maybe Result<()> is sufficient?
  3. Thoughts on using scheduled_for instead of delay? (I have asked question in discussions around this).

What do you think? I'd love your feedback.

P.S. I’m new to Rust, so feel free to correct any mistakes.

@kirillsalykin
Copy link
Author

which makes me think, maybe Queue should only have enqueue(tasks: &[NewTask]) and job will provide more utility functions, like enqueue, enqueue_multi, enqueue_at, etc
wdyt?

@maxcountryman
Copy link
Owner

maxcountryman commented Nov 11, 2024

I'm a little bit confused re the purpose of the NewTask type: a queue is generic over T: Task so I think enqueue_multi should take &[T] right?

Do we need Result<Vec>? Is there a case where you'd want to cancel all scheduled tasks? If not, maybe Result<()> is sufficient?

I think it's important to return Vec<TaskId> otherwise the caller has no way to reference enqueued tasks later. This is especially true if we do make enqueue a more specific case of &[T].

which makes me think, maybe Queue should only have enqueue(tasks: &[NewTask])

I suspect it's true we may want to implement enqueue in terms of the more general input, yes--I think that's also what pg-boss does.

@kirillsalykin
Copy link
Author

I just checked the Task trait and indeed it provides delay, ttl, retry policy.

thank you for a pointer.

@kirillsalykin
Copy link
Author

Hi @maxcountryman,
Unfortunately, I just received another project that is taking up all of my time (which was already limited).
I’d like to work on this issue after the New Year, but if you’d prefer to take it on yourself, I completely understand.

@maxcountryman
Copy link
Owner

Hey no worries at all! I don't think there's a big rush here, so let's wait for the New Year. If someone comes along and needs it before then we can reassess and see if they'd like to contribute, etc. Very much appreciate your contributions and please don't feel any pressure whatsoever. :)

@kirillsalykin
Copy link
Author

thanks!

@maxcountryman
Copy link
Owner

As an aside here, it looks like this might be somewhat of a challenge in sqlx currently: launchbadge/sqlx#294

The query builder approach may be the best option but it's not perfect.

@kirillsalykin
Copy link
Author

indeed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants