Skip to content

Commit

Permalink
feat(fetch): add selector (ReactiveX#5306)
Browse files Browse the repository at this point in the history
* feat(fetch): add selector
  • Loading branch information
cartant committed May 17, 2020
1 parent 796c889 commit a3af68c
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 7 deletions.
18 changes: 18 additions & 0 deletions spec-dtslint/observables/dom/fetch-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { fromFetch } from 'rxjs/fetch';
import { a$ } from '../../helpers';

it('should emit the fetch Response by default', () => {
const a = fromFetch("a"); // $ExpectType Observable<Response>
});

it('should support a selector that returns a Response promise', () => {
const a = fromFetch("a", { selector: response => response.text() }); // $ExpectType Observable<string>
});

it('should support a selector that returns an arbitrary type', () => {
const a = fromFetch("a", { selector: response => a$ }); // $ExpectType Observable<A>
});

it('should error for selectors that don\'t return an ObservableInput', () => {
const a = fromFetch("a", { selector: response => 42 }); // $ExpectError
});
6 changes: 5 additions & 1 deletion spec-dtslint/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
"noImplicitThis": true,
"paths": {
"rxjs": ["../dist/typings"],
"rxjs/operators": ["../dist/typings/operators"]
"rxjs/ajax": ["../dist/typings/ajax"],
"rxjs/fetch": ["../dist/typings/fetch"],
"rxjs/operators": ["../dist/typings/operators"],
"rxjs/testing": ["../dist/typings/testing"],
"rxjs/webSocket": ["../dist/typings/webSocket"]
},
"skipLibCheck": true,
"strictFunctionTypes": true,
Expand Down
53 changes: 53 additions & 0 deletions spec/observables/dom/fetch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,57 @@ describe('fromFetch', () => {
}
});
});

it('should support a selector', done => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', {
selector: response => response.text()
});
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);

fetch$.subscribe({
next: text => {
expect(text).to.equal('bar');
},
error: done,
complete: () => {
// Wait until the complete and the subsequent unsubscribe are finished
// before testing these expectations:
setTimeout(() => {
expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;
done();
}, 0);
}
});
});

it('should abort when unsubscribed and a selector is specified', () => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', {
selector: response => response.text()
});
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);
const subscription = fetch$.subscribe();

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;

subscription.unsubscribe();
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.true;
});
});
78 changes: 72 additions & 6 deletions src/internal/observable/dom/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { from } from '../../observable/from';
import { ObservableInput } from '../../types';

export function fromFetch<T>(
input: string | Request,
init: RequestInit & {
selector: (response: Response) => ObservableInput<T>
}
): Observable<T>;

export function fromFetch(
input: string | Request,
init?: RequestInit
): Observable<Response>;

/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
Expand Down Expand Up @@ -42,7 +56,36 @@ import { Subscription } from '../../Subscription';
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* })
* });
* ```
*
* ### Use with Chunked Transfer Encoding
*
* With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
* the promise returned by `fetch` will resolve as soon as the response's headers are
* received.
*
* That means the `fromFetch` observable will emit a `Response` - and will
* then complete - before the body is received. When one of the methods on the
* `Response` - like `text()` or `json()` - is called, the returned promise will not
* resolve until the entire body has been received. Unsubscribing from any observable
* that uses the promise as an observable input will not abort the request.
*
* To facilitate aborting the retrieval of responses that use chunked transfer encoding,
* a `selector` can be specified via the `init` parameter:
*
* ```ts
* import { of } from 'rxjs';
* import { fromFetch } from 'rxjs/fetch';
*
* const data$ = fromFetch('https://api.github.com/users?per_page=5', {
* selector: response => response.json()
* });
*
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* });
* ```
*
* @param input The resource you would like to fetch. Can be a url or a request object.
Expand All @@ -51,8 +94,14 @@ import { Subscription } from '../../Subscription';
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
*/
export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response> {
return new Observable<Response>(subscriber => {
export function fromFetch<T>(
input: string | Request,
initWithSelector: RequestInit & {
selector?: (response: Response) => ObservableInput<T>
} = {}
): Observable<Response | T> {
const { selector, ...init } = initWithSelector;
return new Observable<Response | T>(subscriber => {
const controller = new AbortController();
const signal = controller.signal;
let abortable = true;
Expand Down Expand Up @@ -91,9 +140,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
}

fetch(input, perSubscriberInit).then(response => {
abortable = false;
subscriber.next(response);
subscriber.complete();
if (selector) {
subscription.add(from(selector(response)).subscribe(
value => subscriber.next(value),
err => {
abortable = false;
if (!unsubscribed) {
// Only forward the error if it wasn't an abort.
subscriber.error(err);
}
},
() => {
abortable = false;
subscriber.complete();
}
));
} else {
abortable = false;
subscriber.next(response);
subscriber.complete();
}
}).catch(err => {
abortable = false;
if (!unsubscribed) {
Expand Down

0 comments on commit a3af68c

Please sign in to comment.