Skip to content

Commit

Permalink
feat(consume): fixing a bug where consume would not connect to a new …
Browse files Browse the repository at this point in the history
…stream if another stream with the same path is currently undergoing connection.
  • Loading branch information
razshare committed May 30, 2024
1 parent 6e0fb2a commit 37f8c0e
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 19 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sveltekit-sse",
"version": "0.13.0",
"version": "0.13.1",
"scripts": {
"dev": "vite dev",
"build": "vite build",
Expand Down
18 changes: 1 addition & 17 deletions src/lib/consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ import { uuid } from './uuid'
* @property {boolean} local
*/

/**
* @type {Map<string, number>}
*/
const connecting = new Map()

/**
* @typedef StreamEvents
* @property {Array<import('./types').EventListener>} onError
Expand Down Expand Up @@ -64,8 +59,6 @@ export function consume({
onClose,
onOpen,
}) {
const key = btoa(JSON.stringify({ resource, options }))

/** @type {StreamEvents} */
const events = {
onClose: [onClose],
Expand Down Expand Up @@ -157,12 +150,6 @@ export function consume({
return
}

if (connecting.has(key)) {
return
}

connecting.set(key, Date.now())

// Reset assumptions on new connections
status = 500
statusText = 'Internal Server Error'
Expand All @@ -183,7 +170,6 @@ export function consume({
status = statusLocal
statusText = statusTextLocal
headers = headersLocal
connecting.delete(key)

if (ok && headers.get('content-type') === EventStreamContentType) {
for (const onOpen of events.onOpen) {
Expand Down Expand Up @@ -255,9 +241,7 @@ export function consume({
})
}

connect().then(function connected() {
connecting.delete(key)
})
connect()

return result
}
3 changes: 2 additions & 1 deletion src/lib/source.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ const cachedConnectables = new Map()
* @returns {Connectable}
*/
function connectable({ resource, cache, options, onClose, onError, onOpen }) {
const key = btoa(JSON.stringify({ resource, options }))
let key = ''

if (cache) {
key = btoa(JSON.stringify({ resource, options }))
const cachedConnectable = cachedConnectables.get(key)
if (cachedConnectable) {
return cachedConnectable
Expand Down
10 changes: 10 additions & 0 deletions src/routes/issue-48/+page.svelte
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<script>
import { sourceSSE } from './sse'
sourceSSE('/issue-48', ['event0'])
sourceSSE('/issue-48', ['event0'])
// setTimeout(function run() {
// sourceSSE('/issue-48', ['event0'])
// }, 1000)
</script>
5 changes: 5 additions & 0 deletions src/routes/issue-48/+server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { produceSSE } from './sse'

export function POST() {
return produceSSE(['event0', 'event1'])
}
26 changes: 26 additions & 0 deletions src/routes/issue-48/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { produce, source } from '$lib'

/**
*
* @param {Array<string>} eventNames
*/
export function produceSSE(eventNames) {
return produce(function start({ emit }) {
for (const eventName of eventNames) {
const data = { time: Date.now() }
emit(eventName, JSON.stringify(data))
}
})
}

/**
*
* @param {string} route
* @param {Array<string>} eventNames
*/
export function sourceSSE(route, eventNames) {
for (const eventName of eventNames) {
const sse = source(route, { cache: true }).select(eventName).json()
sse.subscribe(console.log)
}
}

0 comments on commit 37f8c0e

Please sign in to comment.