Skip to content

Commit

Permalink
[FEA] CUDF, SQL support parquet, orc files (#321)
Browse files Browse the repository at this point in the history
* Attempt to parse parquet file

* Add toParquet, fix readParquet

* Cleanup

* Fix compression param

* Write/Read ORC files

* Add remaining read_orc params, update cmake files

* Finish up providing options to orc parquet

* Add doc strings to new sql createTable methods, update io tests

* Fix up metadata issue with orc and parquet

* Move common columns metadata method, add in query to sql context io tests

* Add doc strings to new file table methods

* Fix SQL cluster server demo

* Add Cluster io create table methods

* Revert indexjs change

* General cleanup

* Update BINDINGS.md

* Update TODO comment

* Partial PR feedback

* Use binary instead of string for orc/parquet
  • Loading branch information
matekdev authored Oct 26, 2021
1 parent 8d94864 commit 36ff27e
Show file tree
Hide file tree
Showing 33 changed files with 1,164 additions and 213 deletions.
4 changes: 2 additions & 2 deletions BINDINGS.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ The tables below show the bindings that have been implemented in `node-rapids`:
| `to_feather` | |
| `to_hdf` | |
| `to_json` | |
| `to_orc` | |
| `to_orc` | ✅ (`toORC`) |
| `to_pandas` | |
| `to_parquet` | |
| `to_parquet` | ✅ (`toParquet`) |
| `to_records` | |
| `to_string` | |
| `transpose` | |
Expand Down
48 changes: 45 additions & 3 deletions modules/cudf/src/data_frame.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import {
} from './types/dtypes';
import {DuplicateKeepOption, NullOrder} from './types/enums';
import {ColumnsMap, CommonType, TypeMap} from './types/mappings';
import {ReadParquetOptions} from './types/parquet';
import {ReadORCOptions, WriteORCOptions} from './types/orc';
import {ReadParquetOptions, WriteParquetOptions} from './types/parquet';

export type SeriesMap<T extends TypeMap> = {
[P in keyof T]: AbstractSeries<T[P]>
Expand Down Expand Up @@ -124,14 +125,33 @@ export class DataFrame<T extends TypeMap = any> {
{} as ColumnsMap<{[P in keyof T]: T[P]}>)));
}

/**
* Read an Apache ORC from disk and create a cudf.DataFrame
*
* @example
* ```typescript
* import {DataFrame} from '@rapidsai/cudf';
* const df = DataFrame.readORC({
* sourceType: 'files',
* sources: ['test.orc'],
* })
* ```
*/
public static readORC(options: ReadORCOptions) {
const {names, table} = Table.readORC(options);
return new DataFrame(new ColumnAccessor(
names.reduce((map, name, i) => ({...map, [name]: table.getColumnByIndex(i)}), {})));
}

/**
* Read an Apache Parquet from disk and create a cudf.DataFrame
*
* @example
* ```typescript
* import {DataFrame, Series} from '@rapidsai/cudf';
* import {DataFrame} from '@rapidsai/cudf';
* const df = DataFrame.readParquet({
* sources: ['test'],
* sourceType: 'files',
* sources: ['test.parquet'],
* })
* ```
*/
Expand Down Expand Up @@ -818,6 +838,28 @@ export class DataFrame<T extends TypeMap = any> {
return readable as AsyncIterable<string>;
}

/**
* Write a DataFrame to ORC format.
*
* @param filePath File path or root directory path.
* @param options Options controlling ORC writing behavior.
*
*/
toORC(filePath: string, options: WriteORCOptions = {}) {
this.asTable().writeORC(filePath, {...options, columnNames: this.names as string[]});
}

/**
* Write a DataFrame to Parquet format.
*
* @param filePath File path or root directory path.
* @param options Options controlling Parquet writing behavior.
*
*/
toParquet(filePath: string, options: WriteParquetOptions = {}) {
this.asTable().writeParquet(filePath, {...options, columnNames: this.names as string[]});
}

/**
* Copy a Series to an Arrow vector in host memory
*
Expand Down
5 changes: 5 additions & 0 deletions modules/cudf/src/node_cudf/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ struct Table : public EnvLocalObjectWrap<Table> {
void write_csv(Napi::CallbackInfo const& info);

static Napi::Value read_parquet(Napi::CallbackInfo const& info);
void write_parquet(Napi::CallbackInfo const& info);

static Napi::Value read_orc(Napi::CallbackInfo const& info);
void write_orc(Napi::CallbackInfo const& info);

static Napi::Value from_arrow(Napi::CallbackInfo const& info);

Napi::Value to_arrow(Napi::CallbackInfo const& info);
Expand Down
3 changes: 3 additions & 0 deletions modules/cudf/src/node_cudf/utilities/metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

namespace nv {

cudf::io::table_input_metadata make_writer_columns_metadata(Napi::Object const& options,
cudf::table_view const& table);

Napi::Array get_output_names_from_metadata(Napi::Env const& env,
cudf::io::table_with_metadata const& result);

Expand Down
3 changes: 3 additions & 0 deletions modules/cudf/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Napi::Function Table::Init(Napi::Env const& env, Napi::Object exports) {
StaticMethod<&Table::read_csv>("readCSV"),
InstanceMethod<&Table::write_csv>("writeCSV"),
StaticMethod<&Table::read_parquet>("readParquet"),
InstanceMethod<&Table::write_parquet>("writeParquet"),
StaticMethod<&Table::read_orc>("readORC"),
InstanceMethod<&Table::write_orc>("writeORC"),
StaticMethod<&Table::from_arrow>("fromArrow"),
InstanceMethod<&Table::drop_nans>("dropNans"),
InstanceMethod<&Table::drop_nulls>("dropNulls"),
Expand Down
37 changes: 26 additions & 11 deletions modules/cudf/src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,16 @@ import {DeviceBuffer, MemoryResource} from '@rapidsai/rmm';
import CUDF from './addon';
import {Column} from './column';
import {Scalar} from './scalar';
import {CSVTypeMap, ReadCSVOptions, WriteCSVOptions} from './types/csv';
import {CSVTypeMap, ReadCSVOptions} from './types/csv';
import {TableWriteCSVOptions} from './types/csv';
import {Bool8, DataType, IndexType, Int32} from './types/dtypes';
import {DuplicateKeepOption, NullOrder} from './types/enums';
import {TypeMap} from './types/mappings';
import {ReadParquetOptions} from './types/parquet';
import {ReadORCOptions, TableWriteORCOptions} from './types/orc';
import {ReadParquetOptions, TableWriteParquetOptions} from './types/parquet';

export type ToArrowMetadata = [string | number, ToArrowMetadata[]?];

interface TableWriteCSVOptions extends WriteCSVOptions {
/** Callback invoked for each CSV chunk. */
next: (chunk: Buffer) => void;
/** Callback invoked when writing is finished. */
complete: () => void;
/** Column names to write in the header. */
columnNames?: string[];
}

interface TableConstructor {
readonly prototype: Table;
new(props: {columns?: ReadonlyArray<Column>|null}): Table;
Expand All @@ -48,6 +41,14 @@ interface TableConstructor {
readCSV<T extends CSVTypeMap = any>(options: ReadCSVOptions<T>):
{names: (keyof T)[], table: Table};

/**
* Reads an ORC dataset into a set of columns.
*
* @param options Settings for controlling reading behavior.
* @return The ORC data as a Table and a list of column names.
*/
readORC(options: ReadORCOptions): {names: string[], table: Table};

/**
* Reads an Apache Parquet dataset into a set of columns.
*
Expand Down Expand Up @@ -209,6 +210,20 @@ export interface Table {
*/
writeCSV(options: TableWriteCSVOptions): void;

/**
* Write a Table to Apache ORC file format.
* @param filePath File path or root directory path.
* @param options Options controlling ORC writing behavior.
*/
writeORC(filePath: string, options: TableWriteORCOptions): void;

/**
* Write a Table to Apache Parquet file format.
* @param filePath File path or root directory path.
* @param options Options controlling parquet writing behavior.
*/
writeParquet(filePath: string, options: TableWriteParquetOptions): void;

dropNans(keys: number[], threshold: number): Table;
dropNulls(keys: number[], threshold: number): Table;
dropDuplicates(keys: number[],
Expand Down
115 changes: 115 additions & 0 deletions modules/cudf/src/table/read_orc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2021, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cudf/io/orc.hpp>
#include <node_cudf/table.hpp>
#include <node_cudf/utilities/metadata.hpp>

namespace nv {

namespace {

cudf::io::orc_reader_options make_reader_options(Napi::Object const& options,
cudf::io::source_info const& source) {
auto env = options.Env();
auto is_null = [](Napi::Value const& val) {
return val.IsNull() || val.IsEmpty() || val.IsUndefined();
};
auto has_opt = [&](std::string const& key) { return options.Has(key); };
auto napi_opt = [&](std::string const& key) -> Napi::Value {
return has_opt(key) ? options.Get(key) : env.Undefined();
};
auto long_opt = [&](std::string const& key) {
return has_opt(key) ? options.Get(key).ToNumber().Int32Value() : -1;
};
auto bool_opt = [&](std::string const& key, bool default_val) {
return has_opt(key) ? options.Get(key).ToBoolean() == true : default_val;
};

auto napi_stripes = napi_opt("stripes");
std::vector<std::vector<cudf::size_type>> stripes;
if (!is_null(napi_stripes) && napi_stripes.IsArray()) {
auto arr = napi_stripes.As<Napi::Array>();
for (size_t i = 0; i < arr.Length(); ++i) { stripes.push_back(NapiToCPP{arr.Get(i)}); }
}

auto opts = std::move(cudf::io::orc_reader_options::builder(source)
.num_rows(long_opt("numRows"))
.use_index(bool_opt("useIndex", true))
.build());

// These cannot be both set together (cudf exception), so we only set them depending on if
// the options contains a definition for them.
if (!stripes.empty()) { opts.set_stripes(stripes); }
if (has_opt("skipRows")) { opts.set_skip_rows(long_opt("skipRows")); }

auto decimal_cols_as_floats = napi_opt("decimalColsAsFloats");
if (!is_null(decimal_cols_as_floats) && decimal_cols_as_floats.IsArray()) {
opts.set_decimal_cols_as_float(NapiToCPP{decimal_cols_as_floats});
}

auto columns = napi_opt("columns");
if (!is_null(columns) && columns.IsArray()) { opts.set_columns(NapiToCPP{columns}); }

return opts;
}

Napi::Value read_orc_files(Napi::Object const& options, std::vector<std::string> const& sources) {
auto env = options.Env();
auto result = cudf::io::read_orc(make_reader_options(options, cudf::io::source_info{sources}));
auto output = Napi::Object::New(env);
output.Set("names", get_output_names_from_metadata(env, result));
output.Set("table", Table::New(env, get_output_cols_from_metadata(env, result)));
return output;
}

std::vector<cudf::io::host_buffer> get_host_buffers(std::vector<Span<uint8_t>> const& sources) {
std::vector<cudf::io::host_buffer> buffers;
buffers.reserve(sources.size());
std::transform(sources.begin(), sources.end(), std::back_inserter(buffers), [&](auto const& buf) {
return cudf::io::host_buffer{static_cast<Span<char>>(buf), buf.size()};
});
return buffers;
}

Napi::Value read_orc_sources(Napi::Object const& options,
std::vector<Span<uint8_t>> const& sources) {
auto env = options.Env();
auto result = cudf::io::read_orc(
make_reader_options(options, cudf::io::source_info{get_host_buffers(sources)}));
auto output = Napi::Object::New(env);
output.Set("names", get_output_names_from_metadata(env, result));
output.Set("table", Table::New(env, get_output_cols_from_metadata(env, result)));
return output;
}

} // namespace

Napi::Value Table::read_orc(Napi::CallbackInfo const& info) {
auto env = info.Env();

NODE_CUDF_EXPECT(info[0].IsObject(), "readORC expects an Object of ReadORCOptions", env);

auto options = info[0].As<Napi::Object>();
auto sources = options.Get("sources");

NODE_CUDF_EXPECT(sources.IsArray(), "readORC expects an Array of paths or buffers", env);
try {
return (options.Get("sourceType").ToString().Utf8Value() == "files")
? read_orc_files(options, NapiToCPP{sources})
: read_orc_sources(options, NapiToCPP{sources});
} catch (cudf::logic_error const& err) { NAPI_THROW(Napi::Error::New(env, err.what())); }
}

} // namespace nv
40 changes: 37 additions & 3 deletions modules/cudf/src/table/read_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cudf/io/datasource.hpp>
#include <cudf/io/parquet.hpp>
#include <node_cudf/table.hpp>
#include <node_cudf/utilities/metadata.hpp>
Expand All @@ -37,13 +38,24 @@ cudf::io::parquet_reader_options make_reader_options(Napi::Object const& options
return has_opt(key) ? options.Get(key).ToBoolean() == true : default_val;
};

auto napi_row_groups = napi_opt("rowGroups");
std::vector<std::vector<cudf::size_type>> row_groups;
if (!is_null(napi_row_groups) && napi_row_groups.IsArray()) {
auto arr = napi_row_groups.As<Napi::Array>();
for (size_t i = 0; i < arr.Length(); ++i) { row_groups.push_back(NapiToCPP{arr.Get(i)}); }
}

auto opts = std::move(cudf::io::parquet_reader_options::builder(source)
.skip_rows(long_opt("skipRows"))
.num_rows(long_opt("numRows"))
.convert_strings_to_categories(bool_opt("stringsToCategorical", false))
.use_pandas_metadata(bool_opt("usePandasMetadata", true))
.build());

// These cannot be both set together (cudf exception), so we only set them depending on if
// the options contains a definition for them.
if (!row_groups.empty()) { opts.set_row_groups(row_groups); }
if (has_opt("skipRows")) { opts.set_skip_rows(long_opt("skipRows")); }

auto columns = napi_opt("columns");

if (!is_null(columns) && columns.IsArray()) { opts.set_columns(NapiToCPP{columns}); }
Expand All @@ -62,6 +74,26 @@ Napi::Value read_parquet_files(Napi::Object const& options,
return output;
}

std::vector<cudf::io::host_buffer> get_host_buffers(std::vector<Span<uint8_t>> const& sources) {
std::vector<cudf::io::host_buffer> buffers;
buffers.reserve(sources.size());
std::transform(sources.begin(), sources.end(), std::back_inserter(buffers), [&](auto const& buf) {
return cudf::io::host_buffer{static_cast<Span<char>>(buf), buf.size()};
});
return buffers;
}

Napi::Value read_parquet_sources(Napi::Object const& options,
std::vector<Span<uint8_t>> const& sources) {
auto env = options.Env();
auto result = cudf::io::read_parquet(
make_reader_options(options, cudf::io::source_info{get_host_buffers(sources)}));
auto output = Napi::Object::New(env);
output.Set("names", get_output_names_from_metadata(env, result));
output.Set("table", Table::New(env, get_output_cols_from_metadata(env, result)));
return output;
}

} // namespace

Napi::Value Table::read_parquet(Napi::CallbackInfo const& info) {
Expand All @@ -72,9 +104,11 @@ Napi::Value Table::read_parquet(Napi::CallbackInfo const& info) {
auto options = info[0].As<Napi::Object>();
auto sources = options.Get("sources");

NODE_CUDF_EXPECT(sources.IsArray(), "readParquet expects an Array of paths", env);
NODE_CUDF_EXPECT(sources.IsArray(), "readParquet expects an Array of paths or buffers", env);
try {
return read_parquet_files(options, NapiToCPP{sources});
return (options.Get("sourceType").ToString().Utf8Value() == "files")
? read_parquet_files(options, NapiToCPP{sources})
: read_parquet_sources(options, NapiToCPP{sources});
} catch (cudf::logic_error const& err) { NAPI_THROW(Napi::Error::New(env, err.what())); }
}

Expand Down
6 changes: 3 additions & 3 deletions modules/cudf/src/table/write_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ namespace nv {

namespace {

cudf::io::table_metadata make_writer_metadata(Napi::Object const& options,
cudf::table_view const& table) {
cudf::io::table_metadata make_csv_writer_metadata(Napi::Object const& options,
cudf::table_view const& table) {
auto env = options.Env();
auto has_opt = [&](std::string const& key) { return options.Has(key); };
auto napi_opt = [&](std::string const& key) -> Napi::Value {
Expand Down Expand Up @@ -119,7 +119,7 @@ void Table::write_csv(Napi::CallbackInfo const& info) {

cudf::table_view table = *this;
callback_sink sink{next.As<Napi::Function>()};
auto metadata = make_writer_metadata(options, table);
auto metadata = make_csv_writer_metadata(options, table);
cudf::io::write_csv(make_writer_options(options, cudf::io::sink_info{&sink}, table, &metadata));

complete.As<Napi::Function>()({});
Expand Down
Loading

0 comments on commit 36ff27e

Please sign in to comment.