Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified proposal #25

Open
dead-claudia opened this issue Sep 22, 2019 · 0 comments
Open

Simplified proposal #25

dead-claudia opened this issue Sep 22, 2019 · 0 comments

Comments

@dead-claudia
Copy link

I was thinking the core of this could be simplified quite a bit, with some redundant bits removed and/or moved to helper functions:

  • emitter = new Emitter(t => ({emit, resolve, reject})) - Create an emitter with an initializer callback.
    • Each returned hook accepts values received from that method. It can choose to either delegate them, ignore them, or whatever.
    • By default, missing hooks delegate, and if the return value isn't an object, only default hooks are used.
  • emitter = new Emitter() - Create a simple emitter.
    • This is just sugar for new Emitter(() => {}).
  • t.emit(value) - Emit a value from this emitter.
  • t.resolve(value) - Resolve the emitter.
  • t.reject(value) - Reject the emitter.
  • emitter.emit(value) - Send an emit to this emitter.
  • emitter.resolve(value) - Send a stream resolution and stop sending values to emitter hooks.
  • emitter.reject(value) - Reject the emitter and stop sending values to emitter hooks.
  • emitter.link({emit, resolve, reject}) - Lets you listen for values emitted, resolving when the emitter resolves and rejecting when it rejects. Returns the argument for convenience with the constructor.
  • Instead of emitter.run, you could just use the pipeline operator

Here's how some of the operators would end up implemented, assuming the F# style was selected for the pipeline operator proposal:

function awaitEnd() {
	return emitter => new Promise((resolve, reject) => {
		emitter.link({resolve, reject})
	})
}

function map(fn) {
	return emitter => new Emitter(t => emitter.link({
		emit(v) {
			try { v = fn(v) } catch (e) { t.reject(e); return }
			t.emit(v)
		},
		resolve(v) { t.resolve(v) },
		reject(v) { t.reject(v) },
	}))
}

function filter(fn) {
	return emitter => new Emitter(t => emitter.link({
		emit(v) {
			try { if (!fn(v)) return } catch (e) { t.reject(e); return }
			t.emit(v)
		},
		resolve(v) { t.resolve(v) },
		reject(v) { t.reject(v) },
	}))
}

function flat() {
	return emitter => new Emitter(t => {
		let active = new Set([emitter])
		let resolutionValue
		return emitter.link({
			emit(child) {
				active.add(child)
				child.link({
					emit(v) { t.emit(v) },
					resolve(v) {
						const open = active
						if (open == null) return
						open.delete(child)
						if (!open.size) {
							active = null
							t.resolve(resolutionValue)
						}
					},
					reject(v) {
						const open = active
						active = resolutionValue = null
						if (open != null) {
							open.delete(child)
							t.reject(v)
							for (const c of open) c.resolve()
						}
					},
				})
			},
			resolve(v) {
				const open = active
				if (open == null) return
				resolutionValue = v
				open.delete(child)
				if (!open.size) {
					active = null
					t.resolve(resolutionValue)
				}
			},
			reject(v) {
				const open = active
				active = resolutionValue = null
				if (open != null) {
					open.delete(child)
					t.reject(v)
					for (const c of open) c.resolve()
				}
			},
		})
	})
}

Composition would simply be function composition in this case.

By design, all emitters would be hot. This works like event emitters and subscriptions, but not observables. If an emitter wants to know when it's resolved, it can just invoke emitter.then on the return value.

Here's the things I removed:

  • Variadic each - just use .each repeatedly. Same effect
  • The second and third parameters to each callbacks - this aligns with iterators that lack this
  • Inheritance from Promise. It implements multicast internally and synchronously propagates errors and completion. Async closure does pose certain usability issues with observables, BTW, and I'd like that to not be forgotten.

A polyfill would be relatively straightforward:

// private
const noError = {}
class EmitterProxy {
	constructor(emitter) {
		this._ = emitter
		// A set is used so it keeps its place if a callback is removed
		// during iteration.
		this._subs = new Set()
		this._hooks = null
	}

	send(value) {
		if (this._canEmit()) {
			for (const sub of this._data._subs) {
				if (!this._canEmit()) break
				sub.send(value)
			}
		}
	}

	resolve(value) {
		this._close("resolved", "resolve")
	}

	reject(value) {
		this._close("rejected", "reject")
	}

	_canEmit() {
		return this._ != null
	}

	_close(state, method) {
		if (this._canEmit()) {
			const subs = this._subs
			const hooks = this._hooks
			const parent = this._parent
			this._._state = state
			this._._data = value
			this._ = this._subs = this._hooks = null
			let error = noError
			for (const sub of subs) {
				try { sub[method](value) } catch (e) { error = e }
			}
			if (error !== noError) throw error
		}
	}
}

// public
class Emitter {
	constructor(init) {
		this._state = "open"
		this._data = new EmitterProxy(this)
		if (init != null) {
			try {
				const result = init(this._data)
				if (this._state === "open" && result != null && (
					typeof result === "object" ||
					typeof result === "function"
				)) {
					this._data._hooks = result
				}
			} catch (e) {
				if (this._state === "open") this._data.reject(e)
			}
		}
	}

	send(value) {
		if (this._state === "open") {
			const hooks = this._data._hooks
			if (hooks != null && typeof hooks.emit === "function") {
				hooks.emit(value)
			} else {
				this._data.send(value)
			}
		}
	}

	resolve(value) {
		if (this._state === "open") {
			const hooks = this._data._hooks
			this._state = "half closed"
			this._data._hooks = null
			if (hooks != null && typeof hooks.resolve === "function") {
				hooks.resolve(value)
			} else {
				this._data.resolve(value)
			}
		}
	}

	reject(value) {
		if (this._state === "open") {
			const hooks = this._data._hooks
			this._state = "half closed"
			this._data._hooks = null
			if (hooks != null && typeof hooks.reject === "function") {
				hooks.reject(value)
			} else {
				this._data.reject(value)
			}
		}
	}

	link(sub) {
		switch (this._state) {
		case "open": case "half closed": this._data._subs.add(sub); break
		case "resolved": sub.resolve(this._data); break
		case "rejected": sub.reject(this._data); break
		}
		return sub
	}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant