Skip to content

Commit

Permalink
apacheGH-33749: [Ruby] Add Arrow::RecordBatch#each_raw_record (apache…
Browse files Browse the repository at this point in the history
…#37137)

### Rationale for this change

This change allows for efficient iteration over large datasets, particularly those utilizing the Apache Parquet format.

### What changes are included in this PR?

- Add the following methods to make the raw_records method iterable.
  - Arrow::RecordBatch#each_raw_record
  - Arrow::Table#each_raw_record
- Add related test

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.

This PR is related to  apache#33749
* Closes: apache#33749

Lead-authored-by: otegami <a.s.takuya1026@gmail.com>
Co-authored-by: takuya kodama <a.s.takuya1026@gmail.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
2 people authored and loicalleyne committed Nov 13, 2023
1 parent 8aab070 commit c800801
Show file tree
Hide file tree
Showing 13 changed files with 4,735 additions and 1 deletion.
6 changes: 6 additions & 0 deletions ruby/red-arrow/ext/arrow/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,17 @@ extern "C" void Init_arrow() {
rb_define_method(cArrowRecordBatch, "raw_records",
reinterpret_cast<rb::RawMethod>(red_arrow::record_batch_raw_records),
0);
rb_define_method(cArrowRecordBatch, "each_raw_record",
reinterpret_cast<rb::RawMethod>(red_arrow::record_batch_each_raw_record),
0);

auto cArrowTable = rb_const_get_at(mArrow, rb_intern("Table"));
rb_define_method(cArrowTable, "raw_records",
reinterpret_cast<rb::RawMethod>(red_arrow::table_raw_records),
0);
rb_define_method(cArrowTable, "each_raw_record",
reinterpret_cast<rb::RawMethod>(red_arrow::table_each_raw_record),
0);

red_arrow::cDate = rb_const_get(rb_cObject, rb_intern("Date"));

Expand Down
152 changes: 152 additions & 0 deletions ruby/red-arrow/ext/arrow/raw-records.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,128 @@ namespace red_arrow {
// The number of columns.
const int n_columns_;
};

class RawRecordsProducer : private Converter, public arrow::ArrayVisitor {
public:
explicit RawRecordsProducer()
: Converter(),
record_(Qnil),
column_index_(0),
row_offset_(0) {
}

void produce(const arrow::RecordBatch& record_batch) {
rb::protect([&] {
const auto n_columns = record_batch.num_columns();
const auto n_rows = record_batch.num_rows();
for (int64_t i = 0; i < n_rows; ++i) {
record_ = rb_ary_new_capa(n_columns);
row_offset_ = i;
for (int i = 0; i < n_columns; ++i) {
const auto array = record_batch.column(i).get();
column_index_ = i;
check_status(array->Accept(this),
"[record-batch][each-raw-record]");
}
rb_yield(record_);
}
return Qnil;
});
}

void produce(const arrow::Table& table) {
rb::protect([&] {
const auto n_columns = table.num_columns();
const auto n_rows = table.num_rows();
std::vector<int> chunk_indexes(n_columns);
std::vector<int64_t> row_offsets(n_columns);
for (int64_t i_row = 0; i_row < n_rows; ++i_row) {
record_ = rb_ary_new_capa(n_columns);
for (int i_column = 0; i_column < n_columns; ++i_column) {
column_index_ = i_column;
const auto chunked_array = table.column(i_column).get();
auto& chunk_index = chunk_indexes[i_column];
auto& row_offset = row_offsets[i_column];
auto array = chunked_array->chunk(chunk_index).get();
while (array->length() == row_offset) {
++chunk_index;
row_offset = 0;
array = chunked_array->chunk(chunk_index).get();
}
row_offset_ = row_offset;
check_status(array->Accept(this),
"[table][each-raw-record]");
++row_offset;
}
rb_yield(record_);
}

return Qnil;
});
}

#define VISIT(TYPE) \
arrow::Status Visit(const arrow::TYPE ## Array& array) override { \
convert(array); \
return arrow::Status::OK(); \
}

VISIT(Null)
VISIT(Boolean)
VISIT(Int8)
VISIT(Int16)
VISIT(Int32)
VISIT(Int64)
VISIT(UInt8)
VISIT(UInt16)
VISIT(UInt32)
VISIT(UInt64)
VISIT(HalfFloat)
VISIT(Float)
VISIT(Double)
VISIT(Binary)
VISIT(String)
VISIT(FixedSizeBinary)
VISIT(Date32)
VISIT(Date64)
VISIT(Time32)
VISIT(Time64)
VISIT(Timestamp)
VISIT(MonthInterval)
VISIT(DayTimeInterval)
VISIT(MonthDayNanoInterval)
VISIT(List)
VISIT(Struct)
VISIT(Map)
VISIT(SparseUnion)
VISIT(DenseUnion)
VISIT(Dictionary)
VISIT(Decimal128)
VISIT(Decimal256)
// TODO
// VISIT(Extension)

#undef VISIT

private:
template <typename ArrayType>
void convert(const ArrayType& array) {
auto value = Qnil;
if (!array.IsNull(row_offset_)) {
value = convert_value(array, row_offset_);
}
rb_ary_store(record_, column_index_, value);
}

// Destination for converted record.
VALUE record_;

// The current column index.
int column_index_;

// The current row offset.
int64_t row_offset_;
};
}

VALUE
Expand Down Expand Up @@ -181,4 +303,34 @@ namespace red_arrow {

return records;
}

VALUE
record_batch_each_raw_record(VALUE rb_record_batch){
auto garrow_record_batch = GARROW_RECORD_BATCH(RVAL2GOBJ(rb_record_batch));
auto record_batch = garrow_record_batch_get_raw(garrow_record_batch).get();

try {
RawRecordsProducer producer;
producer.produce(*record_batch);
} catch (rb::State& state) {
state.jump();
}

return Qnil;
}

VALUE
table_each_raw_record(VALUE rb_table) {
auto garrow_table = GARROW_TABLE(RVAL2GOBJ(rb_table));
auto table = garrow_table_get_raw(garrow_table).get();

try {
RawRecordsProducer producer;
producer.produce(*table);
} catch (rb::State& state) {
state.jump();
}

return Qnil;
}
}
2 changes: 2 additions & 0 deletions ruby/red-arrow/ext/arrow/red-arrow.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace red_arrow {

VALUE record_batch_raw_records(VALUE obj);
VALUE table_raw_records(VALUE obj);
VALUE record_batch_each_raw_record(VALUE obj);
VALUE table_each_raw_record(VALUE obj);

inline VALUE time_unit_to_scale(const arrow::TimeUnit::type unit) {
switch (unit) {
Expand Down
Loading

0 comments on commit c800801

Please sign in to comment.