Skip to content

Commit

Permalink
Merge pull request #10494 from emberjs/lazy-stream
Browse files Browse the repository at this point in the history
Make it easier to write lazy streams
  • Loading branch information
mmun committed Feb 20, 2015
2 parents baa2c22 + 01a0198 commit 8714aba
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 105 deletions.
16 changes: 14 additions & 2 deletions packages/ember-htmlbars/tests/helpers/collection_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,27 @@ QUnit.test("should unsubscribe stream bindings", function() {

var barStreamBinding = view._streamBindings['view.baz'];

equal(barStreamBinding.subscribers.length, 3*2, "adds 3 subscribers");
equal(countSubscribers(barStreamBinding), 3, "adds 3 subscribers");

run(function() {
view.get('content').popObject();
});

equal(barStreamBinding.subscribers.length, 2*2, "removes 1 subscriber");
equal(countSubscribers(barStreamBinding), 2, "removes 1 subscriber");
});

function countSubscribers(stream) {
var count = 0;
var subscriber = stream.subscriberHead;

while (subscriber) {
count++;
subscriber = subscriber.next;
}

return count;
}

QUnit.test("should work inside a bound {{#if}}", function() {
var testData = A([EmberObject.create({ isBaz: false }), EmberObject.create({ isBaz: true }), EmberObject.create({ isBaz: true })]);
var IfTestCollectionView = CollectionView.extend({
Expand Down
21 changes: 2 additions & 19 deletions packages/ember-metal/lib/streams/simple.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import { read, isStream } from "ember-metal/streams/utils";
function SimpleStream(source) {
this.init();
this.source = source;

if (isStream(source)) {
source.subscribe(this._didChange, this);
}
this.dependency = this.addDependency(this.source);
}

SimpleStream.prototype = create(Stream.prototype);
Expand All @@ -30,30 +27,16 @@ merge(SimpleStream.prototype, {
setSource: function(nextSource) {
var prevSource = this.source;
if (nextSource !== prevSource) {
if (isStream(prevSource)) {
prevSource.unsubscribe(this._didChange, this);
}

if (isStream(nextSource)) {
nextSource.subscribe(this._didChange, this);
}

this.dependency.replace(nextSource);
this.source = nextSource;
this.notify();
}
},

_didChange: function() {
this.notify();
},

_super$destroy: Stream.prototype.destroy,

destroy: function() {
if (this._super$destroy()) {
if (isStream(this.source)) {
this.source.unsubscribe(this._didChange, this);
}
this.source = undefined;
return true;
}
Expand Down
238 changes: 208 additions & 30 deletions packages/ember-metal/lib/streams/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,83 @@ import {
@module ember-metal
*/

function Subscriber(callback, context) {
this.next = null;
this.prev = null;
this.callback = callback;
this.context = context;
}

Subscriber.prototype.removeFrom = function(stream) {
var next = this.next;
var prev = this.prev;

if (prev) {
prev.next = next;
} else {
stream.subscriberHead = next;
}

if (next) {
next.prev = prev;
} else {
stream.subscriberTail = prev;
}

stream.maybeDeactivate();
};

function Dependency(dependent, stream, callback, context) {
this.next = null;
this.prev = null;
this.dependent = dependent;
this.stream = stream;
this.callback = callback;
this.context = context;
this.unsubscription = null;
}

Dependency.prototype.subscribe = function() {
this.unsubscribe = this.stream.subscribe(this.callback, this.context);
};

Dependency.prototype.unsubscribe = function() {
this.unsubscription();
this.unsubscription = null;
};

Dependency.prototype.removeFrom = function(stream) {
var next = this.next;
var prev = this.prev;

if (prev) {
prev.next = next;
} else {
stream.dependencyHead = next;
}

if (next) {
next.prev = prev;
} else {
stream.dependencyTail = prev;
}

if (this.unsubscription) {
this.unsubscribe();
}
};

Dependency.prototype.replace = function(stream, callback, context) {
this.stream = stream;
this.callback = callback;
this.context = context;

if (this.unsubscription) {
this.unsubscribe();
this.subscribe();
}
};

/**
@public
@class Stream
Expand All @@ -25,9 +102,15 @@ Stream.prototype = {
init: function() {
this.state = 'dirty';
this.cache = undefined;
this.subscribers = undefined;
this.subscriberHead = null;
this.subscriberTail = null;
this.dependencyHead = null;
this.dependencyTail = null;
this.dependency = null;
this.children = undefined;
this._label = undefined;
this.isActive = false;
this.gotValueWhileInactive = false;
},

get: function(path) {
Expand All @@ -53,18 +136,99 @@ Stream.prototype = {
},

value: function() {
if (!this.isActive) {
this.gotValueWhileInactive = true;
this.revalidate();
return this.valueFn();
}

if (this.state === 'clean') {
return this.cache;
} else if (this.state === 'dirty') {
this.revalidate();
var value = this.valueFn();
this.state = 'clean';
return this.cache = this.valueFn();
this.cache = value;
return value;
}
// TODO: Ensure value is never called on a destroyed stream
// so that we can uncomment this assertion.
//
// Ember.assert("Stream error: value was called in an invalid state: " + this.state);
},

addDependency: function(stream, callback, context) {
if (!stream || !stream.isStream) {
return;
}

if (callback === undefined) {
callback = this.notify;
context = this;
}

var dependency = new Dependency(this, stream, callback, context);

if (this.isActive) {
dependency.subscribe();
}

if (this.dependencyHead === null) {
this.dependencyHead = this.dependencyTail = dependency;
} else {
var tail = this.dependencyTail;
tail.next = dependency;
dependency.prev = tail;
this.dependencyTail = dependency;
}

return dependency;
},

subscribeDependencies: function() {
var dependency = this.dependencyHead;
while (dependency) {
var next = dependency.next;
dependency.subscribe();
dependency = next;
}
},

unsubscribeDependencies: function() {
var dependency = this.dependencyHead;
while (dependency) {
var next = dependency.next;
dependency.unsubscribe();
dependency = next;
}
},

becameActive: function() {},
becameInactive: function() {},

// This method is invoked when the value function is called and when
// a stream becomes active. This allows changes to be made to a stream's
// input, and only do any work in response if the stream has subscribers
// or if someone actually gets the stream's value.
revalidate: function() {},

maybeActivate: function() {
if (this.subscriberHead && !this.isActive) {
this.isActive = true;
this.subscribeDependencies();
this.revalidate();
this.becameActive();
}
},

maybeDeactivate: function() {
if (!this.subscriberHead && this.isActive) {
this.isActive = false;
this.unsubscribeDependencies();
this.becameInactive();
}
},

valueFn: function() {
throw new Error("Stream error: valueFn not implemented");
},
Expand All @@ -78,50 +242,60 @@ Stream.prototype = {
},

notifyExcept: function(callbackToSkip, contextToSkip) {
if (this.state === 'clean') {
if (this.state === 'clean' || this.gotValueWhileInactive) {
this.gotValueWhileInactive = false;
this.state = 'dirty';
this._notifySubscribers(callbackToSkip, contextToSkip);
}
},

subscribe: function(callback, context) {
if (this.subscribers === undefined) {
this.subscribers = [callback, context];
var subscriber = new Subscriber(callback, context, this);
if (this.subscriberHead === null) {
this.subscriberHead = this.subscriberTail = subscriber;
this.maybeActivate();
} else {
this.subscribers.push(callback, context);
var tail = this.subscriberTail;
tail.next = subscriber;
subscriber.prev = tail;
this.subscriberTail = subscriber;
}

var stream = this;
return function() { subscriber.removeFrom(stream); };
},

unsubscribe: function(callback, context) {
var subscribers = this.subscribers;

if (subscribers !== undefined) {
for (var i = 0, l = subscribers.length; i < l; i += 2) {
if (subscribers[i] === callback && subscribers[i+1] === context) {
subscribers.splice(i, 2);
return;
}
var subscriber = this.subscriberHead;

while (subscriber) {
var next = subscriber.next;
if (subscriber.callback === callback && subscriber.context === context) {
subscriber.removeFrom(this);
}
subscriber = next;
}
},

_notifySubscribers: function(callbackToSkip, contextToSkip) {
var subscribers = this.subscribers;

if (subscribers !== undefined) {
for (var i = 0, l = subscribers.length; i < l; i += 2) {
var callback = subscribers[i];
var context = subscribers[i+1];

if (callback === callbackToSkip && context === contextToSkip) {
continue;
}

if (context === undefined) {
callback(this);
} else {
callback.call(context, this);
}
var subscriber = this.subscriberHead;

while (subscriber) {
var next = subscriber.next;

var callback = subscriber.callback;
var context = subscriber.context;

subscriber = next;

if (callback === callbackToSkip && context === contextToSkip) {
continue;
}

if (context === undefined) {
callback(this);
} else {
callback.call(context, this);
}
}
},
Expand All @@ -135,6 +309,10 @@ Stream.prototype = {
children[key].destroy();
}

this.subscriberHead = this.subscriberTail = null;
this.maybeDeactivate();
this.dependencies = null;

return true;
}
},
Expand Down
Loading

0 comments on commit 8714aba

Please sign in to comment.