Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coherent client/server way to work with "infinite" streams #1217

Closed
dionysiusmarquis opened this issue Sep 8, 2024 · 7 comments
Closed

Coherent client/server way to work with "infinite" streams #1217

dionysiusmarquis opened this issue Sep 8, 2024 · 7 comments
Labels
enhancement New feature or request

Comments

@dionysiusmarquis
Copy link

Is your feature request related to a problem? Please describe.
The problen occures when establishing an "infinite" stream server side. I already outlined it here: #1216. There seems to be no way to break/reject the async while yield loop after the client closed the connection.

Describe the solution you'd like
I think it would be coherent to handle it just like client side (following cancellation-and-timeouts#cancellation

This works fine client side. The for await will break Immediately once the abort signal is triggered:

const abort = new AbortController();
const stream = client.infiniteStream({ id: 1 }, { signal: abort.signal });
try {
  for await (const response of stream) {
    console.log("got response message: ", response);
  }
} catch (error) {
  console.log(abort.signal.aborted);
}
setTimeout(() => abort.abort(), 3000);

The server/sending side doesn't seem to reject the generator if the client closes the connection. Trying to follow the same pattern used client side:

async function* infiniteStream(
  req: AccountRequest,
  context: HandlerContext,
) {
  context.signal.addEventListener("abort", (event) => {
    console.log("Client closed the connection."); // this will trigger as expected on client close.
  });
  try {
    while (true) {
      if (context.signal.aborted) break; // this doesn't seem to ever have any effect. It seems the paused while loop will remain infite after aborted.
      yield getDataPromise(req.id); // This will return an unresolved promise, that will resolves with the next server/data event.
    }
  } catch (error) {
    console.error(error);
  }
  console.log("Connection closed."); // this will never be called.
}

Please specify whether the request is for Connect for Web or Connect for
Node.js.

Describe alternatives you've considered
I got arround it by writing the generator "by hand" ({ next() ..., reject() ... }) instead of using while yield, which seems a bit overkill.

@dionysiusmarquis dionysiusmarquis added the enhancement New feature or request label Sep 8, 2024
@srikrsna-buf
Copy link
Member

I replied in the discussion which should unblock you, but yield should have thrown if there is a write error. If you're not seeing that can you make a runnable reproducible example and share it?

@dionysiusmarquis
Copy link
Author

dionysiusmarquis commented Sep 9, 2024

I created a repository with a modified eliza example: dionysiusmarquis/connect-es-issue-1217 (Branches for v1.4 and v2.0)

I was actually wrong that the generator will never be rejected, but its still true, that it won't be rejected immidiately as soon as an abort signal triggers.

So the difference between client and server is:

  • Client generator gets abort signal -> rejects immediately
  • Server triggers abort signal -> client bound generator gets rejected the next time the correponding promise gets resolved

Example:
/src/server.ts#L31
If you set a very high delay value and then close the client connection the client bound generator's while loop will run until the next correponding promise is resolved.

In a system with server wide event promises that are resolved to send out data via rpc, client bound while loops will remain till the next event occures (which in worst case could be quite some time).
It could be solved by having client, instead of server bound promises per event, which then can get rejected on abort signal. But thats quite an overhead that wasn't necessary in the previous non generator environment.

@dionysiusmarquis
Copy link
Author

Here an example that would work:

async *infiniteStream(req: AccountRequest, context: HandlerContext) {
  console.log(`Connected client with id ${req.id}.`);
  const abortPromise = new Promise<void>((resolve) =>
    context.signal.addEventListener("abort", () => {
      resolve();
    }),
  );
  try {
    while (true) {
      yield await Promise.race([
        abortPromise,
        delay(30000),
      ])
    }
  } catch(error) {
    console.error((error as Error).message);
  }
  console.log(`Closed client connection with id ${req.id}.`);
},

Other clients can still register to wait for the event promise and the closed client won't leave a paused while loop till the next resolve. The client part was pretty intuitive, but the server part was harder to get to, when it comes to client close signals.

@srikrsna-buf
Copy link
Member

Ah! I think I see what you want, you want the operation to be cancelled based on the signal. While the example succeeds in ending the request (with slight modification), it doesn't actually cancel the computation. Lets change your example to use fetch:

async *infiniteStream(req: AccountRequest, context: HandlerContext) {
  console.log(`Connected client with id ${req.id}.`);
  const abortPromise = new Promise<void>((resolve, reject) =>
    context.signal.addEventListener("abort", () => {
      reject(); // You need to reject in order for `Promise.race` to throw.
    }),
  );
  try {
    while (true) {
      yield await Promise.race([
        abortPromise,
        fetch('/some/url'),
      ])
    }
  } catch(error) {
    console.error((error as Error).message);
  }
  console.log(`Closed client connection with id ${req.id}.`);
},

Now when the client aborts, yield will throw, but fetch will still happen.

If we change the yield call to:

while (true) {
   yield await fetch('/some/url', { signal: context.signal } );
}

Where the signal is directly passed to fetch, it will not only throw on abort but also cancel the network request.

@dionysiusmarquis
Copy link
Author

That would be a "promnise per client" approach. I guess fetch will eventually resolve/reject based on the signal. In my scenerio there is only one promise that exists on server level. Meaning there is only one promise that any number of clients can "listen" to. Because there is only "one promise for all" it's not possible to reject the promise on client level. Creating a cleint specific abort promise could be a workaround.

Given environment:

  • There is one data promise on server level, that will be resolved once data changes
  • Before resolving the promise a new promise is created, that will be the new current data promise
  • There is a getter function (eg. getDataPromise) that will return the one current data promise
  • There can be any number of generators or async functions to await the current data promise

Why this single promise approach? This way there is no need to track any "listeners". The data provider resolves the promise and don't need to iterate through a pool of registered promises etc.

If i'ts still unclear I'm can also expand the example repo with an example of this single promise approach

@srikrsna-buf
Copy link
Member

Okay, now I understand. If there is one global promise, then using Promise.any is the right approach. One correction I'd do is rejecting the abort promise so that it throws instead of resolving.

Closing this for now, feel free to reach out if you need anything.

@dionysiusmarquis
Copy link
Author

yep you're right reject makes more sense. But Promise.any won't work, I guess, since .any only rejects if no promise resolves.

Here is my current solution: #1216

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants