Skip to content

Commit

Permalink
Switch byte calculation to batches
Browse files Browse the repository at this point in the history
  • Loading branch information
bmcdonald3 committed Sep 11, 2024
1 parent 337b2e7 commit e9cbf05
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,32 @@ int64_t cpp_getStringColumnNumBytes(const char* filename, const char* colname, v
static_cast<parquet::ByteArrayReader*>(column_reader.get());

int64_t numRead = 0;
while (ba_reader->HasNext() && numRead < numElems) {
parquet::ByteArray value;
(void)ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
if(values_read > 0) {
offsets[i] = value.len + 1;
byteSize += value.len + 1;
numRead += values_read;
} else {
offsets[i] = 1;
byteSize+=1;
numRead+=1;

int totalProcessed = 0;
std::vector<parquet::ByteArray> values(batchSize);
while (ba_reader->HasNext() && totalProcessed < numElems) {
if((numElems - totalProcessed) < batchSize) // adjust batchSize if needed
batchSize = numElems - totalProcessed;
std::vector<int16_t> definition_levels(batchSize,-1);
(void)ba_reader->ReadBatch(batchSize, definition_levels.data(), nullptr, values.data(), &values_read);
totalProcessed += values_read;
int j = 0;
int numProcessed = 0;
while(j < batchSize) {
if(definition_levels[j] == 1 || definition_levels[j] == 3) {
offsets[i] = values[numProcessed].len + 1;
byteSize += values[numProcessed].len + 1;
numProcessed++;
i++;
} else if(definition_levels[j] == 0) {
offsets[i] = 1;
byteSize+=1;
i++;
} else {
j = batchSize; // exit condition
}
j++;
}
i++;
}
}
return byteSize;
Expand Down

0 comments on commit e9cbf05

Please sign in to comment.