Skip to content

Commit

Permalink
TEXT support
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd committed Jul 3, 2024
1 parent 2ba7484 commit 9272269
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 44 deletions.
73 changes: 69 additions & 4 deletions c/driver/postgresql/copy/postgres_copy_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <gtest/gtest.h>
#include <nanoarrow/nanoarrow.hpp>

#include "nanoarrow_types.h"
#include "postgres_copy_test_common.h"
#include "postgresql/copy/writer.h"
#include "validation/adbc_validation_util.h"
Expand Down Expand Up @@ -936,9 +935,6 @@ TEST_P(PostgresCopyListTest, PostgresCopyWriteListInteger) {
}
}

INSTANTIATE_TEST_SUITE_P(ArrowListTypes, PostgresCopyListTest,
testing::Values(NANOARROW_TYPE_LIST, NANOARROW_TYPE_LARGE_LIST));

// COPY (SELECT CAST("col" AS BIGINT ARRAY) AS "col" FROM ( VALUES ('{-123, -1}'), ('{0,
// 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static const uint8_t kTestPgCopyBigIntegerArray[] = {
Expand Down Expand Up @@ -999,6 +995,75 @@ TEST_P(PostgresCopyListTest, PostgresCopyWriteListBigInt) {
}
}

// COPY (SELECT CAST("col" AS TEXT ARRAY) AS "col" FROM ( VALUES ('{"foo", "bar"}'),
// ('{"baz", "qux", "quux"}'), (NULL)) AS drvd("col")) TO '/tmp/pgout.data' WITH (FORMAT
// binary);
static const uint8_t kTestPgCopyTextArray[] = {
0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x22, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x00, 0x00,
0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x66, 0x6f, 0x6f,
0x00, 0x00, 0x00, 0x03, 0x62, 0x61, 0x72, 0x00, 0x01, 0x00, 0x00, 0x00, 0x2a,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x00,
0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x62, 0x61,
0x7a, 0x00, 0x00, 0x00, 0x03, 0x71, 0x75, 0x78, 0x00, 0x00, 0x00, 0x04, 0x71,
0x75, 0x75, 0x78, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

TEST_P(PostgresCopyListTest, PostgresCopyWriteListVarchar) {
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;

ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);

ASSERT_EQ(ArrowSchemaInitFromType(schema->children[0], GetParam()), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0], NANOARROW_TYPE_STRING),
NANOARROW_OK);

ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr), NANOARROW_OK);
ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0], ArrowCharView("foo")),
NANOARROW_OK);
ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0], ArrowCharView("bar")),
NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0], ArrowCharView("baz")),
NANOARROW_OK);
ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0], ArrowCharView("qux")),
NANOARROW_OK);
ASSERT_EQ(
ArrowArrayAppendString(array->children[0]->children[0], ArrowCharView("quux")),
NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);

ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error), NANOARROW_OK);

PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&connection_, &schema.value, &array.value), NANOARROW_OK);
ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);

const struct ArrowBuffer buf = tester.WriteBuffer();
// The last 2 bytes of a message can be transmitted via PQputCopyData
// so no need to test those bytes from the Writer
constexpr size_t buf_size = sizeof(kTestPgCopyTextArray) - 2;
ASSERT_EQ(buf.size_bytes, buf_size);
for (size_t i = 0; i < buf_size; i++) {
ASSERT_EQ(buf.data[i], kTestPgCopyTextArray[i]) << "failure at index " << i;
}
}

INSTANTIATE_TEST_SUITE_P(ArrowListTypes, PostgresCopyListTest,
testing::Values(NANOARROW_TYPE_LIST, NANOARROW_TYPE_LARGE_LIST));

TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) {
// Regression test for https://github.com/apache/arrow-adbc/issues/1310
adbc_validation::Handle<struct ArrowSchema> schema;
Expand Down
74 changes: 34 additions & 40 deletions c/driver/postgresql/copy/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,15 +457,23 @@ class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter {
const int32_t dim = end - start;
constexpr int32_t lb = 1;

// TODO: this works for primitive types where we can calculate the buffer size
// in advance for varying types we likely need to create a separate buffer first
const int32_t child_record_size =
array_view_->children[0]->layout.element_size_bits[1] / 8;
const int32_t field_size_bytes =
sizeof(ndim) + sizeof(has_null_flags) + sizeof(child_oid_) + sizeof(dim) * ndim +
sizeof(lb) * ndim
// for each primitive record we send int32_t nbytes + the value itself
+ sizeof(int32_t) * dim + child_record_size * dim;
// for fixed size fields where we know the size of each record we would write to
// postgres T, we could avoid the use of a temporary buffer and just write
//
// const int32_t field_size_bytes =
// sizeof(ndim) + sizeof(has_null_flags) + sizeof(child_oid_) + sizeof(dim) * ndim
// + sizeof(lb) * ndim
// + sizeof(int32_t) * dim + T * dim;
//
// directly to our buffer
nanoarrow::UniqueBuffer tmp;
ArrowBufferInit(tmp.get());
for (auto i = start; i < end; ++i) {
NANOARROW_RETURN_NOT_OK(child_->Write(tmp.get(), i, error));
}
const int32_t field_size_bytes = sizeof(ndim) + sizeof(has_null_flags) +
sizeof(child_oid_) + sizeof(dim) * ndim +
sizeof(lb) * ndim + tmp->size_bytes;

NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, ndim, error));
Expand All @@ -476,9 +484,7 @@ class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter {
NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, lb, error));
}

for (auto i = start; i < end; ++i) {
NANOARROW_RETURN_NOT_OK(child_->Write(buffer, i, error));
}
ArrowBufferAppend(buffer, tmp->data, tmp->size_bytes);

return ADBC_STATUS_OK;
}
Expand Down Expand Up @@ -698,34 +704,22 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
struct ArrowSchemaView child_schema_view;
NANOARROW_RETURN_NOT_OK(
ArrowSchemaViewInit(&child_schema_view, schema->children[0], error));
switch (child_schema_view.type) {
case NANOARROW_TYPE_INT16:
case NANOARROW_TYPE_INT32:
case NANOARROW_TYPE_INT64: {
const auto resolver = conn->type_resolver();
PostgresType child_type;
NANOARROW_RETURN_NOT_OK(PostgresType::FromSchema(*resolver, schema->children[0],
&child_type, error));

auto list_writer =
std::make_unique<PostgresCopyListFieldWriter>(child_type.oid());
list_writer->Init(array_view);

std::unique_ptr<PostgresCopyFieldWriter> child_writer;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(
conn, schema->children[0], array_view->children[0], &child_writer, error));

list_writer->InitChild(std::move(child_writer));

*out = std::move(list_writer);
return NANOARROW_OK;
}
default:
ArrowErrorSet(
error, "COPY Writer not implemented for list types with child type of %d",
child_schema_view.type);
return EINVAL;
}
const auto resolver = conn->type_resolver();
PostgresType child_type;
NANOARROW_RETURN_NOT_OK(
PostgresType::FromSchema(*resolver, schema->children[0], &child_type, error));

auto list_writer = std::make_unique<PostgresCopyListFieldWriter>(child_type.oid());
list_writer->Init(array_view);

std::unique_ptr<PostgresCopyFieldWriter> child_writer;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(
conn, schema->children[0], array_view->children[0], &child_writer, error));

list_writer->InitChild(std::move(child_writer));

*out = std::move(list_writer);
return NANOARROW_OK;
}
default:
break;
Expand Down

0 comments on commit 9272269

Please sign in to comment.