Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
Signed-off-by: xinyi-xs <lihj@emqx.io>
  • Loading branch information
xinyi-xs committed Jun 6, 2024
1 parent d58bd7f commit fbc34bd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 44 deletions.
2 changes: 1 addition & 1 deletion include/nng/supplemental/nanolib/parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ parquet_data_packet *parquet_find_data_packet(conf_parquet *conf, char *filename

parquet_data_packet **parquet_find_data_packets(conf_parquet *conf, char **filenames, uint64_t *keys, uint32_t len);

parquet_data_packet **parquet_find_data_span_packets(conf_parquet *conf, char **filenames, uint32_t len, uint64_t start_key, uint64_t end_key);
parquet_data_packet **parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size);

#ifdef __cplusplus
}
Expand Down
89 changes: 46 additions & 43 deletions src/supplemental/nanolib/parquet/parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -944,9 +944,8 @@ get_keys_indexes(
int16_t definition_level;
int16_t repetition_level;

// FIXME:
int index = 0;
for (const auto &key : keys) {
int i = 0;
bool found = false;
while (int64_reader->HasNext()) {
int64_t value;
Expand All @@ -955,16 +954,12 @@ get_keys_indexes(
&repetition_level, &value, &values_read);
if (1 == rows_read && 1 == values_read) {
if (((uint64_t) value) == key) {
if (index_vector.empty()) {
index_vector.push_back(i);
} else {
index_vector.push_back(i);
}
index_vector.push_back(index++);
found = true;
break;
}
}
i++;
index++;
}
if (!found) {
index_vector.push_back(-1);
Expand Down Expand Up @@ -1178,7 +1173,7 @@ parquet_find_data_packets(
}

static vector<parquet_data_packet *>
parquet_read_span(conf_parquet *conf, char *filename, uint64_t keys[2])
parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2])
{
conf = g_conf;
vector<parquet_data_packet *> ret_vec;
Expand All @@ -1192,7 +1187,7 @@ parquet_read_span(conf_parquet *conf, char *filename, uint64_t keys[2])

// Create a ParquetReader instance
std::string exception_msg = "";
try {
// try {
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(
filename, false, reader_properties);
Expand All @@ -1206,13 +1201,14 @@ parquet_read_span(conf_parquet *conf, char *filename, uint64_t keys[2])
->num_row_groups(); // Get the number of RowGroups
int num_columns =
file_metadata->num_columns(); // Get the number of Columns
assert(num_row_groups == 1);
assert(num_columns == 2);

for (int r = 0; r < num_row_groups; ++r) {
// for (int r = 0; r < num_row_groups; ++r) {

std::shared_ptr<parquet::RowGroupReader>
row_group_reader = parquet_reader->RowGroup(
r); // Get the RowGroup Reader
0); // Get the RowGroup Reader
int64_t values_read = 0;
int64_t rows_read = 0;
int16_t definition_level;
Expand Down Expand Up @@ -1240,41 +1236,44 @@ parquet_read_span(conf_parquet *conf, char *filename, uint64_t keys[2])
}

if (ba_reader->HasNext()) {

int64_t values_read = 0;
int64_t rows_read = 0;
int16_t definition_level;
int64_t batch_size =
index_vector[1] - index_vector[0];
index_vector[1] - index_vector[0] + 1;
std::vector<parquet::ByteArray> values(
batch_size);
parquet::ByteArray value;
rows_read = ba_reader->ReadBatch(batch_size,
&definition_level, nullptr, values.data(),
&values_read);
if (batch_size == rows_read &&
batch_size == values_read) {
for (int64_t b = 0; b < batch_size;
b++) {
parquet_data_packet *pack =
(parquet_data_packet *)
malloc(sizeof(
parquet_data_packet));
pack->data =
(uint8_t *) malloc(
values[b].len *
sizeof(uint8_t));
memcpy(pack->data,
values[b].ptr,
values[b].len);
pack->size = values[b].len;
ret_vec.push_back(pack);
}
}
// if (batch_size == rows_read &&
// batch_size == values_read) {
// for (int64_t b = 0; b < batch_size;
// b++) {
// parquet_data_packet *pack =
// (parquet_data_packet *)
// malloc(sizeof(
// parquet_data_packet));
// pack->data =
// (uint8_t *) malloc(
// values[b].len *
// sizeof(uint8_t));
// memcpy(pack->data,
// values[b].ptr,
// values[b].len);
// pack->size = values[b].len;
// ret_vec.push_back(pack);
// }
// }
}
// }

}

} catch (const std::exception &e) {
exception_msg = e.what();
log_error("exception_msg=[%s]", exception_msg.c_str());
}
// } catch (const std::exception &e) {
// exception_msg = e.what();
// log_error("exception_msg=[%s]", exception_msg.c_str());
// }

return ret_vec;
}
Expand All @@ -1285,7 +1284,7 @@ typedef enum {
} key_type;

static uint64_t
get_key(char *filename, key_type type)
get_key(const char *filename, key_type type)
{
uint64_t range[2] = { 0 };
uint64_t res = 0;
Expand All @@ -1304,11 +1303,13 @@ get_key(char *filename, key_type type)
}

parquet_data_packet **
parquet_find_data_span_packets(conf_parquet *conf, char **filenames,
uint32_t len, uint64_t start_key, uint64_t end_key)
parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size)
{
vector<parquet_data_packet *> ret_vec;
parquet_data_packet **packets = NULL;
uint32_t len = 0;

const char **filenames = parquet_find_span(start_key, end_key, &len);

for (uint32_t i = 0; i < len; i++) {
end_key = i == 0 ? end_key : get_key(filenames[i], END_KEY);
Expand All @@ -1317,14 +1318,16 @@ parquet_find_data_span_packets(conf_parquet *conf, char **filenames,
uint64_t keys[2];
keys[0] = start_key;
keys[1] = end_key;
auto tmp = parquet_read_span(conf, filenames[i], keys);
ret_vec.insert(ret_vec.end(), tmp.begin(), tmp.end());
// auto tmp =
parquet_read_span(conf, filenames[i], keys);
// ret_vec.insert(ret_vec.end(), tmp.begin(), tmp.end());
}

if (!ret_vec.empty()) {
packets = (parquet_data_packet **) malloc(
sizeof(parquet_data_packet *) * len);
copy(ret_vec.begin(), ret_vec.end(), packets);
*size = ret_vec.size();
}

return packets;
Expand Down

0 comments on commit fbc34bd

Please sign in to comment.