Skip to content

Commit

Permalink
feat(format,c,go,java): introduce ADBC API revision 1.1.0
Browse files Browse the repository at this point in the history
Fixes apache#317.

Co-authored-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
lidavidm and zeroshade committed May 22, 2023
1 parent c3202e6 commit c296e17
Show file tree
Hide file tree
Showing 16 changed files with 967 additions and 32 deletions.
535 changes: 534 additions & 1 deletion adbc.h

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion c/driver/postgresql/postgresql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,11 @@ extern "C" {
ADBC_EXPORT
AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT;

auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
std::memset(driver, 0, sizeof(*driver));
// XXX: explicit memset here seems to be miscompiled by x64 clang on
// macOS into an unconditional call to UBSAN abort
driver->DatabaseInit = PostgresDatabaseInit;
driver->DatabaseNew = PostgresDatabaseNew;
driver->DatabaseRelease = PostgresDatabaseRelease;
Expand Down Expand Up @@ -497,6 +499,7 @@ AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* e
driver->StatementRelease = PostgresStatementRelease;
driver->StatementSetOption = PostgresStatementSetOption;
driver->StatementSetSqlQuery = PostgresStatementSetSqlQuery;
driver->StatementSetSubstraitPlan = nullptr;
return ADBC_STATUS_OK;
}
}
2 changes: 1 addition & 1 deletion c/driver/sqlite/sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError*
}

struct AdbcDriver* driver = (struct AdbcDriver*)raw_driver;
memset(driver, 0, sizeof(*driver));
memset(driver, 0, ADBC_DRIVER_1_0_0_SIZE);
driver->DatabaseInit = SqliteDatabaseInit;
driver->DatabaseNew = SqliteDatabaseNew;
driver->DatabaseRelease = SqliteDatabaseRelease;
Expand Down
51 changes: 46 additions & 5 deletions c/driver_manager/adbc_driver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <adbc.h>

#include <algorithm>
#include <array>
#include <cstring>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -191,6 +192,12 @@ AdbcStatusCode StatementExecutePartitions(struct AdbcStatement* statement,
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode StatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode StatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -540,6 +547,15 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
error);
}

AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
if (!statement->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
return statement->private_driver->StatementExecuteSchema(statement, schema, error);
}

AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -640,11 +656,19 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
AdbcDriverInitFunc init_func;
std::string error_message;

if (version != ADBC_VERSION_1_0_0) {
SetError(error, "Only ADBC 1.0.0 is supported");
return ADBC_STATUS_NOT_IMPLEMENTED;
switch (version) {
case ADBC_VERSION_1_0_0:
case ADBC_VERSION_1_1_0:
break;
default:
SetError(error, "Only ADBC 1.0.0 and 1.1.0 are supported");
return ADBC_STATUS_NOT_IMPLEMENTED;
}

if (!raw_driver) {
SetError(error, "Must provide non-NULL raw_driver");
return ADBC_STATUS_INVALID_ARGUMENT;
}
auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);

if (!entrypoint) {
Expand Down Expand Up @@ -771,6 +795,11 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,

AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int version,
void* raw_driver, struct AdbcError* error) {
constexpr std::array<int, 2> kSupportedVersions = {
ADBC_VERSION_1_1_0,
ADBC_VERSION_1_0_0,
};

#define FILL_DEFAULT(DRIVER, STUB) \
if (!DRIVER->STUB) { \
DRIVER->STUB = &STUB; \
Expand All @@ -781,12 +810,17 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
return ADBC_STATUS_INTERNAL; \
}

auto result = init_func(version, raw_driver, error);
AdbcStatusCode result = ADBC_STATUS_NOT_IMPLEMENTED;
for (const int try_version : kSupportedVersions) {
if (try_version > version) continue;
result = init_func(try_version, raw_driver, error);
if (result != ADBC_STATUS_NOT_IMPLEMENTED) break;
}
if (result != ADBC_STATUS_OK) {
return result;
}

if (version == ADBC_VERSION_1_0_0) {
if (version >= ADBC_VERSION_1_0_0) {
auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
CHECK_REQUIRED(driver, DatabaseNew);
CHECK_REQUIRED(driver, DatabaseInit);
Expand Down Expand Up @@ -816,6 +850,13 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers
FILL_DEFAULT(driver, StatementSetSqlQuery);
FILL_DEFAULT(driver, StatementSetSubstraitPlan);
}
if (version >= ADBC_VERSION_1_1_0) {
auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
FILL_DEFAULT(driver, StatementExecuteSchema);

// Zero out the padding
std::memset(driver->reserved, 0, sizeof(driver->reserved));
}

return ADBC_STATUS_OK;

Expand Down
32 changes: 32 additions & 0 deletions c/driver_manager/adbc_driver_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,38 @@ TEST_F(DriverManager, MultiDriverTest) {
error->release(&error.value);
}

class AdbcVersion : public ::testing::Test {
public:
void SetUp() override {
std::memset(&driver, 0, sizeof(driver));
std::memset(&error, 0, sizeof(error));
}

void TearDown() override {
if (error.release) {
error.release(&error);
}

if (driver.release) {
ASSERT_THAT(driver.release(&driver, &error), IsOkStatus(&error));
ASSERT_EQ(driver.private_data, nullptr);
ASSERT_EQ(driver.private_manager, nullptr);
}
}

protected:
struct AdbcDriver driver = {};
struct AdbcError error = {};
};

// TODO: set up a dummy driver to test behavior more deterministically

TEST_F(AdbcVersion, ForwardsCompatible) {
ASSERT_THAT(
AdbcLoadDriver("adbc_driver_sqlite", nullptr, ADBC_VERSION_1_1_0, &driver, &error),
IsOkStatus(&error));
}

class SqliteQuirks : public adbc_validation::DriverQuirks {
public:
AdbcStatusCode SetupDatabase(struct AdbcDatabase* database,
Expand Down
93 changes: 81 additions & 12 deletions go/adbc/adbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,35 @@ const (
StatusUnauthorized // Unauthorized
)

const (
AdbcVersion1_0_0 int64 = 1_000_000
AdbcVersion1_1_0 int64 = 1_001_000
)

// Canonical option values
const (
OptionValueEnabled = "true"
OptionValueDisabled = "false"
OptionKeyAutoCommit = "adbc.connection.autocommit"
OptionKeyIngestTargetTable = "adbc.ingest.target_table"
OptionKeyIngestMode = "adbc.ingest.mode"
OptionKeyIsolationLevel = "adbc.connection.transaction.isolation_level"
OptionKeyReadOnly = "adbc.connection.readonly"
OptionValueIngestModeCreate = "adbc.ingest.mode.create"
OptionValueIngestModeAppend = "adbc.ingest.mode.append"
OptionKeyURI = "uri"
OptionKeyUsername = "username"
OptionKeyPassword = "password"
OptionValueEnabled = "true"
OptionValueDisabled = "false"
OptionKeyAutoCommit = "adbc.connection.autocommit"
// The current catalog.
OptionKeyCurrentCatalog = "adbc.connection.catalog"
// The current schema.
OptionKeyCurrentDbSchema = "adbc.connection.db_schema"
// Make ExecutePartitions nonblocking.
OptionKeyIncremental = "adbc.statement.exec.incremental"
// Get the progress
OptionKeyProgress = "adbc.statement.exec.progress"
OptionKeyIngestTargetTable = "adbc.ingest.target_table"
OptionKeyIngestMode = "adbc.ingest.mode"
OptionKeyIsolationLevel = "adbc.connection.transaction.isolation_level"
OptionKeyReadOnly = "adbc.connection.readonly"
OptionValueIngestModeCreate = "adbc.ingest.mode.create"
OptionValueIngestModeAppend = "adbc.ingest.mode.append"
OptionValueIngestModeReplace = "adbc.ingest.mode.replace"
OptionValueIngestModeCreateAppend = "adbc.ingest.mode.create_append"
OptionKeyURI = "uri"
OptionKeyUsername = "username"
OptionKeyPassword = "password"
)

type OptionIsolationLevel string
Expand All @@ -170,6 +185,11 @@ const (
LevelLinearizable OptionIsolationLevel = "adbc.connection.transaction.isolation.linearizable"
)

// Canonical property values
const (
PropertyProgress = "adbc.statement.exec.progress"
)

// Driver is the entry point for the interface. It is similar to
// database/sql.Driver taking a map of keys and values as options
// to initialize a Connection to the database. Any common connection
Expand Down Expand Up @@ -212,6 +232,8 @@ const (
InfoDriverVersion InfoCode = 101 // DriverVersion
// The driver Arrow library version (type: utf8)
InfoDriverArrowVersion InfoCode = 102 // DriverArrowVersion
// The driver ADBC API version (type: int64)
InfoDriverADBCVersion InfoCode = 103 // DriverADBCVersion
)

type ObjectDepth int
Expand Down Expand Up @@ -275,6 +297,10 @@ type Connection interface {
// codes are defined as constants. Codes [0, 10_000) are reserved
// for ADBC usage. Drivers/vendors will ignore requests for unrecognized
// codes (the row will be omitted from the result).
//
// Since ADBC 1.1.0: the range [500, 1_000) is reserved for "XDBC"
// information, which is the same metadata provided by the same info
// code range in the Arrow Flight SQL GetSqlInfo RPC.
GetInfo(ctx context.Context, infoCodes []InfoCode) (array.RecordReader, error)

// GetObjects gets a hierarchical view of all catalogs, database schemas,
Expand Down Expand Up @@ -470,6 +496,9 @@ type Statement interface {
// of rows affected if known, otherwise it will be -1.
//
// This invalidates any prior result sets on this statement.
//
// Since ADBC 1.1.0: releasing the returned RecordReader without
// consuming it fully is equivalent to calling AdbcStatementCancel.
ExecuteQuery(context.Context) (array.RecordReader, int64, error)

// ExecuteUpdate executes a statement that does not generate a result
Expand Down Expand Up @@ -534,5 +563,45 @@ type Statement interface {
//
// If the driver does not support partitioned results, this will return
// an error with a StatusNotImplemented code.
//
// When OptionKeyIncremental is set, this should be called
// repeatedly until receiving an empty Partitions.
ExecutePartitions(context.Context) (*arrow.Schema, Partitions, int64, error)
}

// StatementCancel is a Statement that also supports Cancel.
//
// Since ADBC API revision 1.1.0.
type StatementCancel interface {
// Cancel stops execution of an in-progress query.
//
// This can be called during ExecuteQuery (or similar), or while
// consuming a RecordReader returned from such. Calling this
// function should make the other functions return an error with a
// StatusCancelled code.
//
// This must always be thread-safe (other operations are not
// necessarily thread-safe).
Cancel() error
}

// StatementExecuteSchema is a Statement that also supports ExecuteSchema.
//
// Since ADBC API revision 1.1.0.
type StatementExecuteSchema interface {
// ExecuteSchema gets the schema of the result set of a query without executing it.
ExecuteSchema(context.Context) (*arrow.Schema, error)
}

// GetSetOptions is a PostInitOptions that also supports getting and setting property values of different types.
//
// Since ADBC API revision 1.1.0.
type GetSetOptions interface {
PostInitOptions

SetOption(key, value string) error
SetOptionInt(key, value int64) error
SetOptionDouble(key, value float64) error
GetOptionInt(key string) (int64, error)
GetOptionDouble(key string) (float64, error)
}
7 changes: 4 additions & 3 deletions go/adbc/infocode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/adbc/pkg/_tmpl/driver.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func {{.Prefix}}DriverInit(version C.int, rawDriver *C.void, err *C.struct_AdbcE
}

driver := (*C.struct_AdbcDriver)(unsafe.Pointer(rawDriver))
C.memset(unsafe.Pointer(driver), 0, C.sizeof_struct_AdbcDriver)
C.memset(unsafe.Pointer(driver), 0, C.ADBC_DRIVER_1_0_0_SIZE)
driver.DatabaseInit = (*[0]byte)(C.{{.Prefix}}DatabaseInit)
driver.DatabaseNew = (*[0]byte)(C.{{.Prefix}}DatabaseNew)
driver.DatabaseRelease = (*[0]byte)(C.{{.Prefix}}DatabaseRelease)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* <p>Connections are not required to be thread-safe, but they can be used from multiple threads so
* long as clients take care to serialize accesses to a connection.
*/
public interface AdbcConnection extends AutoCloseable {
public interface AdbcConnection extends AutoCloseable, AdbcOptions {
/** Commit the pending transaction. */
default void commit() throws AdbcException {
throw AdbcException.notImplemented("Connection does not support transactions");
Expand Down Expand Up @@ -285,6 +285,42 @@ default void setAutoCommit(boolean enableAutoCommit) throws AdbcException {
throw AdbcException.notImplemented("Connection does not support transactions");
}

/**
* Get the current catalog.
*
* @since ADBC API revision 1.1.0
*/
default String getCurrentCatalog() throws AdbcException {
throw AdbcException.notImplemented("Connection does not support current catalog");
}

/**
* Set the current catalog.
*
* @since ADBC API revision 1.1.0
*/
default void setCurrentCatalog(String catalog) throws AdbcException {
throw AdbcException.notImplemented("Connection does not support current catalog");
}

/**
* Get the current schema.
*
* @since ADBC API revision 1.1.0
*/
default String getCurrentDbSchema() throws AdbcException {
throw AdbcException.notImplemented("Connection does not support current catalog");
}

/**
* Set the current schema.
*
* @since ADBC API revision 1.1.0
*/
default void setCurrentDbSchema(String catalog) throws AdbcException {
throw AdbcException.notImplemented("Connection does not support current catalog");
}

/**
* Get whether the connection is read-only.
*
Expand Down
Loading

0 comments on commit c296e17

Please sign in to comment.