Skip to content

Commit

Permalink
Stream files line by line when parsing to avoid memory limits
Browse files Browse the repository at this point in the history
  • Loading branch information
eliot-akira committed Aug 11, 2021
1 parent a8ef348 commit a67622d
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 69 deletions.
53 changes: 32 additions & 21 deletions lib/persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
const path = require('path')
const async = require('async')
const byline = require('byline')
const customUtils = require('./customUtils.js')
const Index = require('./indexes.js')
const model = require('./model.js')
Expand Down Expand Up @@ -149,19 +150,23 @@ class Persistence {
}

/**
* From a database's raw data, return the corresponding
* From a database's raw stream, return the corresponding
* machine understandable collection
*/
treatRawData (rawData) {
const data = rawData.split('\n')
treatRawStream (rawStream, cb) {
const dataById = {}
const tdata = []
const indexes = {}
let corruptItems = -1
let corruptItems = 0

const lineStream = byline(rawStream)
const that = this
let length = 0

lineStream.on('data', function(line) {

for (const datum of data) {
try {
const doc = model.deserialize(this.beforeDeserialization(datum))
const doc = model.deserialize(that.beforeDeserialization(line))
if (doc._id) {
if (doc.$$deleted === true) delete dataById[doc._id]
else dataById[doc._id] = doc
Expand All @@ -170,17 +175,27 @@ class Persistence {
} catch (e) {
corruptItems += 1
}
}

// A bit lenient on corruption
if (
data.length > 0 &&
corruptItems / data.length > this.corruptAlertThreshold
) throw new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`)
length++
})

lineStream.on('end', function() {
// A bit lenient on corruption
let err
if (length > 0 && corruptItems / length > that.corruptAlertThreshold) {
err = new Error("More than " + Math.floor(100 * that.corruptAlertThreshold) + "% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss")
}

tdata.push(...Object.values(dataById))
Object.keys(dataById).forEach(function (k) {
tdata.push(dataById[k])
})

return { data: tdata, indexes: indexes }
cb(err, { data: tdata, indexes: indexes })
})

lineStream.on('error', function(err) {
cb(err)
})
}

/**
Expand All @@ -207,14 +222,10 @@ class Persistence {
// eslint-disable-next-line node/handle-callback-err
storage.ensureDatafileIntegrity(this.filename, err => {
// TODO: handle error
storage.readFile(this.filename, 'utf8', (err, rawData) => {
const fileStream = storage.readFileStream(this.filename, { encoding : 'utf8' })
this.treatRawStream(fileStream, (err, treatedData) => {

if (err) return cb(err)
let treatedData
try {
treatedData = this.treatRawData(rawData)
} catch (e) {
return cb(e)
}

// Recreate all indexes in the datafile
Object.keys(treatedData.indexes).forEach(key => {
Expand Down
1 change: 1 addition & 0 deletions lib/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ storage.writeFile = fs.writeFile
storage.unlink = fs.unlink
storage.appendFile = fs.appendFile
storage.readFile = fs.readFile
storage.readFileStream = fs.createReadStream
storage.mkdir = fs.mkdir

/**
Expand Down
14 changes: 14 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"dependencies": {
"@seald-io/binary-search-tree": "^1.0.2",
"async": "0.2.10",
"byline": "^5.0.0",
"localforage": "^1.9.0"
},
"devDependencies": {
Expand Down
147 changes: 99 additions & 48 deletions test/persistence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Datastore = require('../lib/datastore')
const Persistence = require('../lib/persistence')
const storage = require('../lib/storage')
const { execFile, fork } = require('child_process')
const Readable = require('stream').Readable

const { assert } = chai
chai.should()
Expand Down Expand Up @@ -41,101 +42,151 @@ describe('Persistence', function () {
], done)
})

it('Every line represents a document', function () {
it('Every line represents a document', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ _id: '3', nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(3)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
assert.deepStrictEqual(treatedData[2], { _id: '3', nested: { today: now } })
const stream = new Readable()

stream.push(rawData)
stream.push(null)

d.persistence.treatRawStream(stream, function (err, result) {
const treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(3)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
assert.deepStrictEqual(treatedData[2], { _id: '3', nested: { today: now } })
done()
})
})

it('Badly formatted lines have no impact on the treated data', function () {
it('Badly formatted lines have no impact on the treated data', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
'garbage\n' +
model.serialize({ _id: '3', nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', nested: { today: now } })
const stream = new Readable()

stream.push(rawData)
stream.push(null)

d.persistence.treatRawStream(stream, function (err, result) {
console.log(err)
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', nested: { today: now } })
done()
})
})

it('Well formatted lines that have no _id are not included in the data', function () {
it('Well formatted lines that have no _id are not included in the data', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
stream.push(rawData)
stream.push(null)

d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
done()
})
})

it('If two lines concern the same doc (= same _id), the last one is the good version', function () {
it('If two lines concern the same doc (= same _id), the last one is the good version', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ _id: '1', nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', nested: { today: now } })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
stream.push(rawData)
stream.push(null)

d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', nested: { today: now } })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
done()
})
})

it('If a doc contains $$deleted: true, that means we need to remove it from the data', function () {
it('If a doc contains $$deleted: true, that means we need to remove it from the data', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ _id: '1', $$deleted: true }) + '\n' +
model.serialize({ _id: '3', today: now })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()

stream.push(rawData)
stream.push(null)

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '2', hello: 'world' })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '2', hello: 'world' })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
done()
})
})

it('If a doc contains $$deleted: true, no error is thrown if the doc wasnt in the list before', function () {
it('If a doc contains $$deleted: true, no error is thrown if the doc wasnt in the list before', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', $$deleted: true }) + '\n' +
model.serialize({ _id: '3', today: now })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()

stream.push(rawData)
stream.push(null)

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
done()
})
})

it('If a doc contains $$indexCreated, no error is thrown during treatRawData and we can get the index options', function () {
it('If a doc contains $$indexCreated, no error is thrown during treatRawData and we can get the index options', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ $$indexCreated: { fieldName: 'test', unique: true } }) + '\n' +
model.serialize({ _id: '3', today: now })
const treatedData = d.persistence.treatRawData(rawData).data
const indexes = d.persistence.treatRawData(rawData).indexes
const stream = new Readable()

stream.push(rawData)
stream.push(null)

Object.keys(indexes).length.should.equal(1)
assert.deepStrictEqual(indexes.test, { fieldName: 'test', unique: true })
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
var indexes = result.indexes
Object.keys(indexes).length.should.equal(1)
assert.deepEqual(indexes.test, { fieldName: "test", unique: true })

treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
done()
})
})

it('Compact database on load', function (done) {
Expand Down

0 comments on commit a67622d

Please sign in to comment.