Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Use COPY for writes #1093

Merged
merged 17 commits into from
Oct 13, 2023
4 changes: 2 additions & 2 deletions c/driver/postgresql/postgres_copy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -1406,9 +1406,9 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
return NANOARROW_OK;
}
default:
ArrowErrorSet(error, "COPY Writer not implemented for type %d", schema_view.type);
return EINVAL;
}
return NANOARROW_OK;
}

class PostgresCopyStreamWriter {
Expand Down Expand Up @@ -1455,7 +1455,7 @@ class PostgresCopyStreamWriter {
NANOARROW_OK) {
return ADBC_STATUS_INTERNAL;
}
PostgresCopyFieldWriter* child_writer;
PostgresCopyFieldWriter* child_writer = nullptr;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(schema_view, &child_writer, error));
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
}
Expand Down
84 changes: 61 additions & 23 deletions c/driver/postgresql/postgresql_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,67 @@
#include "adbc.h"
#include "validation/adbc_validation_util.h"


static void BM_PostgresqlExecute(benchmark::State& state) {
const char* uri = std::getenv("ADBC_POSTGRESQL_TEST_URI");
if (!uri) {
if (!uri || !strcmp(uri, "")) {
Copy link
Contributor Author

@WillAyd WillAyd Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are orthogonal but figured OK to lump in here given this is the first PR using these benchmarks. Also happy to split off if you prefer

I noticed LSAN is not happy if we actually return now, for example if you provide an invalid test_uri. Am I missing something extra I should be releasing before these returns?

==96786==ERROR: LeakSanitizer: detected memory leaks

Direct leak of 1024 byte(s) in 1 object(s) allocated from:
    #0 0x7f0f1a0defef in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:69
    #1 0x7f0f19b28aaa in SetErrorVariadic /home/willayd/clones/arrow-adbc/c/driver/common/utils.c:141
    #2 0x7f0f19b283ac in SetError /home/willayd/clones/arrow-adbc/c/driver/common/utils.c:110
    #3 0x7f0f19a7ffcc in adbcpq::PostgresDatabase::Connect(pg_conn**, AdbcError*) /home/willayd/clones/arrow-adbc/c/driver/postgresql/database.cc:107
    #4 0x7f0f19a803d6 in adbcpq::PostgresDatabase::RebuildTypeResolver(AdbcError*) /home/willayd/clones/arrow-adbc/c/driver/postgresql/database.cc:135
    #5 0x7f0f19a7fb23 in adbcpq::PostgresDatabase::Init(AdbcError*) /home/willayd/clones/arrow-adbc/c/driver/postgresql/database.cc:58
    #6 0x7f0f19ab59b1 in PostgresDatabaseInit /home/willayd/clones/arrow-adbc/c/driver/postgresql/postgresql.cc:82
    #7 0x7f0f19ab6766 in AdbcDatabaseInit /home/willayd/clones/arrow-adbc/c/driver/postgresql/postgresql.cc:197
    #8 0x560878af3271 in BM_PostgresqlExecute /home/willayd/clones/arrow-adbc/c/driver/postgresql/postgresql_benchmark.cc:44
    #9 0x560878b898bf in benchmark::internal::BenchmarkInstance::Run(long, int, benchmark::internal::ThreadTimer*, benchmark::internal::ThreadManager*, benchmark::internal::PerfCountersMeasurement*) const (/home/willayd/clones/arrow-adbc/build/driver/postgresql/postgresql-benchmark+0x75a8bf) (BuildId: 9c2b1c1be992ad8c4106282527e34c09b72aeb27)
    #10 0x560878b673e3 in benchmark::internal::(anonymous namespace)::RunInThread(benchmark::internal::BenchmarkInstance const*, long, int, benchmark::internal::ThreadManager*, benchmark::internal::PerfCountersMeasurement*) (/home/willayd/clones/arrow-adbc/build/driver/postgresql/postgresql-benchmark+0x7383e3) (BuildId: 9c2b1c1be992ad8c4106282527e34c09b72aeb27)

SUMMARY: AddressSanitizer: 1024 byte(s) leaked in 1 allocation(s).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because the error needs to be released.

If we're not going to use the error, we could just not have it in the first place (it should always be OK to pass nullptr to an error argument)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. Probably makes sense to just use the error message instead. Done that simplisticly for now - can make into a macro in a follow up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Dewey's original suggestion of factoring it out into a single function would also work.

state.SkipWithError("ADBC_POSTGRESQL_TEST_URI not set!");
return;
}
adbc_validation::Handle<struct AdbcDatabase> database;
struct AdbcError error;

if (AdbcDatabaseNew(&database.value, &error) != ADBC_STATUS_OK) {
state.SkipWithError("AdbcDatabaseNew call failed");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcDatabaseSetOption(&database.value, "uri", uri, &error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not set database uri option");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcDatabaseInit(&database.value, &error) != ADBC_STATUS_OK) {
state.SkipWithError("AdbcDatabaseInit failed");
state.SkipWithError(error.message);
error.release(&error);
return;
}

adbc_validation::Handle<struct AdbcConnection> connection;
if (AdbcConnectionNew(&connection.value, &error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not create connection object");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcConnectionInit(&connection.value, &database.value, &error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not connect to database");
state.SkipWithError(error.message);
error.release(&error);
return;
}

adbc_validation::Handle<struct AdbcStatement> statement;
if (AdbcStatementNew(&connection.value, &statement.value, &error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not create statement object");
state.SkipWithError(error.message);
error.release(&error);
return;
}

const char* drop_query = "DROP TABLE IF EXISTS adbc_postgresql_ingest_benchmark";
if (AdbcStatementSetSqlQuery(&statement.value, drop_query, &error)
!= ADBC_STATUS_OK) {
state.SkipWithError("Could not set DROP TABLE SQL query");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error)
!= ADBC_STATUS_OK) {
state.SkipWithError("Could not execute DROP TABLE SQL query");
state.SkipWithError(error.message);
error.release(&error);
return;
}

adbc_validation::Handle<struct ArrowSchema> schema;
Expand All @@ -79,15 +97,21 @@ static void BM_PostgresqlExecute(benchmark::State& state) {
{"floats", NANOARROW_TYPE_FLOAT},
{"doubles", NANOARROW_TYPE_DOUBLE},
}) != ADBC_STATUS_OK) {
state.SkipWithError("Could not create benchmark schema");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (ArrowArrayInitFromSchema(&array.value, &schema.value, &na_error) != NANOARROW_OK) {
state.SkipWithError("Could not init array from schema");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (ArrowArrayStartAppending(&array.value) != NANOARROW_OK) {
state.SkipWithError("Could not start appending to array");
state.SkipWithError(error.message);
error.release(&error);
return;
}

const size_t n_zeros = 1000;
Expand Down Expand Up @@ -118,7 +142,9 @@ static void BM_PostgresqlExecute(benchmark::State& state) {
array.value.length = n_zeros + n_ones;

if (ArrowArrayFinishBuildingDefault(&array.value, &na_error) != NANOARROW_OK) {
state.SkipWithError("Could not finish array");
state.SkipWithError(error.message);
error.release(&error);
return;
}

const char* create_query =
Expand All @@ -127,50 +153,62 @@ static void BM_PostgresqlExecute(benchmark::State& state) {

if (AdbcStatementSetSqlQuery(&statement.value, create_query, &error)
!= ADBC_STATUS_OK) {
state.SkipWithError("Could not set CREATE TABLE SQL query");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error)
!= ADBC_STATUS_OK) {
state.SkipWithError("Could not execute CREATE TABLE SQL query");
state.SkipWithError(error.message);
error.release(&error);
return;
}

adbc_validation::Handle<struct AdbcStatement> insert_stmt;
if (AdbcStatementNew(&connection.value, &insert_stmt.value, &error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not create INSERT statement object");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcStatementSetOption(&insert_stmt.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
"adbc_postgresql_ingest_benchmark",
&error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not set bulk_ingest statement option");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcStatementSetOption(&insert_stmt.value,
ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND,
&error) != ADBC_STATUS_OK) {
state.SkipWithError("Could not set bulk_ingest append option");
state.SkipWithError(error.message);
error.release(&error);
return;
}

for (auto _ : state) {
// Bind release the array, so if this actually loops you will get errors
// memory leaks
AdbcStatementBind(&insert_stmt.value, &array.value, &schema.value, &error);
AdbcStatementExecuteQuery(&insert_stmt.value, nullptr, nullptr, &error);
}

if (AdbcStatementSetSqlQuery(&statement.value, drop_query, &error)
!= ADBC_STATUS_OK) {
state.SkipWithError("Could not set DROP TABLE SQL query");
state.SkipWithError(error.message);
error.release(&error);
return;
}

if (AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error)
!= ADBC_STATUS_OK) {
state.SkipWithError("Could not execute DROP TABLE SQL query");
state.SkipWithError(error.message);
error.release(&error);
return;
}
}

BENCHMARK(BM_PostgresqlExecute);
BENCHMARK(BM_PostgresqlExecute)->Iterations(1);
BENCHMARK_MAIN();
92 changes: 81 additions & 11 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,77 @@ struct BindStream {
}
return ADBC_STATUS_OK;
}

AdbcStatusCode ExecuteCopy(PGconn* conn, int64_t* rows_affected,
struct AdbcError* error) {
if (rows_affected) *rows_affected = 0;
PGresult* result = nullptr;

while (true) {
Handle<struct ArrowArray> array;
int res = bind->get_next(&bind.value, &array.value);
if (res != 0) {
SetError(error,
"[libpq] Failed to read next batch from stream of bind parameters: "
"(%d) %s %s",
res, std::strerror(res), bind->get_last_error(&bind.value));
return ADBC_STATUS_IO;
}
if (!array->release) break;

Handle<struct ArrowArrayView> array_view;
CHECK_NA(
INTERNAL,
ArrowArrayViewInitFromSchema(&array_view.value, &bind_schema.value, nullptr),
error);
CHECK_NA(INTERNAL, ArrowArrayViewSetArray(&array_view.value, &array.value, nullptr),
error);

PostgresCopyStreamWriter writer;
CHECK_NA(INTERNAL, writer.Init(&bind_schema.value, &array.value), error);
CHECK_NA(INTERNAL, writer.InitFieldWriters(nullptr), error);

// build writer buffer
CHECK_NA(INTERNAL, writer.WriteHeader(nullptr), error);
int write_result;
do {
write_result = writer.WriteRecord(nullptr);
} while (write_result == NANOARROW_OK);

// check if not ENODATA at exit
if (write_result != ENODATA) {
SetError(error, "Error occurred writing COPY data: %s", PQerrorMessage(conn));
return ADBC_STATUS_IO;
}

ArrowBuffer buffer = writer.WriteBuffer();
if (PQputCopyData(conn, reinterpret_cast<char*>(buffer.data),
buffer.size_bytes) <= 0) {
SetError(error, "Error writing tuple field data: %s", PQerrorMessage(conn));
return ADBC_STATUS_IO;
}

if (PQputCopyEnd(conn, NULL) <= 0) {
SetError(error, "Error message returned by PQputCopyEnd: %s",
PQerrorMessage(conn));
return ADBC_STATUS_IO;
}

result = PQgetResult(conn);
ExecStatusType pg_status = PQresultStatus(result);
if (pg_status != PGRES_COMMAND_OK) {
AdbcStatusCode code =
SetError(error, result, "[libpq] Failed to execute COPY statement: %s %s",
PQresStatus(pg_status), PQerrorMessage(conn));
PQclear(result);
return code;
}

PQclear(result);
if (rows_affected) *rows_affected += array->length;
}
return ADBC_STATUS_OK;
}
};
} // namespace

Expand Down Expand Up @@ -1140,19 +1211,18 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
error));
RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));

std::string insert = "INSERT INTO ";
insert += escaped_table;
insert += " VALUES (";
for (size_t i = 0; i < bind_stream.bind_schema_fields.size(); i++) {
if (i > 0) insert += ", ";
insert += "$";
insert += std::to_string(i + 1);
std::string query = "COPY " + escaped_table + " FROM STDIN WITH (FORMAT binary)";
PGresult* result = PQexec(connection_->conn(), query.c_str());
if (PQresultStatus(result) != PGRES_COPY_IN) {
AdbcStatusCode code =
SetError(error, result, "[libpq] COPY query failed: %s\nQuery was:%s",
PQerrorMessage(connection_->conn()), query.c_str());
PQclear(result);
return code;
}
insert += ")";

RAISE_ADBC(
bind_stream.Prepare(connection_->conn(), insert, error, connection_->autocommit()));
RAISE_ADBC(bind_stream.Execute(connection_->conn(), rows_affected, error));
PQclear(result);
RAISE_ADBC(bind_stream.ExecuteCopy(connection_->conn(), rows_affected, error));
return ADBC_STATUS_OK;
}

Expand Down
3 changes: 2 additions & 1 deletion c/validation/adbc_validation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,8 @@ void StatementTest::TestSqlIngestErrors() {
{"coltwo", NANOARROW_TYPE_INT64}}),
IsOkErrno());
ASSERT_THAT(
(MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, {}, {})),
(MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like postgres COPY only raises if there is data

postgres=# COPY (SELECT int64s, int64s FROM bulk_ingest WHERE 1=0) TO '/tmp/pgout.data' WITH (FORMAT BINARY);
postgres=# COPY bulk_ingest FROM '/tmp/pgout.data' WITH (FORMAT BINARY);
COPY 0
postgres=# COPY (SELECT int64s, int64s FROM bulk_ingest) TO '/tmp/pgout.data' WITH (FORMAT BINARY);
postgres=# COPY bulk_ingest FROM '/tmp/pgout.data' WITH (FORMAT BINARY);
ERROR:  row field count is 2, expected 1
CONTEXT:  COPY bulk_ingest, line 1

Given the format itself sends the number of columns expected row-by-row; in the empty result case no such communication occurs

{-42}, {-42})),
IsOkErrno(&na_error));

ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
Expand Down
Loading