diff --git a/spec/observables/dom/fetch-spec.ts b/spec/observables/dom/fetch-spec.ts index 17017c566a7..c7c598b0bff 100644 --- a/spec/observables/dom/fetch-spec.ts +++ b/spec/observables/dom/fetch-spec.ts @@ -249,4 +249,53 @@ describe('fromFetch', () => { // The subscription will not be closed until the error fires when the promise resolves. expect(subscription.closed).to.be.false; }); + + it('should support a selector', done => { + mockFetch.respondWith = { + ...OK_RESPONSE, + text: () => Promise.resolve('bar') + }; + const fetch$ = fromFetch('/foo', undefined, response => response.text()); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + + fetch$.subscribe({ + next: text => { + expect(text).to.equal('bar'); + }, + error: done, + complete: () => { + // Wait until the complete and the subsequent unsubscribe are finished + // before testing these expectations: + setTimeout(() => { + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init!.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false; + done(); + }, 0); + } + }); + }); + + it('should abort when unsubscribed and a selector is specified', () => { + mockFetch.respondWith = { + ...OK_RESPONSE, + text: () => Promise.resolve('bar') + }; + const fetch$ = fromFetch('/foo', undefined, response => response.text()); + expect(mockFetch.calls.length).to.equal(0); + expect(MockAbortController.created).to.equal(0); + const subscription = fetch$.subscribe(); + + expect(MockAbortController.created).to.equal(1); + expect(mockFetch.calls.length).to.equal(1); + expect(mockFetch.calls[0].input).to.equal('/foo'); + expect(mockFetch.calls[0].init!.signal).not.to.be.undefined; + expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false; + + subscription.unsubscribe(); + expect(mockFetch.calls[0].init!.signal!.aborted).to.be.true; + }); }); diff --git a/src/internal/observable/dom/fetch.ts b/src/internal/observable/dom/fetch.ts index 50c9df010d3..bca6efa89e6 100644 --- a/src/internal/observable/dom/fetch.ts +++ b/src/internal/observable/dom/fetch.ts @@ -1,4 +1,18 @@ import { Observable } from '../../Observable'; +import { Subscription } from '../../Subscription'; +import { from } from '../../observable/from'; +import { ObservableInput } from '../../types'; + +export function fromFetch( + input: string | Request, + init?: RequestInit +): Observable; + +export function fromFetch( + input: string | Request, + init: RequestInit | undefined, + selector: (response: Response) => ObservableInput +): Observable; /** * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to @@ -50,14 +64,25 @@ import { Observable } from '../../Observable'; * @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch` * function. The {@link Subscription} is tied to an `AbortController` for the the fetch. */ -export function fromFetch(input: string | Request, init?: RequestInit): Observable { - return new Observable(subscriber => { +export function fromFetch( + input: string | Request, + init?: RequestInit, + selector?: (response: Response) => ObservableInput +): Observable { + return new Observable(subscriber => { const controller = new AbortController(); const signal = controller.signal; - let outerSignalHandler: () => void; let abortable = true; let unsubscribed = false; + const subscription = new Subscription(); + subscription.add(() => { + unsubscribed = true; + if (abortable) { + controller.abort(); + } + }); + let perSubscriberInit: RequestInit; if (init) { // If a signal is provided, just have it teardown. It's a cancellation token, basically. @@ -65,12 +90,14 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab if (init.signal.aborted) { controller.abort(); } else { - outerSignalHandler = () => { + const outerSignal = init.signal; + const outerSignalHandler = () => { if (!signal.aborted) { controller.abort(); } }; - init.signal.addEventListener('abort', outerSignalHandler); + outerSignal.addEventListener('abort', outerSignalHandler); + subscription.add(() => outerSignal.removeEventListener('abort', outerSignalHandler)); } } // init cannot be mutated or reassigned as it's closed over by the @@ -81,9 +108,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab } fetch(input, perSubscriberInit).then(response => { - abortable = false; - subscriber.next(response); - subscriber.complete(); + if (selector) { + subscription.add(from(selector(response)).subscribe( + value => subscriber.next(value), + err => { + abortable = false; + if (!unsubscribed) { + // Only forward the error if it wasn't an abort. + subscriber.error(err); + } + }, + () => { + abortable = false; + subscriber.complete(); + } + )); + } else { + abortable = false; + subscriber.next(response); + subscriber.complete(); + } }).catch(err => { abortable = false; if (!unsubscribed) { @@ -92,11 +136,6 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab } }); - return () => { - unsubscribed = true; - if (abortable) { - controller.abort(); - } - }; + return subscription; }); }