Skip to content

Commit 9762b24

Browse files
alxhubAndrewKushnir
authored andcommitted
feat(core): experimental impl of rxResource() (angular#58255)
Implementations of two rxjs-interop APIs which produce `Resource`s from RxJS Observables. `rxResource()` is a flavor of `resource()` which uses a projection to an `Observable` as its loader (like `switchMap`). PR Close angular#58255
1 parent 18d8d44 commit 9762b24

File tree

4 files changed

+118
-0
lines changed

4 files changed

+118
-0
lines changed

goldens/public-api/core/rxjs-interop/index.api.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import { MonoTypeOperatorFunction } from 'rxjs';
1010
import { Observable } from 'rxjs';
1111
import { OutputOptions } from '@angular/core';
1212
import { OutputRef } from '@angular/core';
13+
import { ResourceLoaderParams } from '@angular/core';
14+
import { ResourceOptions } from '@angular/core';
15+
import { ResourceRef } from '@angular/core';
1316
import { Signal } from '@angular/core';
1417
import { Subscribable } from 'rxjs';
1518
import { ValueEqualityFn } from '@angular/core/primitives/signals';
@@ -20,6 +23,15 @@ export function outputFromObservable<T>(observable: Observable<T>, opts?: Output
2023
// @public
2124
export function outputToObservable<T>(ref: OutputRef<T>): Observable<T>;
2225

26+
// @public
27+
export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T>;
28+
29+
// @public
30+
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
31+
// (undocumented)
32+
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
33+
}
34+
2335
// @public
2436
export function takeUntilDestroyed<T>(destroyRef?: DestroyRef): MonoTypeOperatorFunction<T>;
2537

packages/core/rxjs-interop/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ export {
1515
toObservableMicrotask as ɵtoObservableMicrotask,
1616
} from './to_observable';
1717
export {toSignal, ToSignalOptions} from './to_signal';
18+
export {RxResourceOptions, rxResource} from './rx_resource';
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* @license
3+
* Copyright Google LLC All Rights Reserved.
4+
*
5+
* Use of this source code is governed by an MIT-style license that can be
6+
* found in the LICENSE file at https://angular.dev/license
7+
*/
8+
9+
import {
10+
assertInInjectionContext,
11+
ResourceOptions,
12+
resource,
13+
ResourceLoaderParams,
14+
ResourceRef,
15+
} from '@angular/core';
16+
import {firstValueFrom, Observable, Subject} from 'rxjs';
17+
import {takeUntil} from 'rxjs/operators';
18+
19+
/**
20+
* Like `ResourceOptions` but uses an RxJS-based `loader`.
21+
*
22+
* @experimental
23+
*/
24+
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
25+
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
26+
}
27+
28+
/**
29+
* Like `resource` but uses an RxJS based `loader` which maps the request to an `Observable` of the
30+
* resource's value. Like `firstValueFrom`, only the first emission of the Observable is considered.
31+
*
32+
* @experimental
33+
*/
34+
export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T> {
35+
opts?.injector || assertInInjectionContext(rxResource);
36+
return resource<T, R>({
37+
...opts,
38+
loader: (params) => {
39+
const cancelled = new Subject<void>();
40+
params.abortSignal.addEventListener('abort', () => cancelled.next());
41+
return firstValueFrom(opts.loader(params).pipe(takeUntil(cancelled)));
42+
},
43+
});
44+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* @license
3+
* Copyright Google LLC All Rights Reserved.
4+
*
5+
* Use of this source code is governed by an MIT-style license that can be
6+
* found in the LICENSE file at https://angular.dev/license
7+
*/
8+
9+
import {of, Observable} from 'rxjs';
10+
import {TestBed} from '@angular/core/testing';
11+
import {ApplicationRef, Injector, signal} from '@angular/core';
12+
import {rxResource} from '@angular/core/rxjs-interop';
13+
14+
describe('rxResource()', () => {
15+
it('should fetch data using an observable loader', async () => {
16+
const injector = TestBed.inject(Injector);
17+
const appRef = TestBed.inject(ApplicationRef);
18+
const res = rxResource({
19+
loader: () => of(1),
20+
injector,
21+
});
22+
await appRef.whenStable();
23+
expect(res.value()).toBe(1);
24+
});
25+
26+
it('should cancel the fetch when a new request comes in', async () => {
27+
const injector = TestBed.inject(Injector);
28+
const appRef = TestBed.inject(ApplicationRef);
29+
let unsub = false;
30+
const request = signal(1);
31+
const res = rxResource({
32+
request,
33+
loader: ({request}) =>
34+
new Observable((sub) => {
35+
if (request === 2) {
36+
sub.next(true);
37+
}
38+
return () => {
39+
if (request === 1) {
40+
unsub = true;
41+
}
42+
};
43+
}),
44+
injector,
45+
});
46+
47+
// Wait for the resource to reach loading state.
48+
await waitFor(() => res.isLoading());
49+
50+
// Setting request = 2 should cancel request = 1
51+
request.set(2);
52+
await appRef.whenStable();
53+
expect(unsub).toBe(true);
54+
});
55+
});
56+
57+
async function waitFor(fn: () => boolean): Promise<void> {
58+
while (!fn()) {
59+
await new Promise((resolve) => setTimeout(resolve, 1));
60+
}
61+
}

0 commit comments

Comments
 (0)