Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Add support for bulk ingestion of list types #1882

Closed
judahrand opened this issue May 24, 2024 · 7 comments · Fixed by #1962
Closed

feat(c/driver/postgresql): Add support for bulk ingestion of list types #1882

judahrand opened this issue May 24, 2024 · 7 comments · Fixed by #1962
Labels
Type: enhancement New feature or request

Comments

@judahrand
Copy link
Contributor

What feature or improvement would you like to see?

These types would include List, LargeList, FixedSizeList and could also include FixedSizedTensor.

@cpcloud
Copy link

cpcloud commented Jun 8, 2024

+1 here, this blocks us using the adbc postgres driver in ibis (which would be great, it removes a lot of very slow ingest hacks that we have)

@paleolimbot
Copy link
Member

Just spotted this and thought I'd leave some thoughts on how to do this (for me or the next person that gets here!):

We have support for the other direction (Postgres Array to List), and I don't think it is too hard to reverse engineer that:

class PostgresCopyArrayFieldReader : public PostgresCopyFieldReader {
public:
void InitChild(std::unique_ptr<PostgresCopyFieldReader> child) {
child_ = std::move(child);
child_->Init(pg_type_.child(0));
}
ArrowErrorCode InitSchema(ArrowSchema* schema) override {
NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitSchema(schema));
NANOARROW_RETURN_NOT_OK(child_->InitSchema(schema->children[0]));
return NANOARROW_OK;
}
ArrowErrorCode InitArray(ArrowArray* array) override {
NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitArray(array));
NANOARROW_RETURN_NOT_OK(child_->InitArray(array->children[0]));
return NANOARROW_OK;
}
ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array,
ArrowError* error) override {
if (field_size_bytes <= 0) {
return ArrowArrayAppendNull(array, 1);
}
// Keep the cursor where we start to parse the array so we can check
// the number of bytes read against the field size when finished
const uint8_t* data0 = data->data.as_uint8;
int32_t n_dim;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &n_dim, error));
int32_t flags;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &flags, error));
uint32_t element_type_oid;
NANOARROW_RETURN_NOT_OK(ReadChecked<uint32_t>(data, &element_type_oid, error));
// We could validate the OID here, but this is a poor fit for all cases
// (e.g. testing) since the OID can be specific to each database
if (n_dim < 0) {
ArrowErrorSet(error, "Expected array n_dim > 0 but got %d",
static_cast<int>(n_dim)); // NOLINT(runtime/int)
return EINVAL;
}
// This is apparently allowed
if (n_dim == 0) {
NANOARROW_RETURN_NOT_OK(ArrowArrayFinishElement(array));
return NANOARROW_OK;
}
int64_t n_items = 1;
for (int32_t i = 0; i < n_dim; i++) {
int32_t dim_size;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &dim_size, error));
n_items *= dim_size;
int32_t lower_bound;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &lower_bound, error));
if (lower_bound != 1) {
ArrowErrorSet(error, "Array value with lower bound != 1 is not supported");
return EINVAL;
}
}
for (int64_t i = 0; i < n_items; i++) {
int32_t child_field_size_bytes;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &child_field_size_bytes, error));
NANOARROW_RETURN_NOT_OK(
child_->Read(data, child_field_size_bytes, array->children[0], error));
}
int64_t bytes_read = data->data.as_uint8 - data0;
if (bytes_read != field_size_bytes) {
ArrowErrorSet(error, "Expected to read %d bytes from array field but read %d bytes",
static_cast<int>(field_size_bytes),
static_cast<int>(bytes_read)); // NOLINT(runtime/int)
return EINVAL;
}
NANOARROW_RETURN_NOT_OK(ArrowArrayFinishElement(array));
return NANOARROW_OK;
}
private:
std::unique_ptr<PostgresCopyFieldReader> child_;
};

...some test data that could be used to write a test:

// COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{-123, -1}'), ('{0,
// 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static const uint8_t kTestPgCopyIntegerArray[] = {
0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0x85, 0x00, 0x00, 0x00,
0x04, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x7b, 0x00,
0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

...the test for the read direction:

class PostgresCopyArrayFieldReader : public PostgresCopyFieldReader {
public:
void InitChild(std::unique_ptr<PostgresCopyFieldReader> child) {
child_ = std::move(child);
child_->Init(pg_type_.child(0));
}
ArrowErrorCode InitSchema(ArrowSchema* schema) override {
NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitSchema(schema));
NANOARROW_RETURN_NOT_OK(child_->InitSchema(schema->children[0]));
return NANOARROW_OK;
}
ArrowErrorCode InitArray(ArrowArray* array) override {
NANOARROW_RETURN_NOT_OK(PostgresCopyFieldReader::InitArray(array));
NANOARROW_RETURN_NOT_OK(child_->InitArray(array->children[0]));
return NANOARROW_OK;
}
ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array,
ArrowError* error) override {
if (field_size_bytes <= 0) {
return ArrowArrayAppendNull(array, 1);
}
// Keep the cursor where we start to parse the array so we can check
// the number of bytes read against the field size when finished
const uint8_t* data0 = data->data.as_uint8;
int32_t n_dim;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &n_dim, error));
int32_t flags;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &flags, error));
uint32_t element_type_oid;
NANOARROW_RETURN_NOT_OK(ReadChecked<uint32_t>(data, &element_type_oid, error));
// We could validate the OID here, but this is a poor fit for all cases
// (e.g. testing) since the OID can be specific to each database
if (n_dim < 0) {
ArrowErrorSet(error, "Expected array n_dim > 0 but got %d",
static_cast<int>(n_dim)); // NOLINT(runtime/int)
return EINVAL;
}
// This is apparently allowed
if (n_dim == 0) {
NANOARROW_RETURN_NOT_OK(ArrowArrayFinishElement(array));
return NANOARROW_OK;
}
int64_t n_items = 1;
for (int32_t i = 0; i < n_dim; i++) {
int32_t dim_size;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &dim_size, error));
n_items *= dim_size;
int32_t lower_bound;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &lower_bound, error));
if (lower_bound != 1) {
ArrowErrorSet(error, "Array value with lower bound != 1 is not supported");
return EINVAL;
}
}
for (int64_t i = 0; i < n_items; i++) {
int32_t child_field_size_bytes;
NANOARROW_RETURN_NOT_OK(ReadChecked<int32_t>(data, &child_field_size_bytes, error));
NANOARROW_RETURN_NOT_OK(
child_->Read(data, child_field_size_bytes, array->children[0], error));
}
int64_t bytes_read = data->data.as_uint8 - data0;
if (bytes_read != field_size_bytes) {
ArrowErrorSet(error, "Expected to read %d bytes from array field but read %d bytes",
static_cast<int>(field_size_bytes),
static_cast<int>(bytes_read)); // NOLINT(runtime/int)
return EINVAL;
}
NANOARROW_RETURN_NOT_OK(ArrowArrayFinishElement(array));
return NANOARROW_OK;
}
private:
std::unique_ptr<PostgresCopyFieldReader> child_;
};

@lidavidm
Copy link
Member

I think the one thing that tripped me up when I took a look is that the array binary format includes the OID of the array elements. On read, we just ignore it, but we need to figure out the right OID to use on write (and I think none of the other writers need to care about OID).

It's possible Postgres ignores it too, though! Didn't test.

@paleolimbot
Copy link
Member

I just did a quick browse and I'm wondering if this method will help:

ArrowErrorCode FindArray(uint32_t child_oid, PostgresType* type_out,
ArrowError* error) const {
auto array_oid_lookup = array_mapping_.find(child_oid);
if (array_oid_lookup == array_mapping_.end()) {
ArrowErrorSet(error, "Postgres array type with child oid %ld not found",
static_cast<long>(child_oid)); // NOLINT(runtime/int)
return EINVAL;
}
return Find(array_oid_lookup->second, type_out, error);
}

@lidavidm
Copy link
Member

I think we need the reverse? Given the array OID find the child OID. (And in this case the array OID may not yet exist, if we're creating the table...maybe this is more complicated than it seems.)

@paleolimbot
Copy link
Member

With an up-to-date PostgresTypeResolver I think that's:

ArrowErrorCode Find(uint32_t oid, PostgresType* type_out, ArrowError* error) const {

...plus:

const PostgresType& child(int64_t i) const { return children_[i]; }

...but perhaps getting an up-to-date PostgresTypeResolver is difficult here.

@lidavidm
Copy link
Member

Aha. Thanks!

We can always plumb through the resolver but it seems like we should have enough info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants