Skip to content

Commit

Permalink
feat(format,c,go,python): add error details to ArrowArrayStream
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Aug 4, 2023
1 parent 669812c commit cb6b4bd
Show file tree
Hide file tree
Showing 10 changed files with 410 additions and 84 deletions.
28 changes: 24 additions & 4 deletions adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ struct ADBC_EXPORT AdbcErrorDetail {
///
/// \since ADBC API revision 1.1.0
ADBC_EXPORT
int AdbcErrorGetDetailCount(struct AdbcError* error);
int AdbcErrorGetDetailCount(const struct AdbcError* error);

/// \brief Get a metadata value in an error by index.
///
Expand All @@ -369,7 +369,25 @@ int AdbcErrorGetDetailCount(struct AdbcError* error);
///
/// \since ADBC API revision 1.1.0
ADBC_EXPORT
struct AdbcErrorDetail AdbcErrorGetDetail(struct AdbcError* error, int index);
struct AdbcErrorDetail AdbcErrorGetDetail(const struct AdbcError* error, int index);

/// \brief Get an ADBC error from an ArrowArrayStream created by a driver.
///
/// This allows retrieving error details and other metadata that would
/// normally be suppressed by the Arrow C Stream Interface.
///
/// The caller MUST NOT release the error; it is managed by the release
/// callback in the stream itself.
///
/// \param[in] stream The stream to query.
/// \param[out] status The ADBC status code, or ADBC_STATUS_OK if there is no
/// error. Not written to if the stream does not contain an ADBC error or
/// if the pointer is NULL.
/// \return NULL if not supported.
/// \since ADBC API revision 1.1.0
ADBC_EXPORT
const struct AdbcError* AdbcErrorFromArrayStream(struct ArrowArrayStream* stream,
AdbcStatusCode* status);

/// @}

Expand Down Expand Up @@ -950,8 +968,10 @@ struct ADBC_EXPORT AdbcDriver {
///
/// @{

int (*ErrorGetDetailCount)(struct AdbcError* error);
struct AdbcErrorDetail (*ErrorGetDetail)(struct AdbcError* error, int index);
int (*ErrorGetDetailCount)(const struct AdbcError* error);
struct AdbcErrorDetail (*ErrorGetDetail)(const struct AdbcError* error, int index);
const struct AdbcError* (*ErrorFromArrayStream)(struct ArrowArrayStream* stream,
AdbcStatusCode* status);

AdbcStatusCode (*DatabaseGetOption)(struct AdbcDatabase*, const char*, char*, size_t*,
struct AdbcError*);
Expand Down
41 changes: 37 additions & 4 deletions c/driver/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,49 @@

#include "utils.h"

#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <assert.h>
#include "adbc.h"
#include <adbc.h>

static size_t kErrorBufferSize = 1024;

int AdbcStatusCodeToErrno(AdbcStatusCode code) {
switch (code) {
case ADBC_STATUS_OK:
return 0;
case ADBC_STATUS_UNKNOWN:
return EIO;
case ADBC_STATUS_NOT_IMPLEMENTED:
return ENOTSUP;
case ADBC_STATUS_NOT_FOUND:
return ENOENT;
case ADBC_STATUS_ALREADY_EXISTS:
return EEXIST;
case ADBC_STATUS_INVALID_ARGUMENT:
case ADBC_STATUS_INVALID_STATE:
return EINVAL;
case ADBC_STATUS_INVALID_DATA:
case ADBC_STATUS_INTEGRITY:
case ADBC_STATUS_INTERNAL:
case ADBC_STATUS_IO:
return EIO;
case ADBC_STATUS_CANCELLED:
return ECANCELED;
case ADBC_STATUS_TIMEOUT:
return ETIMEDOUT;
case ADBC_STATUS_UNAUTHENTICATED:
// FreeBSD/macOS have EAUTH, but not other platforms
case ADBC_STATUS_UNAUTHORIZED:
return EACCES;
default:
return EIO;
}
}

/// For ADBC 1.1.0, the structure held in private_data.
struct AdbcErrorDetails {
char* message;
Expand Down Expand Up @@ -165,15 +198,15 @@ void AppendErrorDetail(struct AdbcError* error, const char* key, const uint8_t*
details->count++;
}

int CommonErrorGetDetailCount(struct AdbcError* error) {
int CommonErrorGetDetailCount(const struct AdbcError* error) {
if (error->release != ReleaseErrorWithDetails) {
return 0;
}
struct AdbcErrorDetails* details = (struct AdbcErrorDetails*)error->private_data;
return details->count;
}

struct AdbcErrorDetail CommonErrorGetDetail(struct AdbcError* error, int index) {
struct AdbcErrorDetail CommonErrorGetDetail(const struct AdbcError* error, int index) {
if (error->release != ReleaseErrorWithDetails) {
return (struct AdbcErrorDetail){NULL, NULL, 0};
}
Expand Down
6 changes: 4 additions & 2 deletions c/driver/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
extern "C" {
#endif

int AdbcStatusCodeToErrno(AdbcStatusCode code);

// The printf checking attribute doesn't work properly on gcc 4.8
// and results in spurious compiler warnings
#if defined(__clang__) || (defined(__GNUC__) && __GNUC__ >= 5)
Expand All @@ -47,8 +49,8 @@ void SetErrorVariadic(struct AdbcError* error, const char* format, va_list args)
void AppendErrorDetail(struct AdbcError* error, const char* key, const uint8_t* detail,
size_t detail_length);

int CommonErrorGetDetailCount(struct AdbcError* error);
struct AdbcErrorDetail CommonErrorGetDetail(struct AdbcError* error, int index);
int CommonErrorGetDetailCount(const struct AdbcError* error);
struct AdbcErrorDetail CommonErrorGetDetail(const struct AdbcError* error, int index);

struct StringBuilder {
char* buffer;
Expand Down
17 changes: 15 additions & 2 deletions c/driver/postgresql/postgresql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,27 @@ using adbcpq::PostgresStatement;
// ---------------------------------------------------------------------
// AdbcError

int AdbcErrorGetDetailCount(struct AdbcError* error) {
namespace {
const struct AdbcError* PostgresErrorFromArrayStream(struct ArrowArrayStream* stream,
AdbcStatusCode* status) {
// Currently only valid for TupleReader
return adbcpq::TupleReader::ErrorFromArrayStream(stream, status);
}
} // namespace

int AdbcErrorGetDetailCount(const struct AdbcError* error) {
return CommonErrorGetDetailCount(error);
}

struct AdbcErrorDetail AdbcErrorGetDetail(struct AdbcError* error, int index) {
struct AdbcErrorDetail AdbcErrorGetDetail(const struct AdbcError* error, int index) {
return CommonErrorGetDetail(error, index);
}

const struct AdbcError* AdbcErrorFromArrayStream(struct ArrowArrayStream* stream,
AdbcStatusCode* status) {
return PostgresErrorFromArrayStream(stream, status);
}

// ---------------------------------------------------------------------
// AdbcDatabase

Expand Down
40 changes: 40 additions & 0 deletions c/driver/postgresql/postgresql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <cerrno>
#include <cmath>
#include <cstdlib>
#include <cstring>
Expand Down Expand Up @@ -979,6 +980,45 @@ TEST_F(PostgresStatementTest, AdbcErrorBackwardsCompatibility) {
free(error);
}

TEST_F(PostgresStatementTest, Cancel) {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));

for (const char* query : {
"DROP TABLE IF EXISTS test_cancel",
"CREATE TABLE test_cancel (ints INT)",
R"(INSERT INTO test_cancel (ints)
SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g))",
}) {
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
}

ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM test_cancel", &error),
IsOkStatus(&error));
adbc_validation::StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));

ASSERT_THAT(AdbcStatementCancel(&statement, &error), IsOkStatus(&error));

int retcode = 0;
while (true) {
retcode = reader.MaybeNext();
if (retcode != 0 || !reader.array->release) break;
}

ASSERT_EQ(ECANCELED, retcode);
AdbcStatusCode status = ADBC_STATUS_OK;
const struct AdbcError* detail =
AdbcErrorFromArrayStream(&reader.stream.value, &status);
ASSERT_NE(nullptr, detail);
ASSERT_EQ(ADBC_STATUS_CANCELLED, status);
ASSERT_EQ("57014", std::string_view(detail->sqlstate, 5));
ASSERT_NE(0, AdbcErrorGetDetailCount(detail));
}

struct TypeTestCase {
std::string name;
std::string sql_type;
Expand Down
73 changes: 49 additions & 24 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,12 +523,13 @@ struct BindStream {
int TupleReader::GetSchema(struct ArrowSchema* out) {
int na_res = copy_reader_->GetSchema(out);
if (out->release == nullptr) {
StringBuilderAppend(&error_builder_,
"[libpq] Result set was already consumed or freed");
return EINVAL;
SetError(&error_, "[libpq] Result set was already consumed or freed");
status_ = ADBC_STATUS_INVALID_STATE;
return AdbcStatusCodeToErrno(status_);
} else if (na_res != NANOARROW_OK) {
// e.g., Can't allocate memory
StringBuilderAppend(&error_builder_, "[libpq] Error copying schema");
SetError(&error_, "[libpq] Error copying schema");
status_ = ADBC_STATUS_INTERNAL;
}

return na_res;
Expand All @@ -543,15 +544,16 @@ int TupleReader::InitQueryAndFetchFirst(struct ArrowError* error) {
data_.data.as_char = pgbuf_;

if (get_copy_res == -2) {
StringBuilderAppend(&error_builder_, "[libpq] Fetch header failed: %s",
PQerrorMessage(conn_));
return EIO;
SetError(&error_, "[libpq] Fetch header failed: %s", PQerrorMessage(conn_));
status_ = ADBC_STATUS_IO;
return AdbcStatusCodeToErrno(status_);
}

int na_res = copy_reader_->ReadHeader(&data_, error);
if (na_res != NANOARROW_OK) {
StringBuilderAppend(&error_builder_, "[libpq] ReadHeader failed: %s", error->message);
return EIO;
SetError(&error_, "[libpq] ReadHeader failed: %s", error->message);
status_ = ADBC_STATUS_IO;
return AdbcStatusCodeToErrno(status_);
}

return NANOARROW_OK;
Expand All @@ -562,9 +564,9 @@ int TupleReader::AppendRowAndFetchNext(struct ArrowError* error) {
// call to PQgetCopyData())
int na_res = copy_reader_->ReadRecord(&data_, error);
if (na_res != NANOARROW_OK && na_res != ENODATA) {
StringBuilderAppend(&error_builder_,
"[libpq] ReadRecord failed at row %" PRId64 ": %s", row_id_,
error->message);
SetError(&error_, "[libpq] ReadRecord failed at row %" PRId64 ": %s", row_id_,
error->message);
status_ = ADBC_STATUS_IO;
return na_res;
}

Expand All @@ -578,10 +580,10 @@ int TupleReader::AppendRowAndFetchNext(struct ArrowError* error) {
data_.data.as_char = pgbuf_;

if (get_copy_res == -2) {
StringBuilderAppend(&error_builder_,
"[libpq] PQgetCopyData failed at row %" PRId64 ": %s", row_id_,
PQerrorMessage(conn_));
return EIO;
SetError(&error_, "[libpq] PQgetCopyData failed at row %" PRId64 ": %s", row_id_,
PQerrorMessage(conn_));
status_ = ADBC_STATUS_IO;
return AdbcStatusCodeToErrno(status_);
} else if (get_copy_res == -1) {
// Returned when COPY has finished successfully
return ENODATA;
Expand All @@ -603,8 +605,8 @@ int TupleReader::BuildOutput(struct ArrowArray* out, struct ArrowError* error) {

int na_res = copy_reader_->GetArray(out, error);
if (na_res != NANOARROW_OK) {
StringBuilderAppend(&error_builder_, "[libpq] Failed to build result array: %s",
error->message);
SetError(&error_, "[libpq] Failed to build result array: %s", error->message);
status_ = ADBC_STATUS_INTERNAL;
return na_res;
}

Expand All @@ -625,7 +627,11 @@ void TupleReader::ResetQuery() {
}

// Clear the error builder
error_builder_.size = 0;
if (error_.release) {
error_.release(&error_);
}
error_ = ADBC_ERROR_INIT;
status_ = ADBC_STATUS_OK;

row_id_ = -1;
}
Expand Down Expand Up @@ -673,17 +679,19 @@ int TupleReader::GetNext(struct ArrowArray* out) {
const ExecStatusType pq_status = PQresultStatus(result_);
if (pq_status != PGRES_COMMAND_OK) {
const char* sqlstate = PQresultErrorField(result_, PG_DIAG_SQLSTATE);
StringBuilderAppend(&error_builder_, "[libpq] Query failed [%s]: %s",
PQresStatus(pq_status), PQresultErrorMessage(result_));
SetError(&error_, result_, "[libpq] Query failed [%s]: %s", PQresStatus(pq_status),
PQresultErrorMessage(result_));

if (tmp.release != nullptr) {
tmp.release(&tmp);
}

if (sqlstate != nullptr && std::strcmp(sqlstate, "57014") == 0) {
return ECANCELED;
status_ = ADBC_STATUS_CANCELLED;
} else {
status_ = ADBC_STATUS_IO;
}
return EIO;
return AdbcStatusCodeToErrno(status_);
}

ResetQuery();
Expand All @@ -692,7 +700,11 @@ int TupleReader::GetNext(struct ArrowArray* out) {
}

void TupleReader::Release() {
StringBuilderReset(&error_builder_);
if (error_.release) {
error_.release(&error_);
error_ = ADBC_ERROR_INIT;
}
status_ = ADBC_STATUS_OK;

if (result_) {
PQclear(result_);
Expand All @@ -713,6 +725,19 @@ void TupleReader::ExportTo(struct ArrowArrayStream* stream) {
stream->private_data = this;
}

const struct AdbcError* TupleReader::ErrorFromArrayStream(struct ArrowArrayStream* stream,
AdbcStatusCode* status) {
if (!stream->private_data || stream->release != &ReleaseTrampoline) {
return nullptr;
}

TupleReader* reader = static_cast<TupleReader*>(stream->private_data);
if (status) {
*status = reader->status_;
}
return &reader->error_;
}

int TupleReader::GetSchemaTrampoline(struct ArrowArrayStream* self,
struct ArrowSchema* out) {
if (!self || !self->private_data) return EINVAL;
Expand Down
Loading

0 comments on commit cb6b4bd

Please sign in to comment.