Skip to content

Commit

Permalink
Merge pull request #76 from RagibHasin/master
Browse files Browse the repository at this point in the history
Added mapReduce on Model and supporting tests, decorators and types.
  • Loading branch information
spartan563 authored May 18, 2017
2 parents 1adb2ef + 4cc024c commit 60e0db8
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 0 deletions.
1 change: 1 addition & 0 deletions iridium.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ export * from "./lib/caches/NoOpCache";
export * from "./lib/cacheControllers/IDDirector";

export * from "./lib/utils/ObjectID";
export * from "./lib/MapReduce";
22 changes: 22 additions & 0 deletions lib/Decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {Index, IndexSpecification} from "./Index";
import {Schema} from "./Schema";
import {InstanceImplementation} from "./InstanceInterface";
import {Transforms, DefaultTransforms} from "./Transforms";
import * as MapReduceDef from './MapReduce';

/**
* Specifies the name of the collection to which this instance's documents should be sent.
Expand Down Expand Up @@ -162,4 +163,25 @@ export function Binary(target: Instance<any, any>, name: string) {
DefaultTransforms.Binary.fromDB,
DefaultTransforms.Binary.toDB
)(target, name);
}

/**
* Specifies that the instance is a result of a mapReduce operation and functions of that operation.
*
* @param TDocument Interface of the document on which the operation will run
* @param Key Type of the mapped keys
* @param Value Type of the mapped values
*
* @param {MapReduce.MapFunction<TDocument>} map A function which maps documents.
* @param {MapReduce.ReduceFunction<Key, Value>} reduce A function which reduces mapped pairs.
*
* This decorator replaces the use of the static mapReduce property on instance implementation
* classes. If your transpiler does not support decorators then you are free to make use of the
* property instead.
*/
export function MapReduce<TDocument, Key, Value>(map: MapReduceDef.MapFunction<TDocument>,
reduce: MapReduceDef.ReduceFunction<Key, Value>) {
return function (target: InstanceImplementation<any, any>) {
target.mapReduceOptions = { map: map, reduce: reduce };
};
}
6 changes: 6 additions & 0 deletions lib/Instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {Transforms} from "./Transforms";
import {DefaultValidators} from "./Validators";
import {Conditions} from "./Conditions";
import {Changes} from "./Changes";
import * as MapReduce from "./MapReduce";

import * as _ from "lodash";
import * as MongoDB from "mongodb";
Expand Down Expand Up @@ -129,6 +130,11 @@ export class Instance<TDocument extends { _id?: any }, TInstance> {
*/
static indexes: (Index.Index | Index.IndexSpecification)[] = [];

/**
* mapReduce Options
*/
static mapReduceOptions?: MapReduce.MapReduceFunctions<any,any,any>;

/**
* Saves any changes to this instance, using the built in diff algorithm to write the update query.
* @param {function} callback A callback which is triggered when the save operation completes
Expand Down
6 changes: 6 additions & 0 deletions lib/InstanceInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as Index from "./Index";
import {CacheDirector} from "./CacheDirector";
import {Transforms} from "./Transforms";
import {Changes} from "./Changes";
import {MapFunction, ReduceFunction, MapReduceFunctions} from "./MapReduce"

/**
* This interface dictates the format of an instance class which wraps documents received
Expand Down Expand Up @@ -49,6 +50,11 @@ export interface InstanceImplementation<TDocument extends { _id ?: any }, TInsta
* to automatically generate all specified indexes.
*/
indexes?: (Index.Index | Index.IndexSpecification)[];

/**
* mapReduce Options
*/
mapReduceOptions?: MapReduceFunctions<any, any, any>;

/**
* An optional method which will be called whenever a document is about to be inserted into the database,
Expand Down
68 changes: 68 additions & 0 deletions lib/MapReduce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import * as MongoDB from "mongodb"

import { Instance } from "./Instance"

declare global {
/**
* Emits a key-value pair for mapping.
* This is NOT a global function but is a MongoDB srever internal function.
* Only use in map function and nowhere else.
*
* @param {any} key Key to emit
* @param {any} value Value to emit
*/
function emit(key: any, value: any): void
}

/**
* A function to map values.
* Values are mapped by calling a MongoDB server internal function `emit`.
*
* @param TDocument Interface of the document it works on.
* @return {void} Nothing
*/
export interface MapFunction<TDocument> {
(this: TDocument): void
}

/**
* A function to reduce mapped values.
*
* @param Key Type of the key
* @param Value Type of the value
*
* @param {Key} key The key to reduce
* @param {Value[]} values The values to reduce
*/
export interface ReduceFunction<Key, Value> {
(key: Key, values: Value[]): Value
}

/**
* Interface of the document returned as result of a mapReduce operation.
*
* @param Key Type of the key
* @param Value Type of the value
*/
export interface MapReducedDocument<Key, Value> {
_id: Key
value: Value
}

/**
* MapReduce Options.
*
* @param TDocument Interface of the document it works on.
* @param Key Type of the key to reduce
* @param Value Type of the values to reduce
*
* Extends `MongoDB.MapReduceOptions` with additional map and reduce field.
*/
export interface MapReduceFunctions<TDocument, Key, Value> {
map: MapFunction<TDocument>
reduce: ReduceFunction<Key, Value>
}

export interface MapReduceOptions extends MongoDB.MapReduceOptions {
out?: "inline" | "replace" | "merge" | "reduce"
}
49 changes: 49 additions & 0 deletions lib/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {ModelSpecificInstance} from "./ModelSpecificInstance";
import {InstanceImplementation} from "./InstanceInterface";
import {Transforms, DefaultTransforms} from "./Transforms";
import * as AggregationPipeline from "./Aggregate";
import {MapFunction, ReduceFunction, MapReducedDocument, MapReduceFunctions, MapReduceOptions} from "./MapReduce";

/**
* An Iridium Model which represents a structured MongoDB collection.
Expand Down Expand Up @@ -692,6 +693,54 @@ export class Model<TDocument extends { _id?: any }, TInstance> {
});
}

/**
* Runs a mapReduce operation in MongoDB and returns the contents of the resulting collection.
* @param functions The mapReduce functions which will be passed to MongoDB to complete the operation.
* @param options Options used to configure how MongoDB runs the mapReduce operation on your collection.
* @return A promise which completes when the mapReduce operation has written its results to the provided collection.
*/
mapReduce<Key, Value>(functions: MapReduceFunctions<TDocument, Key, Value>, options: MapReduceOptions): Bluebird<MapReducedDocument<Key, Value>[]>;
/**
* Runs a mapReduce operation in MongoDB and writes the results to a collection.
* @param instanceType An Iridium.Instance type whichThe mapReduce functions which will be passed to MongoDB to complete the operation.
* @param options Options used to configure how MongoDB runs the mapReduce operation on your collection.
* @return A promise which completes when the mapReduce operation has written its results to the provided collection.
*/
mapReduce<Key, Value>(instanceType: InstanceImplementation<MapReducedDocument<Key, Value>, any> & { mapReduceOptions: MapReduceFunctions<TDocument, Key, Value> },
options: MapReduceOptions): Bluebird<void>;
mapReduce<Key, Value>(functions: (InstanceImplementation<MapReducedDocument<Key, Value>, any> & { mapReduceOptions: MapReduceFunctions<TDocument, Key, Value> }) |
MapReduceFunctions<TDocument, Key, Value>, options: MapReduceOptions) {
type fn = MapReduceFunctions<TDocument, Key, Value>;

if ((<fn>functions).map) {
return new Bluebird<MapReducedDocument<Key, Value>[]>((resolve, reject) => {
if (options.out && options.out != "inline")
return reject(new Error("Use inline output option"));
let opts = <MongoDB.MapReduceOptions>options
opts.out = { inline: 1 }
this.collection.mapReduce((<fn>functions).map, (<fn>functions).reduce, opts, function (err, data) {
if (err) return reject(err);
return resolve(data);
})
})
}
else {
let instanceType = <InstanceImplementation<MapReducedDocument<Key, Value>, any> & { mapReduceOptions: MapReduceFunctions<TDocument, Key, Value> }>functions
return new Bluebird<void>((resolve, reject) => {
if (options.out && options.out == "inline")
return reject(new Error("Output cannot be inline"));
let opts = <MongoDB.MapReduceOptions>options
let out : {[op: string]: string} = {}
out[(<string>options.out)] = instanceType.collection
opts.out = out
this.collection.mapReduce(instanceType.mapReduceOptions.map, instanceType.mapReduceOptions.reduce, opts, (err, data) => {
if (err) return reject(err)
return resolve()
})
})
}
}

/**
* Ensures that the given index is created for the collection
* @param {Object} specification The index specification object used by MongoDB
Expand Down
100 changes: 100 additions & 0 deletions test/MapReduce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import * as Iridium from "../iridium";
import * as MongoDB from "mongodb";
import { Cursor } from "../lib/Cursor";
import * as Promise from "bluebird";
import * as _ from "lodash";
import * as chai from "chai";

// This test folows the example depicted on https://docs.mongodb.com/manual/core/map-reduce/

interface TestDocument {
_id?: string;
cust_id: string;
amount: number;
status: string;
}

class Test extends Iridium.Instance<TestDocument, Test> implements TestDocument {
static collection = "test";
static schema: Iridium.Schema = {
_id: MongoDB.ObjectID,
cust_id: String,
amount: Number,
status: String,
};

_id: string;
cust_id: string;
amount: number;
status: string;
}

class MapReducedInstance extends Iridium.Instance<Iridium.MapReducedDocument<string, number>, MapReducedInstance>{
static collection = "mapReduced";
static mapReduceOptions = {
map: function (this: TestDocument) {
emit(this.cust_id, this.amount);
},
reduce: function (key: string, values: number[]) {
return values.reduce((sum, val) => sum + val, 0)
}
}
_id: string
value: number
}

describe("Model", () => {
let core = new Iridium.Core({ database: "test" });

before(() => core.connect());

describe("mapReduce()", () => {
let model = new Iridium.Model<TestDocument, Test>(core, Test);

beforeEach(() => {
return core.connect().then(() => model.remove()).then(() => model.insert([
{ cust_id: "A123", amount: 500, status: "A" },
{ cust_id: "A123", amount: 250, status: "A" },
{ cust_id: "A123", amount: 300, status: "B" },
{ cust_id: "B212", amount: 200, status: "A" }
]));
});

it("should correctly map and reduce with model", () => {
let reducedModel = new Iridium.Model<Iridium.MapReducedDocument<string, number>, MapReducedInstance>(core, MapReducedInstance);
let t = reducedModel.remove().then(() => model.mapReduce(MapReducedInstance, {
out: "replace", query: { status: "A" }
}).then(() => reducedModel.find().toArray()))
return chai.expect(t).to.eventually.exist.and.have.length(2);
});

it("should correctly map and reduce inline without specifying out option", () => {
let t = model.mapReduce({
map: function (this: TestDocument) {
emit(this.cust_id, this.amount);
}, reduce: function (k: string, v: number[]) {
return v.reduce((sum, val) => sum + val, 0)
}
}, { query: { status: "A" } })
return chai.expect(t).to.eventually.exist.and.have.length(2);
});

it("should reject with wrong out option for inline", () => {
let t = model.mapReduce({
map: function (this: TestDocument) {
emit(this.cust_id, this.amount);
}, reduce: function (k: string, v: number[]) {
return v.reduce((sum, val) => sum + val, 0)
}
}, { out: "replace", query: { status: "A" } })
return chai.expect(t).to.eventually.be.rejected
});

it("should reject with wrong out option for model", () => {
let t = model.mapReduce(MapReducedInstance, {
out: "inline", query: { status: "A" }
})
return chai.expect(t).to.eventually.be.rejected
});
});
});

0 comments on commit 60e0db8

Please sign in to comment.