Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store): avoid delayed updates from state stream #1981

Merged
merged 3 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Feature: Devtools Plugin - Add trace options to `NgxsDevtoolsOptions` [#1968](https://github.com/ngxs/store/pull/1968)
- Performance: Tree-shake patch errors [#1955](https://github.com/ngxs/store/pull/1955)
- Fix: Get descriptor explicitly when it's considered as a class property [#1961](https://github.com/ngxs/store/pull/1961)
- Fix: Avoid delayed updates from state stream [#1981](https://github.com/ngxs/store/pull/1981)

### To become next patch version

Expand Down
37 changes: 2 additions & 35 deletions packages/store/src/actions-stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Injectable, OnDestroy } from '@angular/core';
import { Subject, Observable } from 'rxjs';
import { Observable } from 'rxjs';
import { share } from 'rxjs/operators';

import { leaveNgxs } from './operators/leave-ngxs';
import { InternalNgxsExecutionStrategy } from './execution/internal-ngxs-execution-strategy';
import { OrderedSubject } from './internal/custom-rxjs-subjects';

/**
* Status of a dispatched action
Expand All @@ -21,40 +22,6 @@ export interface ActionContext<T = any> {
error?: Error;
}

/**
* Custom Subject that ensures that subscribers are notified of values in the order that they arrived.
* A standard Subject does not have this guarantee.
* For example, given the following code:
* ```typescript
* const subject = new Subject<string>();
subject.subscribe(value => {
if (value === 'start') subject.next('end');
});
subject.subscribe(value => { });
subject.next('start');
* ```
* When `subject` is a standard `Subject<T>` the second subscriber would recieve `end` and then `start`.
* When `subject` is a `OrderedSubject<T>` the second subscriber would recieve `start` and then `end`.
*/
export class OrderedSubject<T> extends Subject<T> {
private _itemQueue: T[] = [];
private _busyPushingNext = false;

next(value?: T): void {
if (this._busyPushingNext) {
this._itemQueue.unshift(value!);
return;
}
this._busyPushingNext = true;
super.next(value);
while (this._itemQueue.length > 0) {
const nextValue = this._itemQueue.pop();
super.next(nextValue);
}
this._busyPushingNext = false;
}
}

/**
* Internal Action stream that is emitted anytime an action is dispatched.
*/
Expand Down
77 changes: 77 additions & 0 deletions packages/store/src/internal/custom-rxjs-subjects.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Subject, BehaviorSubject } from 'rxjs';

/**
* This wraps the provided function, and will enforce the following:
* - The calls will execute in the order that they are made
* - A call will only be initiated when the previous call has completed
* - If there is a call currently executing then the new call will be added
* to the queue and the function will return immediately
*
* NOTE: The following assumptions about the operation must hold true:
* - The operation is synchronous in nature
* - If any asynchronous side effects of the call exist, it should not
* have any bearing on the correctness of the next call in the queue
* - The operation has a void return
* - The caller should not assume that the call has completed upon
* return of the function
* - The caller can assume that all the queued calls will complete
* within the current microtask
* - The only way that a call will encounter another call in the queue
* would be if the call at the front of the queue initiated this call
* as part of its synchronous execution
*/
function orderedQueueOperation<TArgs extends any[]>(operation: (...args: TArgs) => void) {
const callsQueue: TArgs[] = [];
let busyPushingNext = false;
return function callOperation(...args: TArgs) {
if (busyPushingNext) {
callsQueue.unshift(args);
return;
}
busyPushingNext = true;
operation(...args);
while (callsQueue.length > 0) {
const nextCallArgs = callsQueue.pop();
nextCallArgs && operation(...nextCallArgs);
}
busyPushingNext = false;
};
}

/**
* Custom Subject that ensures that subscribers are notified of values in the order that they arrived.
* A standard Subject does not have this guarantee.
* For example, given the following code:
* ```typescript
* const subject = new Subject<string>();
subject.subscribe(value => {
if (value === 'start') subject.next('end');
});
subject.subscribe(value => { });
subject.next('start');
* ```
* When `subject` is a standard `Subject<T>` the second subscriber would recieve `end` and then `start`.
* When `subject` is a `OrderedSubject<T>` the second subscriber would recieve `start` and then `end`.
*/
export class OrderedSubject<T> extends Subject<T> {
next = orderedQueueOperation((value?: T) => super.next(value));
}

/**
* Custom BehaviorSubject that ensures that subscribers are notified of values in the order that they arrived.
* A standard BehaviorSubject does not have this guarantee.
* For example, given the following code:
* ```typescript
* const subject = new BehaviorSubject<string>();
subject.subscribe(value => {
if (value === 'start') subject.next('end');
});
subject.subscribe(value => { });
subject.next('start');
* ```
* When `subject` is a standard `BehaviorSubject<T>` the second subscriber would recieve `end` and then `start`.
* When `subject` is a `OrderedBehaviorSubject<T>` the second subscriber would recieve `start` and then `end`.
*/
export class OrderedBehaviorSubject<T> extends BehaviorSubject<T> {
next = orderedQueueOperation((value: T) => super.next(value));
}
5 changes: 3 additions & 2 deletions packages/store/src/internal/state-stream.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Injectable, OnDestroy } from '@angular/core';
import { BehaviorSubject } from 'rxjs';

import { PlainObject } from '@ngxs/store/internals';

import { OrderedBehaviorSubject } from './custom-rxjs-subjects';

/**
* BehaviorSubject of the entire state.
* @ignore
*/
@Injectable()
export class StateStream extends BehaviorSubject<PlainObject> implements OnDestroy {
export class StateStream extends OrderedBehaviorSubject<PlainObject> implements OnDestroy {
constructor() {
super({});
}
Expand Down
12 changes: 2 additions & 10 deletions packages/store/src/store.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
// tslint:disable:unified-signatures
import { Inject, Injectable, Optional, Type } from '@angular/core';
import { Observable, of, Subscription, throwError, queueScheduler } from 'rxjs';
import {
catchError,
distinctUntilChanged,
map,
shareReplay,
take,
observeOn
} from 'rxjs/operators';
import { Observable, of, Subscription, throwError } from 'rxjs';
import { catchError, distinctUntilChanged, map, shareReplay, take } from 'rxjs/operators';
import { INITIAL_STATE_TOKEN, PlainObject } from '@ngxs/store/internals';

import { InternalNgxsExecutionStrategy } from './execution/internal-ngxs-execution-strategy';
Expand All @@ -28,7 +21,6 @@ export class Store {
* All selects would use this stream, and it would call leave only once for any state change across all active selectors.
*/
private _selectableStateStream = this._stateStream.pipe(
observeOn(queueScheduler),
leaveNgxs(this._internalExecutionStrategy),
shareReplay({ bufferSize: 1, refCount: true })
);
Expand Down
3 changes: 2 additions & 1 deletion packages/store/tests/actions-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { TestBed } from '@angular/core/testing';
import { Subject } from 'rxjs';

import { NgxsModule } from '../src/module';
import { InternalActions, OrderedSubject, ActionStatus, Actions } from '../src/actions-stream';
import { OrderedSubject } from '../src/internal/custom-rxjs-subjects';
import { InternalActions, ActionStatus, Actions } from '../src/actions-stream';

describe('The Actions stream', () => {
it('should not use Subject because of the following issue (note that 3rd subscriber receives the events out of order)', () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/store/tests/issues/issue-1568-return-empty.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { Observable, of } from 'rxjs';
import { NgxsModule, State, Action, Store, Actions } from '@ngxs/store';
Expand All @@ -11,6 +12,7 @@ describe('https://github.com/ngxs/store/issues/1568', () => {
name: 'myState',
defaults: 'STATE_VALUE'
})
@Injectable()
class MyState {
@Action(MyAction)
handleAction(): Observable<string> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { NgxsModule, State, Action, Store, Actions, ofActionDispatched } from '@ngxs/store';

Expand All @@ -12,6 +13,7 @@ describe('Throw error when actions do not have a type property (https://github.c
name: 'myState',
defaults: 'STATE_VALUE'
})
@Injectable()
class MyState {
@Action(MyAction)
handleAction(): void {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ describe('Allow to inject the Store class into the ErrorHandler (https://github.
name: 'animals',
defaults: []
})
@Injectable()
class AnimalsState {
@Action(ProduceError)
produceError() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe('Error handling (https://github.com/ngxs/store/issues/1691)', () => {
@State({
name: 'app'
})
@Injectable()
class AppState {
@Action(ProduceErrorSynchronously)
produceErrorSynchronously() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { NgxsModule, State, Store } from '@ngxs/store';

Expand All @@ -6,6 +7,7 @@ describe('Last select value (https://github.com/ngxs/store/issues/1880)', () =>
name: 'counter',
defaults: 0
})
@Injectable()
class CounterState {}

it('should receive the latest value (previously it was a bug because of refCount() which made observable cold)', async () => {
Expand All @@ -18,10 +20,7 @@ describe('Last select value (https://github.com/ngxs/store/issues/1880)', () =>

// Act
// This is done explicitly to make stream cold.
store
.select(CounterState)
.subscribe()
.unsubscribe();
store.select(CounterState).subscribe().unsubscribe();

store.reset({ counter: 3 });

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Component, Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { Action, NgxsModule, Selector, State, StateContext, Store } from '@ngxs/store';
import { switchMap, tap } from 'rxjs/operators';

describe('Select once after dispatch (https://github.com/ngxs/store/issues/1976)', () => {
class Add {
static readonly type = 'Add';
}

@State<number>({
name: 'counter',
defaults: 0
})
@Injectable()
class CounterState {
@Selector()
static magicNumber(): number {
return 42;
}

@Action(Add)
add(ctx: StateContext<number>) {
const state = ctx.getState();
ctx.setState(state + 1);
}
}

@Component({
template: `
<h1>{{ counter$ | async }}</h1>
<button (click)="dispatch()">Click me</button>
`
})
class TestComponent {
selectSnapshotValue: number | null = null;
selectOnceValue: number | null = null;

constructor(private store: Store) {}

dispatch(): void {
this.store
.selectOnce(CounterState.magicNumber)
.pipe(
switchMap(() => this.store.dispatch(new Add())),
tap(() => {
this.selectSnapshotValue = this.store.selectSnapshot(CounterState);
}),
switchMap(() => this.store.selectOnce(CounterState))
)
.subscribe(selectOnceValue => {
this.selectOnceValue = selectOnceValue;
});
}
}

it('should receive the latest value (previously it was a bug because of refCount() which made observable cold)', async () => {
// Arrange
TestBed.configureTestingModule({
declarations: [TestComponent],
imports: [NgxsModule.forRoot([CounterState])]
});

const fixture = TestBed.createComponent(TestComponent);
fixture.detectChanges();

// Act
document.querySelector('button')!.click();

// Assert
expect(fixture.componentInstance.selectSnapshotValue).toEqual(1);
expect(fixture.componentInstance.selectOnceValue).toEqual(1);
});
});
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Injectable } from '@angular/core';
import { Action, NgxsModule, State, StateContext, Store } from '@ngxs/store';
import { TestBed } from '@angular/core/testing';
import { Subscription, throwError } from 'rxjs';
Expand All @@ -24,6 +25,7 @@ describe('Dispatching an empty array with errors (https://github.com/ngxs/store/
name: 'app',
defaults: {}
})
@Injectable()
class AppState {
@Action(ActionError)
actionError() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Component, NgModule, ApplicationRef } from '@angular/core';
import { Component, NgModule, ApplicationRef, Injectable } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { platformBrowserDynamic } from '@angular/platform-browser-dynamic';
import { Action, NgxsModule, State, StateContext, Store } from '@ngxs/store';
Expand All @@ -15,6 +15,7 @@ describe('Selectors within templates causing ticks (https://github.com/ngxs/stor
name: 'countries',
defaults: []
})
@Injectable()
class CountriesState {
@Action(SetCountries)
async setCountries(ctx: StateContext<string[]>, action: SetCountries) {
Expand All @@ -25,9 +26,7 @@ describe('Selectors within templates causing ticks (https://github.com/ngxs/stor

@Component({
selector: 'app-child',
template: `
{{ countries$ | async }}
`
template: ` {{ countries$ | async }} `
})
class TestChildComponent {
countries$ = this.store.select(CountriesState);
Expand All @@ -37,9 +36,7 @@ describe('Selectors within templates causing ticks (https://github.com/ngxs/stor

@Component({
selector: 'app-root',
template: `
<app-child *ngFor="let item of items"></app-child>
`
template: ` <app-child *ngFor="let item of items"></app-child> `
})
class TestComponent {
items = new Array(10);
Expand Down