Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Inital COPY Writer design #1110

Merged
merged 14 commits into from
Sep 28, 2023
188 changes: 188 additions & 0 deletions c/driver/postgresql/postgres_copy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
return NANOARROW_OK;
}

// Write a value to a buffer without checking the buffer size. Advances
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put all of this code into postgres_copy_reader.h because it re-uses a lot of the same patterns and constants. Maybe we should rename this postgres_copy_io.h?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.

// the cursor of buffer and reduces it by sizeof(T)
template <typename T>
inline void WriteUnsafe(ArrowBuffer* buffer, T in) {
const T value = SwapNetworkToHost(in);
memcpy(buffer->data, &value, sizeof(T));
buffer->data += sizeof(T);
buffer->size_bytes += sizeof(T);
}

template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) {
buffer->data[0] = in;
buffer->data += sizeof(int8_t);
buffer->size_bytes += sizeof(int8_t);
}
Comment on lines +130 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm not sure this is necessary. Compilers understand memcpy, and I would guess that they optimize the generic above to the same as these specializations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specializations exist because of the unsigned argument requirement forSwapNetworkToHost requirement, although that makes me realize these are incorrect as is


template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int16_t in) {
uint16_t uval;
memcpy(&uval, &in, sizeof(int16_t));
WriteUnsafe<uint16_t>(buffer, uval);
}

template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int32_t in) {
int32_t uval;
memcpy(&uval, &in, sizeof(int32_t));
WriteUnsafe<uint32_t>(buffer, uval);
}

template <typename T>
ArrowErrorCode WriteChecked(ArrowBuffer* buffer, T in, ArrowError* error) {
// TODO: beware of overflow here
if (buffer->capacity_bytes < buffer->size_bytes + static_cast<int64_t>(sizeof(T))) {
ArrowErrorSet(error,
"Insufficient buffer capacity (expected " PRId64
" bytes but found " PRId64 ")",
buffer->size_bytes + sizeof(T), buffer->capacity_bytes);

return EINVAL;
}

WriteUnsafe<T>(buffer, in);
return NANOARROW_OK;
}

class PostgresCopyFieldReader {
public:
PostgresCopyFieldReader() : validity_(nullptr), offsets_(nullptr), data_(nullptr) {
Expand Down Expand Up @@ -1058,4 +1105,145 @@ class PostgresCopyStreamReader {
int64_t array_size_approx_bytes_;
};

class PostgresCopyFieldWriter {
public:
virtual ~PostgresCopyFieldWriter() {}

void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };

virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
return ENOTSUP;
}

protected:
struct ArrowArrayView* array_view_;
std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
};

class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
public:
void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
int64_t child_i = static_cast<int64_t>(children_.size());
children_.push_back(std::move(child));
children_[child_i]->Init(array_view_->children[child_i]);
}

ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Write method calls accept an index argument, which is a little different from the Reader setup. Instead of accessing by index, the Readers always call ArrowBufferAppend on the array they are building.

I think we could still do that here, it's just a little bit more complicated by the fact that there is no generator ArrowBufferGetNext or similar, so I figured just using index access was easier to start. Could be something larger I am overlooking

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indices are fine, reading data is different than writing it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I think its just kind of confusing that we increment the buffer in the WriteUnchecked calls but also mix in index access here. May be the best we can do to start

if (index >= array_view_->length) {
return ENODATA;
}

const int16_t n_fields = children_.size();
NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));

for (int16_t i = 0; i < n_fields; i++) {
children_[i]->Write(buffer, index, error);
}

return NANOARROW_OK;
}

private:
std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
};

class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
public:
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
const int32_t field_size_bytes = is_null ? -1 : 1;
NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
if (is_null) {
return ADBC_STATUS_OK;
}

const int8_t value =
static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));

return ADBC_STATUS_OK;
}
};

static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
PostgresCopyFieldWriter** out,
ArrowError* error) {
switch (arrow_type) {
case NANOARROW_TYPE_BOOL:
*out = new PostgresCopyBooleanFieldWriter();
return NANOARROW_OK;
default:
return EINVAL;
}
return NANOARROW_OK;
}

class PostgresCopyStreamWriter {
public:
~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }

ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
schema_ = schema;
NANOARROW_RETURN_NOT_OK(
ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
root_writer_.Init(array_view_.get());
return NANOARROW_OK;
}

int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is necessary, just copied from the reader design

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not? That was to let us build partial results fitting roughly within some bound.


ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move towards ArrowBufferAppendUnsafe if ensure a proper buffer size up front. I think in the current protocol you need 19 bytes for the header, 2 bytes for the number of columns in each row, 4 bytes for each record to indicate the record length, n bits for every non-null record to contain its actual bytes and finally 4 bytes for the end message.

Something to investigate futher

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so one pass to figure out the buffer size, then another pass to actually copy data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to think more about this. I don't think you need 2 passes though? I think this could be calculated up front


const uint32_t flag_fields = 0;
ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));

const uint32_t extension_bytes = 0;
ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));

const int64_t header_bytes =
sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + sizeof(extension_bytes);
buffer->data += header_bytes;
array_size_approx_bytes_ += header_bytes;

return NANOARROW_OK;
}

ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
const uint8_t* start = buffer->data;
NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, error));
records_written_++;
array_size_approx_bytes_ += buffer->data - start;
return NANOARROW_OK;
}

ArrowErrorCode InitFieldWriters(ArrowError* error) {
if (schema_->release == nullptr) {
return EINVAL;
}

for (int64_t i = 0; i < schema_->n_children; i++) {
struct ArrowSchemaView schema_view;
if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
NANOARROW_OK) {
return ADBC_STATUS_INTERNAL;
}
const ArrowType arrow_type = schema_view.type;
PostgresCopyFieldWriter* child_writer;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, error));
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
}

return NANOARROW_OK;
}

private:
PostgresCopyFieldTupleWriter root_writer_;
struct ArrowSchema* schema_;
std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit iffy on C++ constructs, but I think this is the easiest way to declare an pointer that owns data as a class member with C++11 compat. Apologies if I'm missing something easier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Handle<ArrowArrayView> should work. I should really find the time to go write that adbc++ library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool that is a great idea. I think we would have to refactor the Handle to move from statement.cc to postgres_util.h so will tackle in another PR

int64_t records_written_;
int64_t array_size_approx_bytes_;
};

} // namespace adbcpq
56 changes: 56 additions & 0 deletions c/driver/postgresql/postgres_copy_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.

#include <optional>

#include <gtest/gtest.h>
#include <nanoarrow/nanoarrow.hpp>

#include "postgres_copy_reader.h"
#include "validation/adbc_validation_util.h"

namespace adbcpq {

Expand Down Expand Up @@ -52,6 +55,30 @@ class PostgresCopyStreamTester {
PostgresCopyStreamReader reader_;
};

class PostgresCopyStreamWriteTester {
public:
ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array,
ArrowError* error = nullptr) {
NANOARROW_RETURN_NOT_OK(writer_.Init(schema, array));
NANOARROW_RETURN_NOT_OK(writer_.InitFieldWriters(error));
return NANOARROW_OK;
}

ArrowErrorCode WriteAll(struct ArrowBuffer* buffer, ArrowError* error = nullptr) {
NANOARROW_RETURN_NOT_OK(writer_.WriteHeader(buffer, error));

int result;
do {
result = writer_.WriteRecord(buffer, error);
} while (result == NANOARROW_OK);

return result;
}

private:
PostgresCopyStreamWriter writer_;
};

// COPY (SELECT CAST("col" AS BOOLEAN) AS "col" FROM ( VALUES (TRUE), (FALSE), (NULL)) AS
// drvd("col")) TO STDOUT;
static uint8_t kTestPgCopyBoolean[] = {
Expand Down Expand Up @@ -96,6 +123,35 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadBoolean) {
ASSERT_FALSE(ArrowBitGet(data_buffer, 2));
}

TEST(PostgresCopyUtilsTest, PostgresCopyWriteBoolean) {
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_BOOL}}),
ADBC_STATUS_OK);
ASSERT_EQ(adbc_validation::MakeBatch<bool>(&schema.value, &array.value, &na_error,
{true, false, std::nullopt}),
ADBC_STATUS_OK);

PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);

struct ArrowBuffer buffer;
ArrowBufferInit(&buffer);
ArrowBufferReserve(&buffer, sizeof(kTestPgCopyBoolean));
uint8_t* cursor = buffer.data;
ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);

// The last 4 bytes of a message can be transmitted via PQputCopyData
// so no need to test those bytes from the Writer
for (size_t i = 0; i < sizeof(kTestPgCopyBoolean) - 4; i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately when we implement this in statement.cc I imagine we will build the buffer (maybe even in chunks) and send that via PQputCopyData . When all is said and done we would then do a PQputCopyEnd to send the last 4 bytes. Maybe we should make the end message a constant in the tests so it is clear what is part of the "data" versus the sentinel signaling the end of the buffer

ASSERT_EQ(cursor[i], kTestPgCopyBoolean[i]);
}

buffer.data = cursor;
ArrowBufferReset(&buffer);
}

// COPY (SELECT CAST("col" AS SMALLINT) AS "col" FROM ( VALUES (-123), (-1), (1), (123),
// (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopySmallInt[] = {
Expand Down
Loading