Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): support sse in network fetch #2482

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/network/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ npm install @univerjs/network
# Using pnpm
pnpm add @univerjs/network
```

## Credit

This package is inspired by [Angular's implementation](https://github.com/angular/angular/tree/main/packages/common/http). Removed zone.js dependency and features that are not necessary for Univer.
2 changes: 1 addition & 1 deletion packages/network/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export { FetchHTTPImplementation } from './services/http/implementations/fetch';
export { XHRHTTPImplementation } from './services/http/implementations/xhr';
export { HTTPRequest, type HTTPRequestMethod } from './services/http/request';
export { type HTTPResponseType, HTTPStatusCode } from './services/http/http';
export { HTTPResponse, type HTTPEvent, HTTPResponseError } from './services/http/response';
export { HTTPResponse, type HTTPEvent, HTTPResponseError, HTTPEventType, HTTPProgress, ResponseHeader, type HTTPResponseBody } from './services/http/response';
export {
type ISocket,
ISocketService,
Expand Down
58 changes: 27 additions & 31 deletions packages/network/src/services/http/http.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { Disposable, remove, toDisposable } from '@univerjs/core';
import type { IDisposable } from '@wendellhu/redi';
import type { Observable } from 'rxjs';
import { firstValueFrom, of } from 'rxjs';
import { concatMap, map } from 'rxjs/operators';
import { concatMap } from 'rxjs/operators';

import { HTTPHeaders } from './headers';
import type { HTTPResponseType } from './http';
Expand All @@ -28,7 +28,6 @@ import { HTTPParams } from './params';
import type { HTTPRequestMethod } from './request';
import { HTTPRequest } from './request';
import type { HTTPEvent } from './response';
import { HTTPResponse, HTTPResponseError } from './response';
import type { HTTPHandlerFn, HTTPInterceptorFn, RequestPipe } from './interceptor';

export interface IRequestParams {
Expand All @@ -37,9 +36,16 @@ export interface IRequestParams {

/** Query headers. */
headers?: { [key: string]: string | number | boolean };

/** Expected types of the response data. */
responseType?: HTTPResponseType;

withCredentials?: boolean;

/**
* Should report progress.
*/
reportProgress?: boolean;
}

export interface IPostRequestParams extends IRequestParams {
Expand All @@ -52,6 +58,7 @@ export interface IPostRequestParams extends IRequestParams {
export interface IHTTPInterceptor {
/** The priority of the interceptor. The higher the value, the earlier the interceptor is called. */
priority?: number;

/** The interceptor function. */
interceptor: HTTPInterceptorFn;
}
Expand All @@ -61,8 +68,9 @@ export interface IHTTPInterceptor {
*
* You can use interceptors to:
*
* 1. modify requests (headers included) before they are sent, or modify responses before they are returned to the caller.
* 2. thresholding, logging, caching, etc.
* 1. modify requests (headers included) before they are sent, or modify responses
* before they are returned to the caller.
* 2. threshold, logging, caching, etc.
* 3. authentication, authorization, etc.
*/
export class HTTPService extends Disposable {
Expand Down Expand Up @@ -94,31 +102,31 @@ export class HTTPService extends Disposable {
return toDisposable(() => remove(this._interceptors, interceptor));
}

get<T>(url: string, options?: IRequestParams): Promise<HTTPResponse<T>> {
return this._request<T>('GET', url, options);
get<T>(url: string, params?: IRequestParams): Promise<HTTPEvent<T>> {
return this._request<T>('GET', url, params);
}

post<T>(url: string, options?: IPostRequestParams): Promise<HTTPResponse<T>> {
return this._request<T>('POST', url, options);
post<T>(url: string, params?: IPostRequestParams): Promise<HTTPEvent<T>> {
return this._request<T>('POST', url, params);
}

put<T>(url: string, options?: IPostRequestParams): Promise<HTTPResponse<T>> {
return this._request<T>('PUT', url, options);
put<T>(url: string, params?: IPostRequestParams): Promise<HTTPEvent<T>> {
return this._request<T>('PUT', url, params);
}

delete<T>(url: string, options?: IRequestParams): Promise<HTTPResponse<T>> {
return this._request<T>('DELETE', url, options);
delete<T>(url: string, params?: IRequestParams): Promise<HTTPEvent<T>> {
return this._request<T>('DELETE', url, params);
}

patch<T>(url: string, options?: IPostRequestParams): Promise<HTTPResponse<T>> {
patch<T>(url: string, options?: IPostRequestParams): Promise<HTTPEvent<T>> {
return this._request<T>('PATCH', url, options);
}

getSSE<T>(
method: HTTPRequestMethod,
url: string,
options?: IPostRequestParams
): Observable<HTTPResponse<T>> {
): Observable<HTTPEvent<T>> {
// Things to do when sending a HTTP request:
// 1. Generate HTTPRequest/HTTPHeader object
// 2. Call interceptors and finally the HTTP implementation.
Expand All @@ -127,29 +135,21 @@ export class HTTPService extends Disposable {
const request = new HTTPRequest(method, url, {
headers,
params,
withCredentials: options?.withCredentials ?? false, // default value for withCredentials is false by MDN
withCredentials: options?.withCredentials ?? false,
reportProgress: true,
responseType: options?.responseType ?? 'json',
body: (['GET', 'DELETE'].includes(method)) ? undefined : (options as IPostRequestParams)?.body,
});

return of(request).pipe(
concatMap((request) => this._runInterceptorsAndImplementation(request)),
map((response) => {
if (response instanceof HTTPResponseError) {
throw response;
}

return response;
})
);
return of(request).pipe(concatMap((request) => this._runInterceptorsAndImplementation(request)));
}

/** The HTTP request implementations */
private async _request<T>(
method: HTTPRequestMethod,
url: string,
options?: IRequestParams
): Promise<HTTPResponse<T>> {
): Promise<HTTPEvent<T>> {
// Things to do when sending a HTTP request:
// 1. Generate HTTPRequest/HTTPHeader object
// 2. Call interceptors and finally the HTTP implementation.
Expand All @@ -171,11 +171,7 @@ export class HTTPService extends Disposable {
// The event$ may emit multiple values, but we only care about the first one.
// We may need to care about other events (especially progress events) in the future.
const result = await firstValueFrom(events$);
if (result instanceof HTTPResponse) {
return result;
}

throw new Error(`${(result as HTTPResponseError).error}`);
return result;
}

// eslint-disable-next-line ts/no-explicit-any
Expand Down
20 changes: 17 additions & 3 deletions packages/network/src/services/http/implementations/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ import type { Subscriber } from 'rxjs';
import { Observable } from 'rxjs';
import type { HTTPRequest } from '../request';
import type { HTTPEvent, HTTPResponseBody } from '../response';
import { HTTPResponse, HTTPResponseError } from '../response';
import { HTTPProgress, HTTPResponse, HTTPResponseError } from '../response';
import { HTTPHeaders } from '../headers';
import { HTTPStatusCode } from '../http';
import type { IHTTPImplementation } from './implementation';

// CREDIT: This implementation is inspired by (and uses lots of code from) Angular's HttpClient implementation.

/**
* An HTTP implementation using Fetch API. This implementation can both run in browser and Node.js.
*
Expand Down Expand Up @@ -106,14 +104,30 @@ export class FetchHTTPImplementation implements IHTTPImplementation {
): Promise<HTTPResponseBody> {
const chunks: Uint8Array[] = [];
const reader = response.body!.getReader();
const contentLength = response.headers.get('content-length');

let receivedLength = 0;

const reportProgress = request.requestParams?.reportProgress;
const responseType = request.responseType;
let partialText: string | undefined;
let decoder: TextDecoder;

while (true) {
const { done, value } = await reader.read();
if (done) break;

chunks.push(value);
receivedLength += value.length;

if (reportProgress && responseType === 'text') {
partialText = (partialText ?? '') + (decoder ??= new TextDecoder()).decode(value, { stream: true });
subscriber.next(new HTTPProgress(
contentLength ? Number.parseInt(contentLength, 10) : undefined,
receivedLength,
partialText
));
}
}

const all = mergeChunks(chunks, receivedLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('test "HTTPRetryInterceptor"', () => {

const request = httpService.get('http://example.com');
request.then((response) => {
expect(response.body).toEqual({ text: 'Succeeded' });
expect((response as HTTPResponse<{ text: string }>).body).toEqual({ text: 'Succeeded' });
done();
});

Expand Down
4 changes: 4 additions & 0 deletions packages/network/src/services/http/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import type { HTTPParams } from './params';

export type HTTPRequestMethod = 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH';

/**
* @internal
*/
export interface IHTTPRequestParams {
// eslint-disable-next-line ts/no-explicit-any
body?: any;
headers: HTTPHeaders;
params?: HTTPParams;
responseType: HTTPResponseType;
withCredentials: boolean;
reportProgress?: boolean;
}

let HTTPRequestUID = 0;
Expand Down
71 changes: 56 additions & 15 deletions packages/network/src/services/http/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,24 @@

import type { HTTPHeaders } from './headers';

/**
* There are multiple events could be resolved from the HTTP server.
*/
export type HTTPEvent<T> = HTTPResponse<T> | HTTPResponseError;
export type HTTPEvent<T> = HTTPResponse<T> | HTTPProgress;
export enum HTTPEventType {
DownloadProgress,
Response,
}

interface IHTTPEvent {
type: HTTPEventType;
}

export type HTTPResponseBody = string | ArrayBuffer | Blob | object | null;

/** Wraps (success) response info. */
export class HTTPResponse<T> {
/**
* Wraps success response info.
*/
export class HTTPResponse<T> implements IHTTPEvent {
readonly type = HTTPEventType.Response;

readonly body: T;
readonly headers: HTTPHeaders;
readonly status: number;
Expand All @@ -48,6 +57,46 @@ export class HTTPResponse<T> {
}
}

/**
* Progress event for HTTP request. Usually used for reporting download/upload progress or SSE streaming.
*/
export class HTTPProgress implements IHTTPEvent {
readonly type = HTTPEventType.DownloadProgress;

constructor(
/**
* Total number of bytes to download. Depending on the request or
* response, this may not be computable and thus may not be present.
*/
public readonly total: number | undefined,

/**
* Number of bytes downloaded.
*/
public readonly loaded: number,
/**
* The partial response body as downloaded so far.
*
* Only present if the responseType was `text`.
*/
public readonly partialText?: string | undefined
) {
// empty
}
}

export class ResponseHeader {
constructor(
readonly headers: HTTPHeaders,
readonly status: number,
readonly statusText: string
) {
// empty
}
}

// #region error

export class HTTPResponseError {
readonly headers?: HTTPHeaders;
readonly status?: number;
Expand All @@ -72,12 +121,4 @@ export class HTTPResponseError {
}
}

export class ResponseHeader {
constructor(
readonly headers: HTTPHeaders,
readonly status: number,
readonly statusText: string
) {
// empty
}
}
// #endregion
Loading
Loading