Skip to content

Commit

Permalink
feat(core): experimental impl of rxResource()
Browse files Browse the repository at this point in the history
Implementations of two rxjs-interop APIs which produce `Resource`s from
RxJS Observables. `rxResource()` is a flavor of `resource()` which uses a
projection to an `Observable` as its loader (like `switchMap`).
  • Loading branch information
alxhub committed Oct 21, 2024
1 parent 96965d5 commit 456ea9c
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 0 deletions.
12 changes: 12 additions & 0 deletions goldens/public-api/core/rxjs-interop/index.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { MonoTypeOperatorFunction } from 'rxjs';
import { Observable } from 'rxjs';
import { OutputOptions } from '@angular/core';
import { OutputRef } from '@angular/core';
import { ResourceLoaderParams } from '@angular/core';
import { ResourceOptions } from '@angular/core';
import { ResourceRef } from '@angular/core';
import { Signal } from '@angular/core';
import { Subscribable } from 'rxjs';
import { ValueEqualityFn } from '@angular/core/primitives/signals';
Expand All @@ -20,6 +23,15 @@ export function outputFromObservable<T>(observable: Observable<T>, opts?: Output
// @public
export function outputToObservable<T>(ref: OutputRef<T>): Observable<T>;

// @public
export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T>;

// @public
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
// (undocumented)
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
}

// @public
export function takeUntilDestroyed<T>(destroyRef?: DestroyRef): MonoTypeOperatorFunction<T>;

Expand Down
1 change: 1 addition & 0 deletions packages/core/rxjs-interop/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ export {
toObservableMicrotask as ɵtoObservableMicrotask,
} from './to_observable';
export {toSignal, ToSignalOptions} from './to_signal';
export {RxResourceOptions, rxResource} from './rx_resource';
44 changes: 44 additions & 0 deletions packages/core/rxjs-interop/src/rx_resource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.dev/license
*/

import {
assertInInjectionContext,
ResourceOptions,
resource,
ResourceLoaderParams,
ResourceRef,
} from '@angular/core';
import {firstValueFrom, Observable, Subject} from 'rxjs';
import {takeUntil} from 'rxjs/operators';

/**
* Like `ResourceOptions` but uses an RxJS-based `loader`.
*
* @experimental
*/
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
}

/**
* Like `resource` but uses an RxJS based `loader` which maps the request to an `Observable` of the
* resource's value. Like `firstValueFrom`, only the first emission of the Observable is considered.
*
* @experimental
*/
export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T> {
opts?.injector || assertInInjectionContext(rxResource);
return resource<T, R>({
...opts,
loader: (params) => {
const cancelled = new Subject<void>();
params.abortSignal.addEventListener('abort', () => cancelled.next());
return firstValueFrom(opts.loader(params).pipe(takeUntil(cancelled)));
},
});
}
61 changes: 61 additions & 0 deletions packages/core/rxjs-interop/test/rx_resource_spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.dev/license
*/

import {of, Observable} from 'rxjs';
import {TestBed} from '@angular/core/testing';
import {ApplicationRef, Injector, signal} from '@angular/core';
import {rxResource} from '@angular/core/rxjs-interop';

describe('rxResource()', () => {
it('should fetch data using an observable loader', async () => {
const injector = TestBed.inject(Injector);
const appRef = TestBed.inject(ApplicationRef);
const res = rxResource({
loader: () => of(1),
injector,
});
await appRef.whenStable();
expect(res.value()).toBe(1);
});

it('should cancel the fetch when a new request comes in', async () => {
const injector = TestBed.inject(Injector);
const appRef = TestBed.inject(ApplicationRef);
let unsub = false;
const request = signal(1);
const res = rxResource({
request,
loader: ({request}) =>
new Observable((sub) => {
if (request === 2) {
sub.next(true);
}
return () => {
if (request === 1) {
unsub = true;
}
};
}),
injector,
});

// Wait for the resource to reach loading state.
await waitFor(() => res.isLoading());

// Setting request = 2 should cancel request = 1
request.set(2);
await appRef.whenStable();
expect(unsub).toBe(true);
});
});

async function waitFor(fn: () => boolean): Promise<void> {
while (!fn()) {
await new Promise((resolve) => setTimeout(resolve, 1));
}
}

0 comments on commit 456ea9c

Please sign in to comment.