From 1861702dd32fcb801d6abea4a6a2d3ff7422833d Mon Sep 17 00:00:00 2001 From: Felipe Lalanne <1822826+pipex@users.noreply.github.com> Date: Fri, 24 May 2024 13:23:12 -0400 Subject: [PATCH] Simplify Sensor interface. This updates the sensor interface to no longer require a state reference as input, but simply return a subscribable for changes on a specific path. This makes the interface simpler and moves the responsibility of updating the state to sensor subscribers. This is however a breaking change, even though the contructor interface does not changes. Change-type: major --- lib/agent/observe.ts | 21 ++------- lib/agent/patch.ts | 33 ++++++++++++++ lib/agent/runtime.ts | 11 +++-- lib/sensor.spec.ts | 104 ++++++++++++++++++++++--------------------- lib/sensor.ts | 36 +++++++-------- 5 files changed, 114 insertions(+), 91 deletions(-) create mode 100644 lib/agent/patch.ts diff --git a/lib/agent/observe.ts b/lib/agent/observe.ts index dc8cb54..84095d4 100644 --- a/lib/agent/observe.ts +++ b/lib/agent/observe.ts @@ -1,8 +1,8 @@ import type { Ref } from '../ref'; import type { Observer, Next } from '../observable'; import type { Operation } from '../operation'; -import { View } from '../view'; import { Path } from '../path'; +import { patch } from './patch'; function isObject(value: unknown): value is object { return value !== null && typeof value === 'object'; @@ -108,21 +108,6 @@ function observeObject( return buildProxy(r, u, next, path); } -function applyChanges(r: Ref, changes: Array>) { - changes.forEach((change) => { - const view = View.from(r, change.path); - switch (change.op) { - case 'create': - case 'update': - view._ = change.target as any; - break; - case 'delete': - view.delete(); - break; - } - }); -} - /** * Communicates the changes performed by a function on a value * reference to an observer. The function is executed in a @@ -150,7 +135,9 @@ export function observe( try { const res = fn( observeObject(r, r, (change) => { - applyChanges(r, [change]); + patch(r, change); + + // TODO: return changes here instead of the full object observer.next(structuredClone(r._)); }), ); diff --git a/lib/agent/patch.ts b/lib/agent/patch.ts new file mode 100644 index 0000000..960bf51 --- /dev/null +++ b/lib/agent/patch.ts @@ -0,0 +1,33 @@ +import { Ref } from '../ref'; +import type { Operation } from '../operation'; +import { View } from '../view'; +import type { Path } from '../path'; + +export function patch( + r: Ref, + changes: Operation | Array>, +) { + changes = Array.isArray(changes) ? changes : [changes]; + changes.forEach((change) => { + const view = View.from(r, change.path); + switch (change.op) { + case 'create': + case 'update': + view._ = change.target as any; + break; + case 'delete': + view.delete(); + break; + } + }); +} + +export type Patcher = ( + changes: Operation | Array>, +) => void; + +export function Patcher(s: S): Patcher { + const r = Ref.of(s); + return (changes: Operation | Array>) => + patch(r, changes); +} diff --git a/lib/agent/runtime.ts b/lib/agent/runtime.ts index 74b25de..9f0fdde 100644 --- a/lib/agent/runtime.ts +++ b/lib/agent/runtime.ts @@ -11,6 +11,7 @@ import type { StrictTarget } from '../target'; import { Target } from '../target'; import type { Action } from '../task'; import { observe } from './observe'; +import { patch } from './patch'; import type { AgentOpts, Result } from './types'; import { Failure, NotStarted, Stopped, Timeout, UnknownError } from './types'; @@ -185,16 +186,18 @@ export class Runtime { if (p in this.subscriptions) { continue; } - this.subscriptions[p] = sensor(this.stateRef, p).subscribe((s) => { - // There is no need to update the state reference as the sensor already - // modifies the state. We don't handle concurrency as we assume sensors + this.subscriptions[p] = sensor(p).subscribe((change) => { + // Patch the state + // We don't handle concurrency as we assume sensors // do not conflict with each other (should we check?) + patch(this.stateRef, change); if (this.opts.follow) { // Trigger a re-plan to see if the state is still on target this.start(); } else { // Notify the observer of the new state - this.observer.next(s); + // TODO: we should notify changes instead of the full state + this.observer.next(structuredClone(this.stateRef._)); } }); } diff --git a/lib/sensor.spec.ts b/lib/sensor.spec.ts index 1e3352e..16e0939 100644 --- a/lib/sensor.spec.ts +++ b/lib/sensor.spec.ts @@ -1,6 +1,5 @@ import { expect } from '~/test-utils'; import { Sensor } from './sensor'; -import { Ref } from './ref'; import { stub } from 'sinon'; import { setTimeout } from 'timers/promises'; import { Observable } from './observable'; @@ -13,14 +12,12 @@ describe('Sensor', () => { yield 123; }); - const state = Ref.of(0); - // The sensor function should not be called before a subscriber is added expect(read).to.not.have.been.called; // Add a subscriber const next = stub(); - sensor(state).subscribe(next); + sensor().subscribe(next); // We need to wait a bit so the async generator // can yield a value @@ -28,8 +25,11 @@ describe('Sensor', () => { // Only now the sensor function should be called expect(read).to.have.been.called; - expect(next).to.have.been.calledWith(123); - expect(state._).to.equal(123); + expect(next).to.have.been.calledWith({ + op: 'update', + path: '/', + target: 123, + }); }); it('allows reporting on a value using lenses', async () => { @@ -42,16 +42,21 @@ describe('Sensor', () => { }, }); - const state = Ref.of({ temperature: 0, on: false }); - const next = stub(); - sensor(state).subscribe(next); + sensor().subscribe(next); await setTimeout(10); - expect(next).to.have.been.calledWith({ temperature: 20, on: false }); - expect(next).to.have.been.calledWith({ temperature: 23, on: false }); - expect(state._.temperature).to.equal(23); + expect(next).to.have.been.calledWith({ + op: 'update', + path: '/temperature', + target: 20, + }); + expect(next).to.have.been.calledWith({ + op: 'update', + path: '/temperature', + target: 23, + }); }); it('allows reporting on a value using observable', async () => { @@ -61,16 +66,21 @@ describe('Sensor', () => { sensor: () => Observable.from([20, 23]), }); - const state = Ref.of({ temperature: 0, on: false }); - const next = stub(); - sensor(state).subscribe(next); + sensor().subscribe(next); await setTimeout(10); - expect(next).to.have.been.calledWith({ temperature: 20, on: false }); - expect(next).to.have.been.calledWith({ temperature: 23, on: false }); - expect(state._.temperature).to.equal(23); + expect(next).to.have.been.calledWith({ + op: 'update', + path: '/temperature', + target: 20, + }); + expect(next).to.have.been.calledWith({ + op: 'update', + path: '/temperature', + target: 23, + }); }); it('allows reporting on a value using observables and lenses', async () => { @@ -87,35 +97,32 @@ describe('Sensor', () => { .map(({ temp }) => temp), }); - const state: Ref = Ref.of({ - temperature: { office: 0, patio: 0 }, - on: false, - }); - const next = stub(); const nextOther = stub(); - sensor(state, '/temperature/office').subscribe(next); - sensor(state, '/temperature/patio').subscribe(nextOther); + sensor('/temperature/office').subscribe(next); + sensor('/temperature/patio').subscribe(nextOther); // A sensor for an uninitialized path should not throw - expect(() => sensor(state, '/temperature/bedroom')).to.not.throw; + expect(() => sensor('/temperature/bedroom')).to.not.throw; await setTimeout(10); - expect(next.getCalls().length).to.equal(2); + expect(next).to.have.been.calledTwice; expect(next).to.have.been.calledWith({ - temperature: { office: 20, patio: 0 }, - on: false, + op: 'update', + path: '/temperature/office', + target: 20, }); expect(next).to.have.been.calledWith({ - temperature: { office: 23, patio: 30 }, - on: false, + op: 'update', + path: '/temperature/office', + target: 23, }); expect(nextOther).to.have.been.calledOnceWith({ - temperature: { office: 20, patio: 30 }, - on: false, + op: 'update', + path: '/temperature/patio', + target: 30, }); - expect(state._.temperature.office).to.equal(23); }); it('allows reporting on a value using lenses with args', async () => { @@ -137,34 +144,31 @@ describe('Sensor', () => { }, }); - const state: Ref = Ref.of({ - temperature: { office: 0, patio: 0 }, - on: false, - }); - const next = stub(); const nextOther = stub(); - sensor(state, '/temperature/office').subscribe(next); - sensor(state, '/temperature/patio').subscribe(nextOther); + sensor('/temperature/office').subscribe(next); + sensor('/temperature/patio').subscribe(nextOther); // A sensor for an uninitialized path should throw - expect(() => sensor(state, '/temperature/bedroom')).to.throw; + expect(() => sensor('/temperature/bedroom')).to.throw; await setTimeout(20); - expect(next.getCalls().length).to.equal(2); + expect(next).to.have.been.calledTwice; expect(next).to.have.been.calledWith({ - temperature: { office: 20, patio: 0 }, - on: false, + op: 'update', + path: '/temperature/office', + target: 20, }); expect(next).to.have.been.calledWith({ - temperature: { office: 23, patio: 30 }, - on: false, + op: 'update', + path: '/temperature/office', + target: 23, }); expect(nextOther).to.have.been.calledOnceWith({ - temperature: { office: 20, patio: 30 }, - on: false, + op: 'update', + path: '/temperature/patio', + target: 30, }); - expect(state._.temperature.office).to.equal(23); }); }); diff --git a/lib/sensor.ts b/lib/sensor.ts index 17a71d7..8fa9ec9 100644 --- a/lib/sensor.ts +++ b/lib/sensor.ts @@ -1,9 +1,8 @@ import { Lens } from './lens'; import type { PathType } from './path'; import { Path } from './path'; -import type { Ref } from './ref'; -import { View } from './view'; import type { LensArgs } from './lens'; +import type { UpdateOperation } from './operation'; import type { Subscribable } from './observable'; import { Observable } from './observable'; @@ -12,13 +11,17 @@ import { Observable } from './observable'; * A Sensor function for type T is a function that returns a generator * that yields values of type T */ -export type SensorFn = ( +type SensorFn = ( args: LensArgs, ) => | AsyncGenerator, never | void | Lens, void> | Generator, never | void, void | undefined> | Subscribable>; +type SensorOutput = Subscribable< + UpdateOperation +>; + /** * A sensor receives a reference to a global state and * returns a subscribable that allows to observe changes @@ -27,11 +30,11 @@ export type SensorFn = ( export type Sensor = unknown extends Lens ? // Default to the version with path if the lens cannot be resolved - { (s: Ref, path: PathType): Subscribable; lens: Path

} + { (path: PathType): SensorOutput; lens: Path

} : // Otherwise add a path if lens arguments are not empty LensArgs extends Record - ? { (s: Ref): Subscribable; lens: Path

} - : { (s: Ref, path: PathType): Subscribable; lens: Path

}; + ? { (): SensorOutput; lens: Path

} + : { (path: PathType): SensorOutput; lens: Path

}; /** * The sensor constructor properties @@ -70,25 +73,18 @@ function from( } = typeof input === 'function' ? { sensor: input } : input; const lensPath = Path.from(lens); return Object.assign( - function (s: Ref, path: PathType = lens) { + function (path: PathType = lens) { const refPath = Path.from(path); const args = Lens.args(lensPath, refPath) as LensArgs; - const view = View.from(s, refPath); - - return Observable.from(sensor(args)).map((value) => { - // For each value emmited by the sensor - // we update the view and return the updated state - // to the subscriber - view._ = value; - // We need to return a copy of the state here, otherwise - // subscribers would be able to change the behavior of the - // agent or other subscribers - return structuredClone(s._); - }); + return Observable.from(sensor(args)).map((target) => ({ + op: 'update', + path, + target, + })); }, { lens: lensPath }, - ) as Sensor; + ); } /**