Skip to content

Commit

Permalink
Simplify Sensor interface.
Browse files Browse the repository at this point in the history
This updates the sensor interface to no longer require a state reference
as input, but simply return a subscribable for changes on a specific
path. This makes the interface simpler and moves the responsibility of
updating the state to sensor subscribers.

This is however a breaking change, even though the contructor interface
does not changes.

Change-type: major
  • Loading branch information
pipex committed May 24, 2024
1 parent f99ba5f commit 1861702
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 91 deletions.
21 changes: 4 additions & 17 deletions lib/agent/observe.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { Ref } from '../ref';
import type { Observer, Next } from '../observable';
import type { Operation } from '../operation';
import { View } from '../view';
import { Path } from '../path';
import { patch } from './patch';

function isObject(value: unknown): value is object {
return value !== null && typeof value === 'object';
Expand Down Expand Up @@ -108,21 +108,6 @@ function observeObject<T, U extends object>(
return buildProxy(r, u, next, path);
}

function applyChanges<S>(r: Ref<S>, changes: Array<Operation<S, string>>) {
changes.forEach((change) => {
const view = View.from(r, change.path);
switch (change.op) {
case 'create':
case 'update':
view._ = change.target as any;
break;
case 'delete':
view.delete();
break;
}
});
}

/**
* Communicates the changes performed by a function on a value
* reference to an observer. The function is executed in a
Expand Down Expand Up @@ -150,7 +135,9 @@ export function observe<T, U = void>(
try {
const res = fn(
observeObject(r, r, (change) => {
applyChanges(r, [change]);
patch(r, change);

// TODO: return changes here instead of the full object
observer.next(structuredClone(r._));
}),
);
Expand Down
33 changes: 33 additions & 0 deletions lib/agent/patch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Ref } from '../ref';
import type { Operation } from '../operation';
import { View } from '../view';
import type { Path } from '../path';

export function patch<S>(
r: Ref<S>,
changes: Operation<S, Path> | Array<Operation<S, Path>>,
) {
changes = Array.isArray(changes) ? changes : [changes];
changes.forEach((change) => {
const view = View.from(r, change.path);
switch (change.op) {
case 'create':
case 'update':
view._ = change.target as any;
break;
case 'delete':
view.delete();
break;
}
});
}

export type Patcher<S> = (
changes: Operation<S, Path> | Array<Operation<S, Path>>,
) => void;

export function Patcher<S>(s: S): Patcher<S> {
const r = Ref.of(s);
return (changes: Operation<S, Path> | Array<Operation<S, Path>>) =>
patch(r, changes);
}
11 changes: 7 additions & 4 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { StrictTarget } from '../target';
import { Target } from '../target';
import type { Action } from '../task';
import { observe } from './observe';
import { patch } from './patch';

import type { AgentOpts, Result } from './types';
import { Failure, NotStarted, Stopped, Timeout, UnknownError } from './types';
Expand Down Expand Up @@ -185,16 +186,18 @@ export class Runtime<TState> {
if (p in this.subscriptions) {
continue;
}
this.subscriptions[p] = sensor(this.stateRef, p).subscribe((s) => {
// There is no need to update the state reference as the sensor already
// modifies the state. We don't handle concurrency as we assume sensors
this.subscriptions[p] = sensor(p).subscribe((change) => {
// Patch the state
// We don't handle concurrency as we assume sensors
// do not conflict with each other (should we check?)
patch(this.stateRef, change);
if (this.opts.follow) {
// Trigger a re-plan to see if the state is still on target
this.start();
} else {
// Notify the observer of the new state
this.observer.next(s);
// TODO: we should notify changes instead of the full state
this.observer.next(structuredClone(this.stateRef._));
}
});
}
Expand Down
104 changes: 54 additions & 50 deletions lib/sensor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { expect } from '~/test-utils';
import { Sensor } from './sensor';
import { Ref } from './ref';
import { stub } from 'sinon';
import { setTimeout } from 'timers/promises';
import { Observable } from './observable';
Expand All @@ -13,23 +12,24 @@ describe('Sensor', () => {
yield 123;
});

const state = Ref.of(0);

// The sensor function should not be called before a subscriber is added
expect(read).to.not.have.been.called;

// Add a subscriber
const next = stub();
sensor(state).subscribe(next);
sensor().subscribe(next);

// We need to wait a bit so the async generator
// can yield a value
await setTimeout(10);

// Only now the sensor function should be called
expect(read).to.have.been.called;
expect(next).to.have.been.calledWith(123);
expect(state._).to.equal(123);
expect(next).to.have.been.calledWith({
op: 'update',
path: '/',
target: 123,
});
});

it('allows reporting on a value using lenses', async () => {
Expand All @@ -42,16 +42,21 @@ describe('Sensor', () => {
},
});

const state = Ref.of({ temperature: 0, on: false });

const next = stub();
sensor(state).subscribe(next);
sensor().subscribe(next);

await setTimeout(10);

expect(next).to.have.been.calledWith({ temperature: 20, on: false });
expect(next).to.have.been.calledWith({ temperature: 23, on: false });
expect(state._.temperature).to.equal(23);
expect(next).to.have.been.calledWith({
op: 'update',
path: '/temperature',
target: 20,
});
expect(next).to.have.been.calledWith({
op: 'update',
path: '/temperature',
target: 23,
});
});

it('allows reporting on a value using observable', async () => {
Expand All @@ -61,16 +66,21 @@ describe('Sensor', () => {
sensor: () => Observable.from([20, 23]),
});

const state = Ref.of({ temperature: 0, on: false });

const next = stub();
sensor(state).subscribe(next);
sensor().subscribe(next);

await setTimeout(10);

expect(next).to.have.been.calledWith({ temperature: 20, on: false });
expect(next).to.have.been.calledWith({ temperature: 23, on: false });
expect(state._.temperature).to.equal(23);
expect(next).to.have.been.calledWith({
op: 'update',
path: '/temperature',
target: 20,
});
expect(next).to.have.been.calledWith({
op: 'update',
path: '/temperature',
target: 23,
});
});

it('allows reporting on a value using observables and lenses', async () => {
Expand All @@ -87,35 +97,32 @@ describe('Sensor', () => {
.map(({ temp }) => temp),
});

const state: Ref<Heater> = Ref.of({
temperature: { office: 0, patio: 0 },
on: false,
});

const next = stub();
const nextOther = stub();
sensor(state, '/temperature/office').subscribe(next);
sensor(state, '/temperature/patio').subscribe(nextOther);
sensor('/temperature/office').subscribe(next);
sensor('/temperature/patio').subscribe(nextOther);

// A sensor for an uninitialized path should not throw
expect(() => sensor(state, '/temperature/bedroom')).to.not.throw;
expect(() => sensor('/temperature/bedroom')).to.not.throw;

await setTimeout(10);

expect(next.getCalls().length).to.equal(2);
expect(next).to.have.been.calledTwice;
expect(next).to.have.been.calledWith({
temperature: { office: 20, patio: 0 },
on: false,
op: 'update',
path: '/temperature/office',
target: 20,
});
expect(next).to.have.been.calledWith({
temperature: { office: 23, patio: 30 },
on: false,
op: 'update',
path: '/temperature/office',
target: 23,
});
expect(nextOther).to.have.been.calledOnceWith({
temperature: { office: 20, patio: 30 },
on: false,
op: 'update',
path: '/temperature/patio',
target: 30,
});
expect(state._.temperature.office).to.equal(23);
});

it('allows reporting on a value using lenses with args', async () => {
Expand All @@ -137,34 +144,31 @@ describe('Sensor', () => {
},
});

const state: Ref<Heater> = Ref.of({
temperature: { office: 0, patio: 0 },
on: false,
});

const next = stub();
const nextOther = stub();
sensor(state, '/temperature/office').subscribe(next);
sensor(state, '/temperature/patio').subscribe(nextOther);
sensor('/temperature/office').subscribe(next);
sensor('/temperature/patio').subscribe(nextOther);

// A sensor for an uninitialized path should throw
expect(() => sensor(state, '/temperature/bedroom')).to.throw;
expect(() => sensor('/temperature/bedroom')).to.throw;

await setTimeout(20);

expect(next.getCalls().length).to.equal(2);
expect(next).to.have.been.calledTwice;
expect(next).to.have.been.calledWith({
temperature: { office: 20, patio: 0 },
on: false,
op: 'update',
path: '/temperature/office',
target: 20,
});
expect(next).to.have.been.calledWith({
temperature: { office: 23, patio: 30 },
on: false,
op: 'update',
path: '/temperature/office',
target: 23,
});
expect(nextOther).to.have.been.calledOnceWith({
temperature: { office: 20, patio: 30 },
on: false,
op: 'update',
path: '/temperature/patio',
target: 30,
});
expect(state._.temperature.office).to.equal(23);
});
});
36 changes: 16 additions & 20 deletions lib/sensor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Lens } from './lens';
import type { PathType } from './path';
import { Path } from './path';
import type { Ref } from './ref';
import { View } from './view';
import type { LensArgs } from './lens';
import type { UpdateOperation } from './operation';

import type { Subscribable } from './observable';
import { Observable } from './observable';
Expand All @@ -12,13 +11,17 @@ import { Observable } from './observable';
* A Sensor function for type T is a function that returns a generator
* that yields values of type T
*/
export type SensorFn<T, P extends PathType = '/'> = (
type SensorFn<T, P extends PathType = '/'> = (
args: LensArgs<T, P>,
) =>
| AsyncGenerator<Lens<T, P>, never | void | Lens<T, P>, void>
| Generator<Lens<T, P>, never | void, void | undefined>
| Subscribable<Lens<T, P>>;

type SensorOutput<T, P extends PathType = '/'> = Subscribable<
UpdateOperation<T, P>
>;

/**
* A sensor receives a reference to a global state and
* returns a subscribable that allows to observe changes
Expand All @@ -27,11 +30,11 @@ export type SensorFn<T, P extends PathType = '/'> = (
export type Sensor<T, P extends PathType = '/'> =
unknown extends Lens<T, P>
? // Default to the version with path if the lens cannot be resolved
{ (s: Ref<T>, path: PathType): Subscribable<T>; lens: Path<P> }
{ (path: PathType): SensorOutput<T, P>; lens: Path<P> }
: // Otherwise add a path if lens arguments are not empty
LensArgs<T, P> extends Record<string, never>
? { (s: Ref<T>): Subscribable<T>; lens: Path<P> }
: { (s: Ref<T>, path: PathType): Subscribable<T>; lens: Path<P> };
? { (): SensorOutput<T, P>; lens: Path<P> }
: { (path: PathType): SensorOutput<T, P>; lens: Path<P> };

/**
* The sensor constructor properties
Expand Down Expand Up @@ -70,25 +73,18 @@ function from<TState, TPath extends PathType = '/'>(
} = typeof input === 'function' ? { sensor: input } : input;
const lensPath = Path.from(lens);
return Object.assign(
function (s: Ref<TState>, path: PathType = lens) {
function (path: PathType = lens) {
const refPath = Path.from(path);
const args = Lens.args(lensPath, refPath) as LensArgs<TState, TPath>;
const view = View.from(s, refPath);

return Observable.from(sensor(args)).map((value) => {
// For each value emmited by the sensor
// we update the view and return the updated state
// to the subscriber
view._ = value;

// We need to return a copy of the state here, otherwise
// subscribers would be able to change the behavior of the
// agent or other subscribers
return structuredClone(s._);
});
return Observable.from(sensor(args)).map((target) => ({
op: 'update',
path,
target,
}));
},
{ lens: lensPath },
) as Sensor<TState, TPath>;
);
}

/**
Expand Down

0 comments on commit 1861702

Please sign in to comment.