From 22c5696b850ca5fb4d3d92bac3e6a4fda9e0df5f Mon Sep 17 00:00:00 2001 From: William Ayd Date: Fri, 5 Jul 2024 21:24:06 -0400 Subject: [PATCH] feat(c/driver/postgresql): FIXED_SIZED_LIST Writer support (#1975) Another enhancement for https://github.com/apache/arrow-adbc/issues/1882 --- .../copy/postgres_copy_writer_test.cc | 60 +++++++++++++++++++ c/driver/postgresql/copy/writer.h | 39 ++++++++---- 2 files changed, 88 insertions(+), 11 deletions(-) diff --git a/c/driver/postgresql/copy/postgres_copy_writer_test.cc b/c/driver/postgresql/copy/postgres_copy_writer_test.cc index b0124600dd..618f27cf13 100644 --- a/c/driver/postgresql/copy/postgres_copy_writer_test.cc +++ b/c/driver/postgresql/copy/postgres_copy_writer_test.cc @@ -1052,6 +1052,66 @@ TEST_P(PostgresCopyListTest, PostgresCopyWriteListVarchar) { INSTANTIATE_TEST_SUITE_P(ArrowListTypes, PostgresCopyListTest, testing::Values(NANOARROW_TYPE_LIST, NANOARROW_TYPE_LARGE_LIST)); +// COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{1, 2}'), +// ('{-1, -2}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY); +static const uint8_t kTestPgCopyFixedSizeIntegerArray[] = { + 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, + 0x04, 0xff, 0xff, 0xff, 0xfe, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; +TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListInteger) { + adbc_validation::Handle schema; + adbc_validation::Handle array; + struct ArrowError na_error; + + ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK); + + ArrowSchemaInit(schema->children[0]); + ASSERT_EQ( + ArrowSchemaSetTypeFixedSize(schema->children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2), + NANOARROW_OK); + ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0], NANOARROW_TYPE_INT32), + NANOARROW_OK); + + ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr), NANOARROW_OK); + ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK); + + ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 2), NANOARROW_OK); + ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK); + ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK); + + ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1), NANOARROW_OK); + ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -2), NANOARROW_OK); + ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK); + ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK); + + ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK); + ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK); + + ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error), NANOARROW_OK); + + PostgresCopyStreamWriteTester tester; + ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); + ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); + + const struct ArrowBuffer buf = tester.WriteBuffer(); + // The last 2 bytes of a message can be transmitted via PQputCopyData + // so no need to test those bytes from the Writer + constexpr size_t buf_size = sizeof(kTestPgCopyFixedSizeIntegerArray) - 2; + ASSERT_EQ(buf.size_bytes, buf_size); + for (size_t i = 0; i < buf_size; i++) { + ASSERT_EQ(buf.data[i], kTestPgCopyFixedSizeIntegerArray[i]) + << "failure at index " << i; + } +} + TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) { // Regression test for https://github.com/apache/arrow-adbc/issues/1310 adbc_validation::Handle schema; diff --git a/c/driver/postgresql/copy/writer.h b/c/driver/postgresql/copy/writer.h index 0730db1104..528e1f375e 100644 --- a/c/driver/postgresql/copy/writer.h +++ b/c/driver/postgresql/copy/writer.h @@ -436,6 +436,7 @@ class PostgresCopyBinaryDictFieldWriter : public PostgresCopyFieldWriter { } }; +template class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter { public: explicit PostgresCopyListFieldWriter(uint32_t child_oid) : child_oid_{child_oid} {} @@ -452,13 +453,21 @@ class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter { constexpr int32_t ndim = 1; constexpr int32_t has_null_flags = 0; - const int32_t start = ArrowArrayViewListChildOffset(array_view_, index); - const int32_t end = ArrowArrayViewListChildOffset(array_view_, index + 1); + // TODO: the LARGE_LIST should use 64 bit indexes + int32_t start, end; + if constexpr (IsFixedSize) { + start = index * array_view_->layout.child_size_elements; + end = start + array_view_->layout.child_size_elements; + } else { + start = ArrowArrayViewListChildOffset(array_view_, index); + end = ArrowArrayViewListChildOffset(array_view_, index + 1); + } + const int32_t dim = end - start; constexpr int32_t lb = 1; - // for fixed size fields where we know the size of each record we would write to - // postgres T, we could avoid the use of a temporary buffer and just write + // for children of a fixed size T we could avoid the use of a temporary buffer + /// and theoretically just write // // const int32_t field_size_bytes = // sizeof(ndim) + sizeof(has_null_flags) + sizeof(child_oid_) + sizeof(dim) * ndim @@ -698,7 +707,8 @@ static inline ArrowErrorCode MakeCopyFieldWriter( break; } case NANOARROW_TYPE_LIST: - case NANOARROW_TYPE_LARGE_LIST: { + case NANOARROW_TYPE_LARGE_LIST: + case NANOARROW_TYPE_FIXED_SIZE_LIST: { // For now our implementation only supports primitive children types // See PostgresCopyListFieldWriter::Write for limtiations struct ArrowSchemaView child_schema_view; @@ -708,17 +718,24 @@ static inline ArrowErrorCode MakeCopyFieldWriter( NANOARROW_RETURN_NOT_OK(PostgresType::FromSchema(type_resolver, schema->children[0], &child_type, error)); - auto list_writer = std::make_unique(child_type.oid()); - list_writer->Init(array_view); - std::unique_ptr child_writer; NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(schema->children[0], array_view->children[0], type_resolver, &child_writer, error)); - list_writer->InitChild(std::move(child_writer)); - - *out = std::move(list_writer); + if (schema_view.type == NANOARROW_TYPE_FIXED_SIZE_LIST) { + auto list_writer = + std::make_unique>(child_type.oid()); + list_writer->Init(array_view); + list_writer->InitChild(std::move(child_writer)); + *out = std::move(list_writer); + } else { + auto list_writer = + std::make_unique>(child_type.oid()); + list_writer->Init(array_view); + list_writer->InitChild(std::move(child_writer)); + *out = std::move(list_writer); + } return NANOARROW_OK; } default: