Skip to content

Commit 0b192a7

Browse files
committed
wip: sse single connection mode
1 parent f8cec32 commit 0b192a7

File tree

7 files changed

+535
-12
lines changed

7 files changed

+535
-12
lines changed

packages/event-target/redis-event-target/src/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ export function createRedisEventTarget<TEvent extends CustomEvent>(
4141
if (callbacks === undefined) {
4242
callbacks = new Set();
4343
callbacksForTopic.set(topic, callbacks);
44-
45-
subscribeClient.subscribe(topic);
44+
callbacks.add(callback);
45+
return subscribeClient.subscribe(topic).then(() => undefined);
4646
}
4747
callbacks.add(callback);
48+
return;
4849
}
4950

5051
function removeCallback(topic: string, callback: (event: TEvent) => void) {
@@ -65,7 +66,7 @@ export function createRedisEventTarget<TEvent extends CustomEvent>(
6566
if (callbackOrOptions != null) {
6667
const callback =
6768
'handleEvent' in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
68-
addCallback(topic, callback);
69+
return addCallback(topic, callback);
6970
}
7071
},
7172
dispatchEvent(event: TEvent) {

packages/event-target/typed-event-target/src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@ export type TypedEventListenerOrEventListenerObject<TEvent extends CustomEvent>
1111
| TypedEventListenerObject<TEvent>;
1212

1313
export interface TypedEventTarget<TEvent extends CustomEvent> extends EventTarget {
14+
/**
15+
* If the return value is a promise, the promise will resolve once the event listener has been set up.
16+
*/
1417
addEventListener(
1518
type: string,
1619
callback: TypedEventListenerOrEventListenerObject<TEvent> | null,
1720
options?: AddEventListenerOptions | boolean,
18-
): void;
21+
): void | Promise<void>;
1922
dispatchEvent(event: TEvent): boolean;
2023
removeEventListener(
2124
type: string,
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
import { createSchema, createYoga } from 'graphql-yoga';
2+
import { useSSESingleConnection } from '../src/plugins/use-sse-single-connection';
3+
4+
describe('SSE Single Connection plugin', () => {
5+
it('reservation conflict', async () => {
6+
const abortController = new AbortController();
7+
const yoga = createYoga({
8+
plugins: [useSSESingleConnection()],
9+
schema: createSchema({
10+
typeDefs: /* GraphQL */ `
11+
type Query {
12+
hello: String
13+
}
14+
15+
type Subscription {
16+
hello: String
17+
}
18+
`,
19+
resolvers: {
20+
Subscription: {
21+
hello: {
22+
async *subscribe() {
23+
yield { hello: 'Hello' };
24+
},
25+
},
26+
},
27+
},
28+
}),
29+
});
30+
31+
const url = new URL('http://yoga/graphql');
32+
const eventStreamRequest = await yoga.fetch(url, {
33+
method: 'GET',
34+
headers: {
35+
'x-graphql-event-stream-token': 'my-token',
36+
accept: 'text/event-stream',
37+
},
38+
signal: abortController.signal,
39+
});
40+
41+
const iterator = eventStreamRequest.body![Symbol.asyncIterator]();
42+
const next = await iterator.next();
43+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
44+
"event: ready
45+
46+
"
47+
`);
48+
49+
const conflictRequest = await yoga.fetch(url, {
50+
method: 'GET',
51+
headers: {
52+
'x-graphql-event-stream-token': 'my-token',
53+
accept: 'text/event-stream',
54+
},
55+
});
56+
57+
expect(conflictRequest.status).toEqual(409);
58+
expect(await conflictRequest.text()).toMatchInlineSnapshot(`""`);
59+
abortController.abort();
60+
await expect(iterator.next()).rejects.toThrow('aborted');
61+
});
62+
63+
it('execute query', async () => {
64+
const abortController = new AbortController();
65+
const yoga = createYoga({
66+
plugins: [useSSESingleConnection()],
67+
schema: createSchema({
68+
typeDefs: /* GraphQL */ `
69+
type Query {
70+
hello: String
71+
}
72+
`,
73+
}),
74+
});
75+
76+
const url = new URL('http://yoga/graphql');
77+
const eventStreamRequest = await yoga.fetch(url, {
78+
method: 'GET',
79+
headers: {
80+
'x-graphql-event-stream-token': 'my-token',
81+
accept: 'text/event-stream',
82+
},
83+
signal: abortController.signal,
84+
});
85+
86+
const iterator = eventStreamRequest.body![Symbol.asyncIterator]();
87+
let next = await iterator.next();
88+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
89+
"event: ready
90+
91+
"
92+
`);
93+
94+
const executeRequest = await yoga.fetch(url, {
95+
method: 'POST',
96+
body: JSON.stringify({
97+
query: `query { hello }`,
98+
extensions: {
99+
operationId: 'abc',
100+
},
101+
}),
102+
headers: {
103+
'x-graphql-event-stream-token': 'my-token',
104+
'content-type': 'application/json',
105+
},
106+
});
107+
108+
expect(executeRequest.status).toEqual(202);
109+
expect(await executeRequest.text()).toMatchInlineSnapshot(`""`);
110+
next = await iterator.next();
111+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
112+
"event: next
113+
id: abc
114+
data: {"id":"abc","payload":{"data":{"hello":null}}}
115+
116+
"
117+
`);
118+
abortController.abort();
119+
});
120+
121+
it('execute mutation', async () => {
122+
const abortController = new AbortController();
123+
const yoga = createYoga({
124+
plugins: [useSSESingleConnection()],
125+
schema: createSchema({
126+
typeDefs: /* GraphQL */ `
127+
type Query {
128+
hello: String
129+
}
130+
type Mutation {
131+
hello: String
132+
}
133+
`,
134+
}),
135+
});
136+
137+
const url = new URL('http://yoga/graphql');
138+
const eventStreamRequest = await yoga.fetch(url, {
139+
method: 'GET',
140+
headers: {
141+
'x-graphql-event-stream-token': 'my-token',
142+
accept: 'text/event-stream',
143+
},
144+
signal: abortController.signal,
145+
});
146+
147+
const iterator = eventStreamRequest.body![Symbol.asyncIterator]();
148+
let next = await iterator.next();
149+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
150+
"event: ready
151+
152+
"
153+
`);
154+
155+
const executeRequest = await yoga.fetch(url, {
156+
method: 'POST',
157+
body: JSON.stringify({
158+
query: `mutation { hello }`,
159+
extensions: {
160+
operationId: 'abc',
161+
},
162+
}),
163+
headers: {
164+
'x-graphql-event-stream-token': 'my-token',
165+
'content-type': 'application/json',
166+
},
167+
});
168+
169+
expect(executeRequest.status).toEqual(202);
170+
expect(await executeRequest.text()).toMatchInlineSnapshot(`""`);
171+
next = await iterator.next();
172+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
173+
"event: next
174+
id: abc
175+
data: {"id":"abc","payload":{"data":{"hello":null}}}
176+
177+
"
178+
`);
179+
abortController.abort();
180+
});
181+
182+
it('execute subscription', async () => {
183+
const abortController = new AbortController();
184+
const yoga = createYoga({
185+
plugins: [useSSESingleConnection()],
186+
schema: createSchema({
187+
typeDefs: /* GraphQL */ `
188+
type Query {
189+
hello: String
190+
}
191+
type Subscription {
192+
hello: String
193+
}
194+
`,
195+
resolvers: {
196+
Subscription: {
197+
hello: {
198+
async *subscribe() {
199+
yield { hello: 'Hello' };
200+
},
201+
},
202+
},
203+
},
204+
}),
205+
});
206+
207+
const url = new URL('http://yoga/graphql');
208+
const eventStreamRequest = await yoga.fetch(url, {
209+
method: 'GET',
210+
headers: {
211+
'x-graphql-event-stream-token': 'my-token',
212+
accept: 'text/event-stream',
213+
},
214+
signal: abortController.signal,
215+
});
216+
217+
const iterator = eventStreamRequest.body![Symbol.asyncIterator]();
218+
let next = await iterator.next();
219+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
220+
"event: ready
221+
222+
"
223+
`);
224+
225+
const executionURL = new URL('http://yoga/graphql');
226+
executionURL.searchParams.set('query', 'subscription { hello }');
227+
executionURL.searchParams.set('extensions', JSON.stringify({ operationId: 'abc' }));
228+
229+
const executeRequest = await yoga.fetch(executionURL, {
230+
method: 'GET',
231+
headers: {
232+
'x-graphql-event-stream-token': 'my-token',
233+
},
234+
});
235+
236+
expect(executeRequest.status).toEqual(202);
237+
expect(await executeRequest.text()).toMatchInlineSnapshot(`""`);
238+
next = await iterator.next();
239+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
240+
"event: next
241+
id: abc
242+
data: {"id":"abc","payload":{"data":{"hello":"Hello"}}}
243+
244+
"
245+
`);
246+
abortController.abort();
247+
});
248+
249+
it('concurrent operations', async () => {
250+
const d = createDeferred();
251+
const yoga = createYoga({
252+
plugins: [useSSESingleConnection()],
253+
schema: createSchema({
254+
typeDefs: /* GraphQL */ `
255+
type Query {
256+
hello: String
257+
}
258+
259+
type Subscription {
260+
hello: String
261+
}
262+
`,
263+
resolvers: {
264+
Subscription: {
265+
hello: {
266+
async *subscribe() {
267+
await d.promise;
268+
yield { hello: 'Hello' };
269+
},
270+
},
271+
},
272+
},
273+
}),
274+
});
275+
276+
const url = new URL('http://yoga/graphql');
277+
url.searchParams.set('query', 'subscription { hello }');
278+
const response1 = await yoga.fetch(url, {
279+
headers: {
280+
'X-GraphQL-Event-Stream-Token': 'my-token',
281+
},
282+
});
283+
const response1Iterator = response1.body![Symbol.asyncIterator]();
284+
const response1First = await response1Iterator.next();
285+
expect(Buffer.from(response1First.value).toString('utf-8')).toMatchInlineSnapshot(`
286+
":
287+
288+
"
289+
`);
290+
const response2 = await yoga.fetch(url, {
291+
headers: {
292+
'X-GraphQL-Event-Stream-Token': 'my-token',
293+
},
294+
});
295+
const response2Iterator = response2.body![Symbol.asyncIterator]();
296+
const response2First = await response2Iterator.next();
297+
expect(Buffer.from(response2First.value).toString('utf-8')).toMatchInlineSnapshot(`
298+
":
299+
300+
"
301+
`);
302+
});
303+
});
304+
305+
type Deferred<T = void> = {
306+
resolve: (value: T) => void;
307+
reject: (value: unknown) => void;
308+
promise: Promise<T>;
309+
};
310+
311+
function createDeferred<T = void>(): Deferred<T> {
312+
const d = {} as Deferred<T>;
313+
d.promise = new Promise<T>((resolve, reject) => {
314+
d.resolve = resolve;
315+
d.reject = reject;
316+
});
317+
return d;
318+
}

0 commit comments

Comments
 (0)