Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mongo insert deprecated, updating #107

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ mongocollection.find({})

#### Mongo

<a name="mongoinsert" href="#mongoinsert">#</a> etl.mongo.<b>insert</b>(<i>collection</i> [,<i>options</i>])
<a name="mongoinsert" href="#mongoinsert">#</a> etl.mongo.<b>insert</b>(<i>collection</i> [,<i>options</i>]) [DEPRECATED]

Inserts incoming data into the provided mongodb collection. The supplied collection can be a promise on a collection. The options are passed on to both streamz and the mongodb insert comand. By default this object doesn't push anything downstream, but it `pushResults` is set as `true` in options, the results from mongo will be pushed downstream.
Inserts incoming data into the provided mongodb collection. The supplied collection can be a promise on a collection. The options are passed on to both streamz and the mongodb insert command. By default this object doesn't push anything downstream, but it `pushResults` is set as `true` in options, the results from mongo will be pushed downstream.

Example

Expand All @@ -305,6 +305,47 @@ etl.file('test.csv')

```

<a name="mongoinsertone" href="#mongoinsertone">#</a> etl.mongo.<b>insertOne</b>(<i>collection</i> [,<i>options</i>])

Inserts one incoming data into the provided mongodb collection. The supplied collection can be a promise on a collection. The options are passed on to both streamz and the mongodb insertOne command. By default this object doesn't push anything downstream, but it `pushResults` is set as `true` in options, the results from mongo will be pushed downstream.

Example

```js
// The following inserts data from a csv, one record at a time into a mongo collection

var db = mongo.ConnectAsync('mongodb://localhost:27017/testdb');
var collection = db.then(function(db) {
return db.collection('testcollection');
});

etl.file('test.csv')
.pipe(etl.csv())
.pipe(etl.mongo.insertOne(collection));

```

<a name="mongoinsertmany" href="#mongoinsertmany">#</a> etl.mongo.<b>insertOne</b>(<i>collection</i> [,<i>options</i>])

Inserts array of incoming data into the provided mongodb collection. The supplied collection can be a promise on a collection. The options are passed on to both streamz and the mongodb insertMany command. By default this object doesn't push anything downstream, but it `pushResults` is set as `true` in options, the results from mongo will be pushed downstream.

Example

```js
// The following inserts data from a csv, 10 records at a time into a mongo collection

var db = mongo.ConnectAsync('mongodb://localhost:27017/testdb');
var collection = db.then(function(db) {
return db.collection('testcollection');
});

etl.file('test.csv')
.pipe(etl.csv())
.pipe(etl.collect(10))
.pipe(etl.mongo.insertMany(collection));

```

<a name="mongoupdate" href="#mongoupdate">#</a> etl.mongo.<b>update</b>(<i>collection</i> [,<i>keys</i>] [,<i>options</i>])

Updates incoming data by building a `criteria` from an array of `keys` and the incoming data. Supplied collection can be a promise and results can be pushed downstream by declaring `pushResults : true`. The options are passed to mongo so defining `upsert : true` in options will ensure an upsert of the data.
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module.exports = {
insert : require('./insert'),
insertOne : require('./insertOne'),
insertMany : require('./insertMany'),
update : require('./update'),
bulk: require('./bulk'),
upsert : function() {
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/insert.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

const Streamz = require('streamz');
const Promise = require('bluebird');
const util = require('util');
Expand Down
31 changes: 31 additions & 0 deletions lib/mongo/insertMany.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const Streamz = require('streamz');
const Promise = require('bluebird');
const util = require('util');

function insertMany(_c,collection,options) {
if (!(this instanceof Streamz))
return new insertMany(_c,collection);

if (isNaN(_c)) {
options = collection;
collection = _c;
_c = undefined;
}

Streamz.call(this, _c, null, options);
this.collection = Promise.resolve(collection);
this.options = options || {};
}

util.inherits(insertMany,Streamz);

insertMany.prototype._fn = function(d) {
return this.collection
.then(collection =>collection.insertMany(d,this.options))
.then(d => {
if (this.options.pushResults)
return d.result;
});
};

module.exports = insertMany;
31 changes: 31 additions & 0 deletions lib/mongo/insertOne.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const Streamz = require('streamz');
const Promise = require('bluebird');
const util = require('util');

function insertOne(_c,collection,options) {
if (!(this instanceof Streamz))
return new insertOne(_c,collection);

if (isNaN(_c)) {
options = collection;
collection = _c;
_c = undefined;
}

Streamz.call(this, _c, null, options);
this.collection = Promise.resolve(collection);
this.options = options || {};
}

util.inherits(insertOne,Streamz);

insertOne.prototype._fn = function(d) {
return this.collection
.then(collection =>collection.insertOne(d,this.options))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work on this PR! Would this work followed by an etl.collect(10)? I am getting doc has to be object error, which I assume means that due to a collect this is passing an array of docs. I think the docs in this PR should reflect this, remove the collect(10) in the example maybe?

.then(d => {
if (this.options.pushResults)
return d.result;
});
};

module.exports = insertOne;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "etl",
"version": "0.6.11",
"version": "0.6.12",
"description": "Collection of stream-based components that form an ETL pipeline",
"main": "index.js",
"author": "Ziggy Jonsson (http://github.com/zjonsson/)",
Expand Down
2 changes: 2 additions & 0 deletions test/lib/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ async function clear() {
await Promise.all(
[
db.collection("insert").deleteMany({}),
db.collection("insert-one").deleteMany({}),
db.collection("insert-many").deleteMany({}),
db.collection("update-empty").deleteMany({}),
db.collection("update-populated").deleteMany({}),
db.collection("upsert").deleteMany({}),
Expand Down
58 changes: 58 additions & 0 deletions test/mongo-insert-many-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
const etl = require('../index');
const data = require('./data');
const {getCollection, clear} = require('./lib/mongo');
const t = require('tap');
const Promise = require('bluebird');

t.test('mongo.insertMany', async t => {

t.teardown(() => t.end());

t.test('piping data into mongo.insertMany',async t => {
const collection = await getCollection('insert-many');
const d = await data.stream()
.pipe(etl.collect(1))
.pipe(etl.mongo.insertMany(collection,{pushResult:true}))
.promise();
d.forEach(d => t.same(d,{ok:1,n:1},'inserts each record'));
});

t.test('mongo collection',async t => {
const collection = await getCollection('insert-many');
const d = await collection.find({},{ projection: {_id:0}}).toArray();

t.same(d,data.data,'reveals data');
});

t.test('pushResults == false and collection as promise',async t => {
const collection = await getCollection('insert-many');
const d = await data.stream(etl.mongo.insertMany(collection))
.pipe(etl.collect(4))
.pipe(etl.mongo.insertMany(collection))
.promise();

t.same(d,[],'returns nothing');
});

t.test('error in collection', async t => {
const collection = Promise.reject({message: 'CONNECTION_ERROR'});
collection.suppressUnhandledRejections();
const e = await etl.toStream({test:true})
.pipe(etl.collect(1))
.pipe(etl.mongo.insertMany(collection))
.promise()
.then(() => {throw 'SHOULD_ERROR';}, Object);

t.same(e.message,'CONNECTION_ERROR','should bubble down');
});
})
.then(() => clear())
.then(() => t.end())
.catch(e => {
if (e.message.includes('ECONNREFUSED'))
console.warn('Warning: MongoDB server not available');
else
console.warn(e.message);
});


55 changes: 55 additions & 0 deletions test/mongo-insert-one-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const etl = require('../index');
const data = require('./data');
const {getCollection, clear} = require('./lib/mongo');
const t = require('tap');
const Promise = require('bluebird');

t.test('mongo.insertOne', async t => {

t.teardown(() => t.end());

t.test('piping data into mongo.insertOne',async t => {
const collection = await getCollection('insert-one');
const d = await data.stream()
.pipe(etl.mongo.insertOne(collection,{pushResult:true}))
.promise();
d.forEach(d => t.same(d,{ok:1,n:1},'inserts each record'));
});

t.test('mongo collection',async t => {
const collection = await getCollection('insert-one');
const d = await collection.find({},{ projection: {_id:0}}).toArray();

t.same(d,data.data,'reveals data');
});

t.test('pushResults == false and collection as promise',async t => {
const collection = await getCollection('insert-one');
const d = await data.stream(etl.mongo.insertOne(collection))
.pipe(etl.mongo.insertOne(collection))
.promise();

t.same(d,[],'returns nothing');
});

t.test('error in collection', async t => {
const collection = Promise.reject({message: 'CONNECTION_ERROR'});
collection.suppressUnhandledRejections();
const e = await etl.toStream({test:true})
.pipe(etl.mongo.insertOne(collection,'_id'))
.promise()
.then(() => {throw 'SHOULD_ERROR';}, Object);

t.same(e.message,'CONNECTION_ERROR','should bubble down');
});
})
.then(() => clear())
.then(() => t.end())
.catch(e => {
if (e.message.includes('ECONNREFUSED'))
console.warn('Warning: MongoDB server not available');
else
console.warn(e.message);
});


4 changes: 1 addition & 3 deletions test/mongo-insert-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ t.test('mongo.insert', async t => {
const collection = Promise.reject({message: 'CONNECTION_ERROR'});
collection.suppressUnhandledRejections();
const e = await etl.toStream({test:true})
.pipe(etl.mongo.update(collection,'_id'))
.pipe(etl.mongo.insert(collection,'_id'))
.promise()
.then(() => {throw 'SHOULD_ERROR';}, Object);

Expand All @@ -51,5 +51,3 @@ t.test('mongo.insert', async t => {
else
console.warn(e.message);
});