Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Implement consuming a PGresult via the copy reader #2029

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
23620b4
rename method
paleolimbot Jul 22, 2024
b3a6213
sketch result helper
paleolimbot Jul 22, 2024
1c06bf9
maybe implement for prepared queries with no params
paleolimbot Jul 22, 2024
112fa8f
works!
paleolimbot Jul 22, 2024
6df519e
include what you use
paleolimbot Jul 22, 2024
74b3c3f
more formatting
paleolimbot Jul 22, 2024
4696e22
tidy cases
paleolimbot Jul 23, 2024
246e980
shuffle responsibility for preparation
paleolimbot Jul 23, 2024
87ca490
maybe use helper for describe prepared
paleolimbot Jul 23, 2024
3f595b5
maybe better separation
paleolimbot Jul 23, 2024
bce32c9
maybe use for executeschema
paleolimbot Jul 23, 2024
2fa92c3
possibly unify parameter binding for text/binary
paleolimbot Jul 23, 2024
161fd59
handle explicit OIDs in prepare
paleolimbot Jul 23, 2024
c9559de
maybe some more ways to execute
paleolimbot Jul 23, 2024
54cd7ac
maybe implement statement
paleolimbot Jul 23, 2024
8181dd3
maybe better to array stream
paleolimbot Jul 23, 2024
2ce59a7
not quite there yet
paleolimbot Jul 23, 2024
5299f44
most things working
paleolimbot Jul 24, 2024
6cb43e5
tests passing
paleolimbot Jul 24, 2024
838863c
lint
paleolimbot Jul 24, 2024
331a7bd
fix schema names
paleolimbot Jul 24, 2024
03c6167
fix location of starappending
paleolimbot Jul 24, 2024
4d1cbdf
work on Python tests
paleolimbot Jul 24, 2024
4d3befb
fix python tests back (they always call prepare)
paleolimbot Jul 24, 2024
f69b57e
format
paleolimbot Jul 25, 2024
b3e0cd3
fix copy option
paleolimbot Jul 25, 2024
06f9453
use the execute helper
paleolimbot Jul 25, 2024
28c4399
add test
paleolimbot Jul 25, 2024
390fb75
maybe fix a test
paleolimbot Jul 25, 2024
773a395
fix executeschema
paleolimbot Jul 26, 2024
c310abb
remove unused functions
paleolimbot Jul 26, 2024
ce0e972
add test for multiple statements
paleolimbot Jul 26, 2024
9c718e6
test copy path
paleolimbot Jul 26, 2024
45607b3
format
paleolimbot Jul 26, 2024
353100f
revert rust change
paleolimbot Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 20 additions & 35 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,10 @@ class PqGetObjectsHelper {
params.push_back(db_schema_);
}

auto result_helper =
PqResultHelper{conn_, std::string(query.buffer), params, error_};
auto result_helper = PqResultHelper{conn_, std::string(query.buffer)};
StringBuilderReset(&query);

RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
RAISE_ADBC(result_helper.Execute(error_, params));

for (PqResultRow row : result_helper) {
const char* schema_name = row[0].data;
Expand Down Expand Up @@ -188,12 +186,10 @@ class PqGetObjectsHelper {
params.push_back(catalog_);
}

PqResultHelper result_helper =
PqResultHelper{conn_, std::string(query.buffer), params, error_};
PqResultHelper result_helper = PqResultHelper{conn_, std::string(query.buffer)};
StringBuilderReset(&query);

RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
RAISE_ADBC(result_helper.Execute(error_, params));

for (PqResultRow row : result_helper) {
const char* db_name = row[0].data;
Expand Down Expand Up @@ -280,11 +276,10 @@ class PqGetObjectsHelper {
}
}

auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);

RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
RAISE_ADBC(result_helper.Execute(error_, params));
for (PqResultRow row : result_helper) {
const char* table_name = row[0].data;
const char* table_type = row[1].data;
Expand Down Expand Up @@ -341,11 +336,10 @@ class PqGetObjectsHelper {
params.push_back(std::string(column_name_));
}

auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);

RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
RAISE_ADBC(result_helper.Execute(error_, params));

for (PqResultRow row : result_helper) {
const char* column_name = row[0].data;
Expand Down Expand Up @@ -493,11 +487,10 @@ class PqGetObjectsHelper {
params.push_back(std::string(column_name_));
}

auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
auto result_helper = PqResultHelper{conn_, query.buffer};
StringBuilderReset(&query);

RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
RAISE_ADBC(result_helper.Execute(error_, params));

for (PqResultRow row : result_helper) {
const char* constraint_name = row[0].data;
Expand Down Expand Up @@ -655,9 +648,8 @@ AdbcStatusCode PostgresConnection::PostgresConnectionGetInfoImpl(
break;
case ADBC_INFO_VENDOR_VERSION: {
const char* stmt = "SHOW server_version_num";
auto result_helper = PqResultHelper{conn_, std::string(stmt), error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
auto result_helper = PqResultHelper{conn_, std::string(stmt)};
RAISE_ADBC(result_helper.Execute(error));
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for '%s'", stmt);
Expand Down Expand Up @@ -760,9 +752,8 @@ AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value,
if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
output = PQdb(conn_);
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA"};
RAISE_ADBC(result_helper.Execute(error));
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'");
Expand Down Expand Up @@ -931,10 +922,8 @@ AdbcStatusCode PostgresConnectionGetStatisticsImpl(PGconn* conn, const char* db_
std::string prev_table;

{
PqResultHelper result_helper{
conn, query, {db_schema, table_name ? table_name : "%"}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
PqResultHelper result_helper{conn, query};
RAISE_ADBC(result_helper.Execute(error, {db_schema, table_name ? table_name : "%"}));

for (PqResultRow row : result_helper) {
auto reltuples = row[5].ParseDouble();
Expand Down Expand Up @@ -1166,11 +1155,9 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,

std::vector<std::string> params = {table_name_str};

PqResultHelper result_helper =
PqResultHelper{conn_, std::string(query.c_str()), params, error};
PqResultHelper result_helper = PqResultHelper{conn_, std::string(query.c_str())};

RAISE_ADBC(result_helper.Prepare());
auto result = result_helper.Execute();
auto result = result_helper.Execute(error, params);
if (result != ADBC_STATUS_OK) {
auto error_code = std::string(error->sqlstate, 5);
if ((error_code == "42P01") || (error_code == "42602")) {
Expand Down Expand Up @@ -1337,10 +1324,8 @@ AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value,
return ADBC_STATUS_OK;
} else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
// PostgreSQL doesn't accept a parameter here
PqResultHelper result_helper{
conn_, std::string("SET search_path TO ") + value, {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
PqResultHelper result_helper{conn_, std::string("SET search_path TO ") + value};
RAISE_ADBC(result_helper.Execute(error));
return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
Expand Down
130 changes: 119 additions & 11 deletions c/driver/postgresql/postgresql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) {
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_EQ(reader.rows_affected, -1);
ASSERT_EQ(reader.rows_affected, 2);
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(reader.array->release, nullptr);
Expand Down Expand Up @@ -1276,6 +1276,32 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) {
}
}

TEST_F(PostgresStatementTest, ExecuteSchemaParameterizedQuery) {
nanoarrow::UniqueSchema schema_bind;
ArrowSchemaInit(schema_bind.get());
ASSERT_THAT(ArrowSchemaSetTypeStruct(schema_bind.get(), 1),
adbc_validation::IsOkErrno());
ASSERT_THAT(ArrowSchemaSetType(schema_bind->children[0], NANOARROW_TYPE_STRING),
adbc_validation::IsOkErrno());

nanoarrow::UniqueArrayStream bind;
nanoarrow::EmptyArrayStream(schema_bind.get()).ToArrayStream(bind.get());

ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT $1", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBindStream(&statement, bind.get(), &error), IsOkStatus());

nanoarrow::UniqueSchema schema;
ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
IsOkStatus(&error));

ASSERT_EQ(1, schema->n_children);
ASSERT_STREQ("u", schema->children[0]->format);

ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}

TEST_F(PostgresStatementTest, BatchSizeHint) {
ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "batch_size_hint_test", &error),
IsOkStatus(&error));
Expand Down Expand Up @@ -1345,16 +1371,13 @@ TEST_F(PostgresStatementTest, AdbcErrorBackwardsCompatibility) {
TEST_F(PostgresStatementTest, Cancel) {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));

for (const char* query : {
"DROP TABLE IF EXISTS test_cancel",
"CREATE TABLE test_cancel (ints INT)",
R"(INSERT INTO test_cancel (ints)
SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g))",
}) {
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
}
const char* query = R"(DROP TABLE IF EXISTS test_cancel;
CREATE TABLE test_cancel (ints INT);
INSERT INTO test_cancel (ints)
SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g);)";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));

ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM test_cancel", &error),
IsOkStatus(&error));
Expand All @@ -1381,6 +1404,91 @@ TEST_F(PostgresStatementTest, Cancel) {
ASSERT_NE(0, AdbcErrorGetDetailCount(detail));
}

TEST_F(PostgresStatementTest, MultipleStatementsSingleQuery) {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));

const char* query = R"(DROP TABLE IF EXISTS test_query_statements;
CREATE TABLE test_query_statements (ints INT);
INSERT INTO test_query_statements VALUES((1));
INSERT INTO test_query_statements VALUES((2));
INSERT INTO test_query_statements VALUES((3));)";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));

ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM test_query_statements", &error),
IsOkStatus(&error));

adbc_validation::StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
reader.GetSchema();
ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
ASSERT_EQ(reader.array->length, 3);
}

TEST_F(PostgresStatementTest, SetUseCopyFalse) {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));

const char* query = R"(DROP TABLE IF EXISTS test_query_set_copy_false;
CREATE TABLE test_query_set_copy_false (ints INT);
INSERT INTO test_query_set_copy_false VALUES((1));
INSERT INTO test_query_set_copy_false VALUES((NULL));
INSERT INTO test_query_set_copy_false VALUES((3));)";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));

// Check option setting/getting
ASSERT_EQ(
adbc_validation::StatementGetOption(&statement, "adbc.postgresql.use_copy", &error),
"true");

ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
"not true or false", &error),
IsStatus(ADBC_STATUS_INVALID_ARGUMENT));

ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_EQ(
adbc_validation::StatementGetOption(&statement, "adbc.postgresql.use_copy", &error),
"true");

ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_EQ(
adbc_validation::StatementGetOption(&statement, "adbc.postgresql.use_copy", &error),
"false");

ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
"SELECT * FROM test_query_set_copy_false", &error),
IsOkStatus(&error));

adbc_validation::StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));

ASSERT_EQ(reader.rows_affected, 3);

reader.GetSchema();
ASSERT_EQ(reader.schema->n_children, 1);
ASSERT_STREQ(reader.schema->children[0]->format, "i");
ASSERT_STREQ(reader.schema->children[0]->name, "ints");

ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
ASSERT_EQ(reader.array->length, 3);
ASSERT_EQ(reader.array->n_children, 1);
ASSERT_EQ(reader.array->children[0]->null_count, 1);

ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
ASSERT_EQ(reader.array->release, nullptr);
}

struct TypeTestCase {
std::string name;
std::string sql_type;
Expand Down
Loading
Loading