diff --git a/src/simperium/channel.js b/src/simperium/channel.js index 4f59d15..07f79db 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -342,7 +342,6 @@ internal.indexingComplete = function() { * @returns {Promise} - resolves once the change version is saved */ - /** * Maintains syncing state for a Simperium bucket. * @@ -451,15 +450,16 @@ Channel.prototype.remove = function( id ) { * @returns {Promise>} list of known object versions */ Channel.prototype.getRevisions = function( id ) { - return new Promise( ( resolve, reject ) => { - collectionRevisions( this, id, ( error, revisions ) => { - if ( error ) { - reject( error ); - return; - } - resolve( revisions ); - } ); - } ); + /** + * Since revision data is basically immutable we can prevent the + * need to refetch it after it has been loaded once. + * + * E.g. key could be `${ entityId }.${ versionNumber }` + * + * @type {Map} stores specific revisions as a cache + */ + const revisionCache = new Map(); + return collectionRevisions( this, id, revisionCache ); } /** @@ -855,16 +855,6 @@ LocalQueue.prototype.resendSentChanges = function() { } } -/** - * Since revision data is basically immutable we can prevent the - * need to refetch it after it has been loaded once. - * - * E.g. key could be `${ entityId }.${ versionNumber }` - * - * @type {Map} stores specific revisions as a cache - */ -export const revisionCache = new Map(); - /** * Attempts to fetch an entity's revisions * @@ -883,9 +873,10 @@ export const revisionCache = new Map(); * * @param {Object} channel used to send messages to the Simperium server * @param {String} id entity id for which to fetch revisions - * @param {Function} callback called on error or when finished + * @param {Map} cache for storing already requeted revisions + * @returns {Promise>} resolves to the fetched revisions */ -function collectionRevisions( channel, id, callback ) { +function collectionRevisions( channel, id, cache ) { /** @type {Number} ms delay arbitrarily chosen to give up on fetch */ const TIMEOUT = 200; @@ -901,96 +892,98 @@ function collectionRevisions( channel, id, callback ) { /** @type {Number} handle for "start finishing" timeout */ let timeout; - /** - * Receive a version update from the server and - * dispatch the next fetch or finish the fetching - * - * @param {String} id entity id - * @param {Number} version version of returned entity - * @param {Object} data value of entity at revision - */ - function onVersion( id, version, data ) { - revisionCache.set( `${ id }.${ version }`, data ); - versions.push( { id, version, data } ); - - // if we have every possible revision already, finish it! - // this bypasses any mandatory delay - if ( versions.length === latestVersion ) { - finish(); - return; - } - - fetchNextVersion( version ); + return new Promise( ( resolve, reject ) => { + /** + * Receive a version update from the server and + * dispatch the next fetch or finish the fetching + * + * @param {String} id entity id + * @param {Number} version version of returned entity + * @param {Object} data value of entity at revision + */ + function onVersion( id, version, data ) { + cache.set( `${ id }.${ version }`, data ); + versions.push( { id, version, data } ); + + // if we have every possible revision already, finish it! + // this bypasses any mandatory delay + if ( versions.length === latestVersion ) { + finish(); + return; + } - // defer the final response to the application - clearTimeout( timeout ); - timeout = setTimeout( finish, TIMEOUT ); - } + fetchNextVersion( version ); - /** - * Stop listening for versions and stop fetching them - * and pass accumulated data back to application - */ - function finish() { - clearTimeout( timeout ); - channel.removeListener( `version.${ id }`, onVersion ); + // defer the final response to the application + clearTimeout( timeout ); + timeout = setTimeout( finish, TIMEOUT ); + } - // sort newest first - callback( null, versions.sort( ( a, b ) => b.version - a.version ) ); - } + /** + * Stop listening for versions and stop fetching them + * and pass accumulated data back to application + */ + function finish() { + clearTimeout( timeout ); + channel.removeListener( `version.${ id }`, onVersion ); - /** - * Find the next version which isn't around and issue - * a fetch if possible - * - * @param {Number} prevVersion starting point for finding next version - */ - function fetchNextVersion( prevVersion ) { - let version = prevVersion; - - // find the next version to request - // some could have come back already - // or been requested already - while ( version > 0 && requestedVersions.has( version ) ) { - version -= 1; + // sort newest first + resolve( versions.sort( ( a, b ) => b.version - a.version ) ); } - // we have them all - if ( ! version ) { - return; - } + /** + * Find the next version which isn't around and issue + * a fetch if possible + * + * @param {Number} prevVersion starting point for finding next version + */ + function fetchNextVersion( prevVersion ) { + let version = prevVersion; + + // find the next version to request + // some could have come back already + // or been requested already + while ( version > 0 && requestedVersions.has( version ) ) { + version -= 1; + } - requestedVersions.add( version ); + // we have them all + if ( ! version ) { + return; + } + + requestedVersions.add( version ); - // fetch from server or local cache - if ( revisionCache.has( `${ id }.${ version }` ) ) { - onVersion( id, version, revisionCache.get( `${ id }.${ version }` ) ); - } else { - channel.send( `e:${ id }.${ version }` ); + // fetch from server or local cache + if ( cache.has( `${ id }.${ version }` ) ) { + onVersion( id, version, cache.get( `${ id }.${ version }` ) ); + } else { + channel.send( `e:${ id }.${ version }` ); + } } - } - // start listening for the responses - channel.on( `version.${ id }`, onVersion ); + // start listening for the responses + channel.on( `version.${ id }`, onVersion ); - // request the first revision and start the sequence - // pre-emptively fetch as many as could exist by default - channel.store.get( id ).then( ( { version } ) => { - latestVersion = version; + // request the first revision and start the sequence + // pre-emptively fetch as many as could exist by default + channel.store.get( id ).then( ( { version } ) => { + latestVersion = version; - // grab latest change revisions - for ( let i = 0; i < 60 && ( version - i ) > 0; i++ ) { - fetchNextVersion( version - i ); - } + // grab latest change revisions + for ( let i = 0; i < 60 && ( version - i ) > 0; i++ ) { + fetchNextVersion( version - i ); + } - // grab archive revisions - // these are like 1, 11, 21, 31, …, 41, normal revisions [42, 43, 44, 45, …] - const firstArchive = Math.round( ( version - 60 ) / 10 ) * 10 + 1; // 127 -> 67 -> 6 -> 60 -> 61 - for ( let i = 0; i < 100 && ( firstArchive - 10 * i ) > 0; i++ ) { - fetchNextVersion( firstArchive - 10 * i ); - } - }, callback ); + // grab archive revisions + // these are like 1, 11, 21, 31, …, 41, normal revisions [42, 43, 44, 45, …] + const firstArchive = Math.round( ( version - 60 ) / 10 ) * 10 + 1; // 127 -> 67 -> 6 -> 60 -> 61 + for ( let i = 0; i < 100 && ( firstArchive - 10 * i ) > 0; i++ ) { + fetchNextVersion( firstArchive - 10 * i ); + } + }, reject ); - // and set an initial timeout for failed connections - timeout = setTimeout( finish, TIMEOUT * 4 ); + // and set an initial timeout for failed connections + timeout = setTimeout( finish, TIMEOUT * 4 ); + } ); }