Skip to content

Commit

Permalink
CBOR support for result sets (#172)
Browse files Browse the repository at this point in the history
* cbor: add source files individually to the project

- this ends up to simply the spec file a bit and removes some
unnecessary code in the library.

* add CBOR support for result sets

This commit extends the CBOR support with the ability to read the
CBOR-encapsulated result sets.

The commit also makes fetching data more efficient with the SQLGetData()
(the alternative to the generally more efficient SQLBindCol()). The
implementation still uses punctual column binding and unbinding, but
SQLFetch() will now cache the source JSON/CBOR object into IRD's records
and will no longer walk the entire list of instantied ARD records all
the way to the ad-hoc bound column.

Counting the total number of rows returned for a query has been changed
to cope with ES's use of indefinite-size arrays, to avoid iterating
twice over the rows in a page.

* addressing PR review comments

- slight code simplification

(cherry picked from commit 4fc9197)
  • Loading branch information
bpintea committed Aug 28, 2019
1 parent 17d54a7 commit c5cb9cb
Show file tree
Hide file tree
Showing 10 changed files with 781 additions and 277 deletions.
14 changes: 8 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,8 @@ add_custom_target(curlclean
#
set(TINYCBOR_PATH_SRC ${CMAKE_SOURCE_DIR}/libs/tinycbor CACHE PATH
"Lib tinycbor source path")
aux_source_directory(${TINYCBOR_PATH_SRC}/src DRV_SRC)
list(FILTER DRV_SRC EXCLUDE REGEX .*open_memstream.c$) # Win-unsupported
list(FILTER DRV_SRC EXCLUDE REGEX .*cborparser.c$) # to be patched
file(COPY ${TINYCBOR_PATH_SRC}/src/cborparser.c DESTINATION ${CMAKE_BINARY_DIR})
file(COPY ${TINYCBOR_PATH_SRC}/src/cborparser.c DESTINATION
${CMAKE_BINARY_DIR})
# tinycbor doesn't expose (yet? #125) the text/binary string pointer, since the
# string can span multiple stream chunks. However, in our case the CBOR object
# is available entirely, so access to it can safely be had; this saves a
Expand All @@ -308,9 +306,13 @@ CborError cbor_value_get_string_chunk(CborValue *it,
CborError err = get_string_chunk(it, bufferptr, len);
return err != CborNoError ? err : preparse_next_value(it);
}")
aux_source_directory(${CMAKE_BINARY_DIR} DRV_SRC)
list(APPEND DRV_SRC ${CMAKE_BINARY_DIR}/cborparser.c)
list(APPEND DRV_SRC ${TINYCBOR_PATH_SRC}/src/cborvalidation.c)
list(APPEND DRV_SRC ${TINYCBOR_PATH_SRC}/src/cborerrorstrings.c)
list(APPEND DRV_SRC ${TINYCBOR_PATH_SRC}/src/cborencoder.c)
list(APPEND DRV_SRC
${TINYCBOR_PATH_SRC}/src/cborencoder_close_container_checked.c)
set(TINYCBOR_INC ${TINYCBOR_PATH_SRC}/src)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /DWITHOUT_OPEN_MEMSTREAM")
# limit how deep the parser will recurse (current need: 3)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /DCBOR_PARSER_MAX_RECURSIONS=16")

Expand Down
20 changes: 17 additions & 3 deletions driver/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,20 @@ static int debug_callback(CURL *handle, curl_infotype type, char *data,
/*
* "ptr points to the delivered data, and the size of that data is size
* multiplied with nmemb."
*
* Note: Elasticsearch supports (atm.) no streaming API and ES/SQL doesn't
* either. This function will keep realloc'ing (if needed) until the entire
* page sent by ES/SQL is received. The alternative is to stream-parse.
* However, with text & binary data, the stream parsing libraries will ask the
* client to provide a buffer to copy the data into, out of potentially
* multiple received data chunks in the stream. Which could require an extra
* allocation and will always involve an extra copy (or more, for UTF-8
* decoding). With current design (= reply object in contiguous chunk) at
* least the copy is skipped, since the text/binary data is contiguous and
* ready to be read from the receive buffer directly.
*
* TODO: initial chunk size and incremental sizes for the reallocation should
* be better "calibrated" (/ follow some max/hysteretic curve).
*/
static size_t write_callback(char *ptr, size_t size, size_t nmemb,
void *userdata)
Expand Down Expand Up @@ -740,10 +754,10 @@ SQLRETURN curl_post(esodbc_stmt_st *stmt, int url_type,
BOOL is_json;

if (dbc->pack_json) {
DBGH(stmt, "POSTing JSON type %d: [%zu] `" LCPDL "`.", url_type,
DBGH(stmt, "POSTing JSON to URL type %d: [%zu] `" LCPDL "`.", url_type,
req_body->cnt, LCSTR(req_body));
} else {
DBGH(stmt, "POSTing CBOR type %d: [%zu] `%s`.", url_type,
DBGH(stmt, "POSTing CBOR to URL type %d: [%zu] `%s`.", url_type,
req_body->cnt, cstr_hex_dump(req_body));
}

Expand Down Expand Up @@ -1514,7 +1528,7 @@ static BOOL parse_es_version_cbor(esodbc_dbc_st *dbc, cstr_st *rsp_body,
/* the _init() doesn't actually validate the object */
res = cbor_value_validate(&top_obj, ES_CBOR_PARSE_FLAGS);
CHK_RES(stmt, "failed to validate CBOR object: [%zu] `%s`",
stmt->rset.body.cnt, cstr_hex_dump(&stmt->rset.body));
rsp_body->cnt, cstr_hex_dump(rsp_body));
# endif /*0*/
# endif /* !NDEBUG */

Expand Down
12 changes: 7 additions & 5 deletions driver/convert.c
Original file line number Diff line number Diff line change
Expand Up @@ -3495,12 +3495,12 @@ static inline BOOL conv_implemented(SQLSMALLINT sqltype, SQLSMALLINT ctype)
}


/* Check if data types in returned columns are compabile with buffer types
* bound for those columns OR if parameter data conversion is allowed.
/* Check (1) if data types in returned columns are compabile with buffer types
* bound for those columns OR (2) if parameter data conversion is allowed.
* idx:
* if > 0: parameter number for parameter binding;
* if < 0: indicator for bound columns check.
* */
* if < 0: negated column number to check OR indicator to check all bound
* columns (CONV_CHECK_ALL_COLS). */
SQLRETURN convertability_check(esodbc_stmt_st *stmt, SQLINTEGER idx,
int *conv_code)
{
Expand All @@ -3520,7 +3520,9 @@ SQLRETURN convertability_check(esodbc_stmt_st *stmt, SQLINTEGER idx,
axd = stmt->ard;
ixd = stmt->ird;

start = 0;
/* if this is a SQLGetData() call, only check the one bound column */
assert(idx == CONV_CHECK_ALL_COLS || STMT_GD_CALLING(stmt));
start = (idx == CONV_CHECK_ALL_COLS) ? 0 : -idx - 1;
stop = axd->count < ixd->count ? axd->count : ixd->count;
} else {
/*
Expand Down
9 changes: 9 additions & 0 deletions driver/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ SQLULEN get_param_size(esodbc_rec_st *irec);
inline void *deferred_address(SQLSMALLINT field_id, size_t pos,
esodbc_rec_st *rec);


/* column and parameters are all SQLUSMALLINT (unsigned short) */
#define CONV_CHECK_ALL_COLS (- ((SQLINTEGER)USHRT_MAX + 1))
/* Check (1) if data types in returned columns are compabile with buffer types
* bound for those columns OR (2) if parameter data conversion is allowed.
* idx:
* if > 0: parameter number for parameter binding;
* if < 0: negated column number to check OR indicator to check all bound
* columns (CONV_CHECK_ALL_COLS). */
SQLRETURN convertability_check(esodbc_stmt_st *stmt, SQLINTEGER idx,
int *conv_code);
BOOL update_crr_date(struct tm *now);
Expand Down
15 changes: 6 additions & 9 deletions driver/handles.c
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ SQLRETURN EsSQLGetStmtAttrW(

/* "determine the number of the current row in the result set" */
case SQL_ATTR_ROW_NUMBER:
*(SQLULEN *)ValuePtr = (SQLULEN)STMT_CRR_ROW_NUMBER(stmt);
*(SQLULEN *)ValuePtr = (SQLULEN)stmt->tv_rows;
DBGH(stmt, "getting row number: %llu", *(SQLULEN *)ValuePtr);
break;

Expand Down Expand Up @@ -1499,7 +1499,6 @@ esodbc_desc_st *getdata_set_ard(esodbc_stmt_st *stmt, esodbc_desc_st *gd_ard,
SQLUSMALLINT colno, esodbc_rec_st *recs, SQLUSMALLINT count)
{
SQLRETURN ret;
SQLUSMALLINT i;
esodbc_desc_st *ard = stmt->ard;

init_desc(gd_ard, stmt, DESC_TYPE_ARD, SQL_DESC_ALLOC_USER);
Expand All @@ -1511,13 +1510,10 @@ esodbc_desc_st *getdata_set_ard(esodbc_stmt_st *stmt, esodbc_desc_st *gd_ard,
}

if (colno < count) { /* can the static recs be used? */
/* need to init all records, not only the single one that will be
* bound, since data covert. check will run against all bound recs. */
for (i = 0; i < count; i ++) {
init_rec(&recs[i], gd_ard);
}
assert(0 < colno);
init_rec(&recs[colno - 1], gd_ard);

gd_ard->count = count;
gd_ard->count = colno;
gd_ard->recs = recs;
}
/* else: recs will be alloc'd later when binding the column */
Expand Down Expand Up @@ -1996,7 +1992,8 @@ static void set_defaults_from_meta_type(esodbc_rec_st *rec)
rec->concise_type == SQL_C_DEFAULT) ||
(DESC_TYPE_IS_IMPLEMENTATION(rec->desc->type) &&
rec->concise_type == ESODBC_SQL_NULL));
WARNH(rec->desc, "max meta type: can't set defaults");
DBGH(rec->desc, "max meta type (C default / SQL NULL): "
"can't set defaults");
break;
}
}
Expand Down
28 changes: 16 additions & 12 deletions driver/handles.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ typedef struct desc_rec {
* need to be set for records in IxD descriptors */
esodbc_estype_st *es_type;

/* IRD reference copy of respective protocol value */
union {
UJObject json;
CborValue cbor;
} i_val;

/*
* record fields
*/
Expand Down Expand Up @@ -302,13 +308,15 @@ typedef struct struct_desc {

struct resultset_cbor {
cstr_st curs; /* ES'es cursor; refs req's body */
CborValue rows_obj; /* top object rows container (EsSQLRowCount()) */
CborValue rows_iter; /* iterator over received rows; refs req's body */
wstr_st cols_buff /* columns descriptions; refs allocated chunk */;
};

struct resultset_json {
wstr_st curs; /* ES'es cursor; refs UJSON4C 'state' */
void *state; /* top UJSON decoder state */
UJObject rows_obj; /* top object rows container (EsSQLRowCount()) */
void *rows_iter; /* UJSON iterator with the rows in result set */
UJObject row_array; /* UJSON object for current row */
};
Expand All @@ -317,17 +325,17 @@ typedef struct struct_resultset {
long code; /* HTTP code of last response */
cstr_st body; /* HTTP body of last answer to a statement */

BOOL pack_json; /* the server could send a JSON answer for a CBOR req. */
union {
struct resultset_cbor cbor;
struct resultset_json json;
} pack;

size_t nrows; /* (count of) rows in current result set */
size_t vrows; /* (count of) visited rows in current result set */
} resultset_st;

#define STMT_HAS_CURSOR(_stmt) \
(HDRH(_stmt)->dbc->pack_json ? \
((_stmt)->rset.pack_json ? \
(_stmt)->rset.pack.json.curs.cnt : \
(_stmt)->rset.pack.cbor.curs.cnt)

Expand Down Expand Up @@ -370,8 +378,8 @@ typedef struct struct_stmt {
resultset_st rset;
/* count of result sets fetched */
size_t nset;
/* total count of fetched rows for one statement (sum(resultset.nrows)) */
size_t tf_rows;
/* total visited rows (SUM(resultset.vrows)) <=> SQL_ATTR_ROW_NUMBER */
size_t tv_rows;
/* SQL data types conversion to SQL C compatibility (IRD.SQL -> ARD.C) */
enum {
CONVERSION_VIOLATION = -2, /* specs disallowed */
Expand All @@ -388,17 +396,13 @@ typedef struct struct_stmt {

} esodbc_stmt_st;

/* reset total number of fetched rows for a statement */
#define STMT_TFROWS_RESET(_stmt) \
/* reset statment's result set count and number of visited rows */
#define STMT_ROW_CNT_RESET(_stmt) \
do { \
(_stmt)->tf_rows = 0; \
(_stmt)->nset = 0; \
(_stmt)->tv_rows = 0; \
} while (0)

/* 1-based current row number */
#define STMT_CRR_ROW_NUMBER(_stmt) \
((_stmt)->tf_rows - (_stmt)->rset.nrows + \
(_stmt)->rset.vrows + /*1-based*/1)

/* SQLGetData() state reset */
#define STMT_GD_RESET(_stmt) \
do { \
Expand Down
Loading

0 comments on commit c5cb9cb

Please sign in to comment.