Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup TransformStream backpressure #3159

Merged
merged 1 commit into from
Nov 23, 2024

Conversation

jasnell
Copy link
Member

@jasnell jasnell commented Nov 22, 2024

The TransformStream would originally start with backpressure enabled as it is supposed to but we failed to reapply backpressure when items were enqueued. This meant that after the first write to the TransformStream, no backpressure signaling would occur. Doh! This fixes it.

Unfortunately a compat flag is necessary because suddenly enforcing backpressure correctly could break existing workers that failed to realize it was broken. I noticed this because making the change broke one of our tests.

export default {
  async fetch(req, env, ctx) {

    async function sleep(ns) {
      await new Promise((res) => setTimeout(res, ns));
    }

    const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 2024 });

    const { writable, readable } = new TransformStream(undefined, strategy);

    let writes = 0, reads = 0;

    // Start writing chunks to the stream
    async function write(writable) {
      const writer = writable.getWriter();
      for (let n = 0; n < 10000; n++) {
        console.log('waiting on backpressure', writer.ready, writer.desiredSize);
        // To correctly await on backpressure, wait on the writer.ready promise
        await writer.ready;
        console.log('backpressure cleared. writing a chunk');
        // You can also wait on the write.write but doing so is unnecessary if you're awaiting
        // on the writer.ready and can actually hurt performance by reducing writer throughput
        writer.write(crypto.getRandomValues(new Uint8Array(1024)));
        console.log('chunk written', ++writes, writer.desiredSize);
      }
      await writer.close();
      console.log('done writing');
    }

    async function read(readable) {
      console.log('reading pending...');
      await sleep(5_000);

      console.log('reading....');
      for await (const chunk of readable) {
        console.log(`read ${chunk.length}`, ++reads);
        await sleep(100);
      }
      console.log('done reading');
    }

    await Promise.all([
      write(writable),
      read(readable)
    ]);

    return new Response("Hello World\n");
  }
};

```
export default {
  async fetch(req, env, ctx) {

    async function sleep(ns) {
      await new Promise((res) => setTimeout(res, ns));
    }

    const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 2024 });

    const { writable, readable } = new TransformStream(undefined, strategy);

    let writes = 0, reads = 0;

    // Start writing chunks to the stream
    async function write(writable) {
      const writer = writable.getWriter();
      for (let n = 0; n < 10000; n++) {
        console.log('waiting on backpressure', writer.ready, writer.desiredSize);
        await writer.ready;
        console.log('backpressure cleared. writing a chunk');
        writer.write(crypto.getRandomValues(new Uint8Array(1024)));
        console.log('chunk written', ++writes, writer.desiredSize);
      }
      await writer.close();
      console.log('done writing');
    }

    async function read(readable) {
      console.log('reading pending...');
      await sleep(5_000);

      // const reader = readable.getReader();
      // console.log(await reader.read());

      console.log('reading....');
      for await (const chunk of readable) {
        console.log(`read ${chunk.length}`, ++reads);
        await sleep(100);
      }
      console.log('done reading');
    }

    await Promise.all([
      write(writable),
      read(readable)
    ]);

    return new Response("Hello World\n");
  }
};
```
@jasnell jasnell force-pushed the jsnell/fixup-transformstream-backpressure branch from 57d23df to 6b58166 Compare November 23, 2024 13:53
@jasnell jasnell merged commit cd2f9da into main Nov 23, 2024
14 checks passed
@jasnell jasnell deleted the jsnell/fixup-transformstream-backpressure branch November 23, 2024 14:50
@byule
Copy link
Collaborator

byule commented Nov 23, 2024

A few questions - can you elaborate on what broke in the example? What change did we need to make for the test to pass?

Also why isn't there any updates to tests if we broke a test? Is the test not run with the compat flag?

@jasnell
Copy link
Member Author

jasnell commented Nov 23, 2024

... Is the test not run with the compat flag?

Correct. The test doesn't use the compat flag. I'm separately working up a set of new tests to verify that backpressure is fully operational.

In the example given above, without the flag, the first write to the writable side of the transform would block as expected waiting on the first read. Once the first read occurred, however, the internal backpressure flag would be set to false and would never be set back to true, meaning the write loop would continue unchecked until all 10k writes occurred. Then all of the reads would happen. With the fix, each write would appropriately block for backpressure so we'd see a mix of writes and reads in step with each other, each read releasing backpressure and each write triggering backpressure.

@byule
Copy link
Collaborator

byule commented Nov 24, 2024

Right - everything you mention here makes sense, but I still don't understand why the test would fail. Does the test check for ordering? Regardless of whether back-pressure is working (data is written to stream, and data is read out of stream), I expect this test to functionally pass.

@jasnell
Copy link
Member Author

jasnell commented Nov 24, 2024

The test fails with the compat flag because it relies on the write and close promises resolving before a transform stream is read from, which those should not because backpressure is being properly applied.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants