Skip to content

Commit

Permalink
feat(csv): CSVArrowLoader (#3135)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibgreen authored Oct 17, 2024
1 parent e4946a5 commit 264e413
Show file tree
Hide file tree
Showing 23 changed files with 496 additions and 160 deletions.
41 changes: 41 additions & 0 deletions modules/csv/src/csv-arrow-loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable, ArrowTableBatch} from '@loaders.gl/schema';
import {convertTable, convertBatches} from '@loaders.gl/schema-utils';

import type {CSVLoaderOptions} from './csv-loader';
import {CSVLoader} from './csv-loader';

export type CSVArrowLoaderOptions = LoaderOptions & {
csv?: Omit<CSVLoaderOptions['csv'], 'shape'>;
};

export const CSVArrowLoader = {
...CSVLoader,

dataType: null as unknown as ArrowTable,
batchType: null as unknown as ArrowTableBatch,

parse: async (arrayBuffer: ArrayBuffer, options?: CSVLoaderOptions) =>
parseCSVToArrow(new TextDecoder().decode(arrayBuffer), options),
parseText: (text: string, options?: CSVLoaderOptions) => parseCSVToArrow(text, options),
parseInBatches: parseCSVToArrowBatches
} as const satisfies LoaderWithParser<ArrowTable, ArrowTableBatch, CSVArrowLoaderOptions>;

async function parseCSVToArrow(csvText: string, options?: CSVLoaderOptions): Promise<ArrowTable> {
// Apps can call the parse method directly, we so apply default options here
// const csvOptions = {...CSVArrowLoader.options.csv, ...options?.csv};
const table = await CSVLoader.parseText(csvText, options);
return convertTable(table, 'arrow-table');
}

function parseCSVToArrowBatches(
asyncIterator: AsyncIterable<ArrayBuffer> | Iterable<ArrayBuffer>,
options?: CSVArrowLoaderOptions
): AsyncIterable<ArrowTableBatch> {
const tableIterator = CSVLoader.parseInBatches(asyncIterator, options);
return convertBatches(tableIterator, 'arrow-table');
}
71 changes: 54 additions & 17 deletions modules/csv/src/csv-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
// Copyright (c) vis.gl contributors

import type {LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrayRowTable, ObjectRowTable, TableBatch} from '@loaders.gl/schema';
import type {Schema, ArrayRowTable, ObjectRowTable, TableBatch} from '@loaders.gl/schema';

import {log} from '@loaders.gl/loader-utils';
import {
AsyncQueue,
deduceTableSchema,
TableBatchBuilder,
convertToArrayRow,
convertToObjectRow
} from '@loaders.gl/schema-utils';
import Papa from './papaparse/papaparse';
import AsyncIteratorStreamer from './papaparse/async-iterator-streamer';

type ObjectField = {name: string; index: number; type: any};
type ObjectSchema = {[key: string]: ObjectField} | ObjectField[];

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
Expand Down Expand Up @@ -89,7 +88,7 @@ async function parseCSV(
csvText: string,
options?: CSVLoaderOptions
): Promise<ObjectRowTable | ArrayRowTable> {
// Apps can call the parse method directly, we so apply default options here
// Apps can call the parse method directly, so we apply default options here
const csvOptions = {...CSVLoader.options.csv, ...options?.csv};

const firstRow = readFirstRow(csvText);
Expand All @@ -115,20 +114,25 @@ async function parseCSV(
const headerRow = result.meta.fields || generateHeader(csvOptions.columnPrefix, firstRow.length);

const shape = csvOptions.shape || DEFAULT_CSV_SHAPE;
let table: ArrayRowTable | ObjectRowTable;
switch (shape) {
case 'object-row-table':
return {
table = {
shape: 'object-row-table',
data: rows.map((row) => (Array.isArray(row) ? convertToObjectRow(row, headerRow) : row))
};
break;
case 'array-row-table':
return {
table = {
shape: 'array-row-table',
data: rows.map((row) => (Array.isArray(row) ? row : convertToArrayRow(row, headerRow)))
};
break;
default:
throw new Error(shape);
}
table.schema = deduceTableSchema(table!);
return table;
}

// TODO - support batch size 0 = no batching/single batch?
Expand All @@ -151,7 +155,7 @@ function parseCSVInBatches(
let isFirstRow: boolean = true;
let headerRow: string[] | null = null;
let tableBatchBuilder: TableBatchBuilder | null = null;
let schema: ObjectSchema | null = null;
let schema: Schema | null = null;

const config = {
// dynamicTyping: true, // Convert numbers and boolean values in rows from strings,
Expand Down Expand Up @@ -199,7 +203,7 @@ function parseCSVInBatches(
if (!headerRow) {
headerRow = generateHeader(csvOptions.columnPrefix, row.length);
}
schema = deduceSchema(row, headerRow);
schema = deduceCSVSchema(row, headerRow);
}

if (csvOptions.optimizeMemoryUsage) {
Expand Down Expand Up @@ -314,23 +318,56 @@ function generateHeader(columnPrefix: string, count: number = 0): string[] {
return headers;
}

function deduceSchema(row, headerRow): ObjectSchema {
const schema: ObjectSchema = headerRow ? {} : [];
function deduceCSVSchema(row, headerRow): Schema {
const fields: Schema['fields'] = [];
for (let i = 0; i < row.length; i++) {
const columnName = (headerRow && headerRow[i]) || i;
const value = row[i];
switch (typeof value) {
case 'number':
fields.push({name: String(columnName), type: 'float64', nullable: true});
break;
case 'boolean':
// TODO - booleans could be handled differently...
schema[columnName] = {name: String(columnName), index: i, type: Float32Array};
fields.push({name: String(columnName), type: 'bool', nullable: true});
break;
case 'string':
fields.push({name: String(columnName), type: 'utf8', nullable: true});
break;
default:
schema[columnName] = {name: String(columnName), index: i, type: Array};
// We currently only handle numeric rows
// TODO we could offer a function to map strings to numbers?
log.warn(`CSV: Unknown column type: ${typeof value}`)();
fields.push({name: String(columnName), type: 'utf8', nullable: true});
}
}
return schema;
return {
fields,
metadata: {
'loaders.gl#format': 'csv',
'loaders.gl#loader': 'CSVLoader'
}
};
}

// TODO - remove
// type ObjectField = {name: string; index: number; type: any};
// type ObjectSchema = {[key: string]: ObjectField} | ObjectField[];

// function deduceObjectSchema(row, headerRow): ObjectSchema {
// const schema: ObjectSchema = headerRow ? {} : [];
// for (let i = 0; i < row.length; i++) {
// const columnName = (headerRow && headerRow[i]) || i;
// const value = row[i];
// switch (typeof value) {
// case 'number':
// case 'boolean':
// // TODO - booleans could be handled differently...
// schema[columnName] = {name: String(columnName), index: i, type: Float32Array};
// break;
// case 'string':
// default:
// schema[columnName] = {name: String(columnName), index: i, type: Array};
// // We currently only handle numeric rows
// // TODO we could offer a function to map strings to numbers?
// }
// }
// return schema;
// }
3 changes: 3 additions & 0 deletions modules/csv/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ export {CSVLoader} from './csv-loader';

export type {CSVWriterOptions} from './csv-writer';
export {CSVWriter} from './csv-writer';

export type {CSVArrowLoaderOptions} from './csv-arrow-loader';
export {CSVArrowLoader} from './csv-arrow-loader';
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import test from 'tape-promise/tape';
import {loadInBatches, isIterator, isAsyncIterable} from '@loaders.gl/core';
import {CSVLoader} from '../src/csv-loader'; // from '@loaders.gl/csv';
import {CSVArrowLoader} from '@loaders.gl/csv';
import * as arrow from 'apache-arrow';

// Small CSV Sample Files
const CSV_NUMBERS_100_URL = '@loaders.gl/csv/test/data/numbers-100.csv';
const CSV_NUMBERS_10000_URL = '@loaders.gl/csv/test/data/numbers-10000.csv';
const CSV_INCIDENTS_URL_QUOTES = '@loaders.gl/csv/test/data/sf_incidents-small.csv';

// TODO -restore
test.skip('CSVLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_100_URL, CSVLoader, {
csv: {
shape: 'arrow-table'
},
test('CSVArrowLoader#loadInBatches(numbers-100.csv)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_100_URL, CSVArrowLoader, {
batchSize: 40
});

Expand All @@ -29,12 +30,8 @@ test.skip('CSVLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
t.end();
});

// TODO - restore
test.skip('CSVLoader#loadInBatches(numbers-10000.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_10000_URL, CSVLoader, {
csv: {
shape: 'arrow-table'
},
test('CSVArrowLoader#loadInBatches(numbers-10000.csv)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_10000_URL, CSVArrowLoader, {
batchSize: 2000
});
t.ok(isIterator(iterator) || isAsyncIterable(iterator), 'loadInBatches returned iterator');
Expand All @@ -49,3 +46,18 @@ test.skip('CSVLoader#loadInBatches(numbers-10000.csv, arrow)', async (t) => {

t.end();
});

test('CSVArrowLoader#loadInBatches(incidents.csv)', async (t) => {
const iterator = await loadInBatches(CSV_INCIDENTS_URL_QUOTES, CSVArrowLoader);
t.ok(isIterator(iterator) || isAsyncIterable(iterator), 'loadInBatches returned iterator');

let batchCount = 0;
for await (const batch of iterator) {
t.ok(batch.data instanceof arrow.Table, 'returns arrow RecordBatch');
// t.comment(`BATCH: ${batch.length}`);
batchCount++;
}
t.equal(batchCount, 1, 'Correct number of batches received');

t.end();
});
4 changes: 4 additions & 0 deletions modules/csv/test/csv-loader.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import test from 'tape-promise/tape';
import {validateLoader} from 'test/common/conformance';

Expand Down
4 changes: 4 additions & 0 deletions modules/csv/test/csv-writer-papaparse.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

// This is a fork of papaparse under MIT License
// https://github.com/mholt/PapaParse
/* eslint-disable */
Expand Down
4 changes: 2 additions & 2 deletions modules/csv/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import './papaparse/papaparse.spec';
// import './csv-writer-papaparse.spec';

import './csv-loader.spec';
import './csv-loader-arrow.spec';

import './csv-writer.spec';

import './csv-arrow-loader.spec';
42 changes: 31 additions & 11 deletions modules/schema-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,33 @@ export {
} from './lib/schema/convert-arrow-schema';
export {getDataTypeFromArray} from './lib/schema/data-type';

// Table utils
// TABLE CATEGORY UTILS

export {deduceTableSchema} from './lib/schema/deduce-table-schema';
export {makeTableFromData} from './lib/table/tables/make-table';
export {makeTableFromBatches} from './lib/table/batches/make-table-from-batches';

export {convertTable} from './lib/table/tables/convert-table';
export {convertToObjectRow, convertToArrayRow} from './lib/table/tables/row-utils';
export {convertArrowToTable, convertTableToArrow} from './lib/table/tables/convert-arrow-table';

// TABLE CATEGORY UTILS
export {TableBatchBuilder} from './lib/table/batches/table-batch-builder';
export type {TableBatchAggregator} from './lib/table/batches/table-batch-aggregator';
export {RowTableBatchAggregator} from './lib/table/batches/row-table-batch-aggregator';
export {ColumnarTableBatchAggregator} from './lib/table/batches/columnar-table-batch-aggregator';
export {
makeTableBatchIterator,
makeBatchFromTable
} from './lib/table/batches/make-table-batch-iterator';
export {
makeArrowTableBatchIterator,
makeArrowRecordBatchIterator
} from './lib/table/batches/make-arrow-batch-iterator';
export {convertBatch, convertBatches} from './lib/table/batches/convert-batches';

export {
isArrayRowTable,
isObjectRowTable,
isColumnarTable,
isGeoJSONTable,
isArrowTable
} from './lib/table/tables/table-types';

export {
isTable,
Expand All @@ -46,11 +65,12 @@ export {
makeObjectRowIterator
} from './lib/table/tables/table-accessors';

export {makeTableFromData} from './lib/table/tables/make-table';
export {makeTableFromBatches, makeBatchFromTable} from './lib/table/tables/make-table-from-batches';
export {convertTable} from './lib/table/tables/convert-table';
export {deduceTableSchema} from './lib/schema/deduce-table-schema';
export {convertToObjectRow, convertToArrayRow} from './lib/table/tables/row-utils';
// Table batch builders

export {TableBatchBuilder} from './lib/table/batch-builder/table-batch-builder';
export type {TableBatchAggregator} from './lib/table/batch-builder/table-batch-aggregator';
export {RowTableBatchAggregator} from './lib/table/batch-builder/row-table-batch-aggregator';
export {ColumnarTableBatchAggregator} from './lib/table/batch-builder/columnar-table-batch-aggregator';

export {ArrowLikeTable} from './lib/table/arrow-api/arrow-like-table';

Expand Down
Loading

0 comments on commit 264e413

Please sign in to comment.