Skip to content

Commit

Permalink
feat(c/driver/postgresql): implement ADBC 1.1.0 features
Browse files Browse the repository at this point in the history
- ADBC_INFO_DRIVER_ADBC_VERSION
- StatementExecuteSchema (apache#318)
- ADBC_CONNECTION_OPTION_CURRENT_{CATALOG, DB_SCHEMA) (apache#319)
  • Loading branch information
lidavidm committed Jul 11, 2023
1 parent 60e7f31 commit a17176c
Show file tree
Hide file tree
Showing 13 changed files with 477 additions and 57 deletions.
13 changes: 13 additions & 0 deletions c/driver/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,19 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
uint32_t info_code, int64_t info_value,
struct AdbcError* error) {
CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error);
// Append to type variant
CHECK_NA(INTERNAL, ArrowArrayAppendInt(array->children[1]->children[2], info_value),
error);
// Append type code/offset
CHECK_NA(INTERNAL, ArrowArrayFinishUnionElement(array->children[1], /*type_id=*/2),
error);
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error) {
ArrowSchemaInit(schema);
Expand Down
3 changes: 3 additions & 0 deletions c/driver/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
uint32_t info_code,
const char* info_value,
struct AdbcError* error);
AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
uint32_t info_code, int64_t info_value,
struct AdbcError* error);

AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error);
Expand Down
58 changes: 56 additions & 2 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
namespace {

static const uint32_t kSupportedInfoCodes[] = {
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION,
ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION,
ADBC_INFO_DRIVER_ARROW_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION,
};

static const std::unordered_map<std::string, std::string> kPgTableTypes = {
Expand Down Expand Up @@ -771,6 +772,10 @@ AdbcStatusCode PostgresConnectionGetInfoImpl(const uint32_t* info_codes,
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
NANOARROW_VERSION, error));
break;
case ADBC_INFO_DRIVER_ADBC_VERSION:
RAISE_ADBC(AdbcConnectionGetInfoAppendInt(array, info_codes[i],
ADBC_VERSION_1_1_0, error));
break;
default:
// Ignore
continue;
Expand Down Expand Up @@ -831,6 +836,55 @@ AdbcStatusCode PostgresConnection::GetObjects(
return BatchToArrayStream(&array, &schema, out, error);
}

AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value,
size_t* length, struct AdbcError* error) {
std::string output;
if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
PqResultHelper result_helper{conn_, "SELECT CURRENT_CATALOG", {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_CATALOG'");
return ADBC_STATUS_INTERNAL;
}
output = (*it)[0].data;
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'");
return ADBC_STATUS_INTERNAL;
}
output = (*it)[0].data;
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
output = autocommit_ ? ADBC_OPTION_VALUE_ENABLED : ADBC_OPTION_VALUE_DISABLED;
} else {
return ADBC_STATUS_NOT_FOUND;
}

if (output.size() + 1 <= *length) {
std::memcpy(value, output.c_str(), output.size() + 1);
}
*length = output.size() + 1;
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresConnection::GetOptionBytes(const char* option, uint8_t* value,
size_t* length,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresConnection::GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresConnection::GetOptionDouble(const char* option, double* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}

AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
const char* db_schema,
const char* table_name,
Expand Down
12 changes: 12 additions & 0 deletions c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class PostgresConnection {
const char* table_name, const char** table_types,
const char* column_name, struct ArrowArrayStream* out,
struct AdbcError* error);
AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error);
AdbcStatusCode GetOptionDouble(const char* option, double* value,
struct AdbcError* error);
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema,
struct AdbcError* error);
Expand All @@ -49,6 +57,10 @@ class PostgresConnection {
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode Rollback(struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
struct AdbcError* error);
AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);
AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);

PGconn* conn() const { return conn_; }
const std::shared_ptr<PostgresTypeResolver>& type_resolver() const {
Expand Down
12 changes: 12 additions & 0 deletions c/driver/postgresql/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ class PostgresDatabase {

AdbcStatusCode Init(struct AdbcError* error);
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error);
AdbcStatusCode GetOptionDouble(const char* option, double* value,
struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
struct AdbcError* error);
AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);
AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);

// Internal implementation

Expand Down
8 changes: 6 additions & 2 deletions c/driver/postgresql/postgres_copy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,13 @@ static inline ArrowErrorCode MakeCopyFieldReader(const PostgresType& pg_type,

class PostgresCopyStreamReader {
public:
ArrowErrorCode Init(const PostgresType& pg_type) {
ArrowErrorCode Init(PostgresType pg_type) {
if (pg_type.type_id() != PostgresTypeId::kRecord) {
return EINVAL;
}

root_reader_.Init(pg_type);
pg_type_ = std::move(pg_type);
root_reader_.Init(pg_type_);
array_size_approx_bytes_ = 0;
return NANOARROW_OK;
}
Expand Down Expand Up @@ -802,7 +803,10 @@ class PostgresCopyStreamReader {
return NANOARROW_OK;
}

const PostgresType& pg_type() const { return pg_type_; }

private:
PostgresType pg_type_;
PostgresCopyFieldTupleReader root_reader_;
nanoarrow::UniqueSchema schema_;
nanoarrow::UniqueArray array_;
Expand Down
135 changes: 134 additions & 1 deletion c/driver/postgresql/postgresql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,42 @@ AdbcStatusCode PostgresConnectionGetObjects(
table_types, column_name, stream, error);
}

AdbcStatusCode PostgresConnectionGetOption(struct AdbcConnection* connection,
const char* key, char* value, size_t* length,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetOption(key, value, length, error);
}

AdbcStatusCode PostgresConnectionGetOptionBytes(struct AdbcConnection* connection,
const char* key, uint8_t* value,
size_t* length, struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetOptionBytes(key, value, length, error);
}

AdbcStatusCode PostgresConnectionGetOptionInt(struct AdbcConnection* connection,
const char* key, int64_t* value,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetOptionInt(key, value, error);
}

AdbcStatusCode PostgresConnectionGetOptionDouble(struct AdbcConnection* connection,
const char* key, double* value,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetOptionDouble(key, value, error);
}

AdbcStatusCode PostgresConnectionGetTableSchema(
struct AdbcConnection* connection, const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema, struct AdbcError* error) {
Expand Down Expand Up @@ -213,6 +249,33 @@ AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
return (*ptr)->SetOption(key, value, error);
}

AdbcStatusCode PostgresConnectionSetOptionBytes(struct AdbcConnection* connection,
const char* key, const uint8_t* value,
size_t length, struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->SetOptionBytes(key, value, length, error);
}

AdbcStatusCode PostgresConnectionSetOptionInt(struct AdbcConnection* connection,
const char* key, int64_t value,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->SetOptionInt(key, value, error);
}

AdbcStatusCode PostgresConnectionSetOptionDouble(struct AdbcConnection* connection,
const char* key, double value,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->SetOptionDouble(key, value, error);
}

} // namespace
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
Expand All @@ -237,6 +300,30 @@ AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int d
table_types, column_name, stream, error);
}

AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const char* key,
char* value, size_t* length,
struct AdbcError* error) {
return PostgresConnectionGetOption(connection, key, value, length, error);
}

AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection,
const char* key, uint8_t* value,
size_t* length, struct AdbcError* error) {
return PostgresConnectionGetOptionBytes(connection, key, value, length, error);
}

AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection,
const char* key, int64_t* value,
struct AdbcError* error) {
return PostgresConnectionGetOptionInt(connection, key, value, error);
}

AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
const char* key, double* value,
struct AdbcError* error) {
return PostgresConnectionGetOptionDouble(connection, key, value, error);
}

AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
Expand Down Expand Up @@ -287,6 +374,24 @@ AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const
return PostgresConnectionSetOption(connection, key, value, error);
}

AdbcStatusCode AdbcConnectionSetOptionBytes(struct AdbcConnection* connection,
const char* key, const uint8_t* value,
size_t length, struct AdbcError* error) {
return PostgresConnectionSetOptionBytes(connection, key, value, length, error);
}

AdbcStatusCode AdbcConnectionSetOptionInt(struct AdbcConnection* connection,
const char* key, int64_t value, size_t length,
struct AdbcError* error) {
return PostgresConnectionSetOptionInt(connection, key, value, error);
}

AdbcStatusCode AdbcConnectionSetOptionDouble(struct AdbcConnection* connection,
const char* key, double value, size_t length,
struct AdbcError* error) {
return PostgresConnectionSetOptionDouble(connection, key, value, error);
}

// ---------------------------------------------------------------------
// AdbcStatement

Expand Down Expand Up @@ -329,6 +434,15 @@ AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement,
return (*ptr)->ExecuteQuery(output, rows_affected, error);
}

AdbcStatusCode PostgresStatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->ExecuteSchema(schema, error);
}

AdbcStatusCode PostgresStatementGetPartitionDesc(struct AdbcStatement* statement,
uint8_t* partition_desc,
struct AdbcError* error) {
Expand Down Expand Up @@ -423,6 +537,11 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
return PostgresStatementExecuteQuery(statement, output, rows_affected, error);
}

AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
ArrowSchema* schema, struct AdbcError* error) {
return PostgresStatementExecuteSchema(statement, schema, error);
}

AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
uint8_t* partition_desc,
struct AdbcError* error) {
Expand Down Expand Up @@ -474,7 +593,20 @@ AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* e
if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT;

auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE);
if (version >= ADBC_VERSION_1_1_0) {
std::memset(driver, 0, ADBC_DRIVER_1_1_0_SIZE);
driver->StatementExecuteSchema = PostgresStatementExecuteSchema;
driver->ConnectionGetOption = PostgresConnectionGetOption;
driver->ConnectionGetOptionBytes = PostgresConnectionGetOptionBytes;
driver->ConnectionGetOptionInt = PostgresConnectionGetOptionInt;
driver->ConnectionGetOptionDouble = PostgresConnectionGetOptionDouble;
driver->ConnectionSetOptionBytes = PostgresConnectionSetOptionBytes;
driver->ConnectionSetOptionInt = PostgresConnectionSetOptionInt;
driver->ConnectionSetOptionDouble = PostgresConnectionSetOptionDouble;
} else {
std::memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE);
}

driver->DatabaseInit = PostgresDatabaseInit;
driver->DatabaseNew = PostgresDatabaseNew;
driver->DatabaseRelease = PostgresDatabaseRelease;
Expand Down Expand Up @@ -502,6 +634,7 @@ AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* e
driver->StatementRelease = PostgresStatementRelease;
driver->StatementSetOption = PostgresStatementSetOption;
driver->StatementSetSqlQuery = PostgresStatementSetSqlQuery;

return ADBC_STATUS_OK;
}
}
Loading

0 comments on commit a17176c

Please sign in to comment.