Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v5] Remove batching #2869

Merged
merged 19 commits into from
Jan 10, 2022
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions packages/core/src/Mailbox.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,56 @@
interface MailboxItem<T> {
value: T;
next: MailboxItem<T> | null;
}

export class Mailbox<T> {
private events: T[] = [];
private index: number = 0;
private _active: boolean = false;
private _current: MailboxItem<T> | null = null;
private _last: MailboxItem<T> | null = null;

public status: 'deferred' | 'idle' | 'processing' = 'deferred';
constructor(private _process: (ev: T) => void) {}

public get size(): number {
return this.events.length - this.index;
public start() {
this._active = true;
this.flush();
}

public clear(): void {
this.events.length = 0;
this.index = 0;
// we can't set _current to null because we might be currently processing
// and enqueue following clear shouldnt start processing the enqueued item immediately
if (this._current) {
this._current.next = null;
this._last = this._current;
}
}

public enqueue(event: T): void {
this.events.push(event);
}

public dequeue(): T | undefined {
const event = this.events[this.index];

if (!event) {
return undefined;
const enqueued = {
value: event,
next: null
};

if (this._current) {
this._last!.next = enqueued;
this._last = enqueued;
return;
}

this.index++;
this._current = enqueued;
this._last = enqueued;

if (this.index > this.events.length - 1) {
this.events.length = 0;
this.index = 0;
if (this._active) {
this.flush();
}
}

return event;
private flush() {
while (this._current) {
// atm the given _process is responsible for implementing proper try/catch handling
// we assume here that this won't throw in a way that can affect this mailbox
this._process(this._current.value);
this._current = this._current.next;
}
this._last = null;
}
}
6 changes: 2 additions & 4 deletions packages/core/src/ObservableActorRef.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { EventObject, ActorRef, Behavior, SCXML, ActorContext } from './types';
import { startSignal, stopSignal } from './behaviors';
import { Actor } from './actor';
import { CapturedState } from './capturedState';
import { symbolObservable, toSCXMLEvent } from './utils';
@@ -26,13 +25,12 @@ export class ObservableActorRef<TEvent extends EventObject, TEmitted>
}
public start() {
this.deferred = false;
this.actor.receive(startSignal);
this.actor.start();

return this;
}
public stop() {
this.actor.receive(stopSignal);

this.actor.stop();
return this;
}
public subscribe(observer) {
60 changes: 24 additions & 36 deletions packages/core/src/actor.ts
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import { registry } from './registry';
import * as capturedState from './capturedState';
import { ObservableActorRef } from './ObservableActorRef';
import { interopSymbols, toSCXMLEvent } from './utils';
import { Mailbox } from './Mailbox';

const nullSubscription = {
unsubscribe: () => void 0
@@ -138,17 +139,14 @@ export function spawnFrom(entity: any): ObservableActorRef<any, any> {
return spawn(createBehaviorFrom(entity)) as ObservableActorRef<any, any>; // TODO: fix
}

enum ProcessingStatus {
NotProcessing,
Processing
}

export class Actor<TEvent extends EventObject, TEmitted> {
public current: TEmitted;
private context: ActorContext<TEvent, TEmitted>;
private behavior: Behavior<TEvent, TEmitted>;
private mailbox: Array<TEvent | LifecycleSignal> = [];
private processingStatus: ProcessingStatus = ProcessingStatus.NotProcessing;
private mailbox: Mailbox<TEvent | LifecycleSignal> = new Mailbox(
this._process.bind(this)
);

public name: string;

constructor(
@@ -160,46 +158,36 @@ export class Actor<TEvent extends EventObject, TEmitted> {
this.name = name;
this.context = actorContext;
this.current = behavior.initialState;
this.mailbox.enqueue(startSignal);
}
public start() {
this.current = this.behavior.transition(
this.current,
startSignal,
this.context
);
this.mailbox.start();
return this;
}
public stop() {
this.mailbox.length = 0; // TODO: test this behavior
this.current = this.behavior.transition(
this.current,
stopSignal,
this.context
);
// TODO: test this behavior
this.mailbox.clear();
this.mailbox.enqueue(stopSignal);
}
public subscribe(observer) {
return this.behavior.subscribe?.(observer) || nullSubscription;
}
public receive(event: TEvent | LifecycleSignal) {
this.mailbox.push(event);
if (this.processingStatus === ProcessingStatus.NotProcessing) {
this.flush();
}
this.mailbox.enqueue(event);
}
private flush() {
this.processingStatus = ProcessingStatus.Processing;
while (this.mailbox.length) {
const event = this.mailbox.shift()!;
// @ts-ignore
this.context._event = toSCXMLEvent(event);

this.current = this.behavior.transition(
this.current,
this.context._event.data as TEvent,
this.context
);
}
this.processingStatus = ProcessingStatus.NotProcessing;
private _process(event: TEvent | LifecycleSignal) {
this.context._event =
typeof event.type !== 'string'
? (event as LifecycleSignal)
: toSCXMLEvent(event as TEvent);

this.current = this.behavior.transition(
this.current,
typeof this.context._event.type !== 'string'
? (this.context._event as LifecycleSignal)
: (this.context._event as SCXML.Event<TEvent>).data,
this.context
);
}
}

37 changes: 16 additions & 21 deletions packages/core/src/behaviors.ts
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ import { State } from './State';
import { CapturedState } from './capturedState';
import { toActorRef } from './actor';
import { toObserver } from './utils';
import { Mailbox } from './Mailbox';

/**
* Returns an actor behavior from a reducer and its initial state.
@@ -138,27 +139,15 @@ export function spawnBehavior<TEvent extends EventObject, TEmitted>(
): ActorRef<TEvent, TEmitted> {
let state = behavior.initialState;
const observers = new Set<Observer<TEmitted>>();
const mailbox: TEvent[] = [];
let flushing = false;

const flush = () => {
if (flushing) {
return;
}
flushing = true;
while (mailbox.length > 0) {
const event = mailbox.shift()!;
state = behavior.transition(state, event, actorCtx);
observers.forEach((observer) => observer.next?.(state));
}
flushing = false;
};
const mailbox = new Mailbox<TEvent>((event) => {
state = behavior.transition(state, event, actorCtx);
observers.forEach((observer) => observer.next?.(state));
});

const actor = toActorRef({
id: options.id,
send: (event: TEvent) => {
mailbox.push(event);
flush();
mailbox.enqueue(event);
},
getSnapshot: () => state,
subscribe: (next, handleError?, complete?) => {
@@ -172,6 +161,12 @@ export function spawnBehavior<TEvent extends EventObject, TEmitted>(
}
};
},
start() {
mailbox.start();
},
stop() {
mailbox.clear();
},
...interopSymbols
});

@@ -183,8 +178,6 @@ export function spawnBehavior<TEvent extends EventObject, TEmitted>(
_event: null as any
};

state = behavior.start ? behavior.start(actorCtx) : state;

return actor;
}

@@ -455,12 +448,14 @@ export function createMachineBehavior<
return state;
}

if (isSignal(event)) {
const _event = actorContext._event;

if (isSignal(_event)) {
// TODO: unrecognized signal
return state;
}

service?.send(actorContext._event);
service?.send(_event);
return state;
},
subscribe: (observer) => {
27 changes: 9 additions & 18 deletions packages/core/src/interpreter.ts
Original file line number Diff line number Diff line change
@@ -108,7 +108,9 @@ export class Interpreter<
public options: Readonly<InterpreterOptions>;

public id: string;
private mailbox: Mailbox<SCXML.Event<TEvent>> = new Mailbox();
private mailbox: Mailbox<SCXML.Event<TEvent>> = new Mailbox(
this._process.bind(this)
);
private delayedEventsMap: Record<string, number> = {};
private listeners: Set<StateListener<TContext, TEvent>> = new Set();
private stopListeners: Set<Listener> = new Set();
@@ -403,25 +405,18 @@ export class Interpreter<
this.attachDevTools();
}

this.flush();
this.mailbox.start();

return this;
}

private flush() {
this.mailbox.status = 'processing';
let event = this.mailbox.dequeue();
while (event) {
// TODO: handle errors
this.forward(event);
private _process(event: SCXML.Event<TEvent>) {
// TODO: handle errors
this.forward(event);

const nextState = this.nextState(event);
const nextState = this.nextState(event);

this.update(nextState);

event = this.mailbox.dequeue();
}
this.mailbox.status = 'idle';
this.update(nextState);
}

/**
@@ -511,10 +506,6 @@ export class Interpreter<
}

this.mailbox.enqueue(_event);

if (this.mailbox.status === 'idle') {
this.flush();
}
};

private sendTo(
2 changes: 1 addition & 1 deletion packages/core/src/types.ts
Original file line number Diff line number Diff line change
@@ -1550,7 +1550,7 @@ export interface ActorContext<TEvent extends EventObject, TEmitted> {
self: ActorRef<TEvent, TEmitted>;
name: string;
observers: Set<Observer<TEmitted>>;
_event: SCXML.Event<TEvent>;
_event: SCXML.Event<TEvent> | LifecycleSignal;
}

export interface Behavior<TEvent extends EventObject, TEmitted = any> {
6 changes: 6 additions & 0 deletions packages/xstate-react/src/useSpawn.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { useEffect } from 'react';
import { ActorRef, Behavior, EventObject } from 'xstate';
import { spawnBehavior } from 'xstate/behaviors';
import useConstant from './useConstant';
@@ -16,5 +17,10 @@ export function useSpawn<TState, TEvent extends EventObject>(
return spawnBehavior(behavior);
});

useEffect(() => {
actorRef.start!();
return () => actorRef!.stop!();
});

return actorRef;
}
6 changes: 6 additions & 0 deletions packages/xstate-vue/src/useSpawn.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ActorRef, Behavior, EventObject } from 'xstate';
import { spawnBehavior } from 'xstate/behaviors';
import { onBeforeUnmount } from 'vue';

/**
* Vue composable that spawns an `ActorRef` with the specified `behavior`.
@@ -13,5 +14,10 @@ export function useSpawn<TState, TEvent extends EventObject>(
): ActorRef<TEvent, TState> {
const actorRef = spawnBehavior(behavior);

actorRef.start!();
onBeforeUnmount(() => {
actorRef.stop!();
});

return actorRef;
}