Skip to content

Commit

Permalink
batch execution for DML queries with arrayBindSupported=false
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-hx committed Dec 18, 2024
1 parent 314c16f commit d14c285
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 76 deletions.
2 changes: 2 additions & 0 deletions include/snowflake/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ typedef struct SF_STMT {
void* multi_stmt_result_ids;
int64 multi_stmt_count;
int64 paramset_size;
sf_bool array_bind_supported;
int64 affected_rows;

/**
* User realloc function used in snowflake_fetch
Expand Down
279 changes: 204 additions & 75 deletions lib/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,9 @@ static sf_bool setup_result_with_json_resp(SF_STMT* sfstmt, cJSON* data)
} else {
sfstmt->is_dml = detect_stmt_type(stmt_type_id);
}

json_copy_bool(&sfstmt->array_bind_supported, data, "arrayBindSupported");

cJSON* rowtype = snowflake_cJSON_GetObjectItem(data, "rowtype");
if (snowflake_cJSON_IsArray(rowtype)) {
_snowflake_stmt_desc_reset(sfstmt);
Expand Down Expand Up @@ -1924,11 +1927,63 @@ size_t STDCALL _snowflake_get_binding_value_size(SF_BIND_INPUT* bind)
}
}

#define SF_BIND_ALL -1
/**
* @param index The index for array binding. -1 for single binding.
* @return single parameter binding value in cJSON.
*/
static cJSON* get_single_binding_value_json(SF_BIND_INPUT* input, int64 index)
{
char* value = NULL;
cJSON* single_val = NULL;
int64 val_len = 0;
size_t step = 0;
void* val_ptr = NULL;

if (index < 0)
{
value = value_to_string(input->value, input->len, input->c_type);
single_val = snowflake_cJSON_CreateString(value);
SF_FREE(value);
}
else
{
val_len = (int64)input->len;
step = _snowflake_get_binding_value_size(input);
val_ptr = (char*)(input->value) + step * index;
if (input->len_ind)
{
val_len = input->len_ind[index];
}

if (SF_BIND_LEN_NULL == val_len)
{
single_val = snowflake_cJSON_CreateNull();
}
else
{
if ((SF_C_TYPE_STRING == input->c_type) &&
(SF_BIND_LEN_NTS == val_len))
{
val_len = (int64)strlen((char*)val_ptr);
}

value = value_to_string(val_ptr, val_len, input->c_type);
single_val = snowflake_cJSON_CreateString(value);
SF_FREE(value);
}
}

return single_val;
}

/**
* @param sfstmt SNOWFLAKE_STMT context.
* @param index The parameter set index (for batch execution), -1 to return all
* parameter sets (non-batch execution)
* @return parameter bindings in cJSON.
*/
cJSON* STDCALL _snowflake_get_binding_json(SF_STMT* sfstmt)
cJSON* STDCALL _snowflake_get_binding_json(SF_STMT* sfstmt, int64 index)
{
size_t i;
SF_BIND_INPUT* input;
Expand All @@ -1945,7 +2000,9 @@ cJSON* STDCALL _snowflake_get_binding_json(SF_STMT* sfstmt)
bindings = snowflake_cJSON_CreateObject();
for (i = 0; i < sfstmt->params_len; i++)
{
cJSON* binding;
cJSON* binding = NULL;
cJSON* single_val = NULL;

input = _snowflake_get_binding_by_index(sfstmt, i, &name,
name_buf, SF_PARAM_NAME_BUF_LEN);
if (input == NULL)
Expand All @@ -1956,50 +2013,22 @@ cJSON* STDCALL _snowflake_get_binding_json(SF_STMT* sfstmt)
binding = snowflake_cJSON_CreateObject();
type = snowflake_type_to_string(
c_type_to_snowflake(input->c_type, SF_DB_TYPE_TIMESTAMP_NTZ));
if (sfstmt->paramset_size > 1)
// json array for all parameter sets
if ((sfstmt->paramset_size > 1) && (index < 0))
{
cJSON* val_array = snowflake_cJSON_CreateArray();
size_t step = _snowflake_get_binding_value_size(input);
void* val_ptr = input->value;
int64 val_len;
cJSON* single_val = NULL;
for (int64 j = 0; j < sfstmt->paramset_size; j++, val_ptr = (char*)val_ptr + step)
// get all parameter sets, use array
for (int64 j = 0; j < sfstmt->paramset_size; j++)
{
val_len = input->len;
if (input->len_ind)
{
val_len = input->len_ind[j];
}

if (SF_BIND_LEN_NULL == val_len)
{
single_val = snowflake_cJSON_CreateNull();
snowflake_cJSON_AddItemToArray(val_array, single_val);
continue;
}

if ((SF_C_TYPE_STRING == input->c_type) &&
(SF_BIND_LEN_NTS == val_len))
{
val_len = strlen((char*)val_ptr);
}

value = value_to_string(val_ptr, val_len, input->c_type);
single_val = snowflake_cJSON_CreateString(value);
single_val = get_single_binding_value_json(input, j);
snowflake_cJSON_AddItemToArray(val_array, single_val);
if (value) {
SF_FREE(value);
}
}
snowflake_cJSON_AddItemToObject(binding, "value", val_array);
}
else // paramset_size = 1, single value binding
else // single value binding
{
value = value_to_string(input->value, input->len, input->c_type);
snowflake_cJSON_AddStringToObject(binding, "value", value);
if (value) {
SF_FREE(value);
}
single_val = get_single_binding_value_json(input, index);
snowflake_cJSON_AddItemToObject(binding, "value", single_val);
}
snowflake_cJSON_AddStringToObject(binding, "type", type);
snowflake_cJSON_AddItemToObject(bindings, name, binding);
Expand All @@ -2013,7 +2042,7 @@ sf_bool STDCALL _snowflake_needs_stage_binding(SF_STMT* sfstmt)
if (!sfstmt || !sfstmt->connection ||
(_snowflake_get_current_param_style(sfstmt) == INVALID_PARAM_TYPE) ||
sfstmt->connection->stage_binding_disabled ||
sfstmt->paramset_size <= 1)
sfstmt->paramset_size <= 1 || !sfstmt->array_bind_supported)
{
return SF_BOOLEAN_FALSE;
}
Expand Down Expand Up @@ -2066,6 +2095,8 @@ static void STDCALL _snowflake_stmt_reset(SF_STMT *sfstmt) {
sfstmt->params = NULL;
sfstmt->params_len = 0;
sfstmt->name_list = NULL;
sfstmt->array_bind_supported = SF_BOOLEAN_FALSE;
sfstmt->affected_rows = -1;

_snowflake_stmt_desc_reset(sfstmt);

Expand Down Expand Up @@ -2147,6 +2178,7 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf) {
sfstmt->multi_stmt_count = SF_MULTI_STMT_COUNT_UNSET;
// single value binding by default
sfstmt->paramset_size = 1;
sfstmt->affected_rows = -1;
}
return sfstmt;
}
Expand Down Expand Up @@ -2499,6 +2531,11 @@ int64 STDCALL snowflake_affected_rows(SF_STMT *sfstmt) {
}

if (sfstmt->is_dml == SF_BOOLEAN_TRUE) {
if (sfstmt->affected_rows >= 0)
{
// use the value initialized previously from batch execution
return sfstmt->affected_rows;
}
if (SF_STATUS_SUCCESS != snowflake_fetch(sfstmt))
{
return -1;
Expand Down Expand Up @@ -2563,22 +2600,14 @@ SF_STATUS STDCALL snowflake_execute_with_capture(SF_STMT *sfstmt, SF_QUERY_RESUL
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), SF_BOOLEAN_TRUE, result_capture, SF_BOOLEAN_FALSE);
}

SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
sf_bool is_put_get_command,
sf_bool is_native_put_get,
SF_QUERY_RESULT_CAPTURE* result_capture,
sf_bool is_describe_only) {
if (!sfstmt) {
return SF_STATUS_ERROR_STATEMENT_NOT_EXIST;
}

if (is_put_get_command && is_native_put_get && !is_describe_only)
{
_snowflake_stmt_desc_reset(sfstmt);
return _snowflake_execute_put_get_native(sfstmt, NULL, 0, result_capture);
}

clear_snowflake_error(&sfstmt->error);
static SF_STATUS _snowflake_execute_with_binds_ex(SF_STMT* sfstmt,
sf_bool is_put_get_command,
sf_bool is_native_put_get,
SF_QUERY_RESULT_CAPTURE* result_capture,
sf_bool is_describe_only,
cJSON* bind_stage,
cJSON* bindings)
{
SF_STATUS ret = SF_STATUS_ERROR_GENERAL;
SF_JSON_ERROR json_error;
const char *error_msg;
Expand All @@ -2592,25 +2621,6 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
URL_KEY_VALUE url_params[] = {
{.key="requestId=", .value=sfstmt->request_id, .formatted_key=NULL, .formatted_value=NULL, .key_size=0, .value_size=0}
};
size_t i;
cJSON *bindings = NULL;
char* bind_stage = NULL;
SF_BIND_INPUT *input;
const char *type;
char *value;

_mutex_lock(&sfstmt->connection->mutex_sequence_counter);
sfstmt->sequence_counter = ++sfstmt->connection->sequence_counter;
_mutex_unlock(&sfstmt->connection->mutex_sequence_counter);

if (_snowflake_needs_stage_binding(sfstmt))
{
bind_stage = _snowflake_stage_bind_upload(sfstmt);
}
if (!bind_stage)
{
bindings = _snowflake_get_binding_json(sfstmt);
}

if (is_string_empty(sfstmt->connection->directURL) &&
(is_string_empty(sfstmt->connection->master_token) ||
Expand Down Expand Up @@ -2783,7 +2793,7 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
sfstmt->is_multi_stmt = (_SF_STMT_TYPE_MULTI_STMT == stmt_type_id);
}

if (sfstmt->is_multi_stmt)
if ((sfstmt->is_multi_stmt) && (!is_describe_only))
{
if (!setup_multi_stmt_result(sfstmt, data))
{
Expand Down Expand Up @@ -2854,6 +2864,125 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
return ret;
}

static SF_STATUS _batch_dml_execute(SF_STMT* sfstmt,
SF_QUERY_RESULT_CAPTURE* result_capture)
{
SF_STATUS ret = SF_STATUS_ERROR_GENERAL;
cJSON* bindings = NULL;
int64 affected_rows = 0;

// impossible but just in case
if (!sfstmt->is_dml)
{
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_GENERAL,
"Internal error. _batch_dml_execute is called for non-dml query.",
SF_SQLSTATE_GENERAL_ERROR,
sfstmt->sfqid);
return SF_STATUS_ERROR_GENERAL;
}
sfstmt->affected_rows = -1;
for (int64 i = 0; i < sfstmt->paramset_size; i++)
{
bindings = _snowflake_get_binding_json(sfstmt, i);
ret = _snowflake_execute_with_binds_ex(sfstmt,
SF_BOOLEAN_FALSE,
SF_BOOLEAN_FALSE,
result_capture,
SF_BOOLEAN_FALSE,
NULL,
bindings);
if (ret != SF_STATUS_SUCCESS)
{
return ret;
}
affected_rows += snowflake_affected_rows(sfstmt);
}

sfstmt->affected_rows = affected_rows;
return SF_STATUS_SUCCESS;
}

SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
sf_bool is_put_get_command,
sf_bool is_native_put_get,
SF_QUERY_RESULT_CAPTURE* result_capture,
sf_bool is_describe_only) {
SF_STATUS ret = SF_STATUS_ERROR_GENERAL;
cJSON* bindings = NULL;
char* bind_stage = NULL;

if (!sfstmt) {
return SF_STATUS_ERROR_STATEMENT_NOT_EXIST;
}

if (is_put_get_command && is_native_put_get && !is_describe_only)
{
_snowflake_stmt_desc_reset(sfstmt);
return _snowflake_execute_put_get_native(sfstmt, NULL, 0, result_capture);
}

clear_snowflake_error(&sfstmt->error);

_mutex_lock(&sfstmt->connection->mutex_sequence_counter);
sfstmt->sequence_counter = ++sfstmt->connection->sequence_counter;
_mutex_unlock(&sfstmt->connection->mutex_sequence_counter);

// batch execution needed when application using array binding for DML
// queries and in some cases that's not supported by server:
// paramset_size > 1 && arrayBindSupported = false
// fallback to execute the query with each parmeter set in such case
sf_bool need_batch_exec = SF_BOOLEAN_FALSE;

// describe request only returns metadata and doesn't need bindings
if (!is_describe_only)
{
// for parameter array bindings, send describe request first to
// check whether that's supported by server (by checking arrayBindSupported)
if (sfstmt->paramset_size > 1)
{
ret = _snowflake_execute_with_binds_ex(sfstmt,
is_put_get_command,
is_native_put_get,
result_capture,
SF_BOOLEAN_TRUE,
NULL, NULL);
if (ret != SF_STATUS_SUCCESS)
{
return ret;
}
if ((sfstmt->is_dml) && (!sfstmt->array_bind_supported))
{
need_batch_exec = SF_BOOLEAN_TRUE;
}
}
if (!need_batch_exec)
{
if (_snowflake_needs_stage_binding(sfstmt))
{
bind_stage = _snowflake_stage_bind_upload(sfstmt);
}
if (!bind_stage)
{
bindings = _snowflake_get_binding_json(sfstmt, SF_BIND_ALL);
}
}
}

if (need_batch_exec)
{
return _batch_dml_execute(sfstmt, result_capture);
}

return _snowflake_execute_with_binds_ex(sfstmt,
is_put_get_command,
is_native_put_get,
result_capture,
is_describe_only,
bind_stage,
bindings);
}

SF_ERROR_STRUCT *STDCALL snowflake_error(SF_CONNECT *sf) {
if (!sf) {
return NULL;
Expand Down
4 changes: 3 additions & 1 deletion lib/client_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ PARAM_TYPE STDCALL _snowflake_get_param_style(const SF_BIND_INPUT *input);

/**
* @param sfstmt SNOWFLAKE_STMT context.
* @param index The parameter set index (for batch execution), -1 to return all
* parameter sets (non-batch execution)
* @return parameter bindings in cJSON.
*/
cJSON* STDCALL _snowflake_get_binding_json(SF_STMT *sfstmt);
cJSON* STDCALL _snowflake_get_binding_json(SF_STMT *sfstmt, int64 index);

#ifdef __cplusplus
extern "C" {
Expand Down
Loading

0 comments on commit d14c285

Please sign in to comment.