Skip to content

Commit

Permalink
Add basic DataFrame impl ...
Browse files Browse the repository at this point in the history
... and a bunch of performance tests for various scanning approaches
  • Loading branch information
Brian Hulette committed Jan 12, 2018
1 parent a1edac2 commit 30f0330
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 29 deletions.
171 changes: 143 additions & 28 deletions js/perf/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,52 @@
// under the License.

// Use the ES5 UMD target as perf baseline
// const { Table, readVectors } = require('../targets/es5/umd');
// const { Table, readVectors } = require('../targets/es5/cjs');
const { Table, readVectors } = require('../targets/es2015/umd');
// const { Table, readVectors } = require('../targets/es2015/cjs');
// const { DataFrame, Table, readVectors } = require('../targets/es5/umd');
// const { DataFrame, Table, readVectors } = require('../targets/es5/cjs');
// const { DataFrame, Table, readVectors } = require('../targets/es2015/umd');
const { DataFrame, Table, readVectors } = require('../targets/es2015/cjs');

const config = require('./config');
const Benchmark = require('benchmark');

const suites = [];

for (let { name, buffers} of config) {
const parseSuite = new Benchmark.Suite(`Parse ${name}`, { async: true });
const sliceSuite = new Benchmark.Suite(`Slice ${name} vectors`, { async: true });
const iterateSuite = new Benchmark.Suite(`Iterate ${name} vectors`, { async: true });
const getByIndexSuite = new Benchmark.Suite(`Get ${name} values by index`, { async: true });
parseSuite.add(createFromTableTest(name, buffers));
parseSuite.add(createReadVectorsTest(name, buffers));
for (const vector of Table.from(buffers).columns) {
sliceSuite.add(createSliceTest(vector));
iterateSuite.add(createIterateTest(vector));
getByIndexSuite.add(createGetByIndexTest(vector));
}
suites.push(getByIndexSuite, iterateSuite, sliceSuite, parseSuite);
}
//for (let { name, buffers} of config) {
// const parseSuite = new Benchmark.Suite(`Parse "${name}"`, { async: true });
// const sliceSuite = new Benchmark.Suite(`Slice "${name}" vectors`, { async: true });
// const iterateSuite = new Benchmark.Suite(`Iterate "${name}" vectors`, { async: true });
// const getByIndexSuite = new Benchmark.Suite(`Get "${name}" values by index`, { async: true });
// parseSuite.add(createFromTableTest(name, buffers));
// parseSuite.add(createReadVectorsTest(name, buffers));
// for (const vector of Table.from(buffers).columns) {
// sliceSuite.add(createSliceTest(vector));
// iterateSuite.add(createIterateTest(vector));
// getByIndexSuite.add(createGetByIndexTest(vector));
// }
// suites.push(getByIndexSuite, iterateSuite, sliceSuite, parseSuite);
//}

for (let {name, buffers, tests} of require('./table_config')) {
const tableIterateSuite = new Benchmark.Suite(`Table Iterate ${name}`, { async: true });
const tableCountBySuite = new Benchmark.Suite(`Table Count By ${name}`, { async: true });
const vectorCountBySuite = new Benchmark.Suite(`Vector Count By ${name}`, { async: true });
const tableIteratorSuite = new Benchmark.Suite(`Table Iterator "${name}"`, { async: true });
const tableCountSuite = new Benchmark.Suite(`Table Count "${name}"`, { async: true });
const dfIteratorSuite = new Benchmark.Suite(`DataFrame Iterator "${name}"`, { async: true });
const dfIteratorCountSuite = new Benchmark.Suite(`DataFrame Iterator Count "${name}"`, { async: true });
const dfDirectCountSuite = new Benchmark.Suite(`DataFrame Direct Count "${name}"`, { async: true });
const dfScanCountSuite = new Benchmark.Suite(`DataFrame Scan Count "${name}"`, { async: true });
const vectorCountSuite = new Benchmark.Suite(`Vector Count "${name}"`, { async: true });
const table = Table.from(buffers);

tableIterateSuite.add(createTableIterateTest(table));
tableIteratorSuite.add(createTableIteratorTest(table));
dfIteratorSuite.add(createDataFrameIteratorTest(table));
for (test of tests) {
tableCountBySuite.add(createTableCountByTest(table, test.col, test.test, test.value))
vectorCountBySuite.add(createVectorCountByTest(table.columns[test.col], test.test, test.value))
tableCountSuite.add(createTableCountTest(table, test.col, test.test, test.value))
dfIteratorCountSuite.add(createDataFrameIteratorCountTest(table, test.col, test.test, test.value))
dfDirectCountSuite.add(createDataFrameDirectCountTest(table, test.col, test.test, test.value))
dfScanCountSuite.add(createDataFrameScanCountTest(table, test.col, test.test, test.value))
vectorCountSuite.add(createVectorCountTest(table.columns[test.col], test.test, test.value))
}

suites.push(tableIterateSuite, tableCountBySuite, vectorCountBySuite)
suites.push(tableIteratorSuite, tableCountSuite, dfIteratorSuite, dfIteratorCountSuite, dfDirectCountSuite, dfScanCountSuite, vectorCountSuite)
}

console.log('Running apache-arrow performance tests...\n');
Expand Down Expand Up @@ -125,7 +133,7 @@ function createGetByIndexTest(vector) {
};
}

function createVectorCountByTest(vector, test, value) {
function createVectorCountTest(vector, test, value) {
let op;
if (test == 'gteq') {
op = function () {
Expand All @@ -152,7 +160,7 @@ function createVectorCountByTest(vector, test, value) {
};
}

function createTableIterateTest(table) {
function createTableIteratorTest(table) {
let row;
return {
async: true,
Expand All @@ -161,7 +169,7 @@ function createTableIterateTest(table) {
};
}

function createTableCountByTest(table, column, test, value) {
function createTableCountTest(table, column, test, value) {
let op;
if (test == 'gteq') {
op = function () {
Expand All @@ -187,3 +195,110 @@ function createTableCountByTest(table, column, test, value) {
fn: op
};
}

function createDataFrameIteratorTest(table) {
let df = DataFrame.from(table);
let idx;
return {
async: true,
name: `length: ${table.length}`,
fn() { for (idx of table) {} }
};
}

function createDataFrameDirectCountTest(table, column, test, value) {
let df = DataFrame.from(table);

if (test == 'gteq') {
op = function () {
sum = 0;
for (let batch = -1; ++batch < df.lengths.length;) {
const length = df.lengths[batch];

// load batches
const columns = df.getBatch(batch);

// yield all indices
for (let idx = -1; ++idx < length;) {
sum += (columns[column].get(idx) >= value);
}
}
}
} else if (test == 'eq') {
op = function() {
sum = 0;
for (let batch = -1; ++batch < df.lengths.length;) {
const length = df.lengths[batch];

// load batches
const columns = df.getBatch(batch);

// yield all indices
for (let idx = -1; ++idx < length;) {
sum += (columns[column].get(idx) == value);
}
}
}
} else {
throw new Error(`Unrecognized test "${test}"`);
}

return {
async: true,
name: `name: '${table.columns[column].name}', length: ${table.length}, type: ${table.columns[column].type}, test: ${test}, value: ${value}`,
fn: op
};
}

function createDataFrameScanCountTest(table, column, test, value) {
let df = DataFrame.from(table);

if (test == 'gteq') {
op = function () {
sum = 0;
df.scan((idx, cols)=>{sum += cols[column].get(idx) >= value});
}
} else if (test == 'eq') {
op = function() {
sum = 0;
df.scan((idx, cols)=>{sum += cols[column].get(idx) == value});
console.log(sum);
}
} else {
throw new Error(`Unrecognized test "${test}"`);
}

return {
async: true,
name: `name: '${table.columns[column].name}', length: ${table.length}, type: ${table.columns[column].type}, test: ${test}, value: ${value}`,
fn: op
};
}

function createDataFrameIteratorCountTest(table, column, test, value) {
let df = DataFrame.from(table);

if (test == 'gteq') {
op = function () {
sum = 0;
for (idx of df) {
sum += (df.columns[column].get(idx) >= value);
}
}
} else if (test == 'eq') {
op = function() {
sum = 0;
for (idx of df) {
sum += (df.columns[column].get(idx) == value);
}
}
} else {
throw new Error(`Unrecognized test "${test}"`);
}

return {
async: true,
name: `name: '${table.columns[column].name}', length: ${table.length}, type: ${table.columns[column].type}, test: ${test}, value: ${value}`,
fn: op
};
}
2 changes: 1 addition & 1 deletion js/perf/table_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const filenames = glob.sync(path.resolve(__dirname, `../test/data/tables/`, `*.a
tests = [
{col: 0, test: 'gteq', value: 0 },
{col: 1, test: 'gteq', value: 0 },
{col: 2, test: 'eq', value: 'Seattle'},
//{col: 2, test: 'eq', value: 'Seattle'},
]

for (const filename of filenames) {
Expand Down
6 changes: 6 additions & 0 deletions js/src/Arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ import {
TimestampVector,
} from './vector/numeric';

import { DataFrame } from './dataframe/dataframe';

// closure compiler always erases static method names:
// https://github.com/google/closure-compiler/issues/1776
// set them via string indexers to save them from the mangler
Table['from'] = Table.from;
Table['fromAsync'] = Table.fromAsync;
BoolVector['pack'] = BoolVector.pack;
DataFrame['from'] = DataFrame.from;

export { read, readAsync };
export { Table, Vector, StructRow };
Expand Down Expand Up @@ -84,6 +87,8 @@ export {
FixedSizeListVector,
};

export { DataFrame } from './dataframe/dataframe';

/* These exports are needed for the closure umd targets */
try {
const Arrow = eval('exports');
Expand All @@ -93,6 +98,7 @@ try {
Arrow['readAsync'] = readAsync;
Arrow['Table'] = Table;
Arrow['Vector'] = Vector;
Arrow['DataFrame'] = DataFrame;
Arrow['StructRow'] = StructRow;
Arrow['BoolVector'] = BoolVector;
Arrow['ListVector'] = ListVector;
Expand Down
109 changes: 109 additions & 0 deletions js/src/dataframe/dataframe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { Vector } from "../vector/vector";
import { StructVector } from "../vector/struct";
import { VirtualVector } from "../vector/virtual";

export abstract class DataFrame {
public abstract columns: Vector<any>[];
public abstract getBatch(batch: number): Vector[];
public abstract scan(next: (idx: number, cols: Vector[])=>void): void;
static from(table: Vector<any>): DataFrame {
// There are two types of Vectors we might want to make into
// a ChunkedDataFrame:
// 1) a StructVector of all VirtualVectors
// 2) a VirtualVector of all StructVectors
if (table instanceof StructVector) {
if (table.columns.every((col) => col instanceof VirtualVector)) {
// ChunkedDataFrame case (1)
return new ChunkedDataFrame(table.columns as VirtualVector<any>[]);
} else {
return new SimpleDataFrame(table.columns)
}
} else if (table instanceof VirtualVector &&
table.vectors.every((vec) => vec instanceof StructVector)) {
const structs = table.vectors as StructVector<any>[];
const rest: StructVector<any>[] = structs.slice(1);
const virtuals: VirtualVector<any>[] = structs[0].columns.map((vec, col_idx) => {
return vec.concat(...rest.map((vec) => vec.columns[col_idx]));
}) as VirtualVector<any>[];
// ChunkedDataFrame case (2)
return new ChunkedDataFrame(virtuals);
} else {
return new SimpleDataFrame([table]);
}
}
}

class SimpleDataFrame extends DataFrame {
readonly lengths: Uint32Array;
constructor(public columns: Vector<any>[]) {
super();
if (!this.columns.slice(1).every((v) => v.length === this.columns[0].length)) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
}
this.lengths = new Uint32Array([0, this.columns[0].length]);
}

public getBatch() {
return this.columns;
}

public scan(next: (idx: number, cols: Vector[])=>void) {
for (let idx = -1; ++idx < this.lengths[1];) {
next(idx, this.columns)
}
}

*[Symbol.iterator]() {
for (let idx = -1; ++idx < this.lengths[1];) {
yield idx;
}
}
}

class ChunkedDataFrame extends DataFrame {
public columns: Vector<any>[];
readonly lengths: Uint32Array;
constructor(private virtuals: VirtualVector<any>[]) {
super();
const offsets = virtuals[0].offsets;
if (!this.virtuals.slice(1).every((v) => v.aligned(virtuals[0]))) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
}
this.lengths = new Uint32Array(offsets.length);
offsets.forEach((offset, i) => {
this.lengths[i] = offsets[i+1] - offset;;
});
}

getBatch(batch: number): Vector[] {
return this.virtuals.map((virt) => virt.vectors[batch]);
}

scan(next: (idx: number, cols: Vector[])=>void) {
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
const columns = this.getBatch(batch);

// yield all indices
for (let idx = -1; ++idx < length;) {
next(idx, columns)
}
}
}

*[Symbol.iterator]() {
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
this.columns = this.getBatch(batch);

// yield all indices
for (let idx = -1; ++idx < length;) {
yield idx;
}
}
}
}
3 changes: 3 additions & 0 deletions js/src/vector/virtual.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ export class VirtualVector<T> implements Vector<T> {
}
return new ArrayType(0);
}
aligned(other: VirtualVector<any>): boolean {
return this.offsets.every((offset, i) => offset === other.offsets[i]);
}
}

function arraySet<T>(source: T[], target: T[], index: number) {
Expand Down

0 comments on commit 30f0330

Please sign in to comment.