Skip to content

Commit

Permalink
Handle Falcor style observables from unhandled path datasource.
Browse files Browse the repository at this point in the history
  • Loading branch information
lrowe committed Jun 1, 2017
1 parent ca2c06a commit 07ce5ff
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 18 deletions.
6 changes: 4 additions & 2 deletions src/router/call.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var collapse = pathUtils.collapse;
var Observable = require('../RouterRx.js').Observable;
var MaxPathsExceededError = require('../errors/MaxPathsExceededError');
var getPathsCount = require('./getPathsCount');
var outputToObservable = require('../run/conversion/outputToObservable');
var rxNewToRxNewAndOld = require('../run/conversion/rxNewToRxNewAndOld');

/**
Expand Down Expand Up @@ -88,8 +89,9 @@ module.exports = function routerCall(callPath, args,
// we will try the next dataSource in the line.
catch(function catchException(e) {
if (e instanceof CallNotFoundError && router._unhandled) {
return router._unhandled.
call(callPath, args, refPaths, thisPaths);
return outputToObservable(
router._unhandled.
call(callPath, args, refPaths, thisPaths));
}
throw e;
});
Expand Down
5 changes: 3 additions & 2 deletions src/router/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var Observable = require('../RouterRx.js').Observable;
var mCGRI = require('./../run/mergeCacheAndGatherRefsAndInvalidations');
var MaxPathsExceededError = require('../errors/MaxPathsExceededError');
var getPathsCount = require('./getPathsCount');
var outputToObservable = require('../run/conversion/outputToObservable');
var rxNewToRxNewAndOld = require('../run/conversion/rxNewToRxNewAndOld');

/**
Expand Down Expand Up @@ -56,8 +57,8 @@ module.exports = function routerGet(paths) {
// The 3rd argument is the beginning of the actions
// arguments, which for get is the same as the
// unhandledPaths.
return router._unhandled.
get(unhandledPaths).
return outputToObservable(
router._unhandled.get(unhandledPaths)).

// Merge the solution back into the overall message.
map(function(jsonGraphFragment) {
Expand Down
5 changes: 3 additions & 2 deletions src/router/set.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var collapse = pathUtils.collapse;
var mCGRI = require('./../run/mergeCacheAndGatherRefsAndInvalidations');
var MaxPathsExceededError = require('../errors/MaxPathsExceededError');
var getPathsCount = require('./getPathsCount');
var outputToObservable = require('../run/conversion/outputToObservable');
var rxNewToRxNewAndOld = require('../run/conversion/rxNewToRxNewAndOld');

/**
Expand Down Expand Up @@ -141,8 +142,8 @@ module.exports = function routerSet(jsonGraph) {
return pV.path;
}));

return router._unhandled.
set(jsonGraphEnvelope).
return outputToObservable(
router._unhandled.set(jsonGraphEnvelope)).

// Merge the solution back into the overall message.
map(function(unhandledJsonGraphEnv) {
Expand Down
78 changes: 78 additions & 0 deletions test/FalcorObservable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"use strict";

function noop() {
// do nothing
}

var disposeNoop = { dispose: noop };

function disposable(dispose) {
if (typeof dispose === "function") {
return { dispose: dispose };
}
if (typeof dispose === "undefined" || dispose === null) {
return disposeNoop;
}
return dispose;
}

function functionsObserver(onNext, onError, onCompleted) {
return {
onNext: typeof onNext === "function" ? onNext : noop,
onError: typeof onError === "function" ? onError : noop,
onCompleted: typeof onCompleted === "function" ? onCompleted : noop
};
}

function partialObserver(partial) {
return {
onNext: typeof partial.onNext === "function"
? partial.onNext.bind(partial)
: noop,
onError: function onError(e) {
if (typeof partial.onError === "function") {
partial.onError(e);
}
},
onCompleted: function onCompleted() {
if (typeof partial.onCompleted === "function") {
partial.onCompleted();
}
}
};
}

function observer(observerOrOnNext, onError, onCompleted) {
return typeof observerOrOnNext === "object" && observerOrOnNext !== null
? partialObserver(observerOrOnNext)
: functionsObserver(observerOrOnNext, onError, onCompleted);
}

function create(subscribe) {
return {
subscribe: function(observerOrOnNext, onError, onCompleted) {
return disposable(
subscribe(observer(observerOrOnNext, onError, onCompleted))
);
}
};
}

function empty() {
return create(function(obs) {
obs.onCompleted();
});
}

function of(value) {
return create(function(obs) {
obs.onNext(value);
obs.onCompleted();
});
}

module.exports = {
create: create,
empty: empty,
of: of
};
6 changes: 3 additions & 3 deletions test/unit/functional/unhandled.call.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ var noOp = function() {};
var chai = require('chai');
var expect = chai.expect;
var sinon = require('sinon');
var Observable = require('../../../src/RouterRx').Observable;
var FalcorObservable = require('../../FalcorObservable');

describe('#call', function() {
it('should ensure a missing function gets chained.', function(done) {
var router = new R([]);
var onCall = sinon.spy(function() {
return Observable.of({
return FalcorObservable.of({
jsonGraph: {
videos: {
summary: 5
Expand Down Expand Up @@ -45,7 +45,7 @@ describe('#call', function() {
it('should ensure a missing function gets chained and will not materialize properly.', function(done) {
var router = new R([]);
var onCall = sinon.spy(function() {
return Observable.of({
return FalcorObservable.of({
jsonGraph: { },
paths: [
['videos', 'summary']
Expand Down
8 changes: 4 additions & 4 deletions test/unit/functional/unhandled.get.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ var chai = require('chai');
var expect = chai.expect;
var sinon = require('sinon');
var pathValueMerge = require('./../../../src/cache/pathValueMerge');
var Observable = require('../../../src/RouterRx').Observable;
var FalcorObservable = require('../../FalcorObservable');
var $atom = require('./../../../src/support/types').$atom;

describe('#get', function() {
it('should return an empty Observable and just materialize values.', function(done) {
var router = new R([]);
var onUnhandledPaths = sinon.spy(function convert(paths) {
return Observable.empty();
return FalcorObservable.empty();
});
router.routeUnhandledPathsTo({get: onUnhandledPaths});

Expand Down Expand Up @@ -45,7 +45,7 @@ describe('#get', function() {
});
return jsonGraph;
}, {jsonGraph: {}});
return Observable.of(returnValue);
return FalcorObservable.of(returnValue);
});
router.routeUnhandledPathsTo({get: onUnhandledPaths});

Expand Down Expand Up @@ -88,7 +88,7 @@ describe('#get', function() {
});
return jsonGraph;
}, {jsonGraph: {}});
return Observable.of(returnValue);
return FalcorObservable.of(returnValue);
});
router.routeUnhandledPathsTo({get: onUnhandledPaths});

Expand Down
10 changes: 5 additions & 5 deletions test/unit/functional/unhandled.set.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ var chai = require('chai');
var expect = chai.expect;
var sinon = require('sinon');
var pathValueMerge = require('./../../../src/cache/pathValueMerge');
var Observable = require('../../../src/RouterRx').Observable;
var FalcorObservable = require('../../FalcorObservable');
var $atom = require('./../../../src/support/types').$atom;
var $ref = require('./../../../src/support/types').$ref;

describe('#set', function() {
it('should return an empty Observable and just materialize values.', function(done) {
var router = new R([]);
var onUnhandledPaths = sinon.spy(function convert(paths) {
return Observable.empty();
return FalcorObservable.empty();
});
router.routeUnhandledPathsTo({set: onUnhandledPaths});

Expand Down Expand Up @@ -63,7 +63,7 @@ describe('#set', function() {
return jsonGraph;
}, {jsonGraph: {}});

return Observable.of(returnValue);
return FalcorObservable.of(returnValue);
});
router.routeUnhandledPathsTo({set: onUnhandledPaths});

Expand Down Expand Up @@ -122,7 +122,7 @@ describe('#set', function() {
});
return jsonGraph;
}, {jsonGraph: {}});
return Observable.of(returnValue);
return FalcorObservable.of(returnValue);
});
router.routeUnhandledPathsTo({set: onUnhandledPaths});

Expand Down Expand Up @@ -202,7 +202,7 @@ describe('#set', function() {
path: unicorn,
value: 'missing'
});
return Observable.of(next);
return FalcorObservable.of(next);
});
router.routeUnhandledPathsTo({set: onUnhandledPaths});

Expand Down

0 comments on commit 07ce5ff

Please sign in to comment.