Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Listener on Client Disconnect #1770

Open
HeyITGuyFixIt opened this issue Dec 3, 2023 · 38 comments
Open

Event Listener on Client Disconnect #1770

HeyITGuyFixIt opened this issue Dec 3, 2023 · 38 comments
Labels
enhancement New feature or request.

Comments

@HeyITGuyFixIt
Copy link
Contributor

HeyITGuyFixIt commented Dec 3, 2023

What is the feature you are proposing?

As mentioned in Discord, could we have the ability to create event listeners for client disconnect or abort events? I would expect to be able to use them like c.addListener('abort', () => { ...} and optionally c.on('abort', () => { ...}. Of course, we need the removeListener and off functions as well for completeness.

@HeyITGuyFixIt HeyITGuyFixIt added the enhancement New feature or request. label Dec 3, 2023
@HeyITGuyFixIt
Copy link
Contributor Author

HeyITGuyFixIt commented Dec 3, 2023

Example usage:

c.stream((stream) => {
  return new Promise((resolve, reject) => {
    let callback = (data) => stream.write(data);
    request.addListener('data', callback);
    request.addListener('end', () => {
      request.removeListener('data', callback);
      resolve();
    });
    c.addListener('abort', () => {
      request.removeListener('data', callback);
      resolve();
    });
  })
});

@HeyITGuyFixIt
Copy link
Contributor Author

HeyITGuyFixIt commented Dec 3, 2023

It may also be useful to have something like c.canceled as a Boolean for loops. Example (credit: sven@Discord):

app.get("/all", (c) => {
  return c.streamText(async (stream) => {
    await stream.writeln("[");
    for await (let value of all()) {
      if (c.canceled) break;
      await stream.writeln(`${JSON.stringify(value)},`);
    }
    await stream.writeln("{}]");
  });
});

@yusukebe
Copy link
Member

yusukebe commented Dec 3, 2023

@sor4chi

Do you know how to handle user's cancellation. If you run the c.stream() or c.streamText() on Bun like the following, it will not be stopped.

app.get('/stream', (c) => {
  return c.streamText(async (stream) => {
    for (;;) {
      await stream.writeln('Hello')
      // Can I know the user's cancellation?
      await stream.sleep(1000)
    }
  })
})

@yusukebe
Copy link
Member

yusukebe commented Dec 3, 2023

@HeyITGuyFixIt

What do you mean client disconnect or abort events? ? For example, it might not be enabled to handle closing the browser's tab. Or you use some JavaScript on a client-side?

@HeyITGuyFixIt
Copy link
Contributor Author

@yusukebe I am talking about the client's connection to the server. I would expect an event to fire when the client disconnects from the server. This would apply to more than streaming, as the client could disconnect early from any request that could take time to finish the response.

@yusukebe
Copy link
Member

yusukebe commented Dec 3, 2023

@HeyITGuyFixIt

How can I reproduce "disconnect"?

@HeyITGuyFixIt
Copy link
Contributor Author

@yusukebe you can try using the streamText example above, run it, browse to it, and while it is still streaming, stop loading it in the browser, or if you are using curl, Ctrl+c to disconnect.

@yusukebe
Copy link
Member

yusukebe commented Dec 3, 2023

@HeyITGuyFixIt

Thanks, I can reproduce it. However, I'm not certain if it's possible to detect an event like closing the browser.

@sor4chi, what are your thoughts on this?

@HeyITGuyFixIt
Copy link
Contributor Author

HeyITGuyFixIt commented Dec 3, 2023

@yusukebe node's http.ClientRequest has the abort event that is supposed to fire when "the request has been aborted by the client."

@yusukebe
Copy link
Member

yusukebe commented Dec 3, 2023

@HeyITGuyFixIt

I'm not sure if the abort event can handle browser closing.

I might try implementing a Node.js application that uses the abort event to see how it handles a "disconnect." However, if you have a good example, please share it.

@sor4chi
Copy link
Contributor

sor4chi commented Dec 4, 2023

Hi, @HeyITGuyFixIt @yusukebe

It is not possible with the current implementation, but I think it can be done by taking the abort event from the controller when creating the writable stream and passing a handler from outside the StreamAPI. I'll try it later.

@yusukebe
Copy link
Member

yusukebe commented Dec 5, 2023

I've created an example of a Node.js HTTP server that handles user disconnections. If you close the browser tab, the server stops. We should implement a similar feature in Hono.

import * as http from 'http'
import { Readable } from 'stream'

const server = http.createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain; charset=UTF-8',
    'Transfer-Encoding': 'chunked',
    'X-Content-Type-Options': 'nosniff'
  })

  const readable = new Readable({
    read() {}
  })

  const interval = setInterval(() => {
    const data = `${new Date().toISOString()}`
    console.log(data)
    readable.push(`${data}\n`)
  }, 1000)

  readable.pipe(res)

  req.on('close', () => {
    console.log('Connection is closed')
    clearInterval(interval)
    readable.destroy()
  })
})

const PORT = 3000
server.listen(PORT, () => {
  console.log(PORT)
})

@sor4chi
Copy link
Contributor

sor4chi commented Dec 5, 2023

By the way, can we observe a response cancellation event?
I have no idea to subscribe the event.
I came up with getting AbortController from event, but it didn’t work well.

@yusukebe
Copy link
Member

yusukebe commented Dec 5, 2023

@sor4chi

By the way, can we observe a response cancellation event?

I don't know :) So, I've made the Node.js example.

Which runtime do you try it?

@yusukebe
Copy link
Member

yusukebe commented Dec 5, 2023

This works well on Node.js. But it does not work on Bun well because maybe it has this issue: oven-sh/bun#6758

So, we have to test it on Node.js (or Deno).

app.get('/', async (c) => {
  let cancelationRequested = false

  const readable = new ReadableStream({
    start(controller) {
      const encoder = new TextEncoder()
      ;(async () => {
        for (;;) {
          if (cancelationRequested) {
            console.log(cancelationRequested)
            break
          }
          const input = encoder.encode('Hello\n')
          console.log('Hello')
          controller.enqueue(input)
          await new Promise((resolve) => setTimeout(resolve, 1000))
        }
        try {
          controller.close()
        } catch (e) {
          console.log(e)
        }
      })()
    },
    cancel(reason) {
      console.log('Stream canceled', reason)
      cancelationRequested = true
    },
  })

  return new Response(readable, {
    headers: {
      'content-type': 'text/plain; charset=UTF-8',
      'x-content-type-options': 'nosniff',
      'transfer-encoding': 'chunked',
    },
  })
})

@sor4chi
Copy link
Contributor

sor4chi commented Dec 5, 2023

Cool, surely a stream could get it from cancel or abort.
Can't we get abort from something other than stream as @ HeyITGuyFixIt shows in his code...?

@yusukebe
Copy link
Member

yusukebe commented Dec 5, 2023

@sor4chi

Can't we get abort from something other than stream as @ HeyITGuyFixIt shows in his code...?

I don't know. Please investigate it.

@yusukebe
Copy link
Member

yusukebe commented Dec 9, 2023

After investigating, it appears that we can handle cancellation events, such as closing a browser tab, using ReadableStream because it has a cancel method. However, WritableStream, which is used in c.stream(), lacks a method similar to the cancel method of ReadableStream. Therefore, it might not be possible to handle cancellation with c.stream().

@sor4chi

If you have the time, I would appreciate it if you also could investigate this.

@sor4chi
Copy link
Contributor

sor4chi commented Dec 9, 2023

@yusukebe
even this code did not work in cloudflare workers.

I have code locally that implements the abort event handler on c.stream, but I have a feeling that it might make a lot of difference depending on the runtime.

Should we proceed this?

@sor4chi
Copy link
Contributor

sor4chi commented Dec 9, 2023

2023-12-09.15.30.07.mov

@yusukebe
Copy link
Member

yusukebe commented Dec 9, 2023

@sor4chi

Cool!

I don't know whether to merge or not, but can you make a PR? We can discuss it there.

@sor4chi
Copy link
Contributor

sor4chi commented Dec 9, 2023

Yes, of course.
Just a very hacky writing style, so please hold on a moment to refactor it out.

@satyavh
Copy link

satyavh commented Jan 20, 2024

Confirmed that Hono's stream.onAbort does not work with Bun. The following does work to detect if a stream has been disconnected:

return streamSSE(c, async (stream) => {
   c.req.signal.addEventListener("abort", () => console.log('Stream closed'))
   
   await stream.writeSSE({....})
})

Abort event is now triggered when the client triggers EventSource.close()

@sor4chi
Copy link
Contributor

sor4chi commented Jan 20, 2024

HI, @satyavh!
I would like to know which version of Hono you are using.
In hono <= 3.12.0, onAbort on streamSSE did not work, fixed since 3.12.1.

@satyavh
Copy link

satyavh commented Jan 20, 2024

I'm on Hono 3.12.1, does not work for me with Bun, but could be Bun related (I'm on Bun v1.0.24)

@mvolkmann
Copy link

I'm running into the same issue. I want to find a way to close the server-side connection when the client side calls the close method on the EventSource. I have this working with using Node and Express, but haven't been able to duplicate that with Bun and Hono. I am using Bun 1.0.24 and Hono 3.12.6.

Here is the relevant part of my code.
Note that c.req.signal is deprecated and I think we are supposed to use c.req.raw.signal now.

app.get("/sse", (c: Context) => {
  return streamSSE(c, async (stream) => {
    // This should be invoked when the client calls close on the EventSource,
    // but it is not.
    c.req.raw.signal.addEventListener("abort", () => {
      console.log("got abort event");
      // TODO: How can the connection be closed?
    });

    let count = 0;
    while (count < 10) {
      count++;

      await stream.writeSSE({
        // TODO: Why does the next line trigger an error?
        // event: "count", // optional
        id: String(crypto.randomUUID()), // optional
        data: String(count), // TODO: Is this required to be a string?
      });
    }
  });
});

@sor4chi
Copy link
Contributor

sor4chi commented Jan 21, 2024

@satyavh @mvolkmann

Here is the usage of onAbort that Hono is expecting. Cloud you try this?

app.get("/on-abort", (c) => {
  return streamSSE(c, async (stream) => {
    stream.onAbort(() => {
      console.log("aborted");
    });

    for (let i = 0; i < 10; i++) {
      await stream.write(`data: ${i}\n\n`);
      await stream.sleep(1000);
    }
  });
});

@mvolkmann
Copy link

mvolkmann commented Jan 21, 2024

@sor4chi I tried your suggestion. It did not work for me. The client calls eventSource.close(), but the stream.onAbort callback function is never executed.
You can see my simple client code at https://github.com/mvolkmann/server-sent-events-examples/blob/main/bun-sse/public/index.html and my simple server code at https://github.com/mvolkmann/server-sent-events-examples/blob/main/bun-sse/src/server.ts.

@satyavh
Copy link

satyavh commented Jan 21, 2024

No stream.onAbort() -> {} is definitely never called.

@mvolkmann
c.req.raw.signal.addEventListener("abort", () => {}) works for me, and is triggered when the client triggers EventSource.close()

    c.req.raw.signal.addEventListener("abort", () => {
      console.log("got abort event");
      // TODO: How can the connection be closed?
    });

You don't need to close the connection, because when this is triggered the connection is already closed by the client.

But note that this doesn't stop the actually processing of logic in the endpoint. In other words the loop keeps going forever if you don't stop it manually (in your case until count is 10). Hono only closes the stream connection from the server if you return from the streamSSE callback! The same is true if you trigger stream.close()

Now you can imagine the memory and server load issues this can generate if 1000s of clients connect to your SSE endpoint. All those loops/logic will potentially keep going forever, specially if you keep the connection open as per the documentation using stream.sleep()

So what you need to do is something like this

app.get('/sse', async (c) => {
  return streamSSE(c, async (stream) => {
    let isOpen = true
    
    c.req.raw.signal.addEventListener("abort", () => {
      // this will stop the loop and make sure the streamSSE callback resolves
      isOpen = false
    })
  
    // this loop makes sure the SSE stream stays open, as it makes sure the callback on line 2 never resolves (that is the weird part of Hono)
    while (isOpen) {
      const message = `It is ${new Date().toISOString()}`
      await stream.writeSSE({
        data: message,
        event: 'time-update',
        id: String(id++),
      })
      await stream.sleep(1000)
    }
  })
})

@sor4chi
Copy link
Contributor

sor4chi commented Jan 22, 2024

[IMO] There are the possibility that Bun does not support ReadableStream({ cancel }) in the first place.

Similarly, it has been confirmed that onAbort does not work for cloudflare workers. Currently, the only runtime that has been confirmed to work is node.

@not-reed
Copy link

ended up here with the same issue (bun+sse) and just wanted to share my findings after reading through this and trying out some experiments. reading the above and trying a few variations I found some interesting results that seem to solve my problem at least, and maybe helps someone else.

Using a slightly modified version of the sse example from https://hono.dev/helpers/streaming none of these events are ever called

let id = 0;
app.get("/sse", (c) => {
	c.req.raw.signal.addEventListener("abort", () => {
		console.log("never gets called");
	});

	c.req.raw.signal.onabort = () => {
		console.log("never gets called also");
	};
	return streamSSE(c, async (stream) => {
		stream.onAbort(() => {
			console.log("never gets called also 2");
		});

		while (true) {
			const message = `It is ${new Date().toISOString()}`
			await stream.writeSSE({
			  data: message,
			  event: 'time-update',
			  id: String(id++),
			})
			await stream.sleep(1000)
		}
	});
});

however if i check signal.aborted (such as in the while loop) then these events are called

let id = 0;
app.get("/sse", (c) => {
	c.req.raw.signal.addEventListener("abort", () => {
		console.log("now this gets called");
	});

	c.req.raw.signal.onabort = () => {
		console.log("now this gets called also");
	};
	return streamSSE(c, async (stream) => {
		stream.onAbort(() => {
			console.log("still doesn't get called");
		});

		while (true) {
			// add this line, and the above get called on disconnect :) 
			if (c.req.raw.signal.aborted) {
				break;
			}
			const message = `It is ${new Date().toISOString()}`;
			await stream.writeSSE({
				data: message,
				event: "time-update",
				id: String(id++),
			});
			await stream.sleep(1000);
		}
	});
});

@calebkish
Copy link

calebkish commented Mar 30, 2024

Aborts don't seem to be getting captured until something has been written to the stream. In my case, adding await stream.write('') at the beginning of streamText() made it so that all abort signals are captured.

apiRouter.get('/llm',
  (c) => {
    return streamText(c, async (textStream) => {
      // Abort signals aren't captured if nothing is written to the stream for some reason...
      await textStream.write('');

      const prompt = 'Tell me a short story about a happy Llama that is 2 sentences long.';
      const llmStream = await model.stream(prompt);
      if (c.req.raw.signal.aborted) {
        console.log('Aborted');
        return; // do early return if request was immediately aborted
      }
      for await (const chunk of llmStream) { // llmStream will be locked when in this loop
        if (c.req.raw.signal.aborted) {
          console.log('Aborted');
          break; // will cancel the llmStream
        }
        console.log(chunk);
        await textStream.write(chunk);
      }
    });
  }
);

if I didn't have that await stream.write('') at the beginning, and an abort signal came through, it would be as though the abort never happened. Abort handlers wouldn't get triggered, and c.req.raw.signal.aborted would remain false. Only aborts after the initial write (await stream.write(chunk)) were captured.

@satyavh
Copy link

satyavh commented Jul 1, 2024

It is reall a mess and seems to change with every Hono release.

Right now (Hono 4.4.9) adding this anywhere

        stream.onAbort(() => {
          console.log('>>> stream closed!')
        }) 

will not be triggered, BUT it will make sure that the following always get triggered 🤯

      c.req.raw.signal.addEventListener("abort", () => {
          console.log('>>> stream closed')
      })

Full code

return streamSSE(c, async (stream) => {
    // this is just here to make sure that below gets triggered
    stream.onAbort(() => {}) 
   
   // this will now always get triggered - but only if above line is there 🤯 
    c.req.raw.signal.addEventListener("abort", () => {
          console.log('>>> stream closed')
   })
   
   // keep stream open
   await new Promise(() => {})
})

@2234839
Copy link

2234839 commented Oct 9, 2024

I tried all of the above methods and nothing worked for me, and when I was running with node, there was no way to detect when the client link was broken

@T2Knock
Copy link

T2Knock commented Nov 7, 2024

  stream.onAbort(() => {
      console.log('>>> stream closed!')
    }) 
    
  c.req.raw.signal.addEventListener("abort", () => {
      console.log('>>> stream closed')
  })

Not sure if this still an issue i just tested both way in bun v1.1.34 and hono v4.6.8 and it's work just fine

@Gumichocopengin8
Copy link

Gumichocopengin8 commented Dec 27, 2024

It is not possible with the current implementation, but I think it can be done by taking the abort event from the controller when creating the writable stream and passing a handler from outside the StreamAPI. I'll try it later.

First, thanks for making Hono🔥!
I tested hono 4.6.14 on Node.js.
The following code doesn't trigger the abort when closing the browser. It's not implemented yet?

app.get('/on-abort', (c) => {
  return streamSSE(c, async (stream) => {
    stream.onAbort(() => {
      console.log('not called');
    });

    c.req.raw.signal.addEventListener('abort', () => {
      console.log('not called');
    });

    for (let i = 0; ; i++) {
      if (c.req.raw.signal.aborted) {
        console.log('not called');
        break;
      }
      await stream.write(`data: ${i}\n`);
      console.log('data', i, c.req.raw.signal.aborted);
      await stream.sleep(1000);
    }
  });
});

@yusukebe
Copy link
Member

yusukebe commented Jan 6, 2025

Hi @Gumichocopengin8

I've tried your code on my machine. Is this correct behavior or not?

stream.mp4

@Gumichocopengin8
Copy link

I've tried your code on my machine. Is this correct behavior or not?

Yes, that's the expected behavior. I don't see not called console log on my machine somehow. If the code works on your side, I think there's something wrong in my code or env. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request.
Projects
None yet
Development

No branches or pull requests

10 participants