Skip to content

Commit

Permalink
Add batch string read (#3768)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmcdonald3 authored Sep 11, 2024
1 parent e2722c0 commit 8fc7bf8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ module ParquetMsg {
}

proc readStrFilesByName(ref A: [] ?t, filenames: [] string, sizes: [] int, dsetname: string) throws {
extern proc c_readStrColumnByName(filename, arr_chpl, colname, batchSize, errMsg): int;
extern proc c_readStrColumnByName(filename, arr_chpl, colname, numElems, batchSize, errMsg): int;
var (subdoms, length) = getSubdomains(sizes);

coforall loc in A.targetLocales() do on loc {
Expand All @@ -188,7 +188,7 @@ module ParquetMsg {
var col: [filedom] t;

if c_readStrColumnByName(filename.localize().c_str(), c_ptrTo(col),
dsetname.localize().c_str(),
dsetname.localize().c_str(), filedom.size,
batchSize, c_ptrTo(pqErr.errMsg)) == ARROWERROR {
pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName());
}
Expand Down
40 changes: 27 additions & 13 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::share
return i;
}

int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) {
int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);

Expand Down Expand Up @@ -131,23 +131,37 @@ int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* co
column_reader = row_group_reader->Column(idx);

if(ty == ARROWSTRING) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (unsigned char*)chpl_arr;
parquet::ByteArrayReader* reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());

while (reader->HasNext()) {
parquet::ByteArray value;
(void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
// if values_read is 0, that means that it was a null value
if(values_read > 0) {
for(int j = 0; j < value.len; j++) {
chpl_ptr[i] = value.ptr[j];
int totalProcessed = 0;
std::vector<parquet::ByteArray> values(batchSize);
while (reader->HasNext() && totalProcessed < numElems) {
std::vector<int16_t> definition_levels(batchSize,-1);
if((numElems - totalProcessed) < batchSize) // adjust batchSize if needed
batchSize = numElems - totalProcessed;

(void)reader->ReadBatch(batchSize, definition_levels.data(), nullptr, values.data(), &values_read);
totalProcessed += values_read;
int j = 0;
int numProcessed = 0;
while(j < batchSize) {
if(definition_levels[j] == 1) {
for(int k = 0; k < values[numProcessed].len; k++) {
chpl_ptr[i] = values[numProcessed].ptr[k];
i++;
}
i++; // skip one space so the strings are null terminated with a 0
numProcessed++;
} else if(definition_levels[j] == 0) {
i++;
} else {
j = batchSize; // exit loop, not read
}
j++;
}
i++; // skip one space so the strings are null terminated with a 0
}
}
}
}
return 0;
Expand Down Expand Up @@ -744,8 +758,8 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
}

extern "C" {
int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) {
return cpp_readStrColumnByName(filename, chpl_arr, colname, batchSize, errMsg);
int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) {
return cpp_readStrColumnByName(filename, chpl_arr, colname, numElems, batchSize, errMsg);
}

int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) {
Expand Down
4 changes: 2 additions & 2 deletions src/parquet/ReadParquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include <queue>
extern "C" {
#endif
int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg);
int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg);

int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg);
int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg);

int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl,
const char* colname, int64_t numElems, int64_t startIdx,
Expand Down

0 comments on commit 8fc7bf8

Please sign in to comment.