Skip to content

Commit

Permalink
feat(c/driver/postgresql): FIXED_SIZED_LIST Writer support (#1975)
Browse files Browse the repository at this point in the history
Another enhancement for #1882
  • Loading branch information
WillAyd committed Jul 6, 2024
1 parent a946842 commit 22c5696
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 11 deletions.
60 changes: 60 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,66 @@ TEST_P(PostgresCopyListTest, PostgresCopyWriteListVarchar) {
INSTANTIATE_TEST_SUITE_P(ArrowListTypes, PostgresCopyListTest,
testing::Values(NANOARROW_TYPE_LIST, NANOARROW_TYPE_LARGE_LIST));

// COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{1, 2}'),
// ('{-1, -2}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY);
static const uint8_t kTestPgCopyFixedSizeIntegerArray[] = {
0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x04, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00,
0x04, 0xff, 0xff, 0xff, 0xfe, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListInteger) {
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;

ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);

ArrowSchemaInit(schema->children[0]);
ASSERT_EQ(
ArrowSchemaSetTypeFixedSize(schema->children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2),
NANOARROW_OK);
ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);

ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr), NANOARROW_OK);
ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1), NANOARROW_OK);
ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 2), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1), NANOARROW_OK);
ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -2), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error), NANOARROW_OK);

PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK);
ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);

const struct ArrowBuffer buf = tester.WriteBuffer();
// The last 2 bytes of a message can be transmitted via PQputCopyData
// so no need to test those bytes from the Writer
constexpr size_t buf_size = sizeof(kTestPgCopyFixedSizeIntegerArray) - 2;
ASSERT_EQ(buf.size_bytes, buf_size);
for (size_t i = 0; i < buf_size; i++) {
ASSERT_EQ(buf.data[i], kTestPgCopyFixedSizeIntegerArray[i])
<< "failure at index " << i;
}
}

TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) {
// Regression test for https://github.com/apache/arrow-adbc/issues/1310
adbc_validation::Handle<struct ArrowSchema> schema;
Expand Down
39 changes: 28 additions & 11 deletions c/driver/postgresql/copy/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class PostgresCopyBinaryDictFieldWriter : public PostgresCopyFieldWriter {
}
};

template <bool IsFixedSize>
class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter {
public:
explicit PostgresCopyListFieldWriter(uint32_t child_oid) : child_oid_{child_oid} {}
Expand All @@ -452,13 +453,21 @@ class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter {
constexpr int32_t ndim = 1;
constexpr int32_t has_null_flags = 0;

const int32_t start = ArrowArrayViewListChildOffset(array_view_, index);
const int32_t end = ArrowArrayViewListChildOffset(array_view_, index + 1);
// TODO: the LARGE_LIST should use 64 bit indexes
int32_t start, end;
if constexpr (IsFixedSize) {
start = index * array_view_->layout.child_size_elements;
end = start + array_view_->layout.child_size_elements;
} else {
start = ArrowArrayViewListChildOffset(array_view_, index);
end = ArrowArrayViewListChildOffset(array_view_, index + 1);
}

const int32_t dim = end - start;
constexpr int32_t lb = 1;

// for fixed size fields where we know the size of each record we would write to
// postgres T, we could avoid the use of a temporary buffer and just write
// for children of a fixed size T we could avoid the use of a temporary buffer
/// and theoretically just write
//
// const int32_t field_size_bytes =
// sizeof(ndim) + sizeof(has_null_flags) + sizeof(child_oid_) + sizeof(dim) * ndim
Expand Down Expand Up @@ -698,7 +707,8 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
break;
}
case NANOARROW_TYPE_LIST:
case NANOARROW_TYPE_LARGE_LIST: {
case NANOARROW_TYPE_LARGE_LIST:
case NANOARROW_TYPE_FIXED_SIZE_LIST: {
// For now our implementation only supports primitive children types
// See PostgresCopyListFieldWriter::Write for limtiations
struct ArrowSchemaView child_schema_view;
Expand All @@ -708,17 +718,24 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
NANOARROW_RETURN_NOT_OK(PostgresType::FromSchema(type_resolver, schema->children[0],
&child_type, error));

auto list_writer = std::make_unique<PostgresCopyListFieldWriter>(child_type.oid());
list_writer->Init(array_view);

std::unique_ptr<PostgresCopyFieldWriter> child_writer;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(schema->children[0],
array_view->children[0], type_resolver,
&child_writer, error));

list_writer->InitChild(std::move(child_writer));

*out = std::move(list_writer);
if (schema_view.type == NANOARROW_TYPE_FIXED_SIZE_LIST) {
auto list_writer =
std::make_unique<PostgresCopyListFieldWriter<true>>(child_type.oid());
list_writer->Init(array_view);
list_writer->InitChild(std::move(child_writer));
*out = std::move(list_writer);
} else {
auto list_writer =
std::make_unique<PostgresCopyListFieldWriter<false>>(child_type.oid());
list_writer->Init(array_view);
list_writer->InitChild(std::move(child_writer));
*out = std::move(list_writer);
}
return NANOARROW_OK;
}
default:
Expand Down

0 comments on commit 22c5696

Please sign in to comment.