Skip to content

Commit

Permalink
feat(c/driver/postgresql): implement statistics API
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Aug 3, 2023
1 parent 6f02ab1 commit 5fad88c
Show file tree
Hide file tree
Showing 24 changed files with 741 additions and 18 deletions.
4 changes: 2 additions & 2 deletions adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1742,14 +1742,14 @@ AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
/// | Field Name | Field Type |
/// |--------------------------|----------------------------------|
/// | catalog_name | utf8 |
/// | catalog_db_schemas | list<DB_SCHEMA_SCHEMA> |
/// | catalog_db_schemas | list<DB_SCHEMA_SCHEMA> not null |
///
/// DB_SCHEMA_SCHEMA is a Struct with fields:
///
/// | Field Name | Field Type |
/// |--------------------------|----------------------------------|
/// | db_schema_name | utf8 |
/// | db_schema_statistics | list<STATISTICS_SCHEMA> |
/// | db_schema_statistics | list<STATISTICS_SCHEMA> not null |
///
/// STATISTICS_SCHEMA is a Struct with fields:
///
Expand Down
363 changes: 363 additions & 0 deletions c/driver/postgresql/connection.cc

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class PostgresConnection {
struct AdbcError* error);
AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error);
AdbcStatusCode GetStatistics(const char* catalog, const char* db_schema,
const char* table_name, bool approximate,
struct ArrowArrayStream* out, struct AdbcError* error);
AdbcStatusCode GetStatisticNames(struct ArrowArrayStream* out, struct AdbcError* error);
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema,
struct AdbcError* error);
Expand Down
38 changes: 38 additions & 0 deletions c/driver/postgresql/postgresql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,27 @@ AdbcStatusCode PostgresConnectionGetOptionInt(struct AdbcConnection* connection,
return (*ptr)->GetOptionInt(key, value, error);
}

AdbcStatusCode PostgresConnectionGetStatistics(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name, char approximate,
struct ArrowArrayStream* out,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetStatistics(catalog, db_schema, table_name, approximate == 1, out,
error);
}

AdbcStatusCode PostgresConnectionGetStatisticNames(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetStatisticNames(out, error);
}

AdbcStatusCode PostgresConnectionGetTableSchema(
struct AdbcConnection* connection, const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema, struct AdbcError* error) {
Expand Down Expand Up @@ -443,6 +464,21 @@ AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
return PostgresConnectionGetOptionDouble(connection, key, value, error);
}

AdbcStatusCode AdbcConnectionGetStatistics(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name, char approximate,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return PostgresConnectionGetStatistics(connection, catalog, db_schema, table_name,
approximate, out, error);
}

AdbcStatusCode AdbcConnectionGetStatisticNames(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return PostgresConnectionGetStatisticNames(connection, out, error);
}

AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
Expand Down Expand Up @@ -826,6 +862,8 @@ AdbcStatusCode PostgresqlDriverInit(int version, void* raw_driver,
driver->ConnectionGetOptionBytes = PostgresConnectionGetOptionBytes;
driver->ConnectionGetOptionDouble = PostgresConnectionGetOptionDouble;
driver->ConnectionGetOptionInt = PostgresConnectionGetOptionInt;
driver->ConnectionGetStatistics = PostgresConnectionGetStatistics;
driver->ConnectionGetStatisticNames = PostgresConnectionGetStatisticNames;
driver->ConnectionSetOptionBytes = PostgresConnectionSetOptionBytes;
driver->ConnectionSetOptionDouble = PostgresConnectionSetOptionDouble;
driver->ConnectionSetOptionInt = PostgresConnectionSetOptionInt;
Expand Down
165 changes: 165 additions & 0 deletions c/driver/postgresql/postgresql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <cmath>
#include <cstdlib>
#include <cstring>
#include <limits>
Expand Down Expand Up @@ -127,6 +128,7 @@ class PostgresQuirks : public adbc_validation::DriverQuirks {
}
bool supports_metadata_current_catalog() const override { return true; }
bool supports_metadata_current_db_schema() const override { return true; }
bool supports_statistics() const override { return true; }
};

class PostgresDatabaseTest : public ::testing::Test,
Expand Down Expand Up @@ -637,6 +639,169 @@ TEST_F(PostgresConnectionTest, MetadataSetCurrentDbSchema) {
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}

TEST_F(PostgresConnectionTest, MetadataGetStatistics) {
if (!quirks()->supports_statistics()) {
GTEST_SKIP();
}

ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));

// Create sample table
{
adbc_validation::Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));

ASSERT_THAT(AdbcStatementSetSqlQuery(&statement.value,
"DROP TABLE IF EXISTS statstable", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));

ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement.value,
"CREATE TABLE statstable (ints INT, strs TEXT)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));

ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement.value,
"INSERT INTO statstable VALUES (1, 'a'), (NULL, 'bcd'), (-5, NULL)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));

ASSERT_THAT(AdbcStatementSetSqlQuery(&statement.value, "ANALYZE statstable", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));

ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}

adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcConnectionGetStatistics(&connection, nullptr, quirks()->db_schema().c_str(),
"statstable", 1, &reader.stream.value, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());

ASSERT_NO_FATAL_FAILURE(adbc_validation::CompareSchema(
&reader.schema.value, {
{"catalog_name", NANOARROW_TYPE_STRING, true},
{"catalog_db_schemas", NANOARROW_TYPE_LIST, false},
}));

ASSERT_NO_FATAL_FAILURE(adbc_validation::CompareSchema(
reader.schema->children[1]->children[0],
{
{"db_schema_name", NANOARROW_TYPE_STRING, true},
{"db_schema_statistics", NANOARROW_TYPE_LIST, false},
}));

ASSERT_NO_FATAL_FAILURE(adbc_validation::CompareSchema(
reader.schema->children[1]->children[0]->children[1]->children[0],
{
{"table_name", NANOARROW_TYPE_STRING, false},
{"column_name", NANOARROW_TYPE_STRING, true},
{"statistic_key", NANOARROW_TYPE_INT16, false},
{"statistic_value", NANOARROW_TYPE_DENSE_UNION, false},
{"statistic_is_approximate", NANOARROW_TYPE_BOOL, false},
}));

ASSERT_NO_FATAL_FAILURE(adbc_validation::CompareSchema(
reader.schema->children[1]->children[0]->children[1]->children[0]->children[3],
{
{"int64", NANOARROW_TYPE_INT64, true},
{"uint64", NANOARROW_TYPE_UINT64, true},
{"float64", NANOARROW_TYPE_DOUBLE, true},
{"binary", NANOARROW_TYPE_BINARY, true},
}));

std::vector<std::tuple<std::optional<std::string>, int16_t, int64_t>> seen;
while (true) {
ASSERT_NO_FATAL_FAILURE(reader.Next());
if (!reader.array->release) break;

for (int64_t catalog_index = 0; catalog_index < reader.array->length;
catalog_index++) {
struct ArrowStringView catalog_name =
ArrowArrayViewGetStringUnsafe(reader.array_view->children[0], catalog_index);
ASSERT_EQ(quirks()->catalog(),
std::string_view(catalog_name.data,
static_cast<int64_t>(catalog_name.size_bytes)));

struct ArrowArrayView* catalog_db_schemas = reader.array_view->children[1];
struct ArrowArrayView* schema_stats = catalog_db_schemas->children[0]->children[1];
struct ArrowArrayView* stats =
catalog_db_schemas->children[0]->children[1]->children[0];
for (int64_t schema_index =
ArrowArrayViewListChildOffset(catalog_db_schemas, catalog_index);
schema_index <
ArrowArrayViewListChildOffset(catalog_db_schemas, catalog_index + 1);
schema_index++) {
struct ArrowStringView schema_name = ArrowArrayViewGetStringUnsafe(
catalog_db_schemas->children[0]->children[0], schema_index);
ASSERT_EQ(quirks()->db_schema(),
std::string_view(schema_name.data,
static_cast<int64_t>(schema_name.size_bytes)));

for (int64_t stat_index =
ArrowArrayViewListChildOffset(schema_stats, schema_index);
stat_index < ArrowArrayViewListChildOffset(schema_stats, schema_index + 1);
stat_index++) {
struct ArrowStringView table_name =
ArrowArrayViewGetStringUnsafe(stats->children[0], stat_index);
ASSERT_EQ("statstable",
std::string_view(table_name.data,
static_cast<int64_t>(table_name.size_bytes)));
std::optional<std::string> column_name;
if (!ArrowArrayViewIsNull(stats->children[1], stat_index)) {
struct ArrowStringView value =
ArrowArrayViewGetStringUnsafe(stats->children[1], stat_index);
column_name = std::string(value.data, value.size_bytes);
}
ASSERT_TRUE(ArrowArrayViewGetIntUnsafe(stats->children[4], stat_index));

const int16_t stat_key = static_cast<int16_t>(
ArrowArrayViewGetIntUnsafe(stats->children[2], stat_index));
const int32_t offset =
stats->children[3]->buffer_views[1].data.as_int32[stat_index];
int64_t stat_value;
switch (stat_key) {
case ADBC_STATISTIC_AVERAGE_BYTE_WIDTH_KEY:
case ADBC_STATISTIC_DISTINCT_COUNT_KEY:
case ADBC_STATISTIC_NULL_COUNT_KEY:
case ADBC_STATISTIC_ROW_COUNT_KEY:
stat_value = static_cast<int64_t>(
std::round(100 * ArrowArrayViewGetDoubleUnsafe(
stats->children[3]->children[2], offset)));
break;
default:
continue;
}
seen.emplace_back(std::move(column_name), stat_key, stat_value);
}
}
}
}

ASSERT_THAT(seen,
::testing::UnorderedElementsAreArray(
std::vector<std::tuple<std::optional<std::string>, int16_t, int64_t>>{
{"ints", ADBC_STATISTIC_AVERAGE_BYTE_WIDTH_KEY, 400},
{"strs", ADBC_STATISTIC_AVERAGE_BYTE_WIDTH_KEY, 300},
{"ints", ADBC_STATISTIC_NULL_COUNT_KEY, 100},
{"strs", ADBC_STATISTIC_NULL_COUNT_KEY, 100},
{"ints", ADBC_STATISTIC_DISTINCT_COUNT_KEY, 200},
{"strs", ADBC_STATISTIC_DISTINCT_COUNT_KEY, 200},
{std::nullopt, ADBC_STATISTIC_ROW_COUNT_KEY, 300},
}));
}

ADBCV_TEST_CONNECTION(PostgresConnectionTest)

class PostgresStatementTest : public ::testing::Test,
Expand Down
2 changes: 1 addition & 1 deletion c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

#include <adbc.h>
#include <libpq-fe.h>
#include <postgresql/error.h>
#include <nanoarrow/nanoarrow.hpp>

#include "common/utils.h"
#include "connection.h"
#include "error.h"
#include "postgres_copy_reader.h"
#include "postgres_type.h"
#include "postgres_util.h"
Expand Down
14 changes: 14 additions & 0 deletions c/driver/sqlite/sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,20 @@ AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int d
table_type, column_name, out, error);
}

AdbcStatusCode AdbcConnectionGetStatistics(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name, char approximate,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode AdbcConnectionGetStatisticNames(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
Expand Down
36 changes: 36 additions & 0 deletions c/driver_manager/adbc_driver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ AdbcStatusCode ConnectionGetOptionDouble(struct AdbcConnection* connection,
return ADBC_STATUS_NOT_FOUND;
}

AdbcStatusCode ConnectionGetStatistics(struct AdbcConnection*, const char*, const char*,
const char*, char, struct ArrowArrayStream*,
struct AdbcError*) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode ConnectionGetStatisticNames(struct AdbcConnection*,
struct ArrowArrayStream*, struct AdbcError*) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, const char*,
const char*, struct ArrowSchema*,
struct AdbcError* error) {
Expand Down Expand Up @@ -802,6 +813,29 @@ AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
error);
}

AdbcStatusCode AdbcConnectionGetStatistics(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name, char approximate,
struct ArrowArrayStream* out,
struct AdbcError* error) {
if (!connection->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
INIT_ERROR(error, connection);
return connection->private_driver->ConnectionGetStatistics(
connection, catalog, db_schema, table_name, approximate == 1, out, error);
}

AdbcStatusCode AdbcConnectionGetStatisticNames(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error) {
if (!connection->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
INIT_ERROR(error, connection);
return connection->private_driver->ConnectionGetStatisticNames(connection, out, error);
}

AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
Expand Down Expand Up @@ -1464,6 +1498,8 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
FILL_DEFAULT(driver, ConnectionGetOptionBytes);
FILL_DEFAULT(driver, ConnectionGetOptionDouble);
FILL_DEFAULT(driver, ConnectionGetOptionInt);
FILL_DEFAULT(driver, ConnectionGetStatistics);
FILL_DEFAULT(driver, ConnectionGetStatisticNames);
FILL_DEFAULT(driver, ConnectionSetOptionBytes);
FILL_DEFAULT(driver, ConnectionSetOptionDouble);
FILL_DEFAULT(driver, ConnectionSetOptionInt);
Expand Down
25 changes: 25 additions & 0 deletions c/validation/adbc_validation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,31 @@ void ConnectionTest::TestMetadataGetObjectsCancel() {
}
}

void ConnectionTest::TestMetadataGetStatisticNames() {
if (!quirks()->supports_statistics()) {
GTEST_SKIP();
}

ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));

StreamReader reader;
ASSERT_THAT(AdbcConnectionGetStatisticNames(&connection, &reader.stream.value, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());

ASSERT_NO_FATAL_FAILURE(CompareSchema(
&reader.schema.value, {
{"statistic_name", NANOARROW_TYPE_STRING, NOT_NULL},
{"statistic_key", NANOARROW_TYPE_INT16, NOT_NULL},
}));

while (true) {
ASSERT_NO_FATAL_FAILURE(reader.Next());
if (!reader.array->release) break;
}
}

//------------------------------------------------------------
// Tests of AdbcStatement

Expand Down
Loading

0 comments on commit 5fad88c

Please sign in to comment.