From 9b7354795dcd838260c87d5df2fdf6c68b42a6d7 Mon Sep 17 00:00:00 2001 From: Spencer Alger Date: Sat, 22 Feb 2014 12:09:44 -0700 Subject: [PATCH 1/4] updates to the courier to properly support doc based fetch and related improvements --- src/courier/courier.js | 50 +++--- src/courier/data_source/data_source.js | 49 ++++++ src/courier/data_source/doc.js | 40 +++-- src/courier/errors.js | 75 +++++--- src/courier/test_directives.js | 73 ++++++++ src/courier/tests/config.html | 1 + src/courier/{test.html => tests/index.html} | 0 src/index.html | 2 +- src/kibana/constants/base.js | 15 ++ src/kibana/controllers/kibana.js | 80 ++------- src/kibana/main.js | 105 +++++------ src/kibana/partials/index.html | 4 + src/kibana/require.config.js | 1 + src/kibana/services/config.js | 186 +++++++++++++------- src/kibana/services/courier.js | 7 +- src/kibana/services/es.js | 18 +- src/kibana/setup.js | 101 +++++++++++ src/kibana/utils/async_modules.js | 57 ++++++ src/kibana/utils/next_tick.js | 33 ++++ 19 files changed, 639 insertions(+), 258 deletions(-) create mode 100644 src/courier/test_directives.js create mode 100644 src/courier/tests/config.html rename src/courier/{test.html => tests/index.html} (100%) create mode 100644 src/kibana/constants/base.js create mode 100644 src/kibana/partials/index.html create mode 100644 src/kibana/setup.js create mode 100644 src/kibana/utils/async_modules.js create mode 100644 src/kibana/utils/next_tick.js diff --git a/src/courier/courier.js b/src/courier/courier.js index d9c5042083675..f10b61d7ffa6b 100644 --- a/src/courier/courier.js +++ b/src/courier/courier.js @@ -9,6 +9,7 @@ define(function (require) { var DocSource = require('courier/data_source/doc'); var SearchSource = require('courier/data_source/search'); var HastyRefresh = require('courier/errors').HastyRefresh; + var nextTick = require('utils/next_tick'); // map constructors to type keywords var sourceTypes = { @@ -29,6 +30,7 @@ define(function (require) { courier._refs.search, function (err) { if (err) return courier._error(err); + courier._activeSearchRequest = null; }); }, @@ -36,10 +38,7 @@ define(function (require) { // then fetch the onces that are not doc: function (courier) { DocSource.validate(courier, courier._refs.doc, function (err, invalid) { - if (err) { - courier.stop(); - return courier.emit('error', err); - } + if (err) return courier._error(err); // if all of the docs are up to date we don't need to do anything else if (invalid.length === 0) return; @@ -54,7 +53,7 @@ define(function (require) { // default config values var defaults = { fetchInterval: 30000, - docInterval: 2500 + docInterval: 1500 }; /** @@ -63,12 +62,13 @@ define(function (require) { * search: * - inherits filters, and other query properties * - automatically emit results on a set interval + * * doc: * - tracks doc versions * - emits same results event when the doc is updated * - helps seperate versions of kibana running on the same machine stay in sync - * - (NI) tracks version and uses it when new versions of a doc are reindexed - * - (NI) helps deal with conflicts + * - tracks version and uses it to verify that updates are safe to make + * - emits conflict event when that happens * * @param {object} config * @param {Client} config.client - The elasticsearch.js client to use for querying. Should be @@ -115,7 +115,7 @@ define(function (require) { // store a quick "bound" method for triggering this._onInterval[type] = function () { - if (courier._refs[type].length) onFetch[type](courier); + courier.fetch(type); courier._schedule(type); }; @@ -145,7 +145,7 @@ define(function (require) { // is the courier currently running? Courier.prototype.running = function () { - return !!this._fetchTimer; + return !!_.size(this._timer); }; // stop the courier from fetching more results @@ -164,11 +164,18 @@ define(function (require) { }, this); }; - // force a fetch of all datasources right now - Courier.prototype.fetch = function () { - _.forOwn(onFetch, function (fn, type) { - if (this._refs[type].length) fn(this); - }, this); + // force a fetch of all datasources right now, optionally filter by type + Courier.prototype.fetch = function (onlyType) { + var courier = this; + nextTick(function () { + _.forOwn(onFetch, function (fn, type) { + if (onlyType && onlyType !== type) return; + if (courier._refs[type].length) fn(courier); + courier._refs[type].forEach(function (ref) { + ref.fetchCount ++; + }); + }); + }); }; // data source factory @@ -181,6 +188,7 @@ define(function (require) { return new Constructor(this, initialState); }; + /***** * PRIVATE API *****/ @@ -206,7 +214,8 @@ define(function (require) { var refs = this._refs[source._getType()]; if (!_.find(refs, { source: source })) { refs.push({ - source: source + source: source, + fetchCount: 0 }); } }; @@ -240,18 +249,15 @@ define(function (require) { _.each(this._refs.doc, function (ref) { var state = ref.source._state; if ( - state === updated - || ( - state.id === updated.id - && state.type === updated.type - && state.index === updated.index - ) + state.id === updated.id + && state.type === updated.type + && state.index === updated.index ) { delete ref.version; } }); - onFetch.doc(this); + this.fetch('doc'); }; return Courier; diff --git a/src/courier/data_source/data_source.js b/src/courier/data_source/data_source.js index 0a773abae271a..2d1bac072acf0 100644 --- a/src/courier/data_source/data_source.js +++ b/src/courier/data_source/data_source.js @@ -45,6 +45,11 @@ define(function (require) { return courier.createSource(this._getType()).inherits(this); }; + this.courier = function (newCourier) { + courier = this._courier = newCourier; + return this; + }; + // get/set internal state values this._methods.forEach(function (name) { this[name] = function (val) { @@ -96,6 +101,50 @@ define(function (require) { return JSON.stringify(this.toJSON()); }; + /** + * Set the $scope for a datasource, when a datasource is bound + * to a scope, it's event listeners will be wrapped in a call to that + * scope's $apply method (safely). + * + * This also binds the DataSource to the lifetime of the scope: when the scope + * is destroyed, the datasource is closed + * + * @param {AngularScope} $scope - the scope where the event emitter "occurs", + * helps angular determine where to start checking for changes + * @return {this} - chainable + */ + DataSource.prototype.$scope = function ($scope) { + var emitter = this; + + if (emitter._emitter$scope) { + emitter._emitter$scope = $scope; + return this; + } + + emitter._emitter$scope = $scope; + var origOn = emitter.on; + + emitter.on = function (event, listener) { + var wrapped = function () { + var args = arguments; + // always use the stored ref so that it can be updated if needed + var $scope = emitter._emitter$scope; + $scope[$scope.$$phase ? '$eval' : '$apply'](function () { + listener.apply(emitter, args); + }); + }; + wrapped.listener = listener; + return origOn.call(emitter, event, wrapped); + }; + + emitter.on.restore = function () { + delete emitter._emitter$scope; + emitter.on = origOn; + }; + + return this; + }; + /***** * PRIVATE API *****/ diff --git a/src/courier/data_source/doc.js b/src/courier/data_source/doc.js index aff95842f0856..49ff83067a841 100644 --- a/src/courier/data_source/doc.js +++ b/src/courier/data_source/doc.js @@ -1,6 +1,7 @@ define(function (require) { var DataSource = require('courier/data_source/data_source'); var inherits = require('utils/inherits'); + var nextTick = require('utils/next_tick'); var errors = require('courier/errors'); var listenerCount = require('utils/event_emitter').listenerCount; var _ = require('lodash'); @@ -23,7 +24,7 @@ define(function (require) { DocSource.fetch = function (courier, refs, cb) { var client = courier._getClient(); var allRefs = []; - var body = { + var getBody = { docs: [] }; @@ -32,10 +33,10 @@ define(function (require) { if (source._getType() !== 'doc') return; allRefs.push(ref); - body.docs.push(source._flatten()); + getBody.docs.push(source._flatten()); }); - return client.mget({ body: body }, function (err, resp) { + return client.mget({ body: getBody }, function (err, resp) { if (err) return cb(err); _.each(resp.docs, function (resp, i) { @@ -43,9 +44,14 @@ define(function (require) { var source = ref.source; if (resp.error) return source._error(new errors.DocFetchFailure(resp)); - if (ref.version === resp._version) return; // no change - ref.version = resp._version; - source._storeVersion(resp._version); + if (resp.found) { + if (ref.version === resp._version) return; // no change + ref.version = resp._version; + source._storeVersion(resp._version); + } else { + ref.version = void 0; + source._clearVersion(); + } source.emit('results', resp); }); @@ -63,9 +69,10 @@ define(function (require) { DocSource.validate = function (courier, refs, cb) { var invalid = _.filter(refs, function (ref) { var storedVersion = ref.source._getVersion(); - if (ref.version !== storedVersion) return true; + /* jshint eqeqeq: false */ + return (!ref.fetchCount || ref.version != storedVersion); }); - setTimeout(function () { + nextTick(function () { cb(void 0, invalid); }); }; @@ -102,6 +109,7 @@ define(function (require) { id: state.id, type: state.type, index: state.index, + version: source._getVersion(), body: { doc: fields } @@ -129,7 +137,6 @@ define(function (require) { id: state.id, type: state.type, index: state.index, - version: source._getVersion(), body: body, ignore: [409] }, function (err, resp) { @@ -201,8 +208,8 @@ define(function (require) { * @return {number} - the version number, or NaN */ DocSource.prototype._getVersion = function () { - var id = this._versionKey(); - return _.parseInt(localStorage.getItem(id)); + var v = localStorage.getItem(this._versionKey()); + return v ? _.parseInt(v) : void 0; }; /** @@ -212,8 +219,17 @@ define(function (require) { */ DocSource.prototype._storeVersion = function (version) { var id = this._versionKey(); - localStorage.setItem(id, version); + if (version) { + localStorage.setItem(id, version); + } else { + localStorage.removeItem(id); + } }; + /** + * Clears the stored version for a DocSource + */ + DocSource.prototype._clearVersion = DocSource.prototype._storeVersion; + return DocSource; }); \ No newline at end of file diff --git a/src/courier/errors.js b/src/courier/errors.js index e379ccbdff785..f565f389e2f92 100644 --- a/src/courier/errors.js +++ b/src/courier/errors.js @@ -1,37 +1,66 @@ define(function (require) { var listenerCount = require('utils/event_emitter').listenerCount; + var _ = require('lodash'); var errors = {}; + var inherits = require('utils/inherits'); - // caused by a refresh attempting to start before the prevous is done - function HastyRefresh() { - this.name = 'HastyRefresh'; - this.message = 'Courier attempted to start a query before the previous had finished.'; + var canStack = (function () { + var err = new Error(); + return !!err.stack; + }()); + + // abstract error class + function CourierError(msg, constructor) { + this.message = msg; + + Error.call(this, this.message); + if (Error.captureStackTrace) { + Error.captureStackTrace(this, constructor || CourierError); + } else if (canStack) { + this.stack = (new Error()).stack; + } else { + this.stack = ''; + } } - HastyRefresh.prototype = new Error(); - HastyRefresh.prototype.constructor = HastyRefresh; - errors.HastyRefresh = HastyRefresh; + errors.CourierError = CourierError; + inherits(CourierError, Error); + + /** + * HastyRefresh error class + * @param {String} [msg] - An error message that will probably end up in a log. + */ + errors.HastyRefresh = function HastyRefresh() { + CourierError.call(this, + 'Courier attempted to start a query before the previous had finished.', + errors.HastyRefresh); + }; + inherits(errors.HastyRefresh, CourierError); + /** + * DocFetchFailure Error - where there is an error getting a doc + * @param {String} [msg] - An error message that will probably end up in a log. + */ + errors.DocFetchFailure = function DocFetchFailure(resp) { + CourierError.call(this, + 'Failed to get the doc: ' + JSON.stringify(resp), + errors.DocFetchFailure); - // where there is an error getting a doc - function DocFetchFailure(resp) { - this.name = 'DocFetchFailure'; this.resp = resp; - this.message = 'Failed to get the doc: ' + JSON.stringify(resp); - } - DocFetchFailure.prototype = new Error(); - DocFetchFailure.prototype.constructor = DocFetchFailure; - errors.DocFetchFailure = DocFetchFailure; + }; + inherits(errors.DocFetchFailure, CourierError); + /** + * Connection Error + * @param {String} [msg] - An error message that will probably end up in a log. + */ + errors.VersionConflict = function VersionConflict(resp) { + CourierError.call(this, + 'Failed to store document changes do to a version conflict.', + errors.VersionConflict); - // there was a conflict storing a doc - function VersionConflict(resp) { - this.name = 'VersionConflict'; this.resp = resp; - this.message = 'Failed to store document changes do to a version conflict.'; - } - VersionConflict.prototype = new Error(); - VersionConflict.prototype.constructor = VersionConflict; - errors.VersionConflict = VersionConflict; + }; + inherits(errors.VersionConflict, CourierError); return errors; }); \ No newline at end of file diff --git a/src/courier/test_directives.js b/src/courier/test_directives.js new file mode 100644 index 0000000000000..e3a24a007d30b --- /dev/null +++ b/src/courier/test_directives.js @@ -0,0 +1,73 @@ +define(function (require) { + var angular = require('angular'); + + angular + .module('kibana/directives') + .directive('configTest', function () { + return { + restrict: 'E', + template: 'My favorite number is {{favoriteNum}} ', + controller: function ($scope, config) { + config.bind($scope, 'favoriteNum', { + default: 0 + }); + + $scope.click = function () { + $scope.favoriteNum++; + }; + } + }; + }) + .directive('courierTest', function () { + return { + restrict: 'E', + scope: { + type: '@' + }, + template: '{{count}} : 
{{json}}
', + controller: function ($scope, courier) { + $scope.count = 0; + var source = courier.rootSearchSource.extend() + .type($scope.type) + .source({ + include: 'country' + }) + .$scope($scope) + .on('results', function (resp) { + $scope.count ++; + $scope.json = JSON.stringify(resp.hits, null, ' '); + }); + } + }; + }) + .directive('courierDocTest', function () { + return { + restrict: 'E', + scope: { + id: '@', + type: '@', + index: '@' + }, + template: '{{count}} :
{{json}}
', + controller: function (courier, $scope) { + $scope.count = 0; + var currentSource; + $scope.click = function () { + if (currentSource) { + source.doIndex(currentSource); + } + }; + var source = courier.createSource('doc') + .id($scope.id) + .type($scope.type) + .index($scope.index) + .$scope($scope) + .on('results', function (doc) { + currentSource = doc._source; + $scope.count ++; + $scope.json = JSON.stringify(doc, null, ' '); + }); + } + }; + }); +}); \ No newline at end of file diff --git a/src/courier/tests/config.html b/src/courier/tests/config.html new file mode 100644 index 0000000000000..908482111d4fd --- /dev/null +++ b/src/courier/tests/config.html @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/courier/test.html b/src/courier/tests/index.html similarity index 100% rename from src/courier/test.html rename to src/courier/tests/index.html diff --git a/src/index.html b/src/index.html index 05ba23b47dc13..d646f86e43019 100644 --- a/src/index.html +++ b/src/index.html @@ -12,7 +12,7 @@ -
+
diff --git a/src/kibana/constants/base.js b/src/kibana/constants/base.js new file mode 100644 index 0000000000000..984ee81fe8c4a --- /dev/null +++ b/src/kibana/constants/base.js @@ -0,0 +1,15 @@ +define(function (require) { + var angular = require('angular'); + + /** + * broke this out so that it could be loaded before the application is + */ + angular.module('kibana/constants') + // This stores the Kibana revision number, @REV@ is replaced by grunt. + .constant('kbnVersion', '@REV@') + + // Use this for cache busting partials + .constant('cacheBust', 'cache-bust=' + Date.now()) + + ; +}); \ No newline at end of file diff --git a/src/kibana/controllers/kibana.js b/src/kibana/controllers/kibana.js index 80aac33c60db5..7c9c46d09efbb 100644 --- a/src/kibana/controllers/kibana.js +++ b/src/kibana/controllers/kibana.js @@ -1,75 +1,19 @@ define(function (require) { var angular = require('angular'); var _ = require('lodash'); + var $ = require('jquery'); - angular.module('kibana/controllers') - .controller('Kibana', function (courier, $scope, $rootScope) { - $rootScope.dataSource = courier.createSource('search') - .index('_all') - .size(5); - - // this should be triggered from within the controlling application - setTimeout(_.bindKey(courier, 'start'), 15); - }); - - angular.module('kibana/directives') - .directive('courierTest', function () { - return { - restrict: 'E', - scope: { - type: '@' - }, - template: '{{count}} : 
{{json}}
', - controller: function ($rootScope, $scope, courier) { - $scope.count = 0; - - var source = $rootScope.dataSource.extend() - .type($scope.type) - .source({ - include: 'country' - }) - .on('results', function (resp) { - $scope.count ++; - $scope.json = JSON.stringify(resp.hits, null, ' '); - }); + require('services/config'); + require('services/courier'); - courier.mapper.getFields($rootScope.dataSource, function (data) { - $scope.json = data; - }); - - $scope.$watch('type', source.type); - } - }; - }) - .directive('courierDocTest', function () { - return { - restrict: 'E', - scope: { - id: '@', - type: '@', - index: '@' - }, - template: '{{count}} :
{{json}}
', - controller: function (courier, $scope) { - $scope.count = 0; - - var currentSource; - $scope.click = function () { - if (currentSource) { - source.update(currentSource); - } - }; - - var source = courier.createSource('doc') - .id($scope.id) - .type($scope.type) - .index($scope.index) - .on('results', function (doc) { - currentSource = doc._source; - $scope.count ++; - $scope.json = JSON.stringify(doc, null, ' '); - }); - } - }; + angular.module('kibana/controllers') + .controller('kibana', function ($scope, courier) { + setTimeout(function () { + courier.start(); + }, 15); + + $scope.$on('$routeChangeSuccess', function () { + if (courier.running()) courier.fetch(); + }); }); }); \ No newline at end of file diff --git a/src/kibana/main.js b/src/kibana/main.js index 2306a82ed1820..fce5cb07cbcb1 100644 --- a/src/kibana/main.js +++ b/src/kibana/main.js @@ -7,90 +7,69 @@ define(function (require) { var $ = require('jquery'); var _ = require('lodash'); var scopedRequire = require('require'); + var enableAsyncModules = require('utils/async_modules'); + var setup = require('./setup'); require('elasticsearch'); require('angular-route'); - // keep a reference to each module defined before boot, so that - // after boot it can define new features. Also serves as a flag. - var preBootModules = []; - - // the functions needed to register different - // features defined after boot - var registerFns = {}; + var app = angular.module('kibana', []); + enableAsyncModules(app); var dependencies = [ 'elasticsearch', + 'ngRoute', 'kibana', - 'ngRoute' + 'kibana/controllers', + 'kibana/directives', + 'kibana/factories', + 'kibana/services', + 'kibana/filters', + 'kibana/constants' ]; - _('controllers directives factories services filters'.split(' ')) - .map(function (type) { return 'kibana/' + type; }) - .each(function (name) { - preBootModules.push(angular.module(name, [])); - dependencies.push(name); - }); - - var app = angular.module('kibana', dependencies); + function isScope(obj) { + return obj && obj.$evalAsync && obj.$watch; + } - // This stores the Kibana revision number, @REV@ is replaced by grunt. - app.constant('kbnVersion', '@REV@'); - - // Use this for cache busting partials - app.constant('cacheBust', 'cache-bust=' + Date.now()); - - /** - * Modules that need to register components within the application after - * bootstrapping is complete need to pass themselves to this method. - * - * @param {object} module - The Angular module - * @return {object} module - */ - app.useModule = function (module) { - if (preBootModules) { - preBootModules.push(module); - } else { - _.extend(module, registerFns); + dependencies.forEach(function (name) { + if (name.indexOf('kibana/') === 0) { + app.useModule(angular.module(name, [])); } - return module; - }; + }); - app.config(function ($routeProvider, $controllerProvider, $compileProvider, $filterProvider, $provide) { + app.requires = dependencies; + + app.config(function ($routeProvider) { $routeProvider + .when('/', { + templateUrl: 'kibana/partials/index.html' + }) + .when('/config-test', { + templateUrl: 'courier/tests/config.html', + }) .when('/courier-test', { - templateUrl: 'courier/test.html', + templateUrl: 'courier/tests/index.html', }) .otherwise({ - redirectTo: 'courier-test' + redirectTo: '' }); - - // this is how the internet told me to dynamically add modules :/ - registerFns.controller = $controllerProvider.register; - registerFns.directive = $compileProvider.directive; - registerFns.factory = $provide.factory; - registerFns.service = $provide.service; - registerFns.filter = $filterProvider.register; }); - // load the core components - require([ - 'services/courier', - 'services/es', - 'services/config', - 'controllers/kibana' - ], function () { + setup(app, function (err) { + if (err) throw err; - // bootstrap the app - $(function () { - angular - .bootstrap(document, dependencies) - .invoke(function ($rootScope) { - _.each(preBootModules, function (module) { - _.extend(module, registerFns); - }); - preBootModules = false; - }); + // load the elasticsearch service + require([ + 'controllers/kibana', + 'courier/test_directives', + 'constants/base' + ], function () { + // bootstrap the app + $(function () { + angular + .bootstrap(document, dependencies); + }); }); }); diff --git a/src/kibana/partials/index.html b/src/kibana/partials/index.html new file mode 100644 index 0000000000000..6d94f95ac9270 --- /dev/null +++ b/src/kibana/partials/index.html @@ -0,0 +1,4 @@ + \ No newline at end of file diff --git a/src/kibana/require.config.js b/src/kibana/require.config.js index ecf97e57434ae..082c7144e4cad 100644 --- a/src/kibana/require.config.js +++ b/src/kibana/require.config.js @@ -16,6 +16,7 @@ var bowerComponents = [ 'angular', 'angular-route', + ['async', 'lib/async'], 'd3', ['elasticsearch', 'elasticsearch.angular'], 'jquery', diff --git a/src/kibana/services/config.js b/src/kibana/services/config.js index f1216aa02bbb0..9e684400bd4e0 100644 --- a/src/kibana/services/config.js +++ b/src/kibana/services/config.js @@ -1,89 +1,149 @@ define(function (require) { var angular = require('angular'); - var configFile = require('../../config'); var _ = require('lodash'); + var configFile = require('../../config'); + var nextTick = require('utils/next_tick'); - var module = angular.module('kibana/services'); - module.service('config', function ($q, es, courier) { + require('services/courier'); - var app = angular.module('kibana'); - var config = {}; + var module = angular.module('kibana/services'); + // share doc and val cache between apps + var doc; + var vals = {}; + module.service('config', function ($q, $rootScope, courier, kbnVersion) { var watchers = {}; + var unwatchers = []; - function watch(key, onChange) { - // probably a horrible idea - if (!watchers[key]) watchers[key] = []; - watchers[key].push(onChange); + if (!doc) { + doc = courier.createSource('doc') + .index(configFile.kibanaIndex) + .type('config') + .id(kbnVersion); + } else { + // clean up after previous app + doc.removeAllListeners('results'); + doc.courier(courier); } - function change(key, val) { - if (config[key] !== val) { - var oldVal = config[key]; - config[key] = val; - if (watchers[key]) { - watchers[key].forEach(function (watcher) { - watcher(val, oldVal); - }); + doc.on('results', function (resp) { + if (!resp.found) return; // init should ensure it exists + _.forOwn(resp._source, function (val, key) { + if (vals[key] !== val) _change(key, val); + }); + }); + + /****** + * PUBLIC API + ******/ + + function init() { + var defer = $q.defer(); + courier.fetch(); + doc.on('results', function completeInit(resp) { + // ONLY ACT IF !resp.found + if (!resp.found) { + console.log('creating empty config doc'); + doc.doIndex({}); + return; } - } + + console.log('fetched config doc'); + doc.removeListener('results', completeInit); + defer.resolve(); + }); + return defer.promise; } - function getDoc() { - var defer = $q.promise(); + function get(key) { + return vals[key]; + } - courier.get({ - index: config.kibanaIndex, - type: 'config', - id: app.constant('kbnVersion') - }, function fetchDoc(err, doc) { - _.assign(config, doc); - defer.resolve(); - }, function onDocUpdate(doc) { - _.forOwn(doc, function (val, key) { - change(key, val); + function set(key, val) { + // sets a value in the config + // the es doc must be updated successfully for the update to reflect in the get api. + if (vals[key] === val) { + var defer = $q.defer(); + defer.resolve(true); + return defer.promise; + } + + var update = {}; + update[key] = val; + + return doc.doUpdate(update) + .then(function () { + _change(key, val); + return true; + }) + .catch(function (err) { + throw err; }); + } + + function $watch(key, onChange) { + // probably a horrible idea + if (!watchers[key]) watchers[key] = []; + watchers[key].push(onChange); + _notify(onChange, vals[key]); + } + + function bindToScope($scope, key, opts) { + $watch(key, function (val) { + if (opts && val === void 0) val = opts['default']; + $scope[key] = val; }); - return defer.promise; + var first = true; + unwatchers.push($scope.$watch(key, function (newVal) { + if (first) return first = false; + set(key, newVal); + })); } - return { - get: function (key) { - return config[key]; - }, - set: function (key, val) { - // sets a value in the config - // the es doc must be updated successfully for the update to reflect in the get api. + function close() { + watchers = null; + unwatchers.forEach(function (unwatcher) { + unwatcher(); + }); + } - if (key === 'elasticsearch' || key === 'kibanaIndex') { - return $q.reject(new Error('These values must be updated in the config.js file.')); - } + // expose public API on the instance + this.init = init; + this.close = close; + this.get = get; + this.set = set; + this.bind = bindToScope; + this.$watch = $watch; - var defer = $q.defer(); + /******* + * PRIVATE API + *******/ - if (config[key] === val) { - defer.resolve(); - return defer.promise; - } + function _change(key, val) { + _notify(watchers[key], val, vals[key]); + vals[key] = val; + console.log(key, 'is now', val); + } - var body = {}; - body[key] = val; - courier.update({ - index: config.kibanaIndex, - type: 'config', - id: app.constant('kbnVersion'), - body: body - }, function (err) { - if (err) return defer.reject(err); - - change(key, val); - defer.resolve(); + function _notify(fns, cur, prev) { + if ($rootScope.$$phase) { + // reschedule + nextTick(function () { + _notify(fns, cur, prev); }); + return; + } - return defer.promise; - }, - $watch: watch, - init: getDoc - }; + var isArr = _.isArray(fns); + if (!fns || (isArr && !fns.length)) return; + + $rootScope.$apply(function () { + if (!isArr) return fns(cur, prev); + + fns.forEach(function (onChange) { + onChange(cur, prev); + }); + }); + } }); }); \ No newline at end of file diff --git a/src/kibana/services/courier.js b/src/kibana/services/courier.js index 3bd84c673cd81..f7cd6ff1fe3c4 100644 --- a/src/kibana/services/courier.js +++ b/src/kibana/services/courier.js @@ -5,16 +5,19 @@ define(function (require) { var errors = require('courier/errors'); require('services/promises'); + require('services/es'); + var courier; // share the courier amoungst all of the apps angular.module('kibana/services') .service('courier', function (es, promises) { + if (courier) return courier; promises.playNice(DocSource.prototype, [ 'doUpdate', 'doIndex' ]); - var courier = new Courier({ + courier = new Courier({ fetchInterval: 15000, client: es, promises: promises @@ -22,6 +25,8 @@ define(function (require) { courier.errors = errors; + courier.rootSearchSource = courier.createSource('search'); + return courier; }); }); \ No newline at end of file diff --git a/src/kibana/services/es.js b/src/kibana/services/es.js index c0a7bf6bb4ab5..a2aa4bd5936fa 100644 --- a/src/kibana/services/es.js +++ b/src/kibana/services/es.js @@ -1,8 +1,16 @@ define(function (require) { - var angular = require('angular'); + var configFile = require('../../config'); - var module = angular.module('kibana/services'); - module.service('es', function (esFactory) { - return esFactory(); - }); + var es; // share the client amoungst all apps + require('angular') + .module('kibana/services') + .service('es', function (esFactory, $q) { + if (es) return es; + + es = esFactory({ + host: configFile.elasticsearch + }); + + return es; + }); }); \ No newline at end of file diff --git a/src/kibana/setup.js b/src/kibana/setup.js new file mode 100644 index 0000000000000..5d978e3cc1513 --- /dev/null +++ b/src/kibana/setup.js @@ -0,0 +1,101 @@ +define(function (require) { + var angular = require('angular'); + var async = require('async'); + var $ = require('jquery'); + var configFile = require('../config'); + var nextTick = require('utils/next_tick'); + + /** + * Setup the kibana application, ensuring that the kibanaIndex exists, + * and perform any migration of data that is required. + * + * @param {Module} app - The Kibana module + * @param {function} done - callback + */ + return function SetupApp(app, done) { + // load angular deps + require([ + 'elasticsearch', + 'services/es', + 'services/config', + 'constants/base' + ], function () { + $(function () { + + var setup = angular.module('setup', [ + 'elasticsearch', + 'kibana/services', + 'kibana/constants' + ]); + var appEl = document.createElement('div'); + var kibanaIndexExists; + + setup.run(function (es, config) { + // init the setup module + async.series([ + async.apply(checkForKibanaIndex, es), + async.apply(createKibanaIndex, es), + async.apply(checkForCurrentConfigDoc, es), + async.apply(initConfig, config) + ], function (err) { + // ready to go, remove the appEl, close services and boot be done + appEl.remove(); + console.log('booting application'); + return done(err); + }); + }); + + angular.bootstrap(appEl, ['setup']); + + function checkForKibanaIndex(es, done) { + console.log('look for kibana index'); + es.indices.exists({ + index: configFile.kibanaIndex + }, function (err, exists) { + console.log('kibana index does', (exists ? '' : 'not ') + 'exist'); + kibanaIndexExists = exists; + return done(err); + }); + } + + // create the index if it doens't exist already + function createKibanaIndex(es, done) { + if (kibanaIndexExists) return done(); + console.log('creating kibana index'); + es.indices.create({ + index: configFile.kibanaIndex, + body: { + settings: { + mappings: { + type1: { + _source: { + enabled: false + }, + properties: { + field1: { + type: 'string', + index: 'not_analyzed' + } + } + } + } + } + } + }, done); + } + + // if the index is brand new, no need to see if it is out of data + function checkForCurrentConfigDoc(es, done) { + if (!kibanaIndexExists) return done(); + console.log('checking if migration is necessary: not implemented'); + nextTick(done); + } + + function initConfig(config, done) { + console.log('initializing config service'); + config.init().then(function () { done(); }, done); + } + }); + }); + }; +}); \ No newline at end of file diff --git a/src/kibana/utils/async_modules.js b/src/kibana/utils/async_modules.js new file mode 100644 index 0000000000000..aa2babb570aba --- /dev/null +++ b/src/kibana/utils/async_modules.js @@ -0,0 +1,57 @@ +define(function (require) { + var _ = require('lodash'); + + // TODO: this will probably fail to work when we have multiple apps. Might need to propogate + // registrations to multiple providers + function enable(app) { + // keep a reference to each module defined before boot, so that + // after boot it can define new features. Also serves as a flag. + var preBootModules = []; + + // the functions needed to register different + // features defined after boot + var registerFns = {}; + + app.config(function ($controllerProvider, $compileProvider, $filterProvider, $provide) { + // this is how the internet told me to dynamically add modules :/ + registerFns = { + controller: $controllerProvider.register, + directive: $compileProvider.directive, + factory: $provide.factory, + service: $provide.service, + constant: $provide.constant, + value: $provide.value, + filter: $filterProvider.register + }; + }); + + /** + * Modules that need to register components within the application after + * bootstrapping is complete need to pass themselves to this method. + * + * @param {object} module - The Angular module + * @return {object} module + */ + app.useModule = function (module) { + if (preBootModules) { + preBootModules.push(module); + } else { + _.extend(module, registerFns); + } + return module; + }; + + /** + * Called after app is bootrapped to enable asyncModules + * @return {[type]} [description] + */ + app.run(function () { + _.each(preBootModules, function (module) { + _.extend(module, registerFns); + }); + preBootModules = false; + }); + } + + return enable; +}); \ No newline at end of file diff --git a/src/kibana/utils/next_tick.js b/src/kibana/utils/next_tick.js new file mode 100644 index 0000000000000..28b59102101ee --- /dev/null +++ b/src/kibana/utils/next_tick.js @@ -0,0 +1,33 @@ +define(function () { + + var canSetImmediate = typeof window !== 'undefined' && window.setImmediate; + var canPost = typeof window !== 'undefined' && window.postMessage && window.addEventListener + ; + + if (canSetImmediate) { + return function (f) { return window.setImmediate(f); }; + } + + if (canPost) { + var queue = []; + window.addEventListener('message', function (ev) { + if (ev.source === window && ev.data === 'process-tick') { + ev.stopPropagation(); + if (queue.length > 0) { + var fn = queue.shift(); + fn(); + } + } + }, true); + + return function nextTick(fn) { + queue.push(fn); + window.postMessage('process-tick', '*'); + }; + } + + return function nextTick(fn) { + setTimeout(fn, 0); + }; + +}); \ No newline at end of file From 65ec35819f2aa132ca26f4eeeb65c6081868ff77 Mon Sep 17 00:00:00 2001 From: Spencer Alger Date: Mon, 24 Feb 2014 10:12:38 -0700 Subject: [PATCH 2/4] merged in mapper changes --- .jshintrc | 2 +- src/courier/courier.js | 12 +- src/courier/data_source/doc.js | 46 ++-- src/courier/errors.js | 35 +++ src/courier/mapper.js | 203 +++++++++++++++++- .../directives.js} | 43 +++- src/courier/tests/mapper.html | 1 + src/kibana/controllers/kibana.js | 3 +- src/kibana/main.js | 7 +- src/kibana/partials/index.html | 1 + src/kibana/services/config.js | 10 +- src/kibana/utils/async_modules.js | 5 +- src/kibana/utils/next_tick.js | 15 +- test/unit/index.html | 4 +- test/unit/specs/mapper.js | 129 +++++++++-- 15 files changed, 445 insertions(+), 71 deletions(-) rename src/courier/{test_directives.js => tests/directives.js} (60%) create mode 100644 src/courier/tests/mapper.html diff --git a/.jshintrc b/.jshintrc index e21295e990158..3aae3b5b89e93 100644 --- a/.jshintrc +++ b/.jshintrc @@ -9,7 +9,7 @@ "console": true }, - "camelcase": true, + "camelcase": false, "white": true, "bitwise": false, "eqnull": true, diff --git a/src/courier/courier.js b/src/courier/courier.js index f10b61d7ffa6b..2013dd33842aa 100644 --- a/src/courier/courier.js +++ b/src/courier/courier.js @@ -11,6 +11,8 @@ define(function (require) { var HastyRefresh = require('courier/errors').HastyRefresh; var nextTick = require('utils/next_tick'); + var Mapper = require('courier/mapper'); + // map constructors to type keywords var sourceTypes = { doc: DocSource, @@ -53,7 +55,9 @@ define(function (require) { // default config values var defaults = { fetchInterval: 30000, - docInterval: 1500 + docInterval: 1500, + internalIndex: 'kibana4-int', + mapperCacheType: 'mappings' }; /** @@ -100,6 +104,12 @@ define(function (require) { // interval hook/fn for each type this._onInterval = {}; + // make the mapper accessable + this._mapper = new Mapper(this, { + cacheIndex: config.internalIndex, + cacheType: config.mapperCacheType + }); + _.each(sourceTypes, function (fn, type) { var courier = this; // the name used outside of this module diff --git a/src/courier/data_source/doc.js b/src/courier/data_source/doc.js index 49ff83067a841..38133d55657eb 100644 --- a/src/courier/data_source/doc.js +++ b/src/courier/data_source/doc.js @@ -36,27 +36,27 @@ define(function (require) { getBody.docs.push(source._flatten()); }); - return client.mget({ body: getBody }, function (err, resp) { - if (err) return cb(err); - - _.each(resp.docs, function (resp, i) { - var ref = allRefs[i]; - var source = ref.source; - - if (resp.error) return source._error(new errors.DocFetchFailure(resp)); - if (resp.found) { - if (ref.version === resp._version) return; // no change - ref.version = resp._version; - source._storeVersion(resp._version); - } else { - ref.version = void 0; - source._clearVersion(); - } - source.emit('results', resp); - }); - - cb(err, resp); - }); + return client.mget({ body: getBody }) + .then(function (resp) { + _.each(resp.docs, function (resp, i) { + var ref = allRefs[i]; + var source = ref.source; + + if (resp.error) return source._error(new errors.DocFetchFailure(resp)); + if (resp.found) { + if (ref.version === resp._version) return; // no change + ref.version = resp._version; + source._storeVersion(resp._version); + } else { + ref.version = void 0; + source._clearVersion(); + } + source.emit('results', resp); + }); + + cb(void 0, resp); + }) + .catch(cb); }; /** @@ -72,9 +72,7 @@ define(function (require) { /* jshint eqeqeq: false */ return (!ref.fetchCount || ref.version != storedVersion); }); - nextTick(function () { - cb(void 0, invalid); - }); + nextTick(cb, void 0, invalid); }; /***** diff --git a/src/courier/errors.js b/src/courier/errors.js index f565f389e2f92..fac65a0a91f19 100644 --- a/src/courier/errors.js +++ b/src/courier/errors.js @@ -36,6 +36,7 @@ define(function (require) { }; inherits(errors.HastyRefresh, CourierError); + /** * DocFetchFailure Error - where there is an error getting a doc * @param {String} [msg] - An error message that will probably end up in a log. @@ -49,6 +50,7 @@ define(function (require) { }; inherits(errors.DocFetchFailure, CourierError); + /** * Connection Error * @param {String} [msg] - An error message that will probably end up in a log. @@ -62,5 +64,38 @@ define(function (require) { }; inherits(errors.VersionConflict, CourierError); + + /** + * there was a conflict storing a doc + * @param {String} field - the fields which contains the conflict + */ + errors.MappingConflict = function MappingConflict(field) { + CourierError.call(this, + 'Field ' + field + ' is defined as at least two different types in indices matching the pattern', + errors.MappingConflict); + }; + inherits(errors.MappingConflict, CourierError); + + /** + * a non-critical cache write to elasticseach failed + */ + errors.CacheWriteFailure = function CacheWriteFailure() { + CourierError.call(this, + 'A Elasticsearch cache write has failed.', + errors.CacheWriteFailure); + }; + inherits(errors.CacheWriteFailure, CourierError); + + /** + * when a field mapping is requested for an unknown field + * @param {String} name - the field name + */ + errors.FieldNotFoundInCache = function FieldNotFoundInCache(name) { + CourierError.call(this, + 'The ' + name + ' field was not found in the cached mappings', + errors.FieldNotFoundInCache); + }; + inherits(errors.FieldNotFoundInCache, CourierError); + return errors; }); \ No newline at end of file diff --git a/src/courier/mapper.js b/src/courier/mapper.js index 7de98976d3911..14ce0a686bd03 100644 --- a/src/courier/mapper.js +++ b/src/courier/mapper.js @@ -1,4 +1,8 @@ define(function (require) { + var _ = require('lodash'); + var Error = require('courier/errors'); + var nextTick = require('utils/next_tick'); + /** * - Resolves index patterns * - Fetches mappings from elasticsearch @@ -6,21 +10,202 @@ define(function (require) { * * @class Mapper */ - function Mapper(client) { + function Mapper(courier, config) { + + var client = courier._getClient(); + + // Exclude anything wirh empty mapping except these + var reservedFields = { + '_id': { type: 'string' }, + '_type': { type: 'string' }, + '_index': { type: 'string' } + }; + + // Save a reference to this + var self = this; + + // Store mappings we've already loaded from Elasticsearch + var mappings = {}; /** * Gets an object containing all fields with their mappings - * @param {dataSource} [dataSource] - * @param {Function} [callback] A function to be executed with the results. - * @param {String} [type] - * @return {Object} A hash containing fields and their related mapping + * @param {dataSource} dataSource + * @param {Function} callback A function to be executed with the results. + */ + this.getFields = function (dataSource, callback) { + if (self.getFieldsFromObject(dataSource)) { + // If we already have the fields in our object, use that. + nextTick(callback, void 0, self.getFieldsFromObject(dataSource)); + } else { + // Otherwise, try to get fields from Elasticsearch cache + self.getFieldsFromCache(dataSource, function (err, fields) { + if (err) { + // If we are unable to get the fields from cache, get them from mapping + self.getFieldsFromMapping(dataSource, function (err, fields) { + if (err) return courier._error(err); + + // And then cache them + cacheFieldsToElasticsearch(config, dataSource._state.index, fields, function (err, response) { + if (err) return courier._error(new Error.CacheWriteError()); + }); + + cacheFieldsToObject(dataSource, fields); + callback(err, fields); + }); + } else { + cacheFieldsToObject(dataSource, fields); + callback(err, fields); + } + }); + } + }; + + /** + * Gets an object containing the mapping for a field + * @param {dataSource} dataSource + * @param {String} field The dot notated name of a field to get the mapping for + * @param {Function} callback A function to be executed with the results. + */ + this.getFieldMapping = function (dataSource, field, callback) { + self.getFields(dataSource, function (err, fields) { + if (_.isUndefined(fields[field])) return courier._error(new Error.FieldNotFoundInCache(field)); + callback(err, fields[field]); + }); + }; + + /** + * Gets an object containing the mapping for a field + * @param {dataSource} dataSource + * @param {Array} fields The dot notated names of a fields to get the mapping for + * @param {Function} callback A function to be executed with the results. + */ + this.getFieldsMapping = function (dataSource, fields, callback) { + self.getFields(dataSource, function (err, fields) { + var _mapping = _.object(_.map(fields, function (field) { + if (_.isUndefined(fields[field])) return courier._error(new Error.FieldNotFoundInCache(field)); + return [field, fields[field]]; + })); + callback(err, _mapping); + }); + }; + + /** + * Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch + * @param {dataSource} dataSource + * @return {Object} An object containing fields with their mappings, or false if not found. + */ + this.getFieldsFromObject = function (dataSource) { + return !_.isUndefined(mappings[dataSource._state.index]) ? mappings[dataSource._state.index] : false; + }; + + /** + * Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch + * @param {dataSource} dataSource + * @param {Function} callback A function to be executed with the results. + */ + this.getFieldsFromCache = function (dataSource, callback) { + var params = { + index: config.cacheIndex, + type: config.cacheType, + id: dataSource._state.index, + }; + + client.getSource(params, callback); + }; + + /** + * Gets an object containing all fields with their mappings directly from Elasticsearch + * @param {dataSource} dataSource + * @param {Function} callback A function to be executed with the results. */ - this.getFields = function (dataSource, callback, type) { - client.indices.getFieldMapping({index: dataSource.index}, callback); + this.getFieldsFromMapping = function (dataSource, callback) { + var params = { + // TODO: Change index to be newest resolved index. Change _state.index to id(). + index: dataSource._state.index, + field: '*', + }; + + // TODO: Add week/month check + client.indices.getFieldMapping(params, function (err, response, status) { + + // TODO: Add error message + + var fields = {}; + + _.each(response, function (index) { + _.each(index.mappings, function (type) { + _.each(type, function (field, name) { + if (_.isUndefined(field.mapping) || name[0] === '_') return; + if (!_.isUndefined(fields[name]) && fields[name] !== field.mapping[_.keys(field.mapping)[0]]) + return courier._error(new Error.MappingConflict()); + fields[name] = field.mapping[_.keys(field.mapping)[0]]; + }); + }); + }); + + // TODO if these are mapped differently this might cause problems + _.assign(fields, reservedFields); + + callback(err, fields); + }); }; - this.getFieldType = function (dataSource, field, type) { - return field, type; + /** + * Stores processed mappings in Elasticsearch + * @param {dataSource} dataSource + * @param {Function} callback A function to be executed with the results. + */ + var cacheFieldsToElasticsearch = function (config, index, fields, callback) { + client.index({ + index: config.cacheIndex, + type: config.cacheType, + id: index, + body: fields + }, callback); + }; + + /** + * Stores processed mappings in an object + * @param {dataSource} dataSource + * @param {Function} callback A function to be executed with the results. + */ + var cacheFieldsToObject = function (dataSource, fields) { + mappings[dataSource._state.index] = _.clone(fields); + return !_.isUndefined(mappings[dataSource._state.index]) ? true : false; + }; + + /** + * Clears mapping caches from elasticsearch and from local object + * @param {dataSource} dataSource + * @param {Function} callback A function to be executed with the results. + */ + this.clearCache = function (dataSource, callback) { + if (!_.isUndefined(mappings[dataSource._state.index])) { + delete mappings[dataSource._state.index]; + } + client.delete({ + index: config.cacheIndex, + type: config.cacheType, + id: dataSource._state.index + }, callback); + }; + + + /** + * Sets a number of fields to be ignored in the mapping. Not sure this is a good idea? + * @param {dataSource} dataSource + * @param {Array} fields An array of fields to be ignored + * @param {Function} callback A function to be executed with the results. + */ + this.ignoreFields = function (dataSource, fields, callback) { + if (!_.isArray(fields)) fields = [fields]; + var ignore = _.object(_.map(fields, function (field) { + return [field, {type: 'ignore'}]; + })); + self.getFields(dataSource, function (err, mapping) { + _.assign(mapping, ignore); + callback(err, mapping); + }); }; } diff --git a/src/courier/test_directives.js b/src/courier/tests/directives.js similarity index 60% rename from src/courier/test_directives.js rename to src/courier/tests/directives.js index e3a24a007d30b..8433a01c170ed 100644 --- a/src/courier/test_directives.js +++ b/src/courier/tests/directives.js @@ -8,7 +8,7 @@ define(function (require) { restrict: 'E', template: 'My favorite number is {{favoriteNum}} ', controller: function ($scope, config) { - config.bind($scope, 'favoriteNum', { + config.$bind($scope, 'favoriteNum', { default: 0 }); @@ -69,5 +69,46 @@ define(function (require) { }); } }; + }) + .directive('mappingTest', function () { + return { + restrict: 'E', + scope: { + type: '@', + fields: '@' + }, + template: 'Mappings:
{{name}} = {{mapping.type}}

' + + '{{count}} : 
{{json}}
', + controller: function ($rootScope, $scope, courier) { + $scope.count = 0; + $scope.mappedFields = {}; + + var source = courier.rootSearchSource.extend() + .index('logstash-*') + .type($scope.type) + .source({ + include: 'geo' + }) + .$scope($scope) + .on('results', function (resp) { + $scope.count ++; + $scope.json = JSON.stringify(resp.hits, null, ' '); + }); + + var fields = $scope.fields.split(','); + + fields.forEach(function (field) { + courier._mapper.getFieldMapping(source, field, function (err, mapping) { + $scope.$apply(function () { + $scope.mappedFields[field] = mapping; + }); + }); + }); + + courier._mapper.getFields(source, function (err, response, status) { + console.log(response); + }); + } + }; }); }); \ No newline at end of file diff --git a/src/courier/tests/mapper.html b/src/courier/tests/mapper.html new file mode 100644 index 0000000000000..07eef5fffb290 --- /dev/null +++ b/src/courier/tests/mapper.html @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/kibana/controllers/kibana.js b/src/kibana/controllers/kibana.js index 7c9c46d09efbb..ec936b749f848 100644 --- a/src/kibana/controllers/kibana.js +++ b/src/kibana/controllers/kibana.js @@ -6,7 +6,8 @@ define(function (require) { require('services/config'); require('services/courier'); - angular.module('kibana/controllers') + angular + .module('kibana/controllers') .controller('kibana', function ($scope, courier) { setTimeout(function () { courier.start(); diff --git a/src/kibana/main.js b/src/kibana/main.js index fce5cb07cbcb1..d46c2dd9a2912 100644 --- a/src/kibana/main.js +++ b/src/kibana/main.js @@ -48,11 +48,14 @@ define(function (require) { .when('/config-test', { templateUrl: 'courier/tests/config.html', }) + .when('/mapper-test', { + templateUrl: 'courier/tests/mapper.html', + }) .when('/courier-test', { templateUrl: 'courier/tests/index.html', }) .otherwise({ - redirectTo: '' + redirectTo: '/' }); }); @@ -61,8 +64,8 @@ define(function (require) { // load the elasticsearch service require([ + 'courier/tests/directives', 'controllers/kibana', - 'courier/test_directives', 'constants/base' ], function () { // bootstrap the app diff --git a/src/kibana/partials/index.html b/src/kibana/partials/index.html index 6d94f95ac9270..937976665c4a7 100644 --- a/src/kibana/partials/index.html +++ b/src/kibana/partials/index.html @@ -1,4 +1,5 @@ \ No newline at end of file diff --git a/src/kibana/services/config.js b/src/kibana/services/config.js index 9e684400bd4e0..6520dd1ac7faf 100644 --- a/src/kibana/services/config.js +++ b/src/kibana/services/config.js @@ -10,6 +10,7 @@ define(function (require) { // share doc and val cache between apps var doc; var vals = {}; + module.service('config', function ($q, $rootScope, courier, kbnVersion) { var watchers = {}; var unwatchers = []; @@ -21,8 +22,9 @@ define(function (require) { .id(kbnVersion); } else { // clean up after previous app - doc.removeAllListeners('results'); - doc.courier(courier); + doc + .removeAllListeners('results') + .courier(courier); } doc.on('results', function (resp) { @@ -128,9 +130,7 @@ define(function (require) { function _notify(fns, cur, prev) { if ($rootScope.$$phase) { // reschedule - nextTick(function () { - _notify(fns, cur, prev); - }); + nextTick(_notify, fns, cur, prev); return; } diff --git a/src/kibana/utils/async_modules.js b/src/kibana/utils/async_modules.js index aa2babb570aba..b41a27ba3e8a9 100644 --- a/src/kibana/utils/async_modules.js +++ b/src/kibana/utils/async_modules.js @@ -1,8 +1,9 @@ define(function (require) { var _ = require('lodash'); - // TODO: this will probably fail to work when we have multiple apps. Might need to propogate - // registrations to multiple providers + /* TODO: this will probably fail to work when we have multiple apps. + * Might need to propogate registrations to multiple providers + */ function enable(app) { // keep a reference to each module defined before boot, so that // after boot it can define new features. Also serves as a flag. diff --git a/src/kibana/utils/next_tick.js b/src/kibana/utils/next_tick.js index 28b59102101ee..bf3278b7aa5e1 100644 --- a/src/kibana/utils/next_tick.js +++ b/src/kibana/utils/next_tick.js @@ -15,13 +15,24 @@ define(function () { ev.stopPropagation(); if (queue.length > 0) { var fn = queue.shift(); - fn(); + if (typeof fn === 'function') { + fn(); + } else { + // partial args were supplied + var args = fn; + fn = args.shift(); + fn.apply(null, args); + } } } }, true); return function nextTick(fn) { - queue.push(fn); + if (arguments.length > 1) { + queue.push([fn].concat([].slice.call(arguments, 1))); + } else { + queue.push(fn); + } window.postMessage('process-tick', '*'); }; } diff --git a/test/unit/index.html b/test/unit/index.html index cd9bf67e435bd..cb071b56f4c9d 100644 --- a/test/unit/index.html +++ b/test/unit/index.html @@ -36,8 +36,8 @@ } }) require([ - '/specs/courier.js', - '/specs/data_source.js', + //'/specs/courier.js', + //'/specs/data_source.js', '/specs/mapper.js' ], function () { window.mochaRunner = mocha.run().on('end', function(){ diff --git a/test/unit/specs/mapper.js b/test/unit/specs/mapper.js index b49c4af5fa693..15c5fa4770d5a 100644 --- a/test/unit/specs/mapper.js +++ b/test/unit/specs/mapper.js @@ -1,47 +1,134 @@ define(function (require) { var elasticsearch = require('../bower_components/elasticsearch/elasticsearch.js'); var _ = require('lodash'); + var sinon = require('sinon/sinon'); var Courier = require('courier/courier'); var DataSource = require('courier/data_source/data_source'); var Mapper = require('courier/mapper'); + var client = new elasticsearch.Client({ host: 'localhost:9200', }); - describe('Mapper Module', function () { + var courier = new Courier({ + client: client + }); + + describe('Mapper', function () { + var server, source, mapper; + + beforeEach(function() { + source = courier.createSource('search') + .index('valid') + .size(5); + mapper = new Mapper(courier); + + // Stub out a mini mapping response. + sinon.stub(client.indices, 'getFieldMapping',function (params, callback) { + if(params.index === 'valid') { + setTimeout(callback(undefined,{ + "test": { + "mappings": { + "testType": { + "foo.bar": { + "full_name": "foo.bar", + "mapping": { + "bar": { + "type": "string" + }}}}}}} + ),0); + } else { + setTimeout(callback('Error: Not Found',undefined)); + } + }); + + sinon.stub(client, 'getSource', function (params, callback) { + if(params.id === 'valid') { + setTimeout(callback(undefined,{"foo.bar": {"type": "string"}}),0); + } else { + setTimeout(callback('Error: Not Found',undefined),0); + } + }); + + sinon.stub(client, 'delete', function (params, callback) {callback(undefined,true);}); + }); + + afterEach(function () { + client.indices.getFieldMapping.restore(); + client.getSource.restore(); + client.delete.restore(); + }); - it('provides a constructor for the Mapper class', function () { - var mapper = new Mapper(client); + it('provides a constructor for the Mapper class', function (done) { + var mapper = new Mapper(courier); expect(mapper).to.be.a(Mapper); + done(); }); - it('has a function called getFields that returns an object', function () { - /* - var courier = new Courier({ - client: client + it('has getFieldsFromMapping function that returns a mapping', function (done) { + mapper.getFieldsFromMapping(source,function (err, mapping) { + expect(client.indices.getFieldMapping.called).to.be(true); + expect(mapping['foo.bar'].type).to.be('string'); + done(); }); + }); - var dataSource = courier.createSource('search') - .index('_all') + it('has getFieldsFromCache that returns an error for uncached indices', function (done) { + source = courier.createSource('search') + .index('invalid') .size(5); - var mapper = new Mapper(client); + mapper.getFieldsFromCache(source,function (err, mapping) { + expect(client.getSource.called).to.be(true); + expect(err).to.be('Error: Not Found'); + done(); + }); + }); - var callback = function(data) { - console.log(data); - }; + it('has getFieldsFromCache that returns a mapping', function (done) { + mapper.getFieldsFromCache(source,function (err, mapping) { + expect(client.getSource.called).to.be(true); + expect(mapping['foo.bar'].type).to.be('string'); + done(); + }); + }); - expect(mapper.getFields(dataSource,callback)).to.eql({ - foo: { - type: 'string' - }, - "foo.bar": { - type: 'long' - } + it('has a getFieldsFromObject function', function (done) { + expect(mapper.getFieldsFromObject).to.be.a('function'); + done(); + }); + + it('has a getFields that returns a mapping from cache', function (done) { + mapper.getFields(source, function (err, mapping) { + expect(client.getSource.called).to.be(true); + expect(client.indices.getFieldMapping.called).to.be(false); + expect(mapping['foo.bar'].type).to.be('string'); + done(); }); - */ }); + it('has getFields that throws an error for invalid indices', function (done) { + source = courier.createSource('search') + .index('invalid') + .size(5); + try { + mapper.getFields(source, function (err, mapping) { + // Should not be called + expect('the callback').to.be('not executed'); + done(); + }); + } catch (e) { + expect(true).to.be(true); + done(); + } + }); + + it('has a clearCache that calls client.delete', function (done) { + mapper.clearCache(source, function () { + expect(client.delete.called).to.be(true); + done(); + }); + }); }); From e382cb9ec7709cd49f29cb3435e3263b1a9bb330 Mon Sep 17 00:00:00 2001 From: Spencer Alger Date: Mon, 24 Feb 2014 10:22:12 -0700 Subject: [PATCH 3/4] fixed the courer._clearScheduled, and renamed bind to $bind --- src/courier/courier.js | 3 ++- src/kibana/services/config.js | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/courier/courier.js b/src/courier/courier.js index 2013dd33842aa..3dd16c32bd79b 100644 --- a/src/courier/courier.js +++ b/src/courier/courier.js @@ -248,7 +248,8 @@ define(function (require) { // properly clear scheduled fetches Courier.prototype._clearScheduled = function (type) { - this._timer[type] = clearTimeout(this._timer[type]); + clearTimeout(this._timer[type]); + delete this._timer[type]; }; // alert the courior that a doc has been updated diff --git a/src/kibana/services/config.js b/src/kibana/services/config.js index 6520dd1ac7faf..8812412e6ab4b 100644 --- a/src/kibana/services/config.js +++ b/src/kibana/services/config.js @@ -89,7 +89,7 @@ define(function (require) { _notify(onChange, vals[key]); } - function bindToScope($scope, key, opts) { + function $bindToScope($scope, key, opts) { $watch(key, function (val) { if (opts && val === void 0) val = opts['default']; $scope[key] = val; @@ -114,7 +114,7 @@ define(function (require) { this.close = close; this.get = get; this.set = set; - this.bind = bindToScope; + this.$bind = $bindToScope; this.$watch = $watch; /******* From 3dbbc975f54a8d41259523f975dd92d491016a3c Mon Sep 17 00:00:00 2001 From: Spencer Alger Date: Mon, 24 Feb 2014 10:36:53 -0700 Subject: [PATCH 4/4] wired up the configFile's kibanaIndex field to the mapper --- src/config.js | 2 +- src/kibana/services/courier.js | 3 ++- src/kibana/setup.js | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/config.js b/src/config.js index 38344830ca8ee..1ec59abb76f71 100644 --- a/src/config.js +++ b/src/config.js @@ -27,6 +27,6 @@ define(function () { * The default ES index to use for storing Kibana specific object * such as stored dashboards */ - kibanaIndex: 'kibana-int' + kibanaIndex: 'kibana4-int' }; }); \ No newline at end of file diff --git a/src/kibana/services/courier.js b/src/kibana/services/courier.js index f7cd6ff1fe3c4..0d1d831ef4862 100644 --- a/src/kibana/services/courier.js +++ b/src/kibana/services/courier.js @@ -3,6 +3,7 @@ define(function (require) { var Courier = require('courier/courier'); var DocSource = require('courier/data_source/doc'); var errors = require('courier/errors'); + var configFile = require('../../config'); require('services/promises'); require('services/es'); @@ -20,7 +21,7 @@ define(function (require) { courier = new Courier({ fetchInterval: 15000, client: es, - promises: promises + internalIndex: configFile.kibanaIndex }); courier.errors = errors; diff --git a/src/kibana/setup.js b/src/kibana/setup.js index 5d978e3cc1513..f7df5f551aadb 100644 --- a/src/kibana/setup.js +++ b/src/kibana/setup.js @@ -67,12 +67,12 @@ define(function (require) { body: { settings: { mappings: { - type1: { + mappings: { _source: { enabled: false }, properties: { - field1: { + type: { type: 'string', index: 'not_analyzed' }