-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-33749: [Ruby] Add Arrow::RecordBatch#each_raw_record #37137
Changes from 7 commits
8dd6ff6
3318abc
d3e5f3f
93adaac
a16c4c4
b354011
92b4b76
66ea580
b0731a7
34b53e0
8305926
c0a07e2
1089b8c
97ff172
98a3599
516ef62
7050dcc
72cc0b4
4456002
e1e1cd2
c7f4135
595f0c9
3f173ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -144,6 +144,122 @@ namespace red_arrow { | |||||
// The number of columns. | ||||||
const int n_columns_; | ||||||
}; | ||||||
|
||||||
class RawRecordsProducer : private Converter, public arrow::ArrayVisitor { | ||||||
public: | ||||||
explicit RawRecordsProducer(int n_columns) | ||||||
: Converter(), | ||||||
record_(Qnil), | ||||||
n_columns_(n_columns) { | ||||||
} | ||||||
|
||||||
void produce(const arrow::RecordBatch& record_batch) { | ||||||
rb::protect([&] { | ||||||
const auto n_rows = record_batch.num_rows(); | ||||||
for (int64_t i = 0; i < n_rows; ++i) { | ||||||
record_ = rb_ary_new_capa(n_columns_); | ||||||
row_offset_ = i; | ||||||
|
||||||
for (int i = 0; i < n_columns_; ++i) { | ||||||
const auto array = record_batch.column(i).get(); | ||||||
column_index_ = i; | ||||||
|
||||||
check_status(array->Accept(this), | ||||||
"[record-batch][each-raw-record]"); | ||||||
} | ||||||
rb_yield(record_); | ||||||
} | ||||||
return Qnil; | ||||||
}); | ||||||
} | ||||||
|
||||||
void produce(const arrow::Table& table) { | ||||||
rb::protect([&] { | ||||||
const auto n_rows = table.num_rows(); | ||||||
for (int64_t i = 0; i < n_rows; ++i) { | ||||||
row_offset_ = i; | ||||||
record_ = rb_ary_new_capa(n_columns_); | ||||||
|
||||||
for (int i = 0; i < n_columns_; ++i) { | ||||||
const auto& chunked_array = table.column(i).get(); | ||||||
column_index_ = i; | ||||||
|
||||||
for (const auto array : chunked_array->chunks()) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, we miss
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix: 34b53e0 |
||||||
check_status(array->Accept(this), | ||||||
"[table][each-raw-record]"); | ||||||
} | ||||||
} | ||||||
rb_yield(record_); | ||||||
} | ||||||
return Qnil; | ||||||
}); | ||||||
} | ||||||
|
||||||
#define VISIT(TYPE) \ | ||||||
arrow::Status Visit(const arrow::TYPE ## Array& array) override { \ | ||||||
convert(array); \ | ||||||
return arrow::Status::OK(); \ | ||||||
} | ||||||
|
||||||
VISIT(Null) | ||||||
VISIT(Boolean) | ||||||
VISIT(Int8) | ||||||
VISIT(Int16) | ||||||
VISIT(Int32) | ||||||
VISIT(Int64) | ||||||
VISIT(UInt8) | ||||||
VISIT(UInt16) | ||||||
VISIT(UInt32) | ||||||
VISIT(UInt64) | ||||||
VISIT(HalfFloat) | ||||||
VISIT(Float) | ||||||
VISIT(Double) | ||||||
VISIT(Binary) | ||||||
VISIT(String) | ||||||
VISIT(FixedSizeBinary) | ||||||
VISIT(Date32) | ||||||
VISIT(Date64) | ||||||
VISIT(Time32) | ||||||
VISIT(Time64) | ||||||
VISIT(Timestamp) | ||||||
VISIT(MonthInterval) | ||||||
VISIT(DayTimeInterval) | ||||||
VISIT(MonthDayNanoInterval) | ||||||
VISIT(List) | ||||||
VISIT(Struct) | ||||||
VISIT(Map) | ||||||
VISIT(SparseUnion) | ||||||
VISIT(DenseUnion) | ||||||
VISIT(Dictionary) | ||||||
VISIT(Decimal128) | ||||||
VISIT(Decimal256) | ||||||
// TODO | ||||||
// VISIT(Extension) | ||||||
|
||||||
#undef VISIT | ||||||
|
||||||
private: | ||||||
template <typename ArrayType> | ||||||
void convert(const ArrayType& array) { | ||||||
auto value = Qnil; | ||||||
if (!array.IsNull(row_offset_)) { | ||||||
value = convert_value(array, row_offset_); | ||||||
} | ||||||
rb_ary_store(record_, column_index_, value); | ||||||
} | ||||||
|
||||||
// Destination for converted record. | ||||||
VALUE record_; | ||||||
|
||||||
// The current column index. | ||||||
int column_index_; | ||||||
|
||||||
// The current row offset. | ||||||
int64_t row_offset_; | ||||||
|
||||||
// The number of columns. | ||||||
const int n_columns_; | ||||||
}; | ||||||
} | ||||||
|
||||||
VALUE | ||||||
|
@@ -181,4 +297,36 @@ namespace red_arrow { | |||||
|
||||||
return records; | ||||||
} | ||||||
|
||||||
VALUE | ||||||
record_batch_each_raw_record(VALUE rb_record_batch){ | ||||||
auto garrow_record_batch = GARROW_RECORD_BATCH(RVAL2GOBJ(rb_record_batch)); | ||||||
auto record_batch = garrow_record_batch_get_raw(garrow_record_batch).get(); | ||||||
const auto n_columns = record_batch->num_columns(); | ||||||
|
||||||
try { | ||||||
RawRecordsProducer producer(n_columns); | ||||||
producer.produce(*record_batch); | ||||||
} catch (rb::State& state) { | ||||||
state.jump(); | ||||||
} | ||||||
|
||||||
return Qnil; | ||||||
} | ||||||
|
||||||
VALUE | ||||||
table_each_raw_record(VALUE rb_table) { | ||||||
auto garrow_table = GARROW_TABLE(RVAL2GOBJ(rb_table)); | ||||||
auto table = garrow_table_get_raw(garrow_table).get(); | ||||||
const auto n_columns = table->num_columns(); | ||||||
|
||||||
try { | ||||||
RawRecordsProducer producer(n_columns); | ||||||
producer.produce(*table); | ||||||
} catch (rb::State& state) { | ||||||
state.jump(); | ||||||
} | ||||||
|
||||||
return Qnil; | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove
n_columns
here and definen_columns
in eachproduce()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix: b0731a7