-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(c/driver/postgresql): Use COPY for writes #1093
Conversation
c/driver/postgresql/statement.cc
Outdated
PGresult* result = PQprepare(conn, /*stmtName=*/"", query.c_str(), | ||
/*nParams=*/bind_schema->n_children, param_types.data()); | ||
if (PQresultStatus(result) != PGRES_COMMAND_OK) { | ||
const char* temp = "COPY bulk_ingest FROM STDIN WITH (FORMAT binary);"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I placed this in Prepare
because it represented a minor diff, but I'm not sure the Prepare/Execute pattern is the same concept as a COPY. Maybe COPY should have a dedicated ADBC method?
c/driver/postgresql/statement.cc
Outdated
@@ -353,10 +360,30 @@ struct BindStream { | |||
CHECK_NA(INTERNAL, ArrowArrayViewSetArray(&array_view.value, &array.value, nullptr), | |||
error); | |||
|
|||
struct ArrowBuffer buf; | |||
ArrowBufferInit(&buf); | |||
ArrowBufferAppend(&buf, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably reserve the number of bytes needed up front
c/driver/postgresql/statement.cc
Outdated
|
||
if (PQputCopyData(conn, reinterpret_cast<char*>(buf.data), buf.size_bytes) <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if the reinterpret_cast here is appropriate, or if there was a way to construct an ArrowBufferView from the ArrowBuffer and use the union approach
I think the remaining test failures are just a matter of the COPY semantics not aligning well with expectations around the Prepare/Execute method currently in use. Do we need to add an option for |
Sorry, what exactly is the problem? Presumably we'd only activate COPY for bulk ingest? |
That helps clarify - I wasn't sure if we wanted to totally replace the prepared statement with COPY but sounds like no. I'll have to dig through the code a little bit more to see how we should be dispatching to COPY. I don't think any driver does that right now |
Yeah, so we'd have to implement a separate path that only bulk ingestion uses. For prepared statements, my understanding is that you can't use them with COPY (at least if you want bind parameters), so this only works for ingest, unfortunately. |
When I'm looking at |
So |
Ah OK great - thanks for all that info. Very helpful and cleared up some misconceptions I had about how this should work. So for now set up a dedicated |
This is a pre-cursor to #1093 ; figured it would be easier to work piece-wise rather than all at once. This does not try to actually connect the statement.cc code to use this, but just gets the test case / general structure set up
@@ -2112,7 +2112,8 @@ void StatementTest::TestSqlIngestErrors() { | |||
{"coltwo", NANOARROW_TYPE_INT64}}), | |||
IsOkErrno()); | |||
ASSERT_THAT( | |||
(MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, {}, {})), | |||
(MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like postgres COPY only raises if there is data
postgres=# COPY (SELECT int64s, int64s FROM bulk_ingest WHERE 1=0) TO '/tmp/pgout.data' WITH (FORMAT BINARY);
postgres=# COPY bulk_ingest FROM '/tmp/pgout.data' WITH (FORMAT BINARY);
COPY 0
postgres=# COPY (SELECT int64s, int64s FROM bulk_ingest) TO '/tmp/pgout.data' WITH (FORMAT BINARY);
postgres=# COPY bulk_ingest FROM '/tmp/pgout.data' WITH (FORMAT BINARY);
ERROR: row field count is 2, expected 1
CONTEXT: COPY bulk_ingest, line 1
Given the format itself sends the number of columns expected row-by-row; in the empty result case no such communication occurs
OK after implemented the CopyWriter separately this should work; need to figure out the CI build failure which I'm not getting locally, but think will be green after that. From basic benchmarking in #1189 I don't know that this is any faster than what we have now, but this is also a pretty inefficient implementation. I assume we can speed things up a lot by pre-allocating the write buffer or reserving space rather than leaving it to individual BufferAppend calls to do that |
Must have been misreading the benchmarks before. They do show this as a pretty sizable perf boost. Here is main:
versus this PR:
|
@@ -24,47 +24,56 @@ | |||
|
|||
static void BM_PostgresqlExecute(benchmark::State& state) { | |||
const char* uri = std::getenv("ADBC_POSTGRESQL_TEST_URI"); | |||
if (!uri) { | |||
if (!uri || !strcmp(uri, "")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are orthogonal but figured OK to lump in here given this is the first PR using these benchmarks. Also happy to split off if you prefer
I noticed LSAN is not happy if we actually return now, for example if you provide an invalid test_uri. Am I missing something extra I should be releasing before these returns?
==96786==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 1024 byte(s) in 1 object(s) allocated from:
#0 0x7f0f1a0defef in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:69
#1 0x7f0f19b28aaa in SetErrorVariadic /home/willayd/clones/arrow-adbc/c/driver/common/utils.c:141
#2 0x7f0f19b283ac in SetError /home/willayd/clones/arrow-adbc/c/driver/common/utils.c:110
#3 0x7f0f19a7ffcc in adbcpq::PostgresDatabase::Connect(pg_conn**, AdbcError*) /home/willayd/clones/arrow-adbc/c/driver/postgresql/database.cc:107
#4 0x7f0f19a803d6 in adbcpq::PostgresDatabase::RebuildTypeResolver(AdbcError*) /home/willayd/clones/arrow-adbc/c/driver/postgresql/database.cc:135
#5 0x7f0f19a7fb23 in adbcpq::PostgresDatabase::Init(AdbcError*) /home/willayd/clones/arrow-adbc/c/driver/postgresql/database.cc:58
#6 0x7f0f19ab59b1 in PostgresDatabaseInit /home/willayd/clones/arrow-adbc/c/driver/postgresql/postgresql.cc:82
#7 0x7f0f19ab6766 in AdbcDatabaseInit /home/willayd/clones/arrow-adbc/c/driver/postgresql/postgresql.cc:197
#8 0x560878af3271 in BM_PostgresqlExecute /home/willayd/clones/arrow-adbc/c/driver/postgresql/postgresql_benchmark.cc:44
#9 0x560878b898bf in benchmark::internal::BenchmarkInstance::Run(long, int, benchmark::internal::ThreadTimer*, benchmark::internal::ThreadManager*, benchmark::internal::PerfCountersMeasurement*) const (/home/willayd/clones/arrow-adbc/build/driver/postgresql/postgresql-benchmark+0x75a8bf) (BuildId: 9c2b1c1be992ad8c4106282527e34c09b72aeb27)
#10 0x560878b673e3 in benchmark::internal::(anonymous namespace)::RunInThread(benchmark::internal::BenchmarkInstance const*, long, int, benchmark::internal::ThreadManager*, benchmark::internal::PerfCountersMeasurement*) (/home/willayd/clones/arrow-adbc/build/driver/postgresql/postgresql-benchmark+0x7383e3) (BuildId: 9c2b1c1be992ad8c4106282527e34c09b72aeb27)
SUMMARY: AddressSanitizer: 1024 byte(s) leaked in 1 allocation(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because the error needs to be released.
If we're not going to use the error, we could just not have it in the first place (it should always be OK to pass nullptr
to an error argument)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. Probably makes sense to just use the error message instead. Done that simplisticly for now - can make into a macro in a follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Dewey's original suggestion of factoring it out into a single function would also work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is performance like before/after?
Oh, you posted above |
That is very impressive (I assume postgres was running locally?) So ~4.5 seconds to ~5.5 milliseconds? It might be good to figure out how to do this test with multiple iterations (I think you can tell Googletest inside the benchmark loop "this is setup, don't measure it") |
The current limitation with multiple iterations is that the call to |
Thanks for working through this! |
closes #1037