Skip to content

Commit

Permalink
fix: stabilized stream initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
LoaderB0T committed Apr 24, 2024
1 parent d3371e5 commit de5e2bb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "easy-network-stub",
"version": "7.0.3",
"version": "7.1.0",
"repository": "https://github.com/LoaderB0T/easy-network-stub.git",
"author": {
"name": "Janik Schumacher (LoaderB0T)",
Expand Down
30 changes: 30 additions & 0 deletions src/stream/http-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,42 @@ export class HttpStreamResponse {
private _httpServer?: http.Server;
private _port?: number;
private _res?: Res;
private _initializeState: 'none' | 'initializing' | 'initialized' = 'none';
private _clientConnected = false;
private readonly _initializedCallbacks: Array<() => void> = [];
private readonly _clientConnectedCallbacks: Array<() => void> = [];
private readonly _kind: StreamKind;

constructor(kind: StreamKind = 'eventStream') {
this._kind = kind;
}

public async waitForClientConnection() {
if (this._clientConnected) {
return;
}
await new Promise<void>(resolve => {
this._clientConnectedCallbacks.push(resolve);
});
}

public async init() {
if (this._initializeState === 'initialized') {
return;
}
if (this._initializeState === 'initializing') {
return new Promise<void>(resolve => {
this._initializedCallbacks.push(resolve);
});
}
this._initializeState = 'initializing';
const { server, port } = await this._createServer();
this._httpServer = server;
this._port = port;
setTimeout(() => {
this._initializeState = 'initialized';
this._initializedCallbacks.forEach(cb => cb());
}, 100);
}

private async _createServer() {
Expand All @@ -28,6 +54,10 @@ export class HttpStreamResponse {
const server = http.createServer(async (req, res) => {
this._sseStart(res);
this._res = res;
setTimeout(() => {
this._clientConnected = true;
this._clientConnectedCallbacks.forEach(cb => cb());
}, 100);
});

// eslint-disable-next-line no-constant-condition
Expand Down
4 changes: 3 additions & 1 deletion src/stream/stream-response-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ export class StreamResponseHandler extends CustomResponseHandler {
req.reply({ statusCode: 302, headers: { location: this._stream.url } });
}

public send<T>(fragment: T): void {
public async send<T>(fragment: T): Promise<void> {
await this._stream.init();
await this._stream.waitForClientConnection();
this._stream.send(fragment);
}

Expand Down

0 comments on commit de5e2bb

Please sign in to comment.