Skip to content

Commit

Permalink
Merge pull request #253 from rlidwka/search
Browse files Browse the repository at this point in the history
complete search rewrite
  • Loading branch information
rlidwka committed May 30, 2015
2 parents 3dd6442 + cac6e89 commit d370e5a
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 171 deletions.
5 changes: 5 additions & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ function Config(config) {
return flatten(result)
}

// add a default rule for all packages to make writing plugins easier
if (self.packages['**'] == null) {
self.packages['**'] = {}
}

for (var i in self.packages) {
assert(
typeof(self.packages[i]) === 'object' &&
Expand Down
69 changes: 46 additions & 23 deletions lib/index-api.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
var async = require('async')
var Cookies = require('cookies')
var express = require('express')
var expressJson5 = require('express-json5')
Expand Down Expand Up @@ -84,33 +83,57 @@ module.exports = function(config, auth, storage) {

// searching packages
app.get('/-/all/:anything?', function(req, res, next) {
storage.search(req.param.startkey || 0, {req: req}, function(err, result) {
if (err) return next(err)
async.eachSeries(Object.keys(result), function(pkg, cb) {
auth.allow_access(pkg, req.remote_user, function(err, allowed) {
if (err) {
if (err.status && String(err.status).match(/^4\d\d$/)) {
// auth plugin returns 4xx user error,
// that's equivalent of !allowed basically
allowed = false
} else {
return cb(err)
}
var received_end = false
var response_finished = false
var processing_pkgs = 0

res.status(200)
res.write('{"_updated":' + Date.now());

var stream = storage.search(req.param.startkey || 0, { req: req })

stream.on('data', function each(pkg) {
processing_pkgs++

auth.allow_access(pkg.name, req.remote_user, function(err, allowed) {
processing_pkgs--

if (err) {
if (err.status && String(err.status).match(/^4\d\d$/)) {
// auth plugin returns 4xx user error,
// that's equivalent of !allowed basically
allowed = false
} else {
stream.abort(err)
}
}

if (!allowed) delete result[pkg]
cb()
})
}, function(err) {
if (err) return next(err)
next(result)
if (allowed) {
res.write(',\n' + JSON.stringify(pkg.name) + ':' + JSON.stringify(pkg))
}

check_finish()
})
})
})

//app.get('/*', function(req, res) {
// proxy.request(req, res)
//})
stream.on('error', function (_err) {
res.socket.destroy()
})

stream.on('end', function () {
received_end = true
check_finish()
})

function check_finish() {
if (!received_end) return
if (processing_pkgs) return
if (response_finished) return

response_finished = true
res.end('}\n')
}
})

// placeholder 'cause npm require to be authenticated to publish
// we do not do any real authentication yet
Expand Down
12 changes: 0 additions & 12 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,6 @@ module.exports = function(config_hash) {
next()
})

/* app.get('/-/all', function(req, res) {
var https = require('https')
var JSONStream = require('JSONStream')
var request = require('request')({
url: 'https://registry.npmjs.org/-/all',
})
.pipe(JSONStream.parse('*'))
.on('data', function(d) {
console.log(d)
})
})*/

// hook for tests only
if (config._debug) {
app.get('/-/_debug', function(req, res, next) {
Expand Down
115 changes: 93 additions & 22 deletions lib/local-storage.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
var assert = require('assert')
var async = require('async')
var Crypto = require('crypto')
var fs = require('fs')
var Error = require('http-errors')
var Path = require('path')
var Stream = require('readable-stream')
var URL = require('url')
var fs_storage = require('./local-fs')
var Logger = require('./logger')
Expand Down Expand Up @@ -482,35 +484,55 @@ Storage.prototype.get_package = function(name, options, callback) {
})
}

Storage.prototype.get_recent_packages = function(startkey, callback) {
// walks through each package and calls `on_package` on them
Storage.prototype._each_package = function (on_package, on_end) {
var self = this
var i = 0
var list = []
var storages = {}

var storage = self.storage('')
if (!storage) return callback(null, [])
storages[self.config.storage] = true

fs.readdir(storage.path, function(err, files) {
if (err) return callback(null, [])

var filesL = files.length
if (self.config.packages) {
Object.keys(self.packages || {}).map(function (pkg) {
if (self.config.packages[pkg].storage) {
storages[self.config.packages[pkg].storage] = true
}
})
}

files.forEach(function(file) {
fs.stat(storage.path, function(err, stats) {
if (err) return callback(err)
if (stats.mtime > startkey && Utils.validate_name(file)) {
list.push({
time: stats.mtime,
name: file
var base = Path.dirname(self.config.self_path);

async.eachSeries(Object.keys(storages), function (storage, cb) {
fs.readdir(Path.resolve(base, storage), function (err, files) {
if (err) return cb(err)

async.eachSeries(files, function (file, cb) {
if (file.match(/^@/)) {
// scoped
fs.readdir(Path.resolve(base, storage, file), function (err, files) {
if (err) return cb(err)

async.eachSeries(files, function (file2, cb) {
if (Utils.validate_name(file2)) {
on_package({
name: file + '/' + file2,
path: Path.resolve(base, storage, file, file2),
}, cb)
} else {
cb()
}
}, cb)
})
} else if (Utils.validate_name(file)) {
on_package({
name: file,
path: Path.resolve(base, storage, file)
}, cb)
} else {
cb()
}
if (++i !== filesL) {
return false
}
return callback(null, list)
})
}, cb)
})
})
}, on_end)
}

//
Expand Down Expand Up @@ -565,6 +587,55 @@ Storage.prototype.update_package = function(name, updateFn, _callback) {
})
}

Storage.prototype.search = function(startkey, options) {
var self = this

var stream = new Stream.PassThrough({ objectMode: true })

self._each_package(function on_package(item, cb) {
fs.stat(item.path, function(err, stats) {
if (err) return cb(err)

if (stats.mtime > startkey) {
self.get_package(item.name, options, function(err, data) {
if (err) return cb(err)

var versions = Utils.semver_sort(Object.keys(data.versions))
var latest = versions[versions.length - 1]

if (data.versions[latest]) {
stream.push({
name : data.versions[latest].name,
description : data.versions[latest].description,
'dist-tags' : { latest: latest },
maintainers : data.versions[latest].maintainers ||
[ data.versions[latest]._npmUser ].filter(Boolean),
author : data.versions[latest].author,
repository : data.versions[latest].repository,
readmeFilename : data.versions[latest].readmeFilename || '',
homepage : data.versions[latest].homepage,
keywords : data.versions[latest].keywords,
bugs : data.versions[latest].bugs,
license : data.versions[latest].license,
time : { modified: item.time ? new Date(item.time).toISOString() : undefined },
versions : {},
})
}

cb()
})
} else {
cb()
}
})
}, function on_end(err) {
if (err) return stream.emit('error', err)
stream.end()
})

return stream
}

Storage.prototype._normalize_package = function(pkg) {
;['versions', 'dist-tags', '_distfiles', '_attachments', '_uplinks'].forEach(function(key) {
if (!Utils.is_object(pkg[key])) pkg[key] = {}
Expand Down
100 changes: 32 additions & 68 deletions lib/storage.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var assert = require('assert')
var async = require('async')
var Error = require('http-errors')
var Stream = require('stream')
var Local = require('./local-storage')
var Logger = require('./logger')
var MyStreams = require('./streams')
Expand Down Expand Up @@ -328,86 +329,49 @@ Storage.prototype.get_package = function(name, options, callback) {
//
// Retrieve remote and local packages more recent than {startkey}
//
// Function invokes uplink.request for npm and local.get_recent_packages for
// local ones then sum up the result in a json object
// Function streams all packages from all uplinks first, and then
// local packages.
//
// Note that local packages could override registry ones just because
// they appear in JSON last. That's a trade-off we make to avoid
// memory issues.
//
// Used storages: local && uplink (proxy_access)
//
Storage.prototype.search = function(startkey, options, callback) {
Storage.prototype.search = function(startkey, options) {
var self = this
var uplinks = []
var i = 0

var uplinks
for (var p in self.uplinks) {
uplinks.push(p)
}

function merge_with_local_packages(err, res, body) {
if (err) return callback(err)
var j = 0

self.local.get_recent_packages(startkey, function(err, list) {
if (err) return callback(err)

var listL = list.length

if (!listL) return callback(null, body)

list.forEach(function(item) {
self.local.get_package(item.name, options, function(err, data) {
if (err) return callback(err)

var versions = Utils.semver_sort(Object.keys(data.versions))
var latest = versions[versions.length - 1]

if (data.versions[latest]) {
body[item.name] = {
name : data.versions[latest].name,
description : data.versions[latest].description,
'dist-tags' : { latest: latest },
maintainers : data.versions[latest].maintainers || [data.versions[latest]._npmUser].filter(Boolean),
readmeFilename: data.versions[latest].readmeFilename || '',
time : {
modified: new Date(item.time).toISOString()
},
versions : {},
repository : data.versions[latest].repository,
keywords : data.versions[latest].keywords
}
body[item.name].versions[latest] = 'latest'
}
var stream = new Stream.PassThrough({ objectMode: true })

if (++j !== listL) {
return false
}
async.eachSeries(Object.keys(self.uplinks), function(up_name, cb) {
// shortcut: if `local=1` is supplied, don't call uplinks
if (options.req.query.local !== undefined) return cb()

return callback(null, body)
})
})
var lstream = self.uplinks[up_name].search(startkey, options)
lstream.pipe(stream, { end: false })
lstream.on('error', function (err) {
self.logger.error({ err: err }, 'uplink error: @{err.message}')
cb(), cb = function () {}
})
lstream.on('end', function () {
cb(), cb = function () {}
})
}

function remote_search() {
var uplink = self.uplinks[uplinks[i]]
if (options.req.query.local !== undefined || !uplink) {
return merge_with_local_packages(null, null, {})
stream.abort = function () {
if (lstream.abort) lstream.abort()
cb(), cb = function () {}
}
self.uplinks[uplinks[i]].request({
uri : options.req.url,
timeout : self.uplinks[p].timeout,
json : true,
req : options.req,
}, function(err, res, body) {
if (err || Math.floor(res.statusCode / 100) > 3) {
i++
return remote_search()
}
return merge_with_local_packages(err, res, body)
}, function () {
var lstream = self.local.search(startkey, options)
stream.abort = function () { lstream.abort() }
lstream.pipe(stream, { end: true })
lstream.on('error', function (err) {
self.logger.error({ err: err }, 'search error: @{err.message}')
stream.end()
})
}
})

remote_search()
return stream
}

Storage.prototype.get_local = function(callback) {
Expand Down
Loading

0 comments on commit d370e5a

Please sign in to comment.