Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support incremental backoffs in cloudflare workers #604

Open
1 task done
Starttoaster opened this issue Jul 28, 2024 · 9 comments
Open
1 task done

[Feature] Support incremental backoffs in cloudflare workers #604

Starttoaster opened this issue Jul 28, 2024 · 9 comments

Comments

@Starttoaster
Copy link

Starttoaster commented Jul 28, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Description

Since most of tokio isn't supported (since it does not compile to wasm) I'm not certain if there's a real pattern to support a Cloudflare worker, written in Rust using the workers-rs package, to forward a request to another server using reqwest, with an incremental backoff in the case of a 429 rate limit. It would be the goal of this feature to support in the worker runtime some method of sleeping the worker thread for some incremented period of time until either the request finally succeeds, or Cloudflare kills the thread for taking too long as I believe is already expected behavior with workers.

@kflansburg
Copy link
Contributor

We do support reqwest, async, I think this would be pretty trivial to implement with a loop:

let mut wait = 1000; // milliseconds
let res = loop {
    let res = reqwest::get(url).await?;
    if res.status() != 429 {
        break res;
    }
    sleep(1000).await
    wait *= 2;
};

The only tricky thing is sleeping, but I think you can do something like this:

pub async fn sleep(delay: i32) {
    let mut cb = |resolve: js_sys::Function, reject: js_sys::Function| {
        web_sys::window()
            .unwrap()
            .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, delay);
    };

    let p = js_sys::Promise::new(&mut cb);

    wasm_bindgen_futures::JsFuture::from(p).await.unwrap();
}

@Starttoaster
Copy link
Author

Starttoaster commented Jul 30, 2024

Thanks! This does get me a bit closer to something that actually works, by way of this actually compiling and running on npx run dev, but I'm getting an issue that I'm maybe not experienced enough with js_sys to understand. I'm running this locally, and trying to run a simple bash script that just slams my local worker with a curl request, one after another (not concurrently.)

[wrangler:inf] POST /test 500 Internal Server Error (99ms)
✘ [ERROR] Uncaught (in response) Error: The script will never generate a response.

✘ [ERROR] A hanging Promise was canceled. This happens when the worker runtime is waiting for a Promise from JavaScript to resolve, but has detected that the Promise cannot possibly ever resolve because all code and events related to the Promise's I/O context have already finished.

And I can tell that not all of the upstream requests actually completed successfully. For context this is basically forwarding a webhook's POST body data to a specific upstream webhook URL for a chat service. I believe I'm getting this every time the worker is receiving a 429 from the chat service.

If this makes sense to you, some help would be appreciated. But I'm still looking in case I can figure it out myself..

@kflansburg
Copy link
Contributor

This is reported by the Workers runtime when all futures have executed but not response has returned, so I suspect there is something slightly wrong with my hacky sleep method.

It is also seen when Rust just panics, so we may be hitting those unwraps. Can you add the panic hook and some logging to see if that is happening?

@Starttoaster
Copy link
Author

Actually just figured this out after some edits to the sleep function:

async fn sleep(delay: i32) {
    let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
        let _ = web_sys::window()
            .unwrap()
            .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, delay);
    };

    let p = js_sys::Promise::new(&mut cb);

    wasm_bindgen_futures::JsFuture::from(p).await.unwrap_or_else(|e| {
        eprintln!("An error occurred awaiting JS future: {:?}", e);
        Default::default()
    });
}

The issue was just the unwrap function, which panicked sometimes like you mentioned

@Starttoaster
Copy link
Author

Starttoaster commented Jul 30, 2024

To what you just said: yeah, this does feel a little hacky. It would be excellent for thread sleeping to be smoothed out in workers-rs someday, but this seems to work for now. Thanks again!

Feel free to close the Issue if this doesn't really hold value anymore in your opinion. Like if it's unlikely for workers-rs to do anything in the near or long term to make this a bit more paved in the future. Or if it's already being tracked as a work item somewhere else.

@kflansburg
Copy link
Contributor

To what you just said: yeah, this does feel a little hacky. It would be excellent for thread sleeping to be smoothed out in workers-rs someday, but this seems to work for now. Thanks again!

I think we would end up doing something similar (albeit better tested). It's not very elegant because JS "sleeping" is quirky and involves a setTimeout with callback, and we are just leveraging JS here. But I think this is more or less the right way to implement this.

I think there may be some crates that solve this problem as well. Unfortunately tokio::time requires WASI with time functions implemented (we do support this on Workers, but not workers-rs).

@Starttoaster
Copy link
Author

Starttoaster commented Jul 30, 2024

Hm, I wasn't actually testing this properly. To produce a 429 from my chat app, I had to alter my script to actually run some of the requests concurrently. So I'm essentially running this bash script:

#!/bin/bash

counter=0
max_jobs=10

wait_for_jobs() {
    while [ $(jobs -r | wc -l) -ge $max_jobs ]; do
        sleep 1
    done
}

while true; do
  wait_for_jobs

  curl -X POST localhost:8787/test -d '{post body data here}' &

  counter=$((counter + 1))
done

This limits the number of concurrent requests to 10.

With this, I'm getting some successful POST requests, but most (the ones that are getting 429'd), are causing the worker to http 500. So the hacky function seems to have stopped panicking, but still doesn't actually sleep.

When I invoke my worker, it responds to my http client with a helpful message. In this case received unexpected status code and ran out of retries and it was slamming my shell with these messages, likely meaning no sleeping was actually occurring..

This is what the function looks like that actually calls the sleep function from above:

    pub async fn send(&self, url: &str) -> Result<()> {
        let mut retries = 10;
        let mut backoff_delay = 1;

        loop {
            let result = CLIENT
                .post(url)
                .header(
                    reqwest::header::CONTENT_TYPE,
                    "application/json;charset=UTF-8",
                )
                .body(self.to_string())
                .send()
                .await;

            match result {
                Ok(resp) if resp.status().is_success() => {
                    return Ok(());
                }
                Ok(_resp) => {
                    if retries > 0 {
                        sleep(backoff_delay).await;
                        backoff_delay *= 2;
                        retries -= 1;
                    } else {
                        return Err(anyhow::anyhow!("Request received unexpected status code and ran out of retries"));
                    }
                }
                Err(e) => return Err(anyhow::anyhow!("Failed making request to webhook: {e}")),
            }
        }
    }

@kflansburg
Copy link
Contributor

kflansburg commented Jul 30, 2024

Ok, my bad, there is no window object in the Workers environment, but we can use wasm-bindgen to grab the setTimeout function, this appears to work for me:

#[wasm_bindgen]
extern "C" {
    #[wasm_bindgen(js_name="setTimeout")]
    fn set_timeout(cb: &js_sys::Function, delay: i32);
}

async fn sleep(delay: i32) {
    let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
        set_timeout(&resolve, delay);
    };

    let p = js_sys::Promise::new(&mut cb);

    wasm_bindgen_futures::JsFuture::from(p).await.unwrap_or_else(|e| {
        eprintln!("An error occurred awaiting JS future: {:?}", e);
        Default::default()
    });
}

@Starttoaster
Copy link
Author

You're a legend. Apologies for not getting there myself, but I looked around for what felt like ages online with nobody seemingly trying to do exactly this (at least not using workers-rs.) But it seems like a useful enough thing to document, I assume I'm not the only one using workers as an intermediary between two web services that sometimes gets rate limited.

But yeah, with a sufficient number of retries (hate my chat app for this) it seems my messages are going through.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants