Skip to content

Commit

Permalink
PR switches remtran implementation to run over cdb2api instead of leg…
Browse files Browse the repository at this point in the history
…acy remtran plugin.

Currently it supports non-2pc transactions and socksql transaction isolation.
It provides protocol versioning and solves the stale schema change issue, which the previous implementation did not support.

PR adds a fdb compatibility testcase to check for ability to execute cross-cluster queries and writes with older and newer protocol
versions.

The new implementation eliminates some of the fragile synchronization between read and write remote cursors in the previous implementation,
by running the write statements remotely (push mode extended to writes).

Signed-off-by: Dorin Hogea <dhogea@bloomberg.net>
  • Loading branch information
dorinhogea committed Dec 16, 2024
1 parent 49b1486 commit e907f7c
Show file tree
Hide file tree
Showing 46 changed files with 1,676 additions and 307 deletions.
2 changes: 2 additions & 0 deletions db/comdb2.h
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,8 @@ extern int gbl_server_admin_mode;
void csc2_free_all(void);

int fdb_default_ver_set(int val);
int fdb_push_write_set(int val);
int fdb_push_set(int val);

/* hack to temporary allow bools on production stage */
void csc2_allow_bools(void);
Expand Down
20 changes: 20 additions & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ extern int gbl_fdb_allow_cross_classes;
extern int gbl_fdb_resolve_local;
extern int gbl_fdb_push_redirect_foreign;
extern int gbl_fdb_push_remote;
extern int gbl_fdb_push_remote_write;
extern int gbl_fdb_remsql_cdb2api;
extern int gbl_goslow;
extern int gbl_heartbeat_send;
Expand Down Expand Up @@ -1097,6 +1098,25 @@ static int fdb_default_ver_update(void *context, void *value)
return 0;
}

static int fdb_push_write_update(void *context, void *value)
{
comdb2_tunable *tunable = (comdb2_tunable *)context;
int val = *(int*)value;
if (fdb_push_write_set(val))
return -1;
*(int*)tunable->var = val;
return 0;
}

static int fdb_push_update(void *context, void *value)
{
comdb2_tunable *tunable = (comdb2_tunable *)context;
int val = *(int*)value;
if (fdb_push_set(val))
return -1;
*(int*)tunable->var = val;
return 0;
}

/* Forward declaration */
int ctrace_set_rollat(void *unused, void *value);
Expand Down
5 changes: 4 additions & 1 deletion db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,10 @@ REGISTER_TUNABLE("foreign_db_allow_cross_class", NULL, TUNABLE_BOOLEAN, &gbl_fdb
READONLY | NOARG | READEARLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("foreign_db_resolve_local", NULL, TUNABLE_BOOLEAN, &gbl_fdb_resolve_local,
READONLY | NOARG | READEARLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("foreign_db_push_remote", NULL, TUNABLE_BOOLEAN, &gbl_fdb_push_remote, NOARG, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("foreign_db_push_remote", "Fdb proxy more for reads (OFF turns off writes as well). (Default: on)", TUNABLE_BOOLEAN, &gbl_fdb_push_remote, NOARG, NULL, NULL,
fdb_push_update, NULL);
REGISTER_TUNABLE("foreign_db_push_remote_writes", "Fdb proxy mode for writes (ON turns on reads as well). (Default: on)", TUNABLE_BOOLEAN, &gbl_fdb_push_remote_write, NOARG, NULL, NULL,
fdb_push_write_update, NULL);
REGISTER_TUNABLE("foreign_db_push_redirect",
"Redirect fdb query to run via client instead of on server. (Default: off)", TUNABLE_BOOLEAN,
&gbl_fdb_push_redirect_foreign, NOARG, NULL, NULL, NULL, NULL);
Expand Down
61 changes: 46 additions & 15 deletions db/dohast.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "ast.h"
#include "dohsql.h"
#include "sql.h"
#include "fdb_fend.h"

int gbl_dohast_disable = 0;
int gbl_dohast_verbose = 0;
Expand Down Expand Up @@ -947,36 +948,66 @@ int comdb2_check_parallel(Parse *pParse)
return 0;
}

int comdb2_check_push_remote(Parse *pParse)
ast_node_t *_get_ast_node(Parse *pParse)
{
if (comdb2IsPrepareOnly(pParse))
return 0;
return NULL;

if (pParse->explain)
return NULL;

ast_t *ast = pParse->ast;
dohsql_node_t *node;

GET_CLNT;
if (!gbl_fdb_push_remote && !clnt->force_fdb_push_remote && !clnt->force_fdb_push_redirect)
if (ast && ast->unsupported)
return NULL;
if (!ast->stack[0].obj)
return NULL;

return ast->stack;
}

int comdb2_check_push_remote(Parse *pParse)
{
if (has_parallel_sql(NULL) == 0)
return 0;

if (ast && ast->unsupported)
ast_node_t * anode = _get_ast_node(pParse);
if (!anode)
return 0;
if (has_parallel_sql(NULL) == 0)

if (anode->op != AST_TYPE_SELECT)
return 0;
if (ast->nused > 1)

if (pParse->ast->nused > 1)
return 0;
if (!ast->stack[0].obj)

dohsql_node_t *node = (dohsql_node_t*)anode->obj;

if (node->remotedb > 1)
if (!fdb_push_setup(pParse, node))
return 1;

return 0;
}

int comdb2_check_push_remote_write(Parse *pParse)
{
ast_node_t *anode = _get_ast_node(pParse);
if (!anode)
return 0;

node = (dohsql_node_t *)ast->stack[0].obj;
if (anode->op != AST_TYPE_INSERT &&
anode->op != AST_TYPE_DELETE &&
anode->op != AST_TYPE_UPDATE)
return 0;

if (node->type != AST_TYPE_SELECT)
Table *pTab = (Table*)anode->obj;
if (!pTab)
return 0;

if (!pParse->explain)
if (node->remotedb > 1)
if (!fdb_push_run(pParse, node))
return 1;
if (pTab->iDb > 1)
if (!fdb_push_write_setup(pParse, anode->op, pTab))
return 1;
return 0;
}

Expand Down
46 changes: 26 additions & 20 deletions db/fdb_bend_sql.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern int gbl_fdb_track;
extern int blockproc2sql_error(int rc, const char *func, int line);


int fdb_appsock_work(const char *cid, struct sqlclntstate *clnt, int version,
int fdb_appsock_work(const char *cid, sqlclntstate *clnt, int version,
enum run_sql_flags flags, char *sql, int sqllen,
char *trim_key, int trim_keylen, SBUF2 *sb)
{
Expand Down Expand Up @@ -72,11 +72,11 @@ int fdb_appsock_work(const char *cid, struct sqlclntstate *clnt, int version,
*
*/
int fdb_svc_cursor_open_sql(char *tid, char *cid, int code_release, int version,
int flags, struct sqlclntstate **pclnt)
int flags, sqlclntstate **pclnt)
{
struct sqlclntstate *clnt = NULL;
sqlclntstate *clnt = NULL;
/* we need to create a private clnt state */
clnt = (struct sqlclntstate *)calloc(1, sizeof(struct sqlclntstate));
clnt = (sqlclntstate *)calloc(1, sizeof(sqlclntstate));
if (!clnt) {
return -1;
}
Expand Down Expand Up @@ -117,8 +117,7 @@ int fdb_svc_sql_row(SBUF2 *sb, char *cid, char *row, int rowlen, int ret)
* masks every index as covered index
*
*/
int fdb_svc_alter_schema(struct sqlclntstate *clnt, sqlite3_stmt *stmt,
UnpackedRecord *upr)
int fdb_svc_alter_schema(sqlclntstate *clnt, sqlite3_stmt *stmt, UnpackedRecord *upr)
{
int rootpage;
int ixnum;
Expand Down Expand Up @@ -284,7 +283,7 @@ int fdb_svc_alter_schema(struct sqlclntstate *clnt, sqlite3_stmt *stmt,
return 0;
}

void init_sqlclntstate(struct sqlclntstate *clnt, char *tid)
void init_sqlclntstate(sqlclntstate *clnt, char *tid)
{
start_internal_sql_clnt(clnt);
clnt->dbtran.mode = TRANLEVEL_SOSQL;
Expand All @@ -301,9 +300,9 @@ void init_sqlclntstate(struct sqlclntstate *clnt, char *tid)
*/
int fdb_svc_trans_begin(char *tid, enum transaction_level lvl, int flags, int seq, struct sql_thread *thd,
char *dist_txnid, char *coordinator_dbname, char *coordinator_tier, int64_t timestamp,
struct sqlclntstate **pclnt)
sqlclntstate **pclnt)
{
struct sqlclntstate *clnt = NULL;
sqlclntstate *clnt = NULL;
int rc = 0;

*pclnt = NULL;
Expand All @@ -320,7 +319,7 @@ int fdb_svc_trans_begin(char *tid, enum transaction_level lvl, int flags, int se
return -1;
}

*pclnt = clnt = calloc(1, sizeof(struct sqlclntstate));
*pclnt = clnt = calloc(1, sizeof(sqlclntstate));
if (!clnt) {
return -1;
}
Expand Down Expand Up @@ -372,7 +371,7 @@ int fdb_svc_trans_begin(char *tid, enum transaction_level lvl, int flags, int se
*
*/
int fdb_svc_trans_commit(char *tid, enum transaction_level lvl,
struct sqlclntstate *clnt, int seq)
sqlclntstate *clnt, int seq)
{
int rc = 0, irc = 0;
int bdberr = 0;
Expand Down Expand Up @@ -475,7 +474,7 @@ int fdb_svc_trans_commit(char *tid, enum transaction_level lvl,
*
*/
int fdb_svc_trans_rollback(char *tid, enum transaction_level lvl,
struct sqlclntstate *clnt, int seq)
sqlclntstate *clnt, int seq)
{
int rc;
uuidstr_t us;
Expand Down Expand Up @@ -538,7 +537,7 @@ int fdb_svc_trans_rollback(char *tid, enum transaction_level lvl,
}

static struct sql_thread *
_fdb_svc_cursor_start(BtCursor *pCur, struct sqlclntstate *clnt, char *tblname,
_fdb_svc_cursor_start(BtCursor *pCur, sqlclntstate *clnt, char *tblname,
int rootpage, unsigned long long genid,
int need_bdbcursor, int *standalone)
{
Expand Down Expand Up @@ -638,7 +637,7 @@ _fdb_svc_cursor_start(BtCursor *pCur, struct sqlclntstate *clnt, char *tblname,
return thd;
}

static int _fdb_svc_cursor_end(BtCursor *pCur, struct sqlclntstate *clnt,
static int _fdb_svc_cursor_end(BtCursor *pCur, sqlclntstate *clnt,
int standalone)
{
int bdberr = 0;
Expand Down Expand Up @@ -737,7 +736,7 @@ static int _fdb_svc_indexes_to_ondisk(unsigned char **pIndexes, struct dbtable *
* Insert a sqlite row in the local transaction
*
*/
int fdb_svc_cursor_insert(struct sqlclntstate *clnt, char *tblname,
int fdb_svc_cursor_insert(sqlclntstate *clnt, char *tblname,
int rootpage, int version, unsigned long long genid,
char *data, int datalen, int seq)
{
Expand Down Expand Up @@ -819,7 +818,7 @@ int fdb_svc_cursor_insert(struct sqlclntstate *clnt, char *tblname,
* Delete a sqlite row in the local transaction
*
*/
int fdb_svc_cursor_delete(struct sqlclntstate *clnt, char *tblname,
int fdb_svc_cursor_delete(sqlclntstate *clnt, char *tblname,
int rootpage, int version, unsigned long long genid,
int seq)
{
Expand Down Expand Up @@ -866,7 +865,7 @@ int fdb_svc_cursor_delete(struct sqlclntstate *clnt, char *tblname,
* Update a sqlite row in the local transaction
*
*/
int fdb_svc_cursor_update(struct sqlclntstate *clnt, char *tblname,
int fdb_svc_cursor_update(sqlclntstate *clnt, char *tblname,
int rootpage, int version,
unsigned long long oldgenid, unsigned long long genid,
char *data, int datalen, int seq)
Expand Down Expand Up @@ -958,9 +957,9 @@ int fdb_svc_cursor_update(struct sqlclntstate *clnt, char *tblname,
* Return the sqlclntstate storing the shared transaction, if any
*
*/
struct sqlclntstate *fdb_svc_trans_get(char *tid)
sqlclntstate *fdb_svc_trans_get(char *tid)
{
struct sqlclntstate *clnt;
sqlclntstate *clnt;
int rc = 0;

int deadline = 0;
Expand Down Expand Up @@ -1001,7 +1000,7 @@ struct sqlclntstate *fdb_svc_trans_get(char *tid)
* Pack an sqlite result to be send to a remote db
*
*/
void fdb_sqlite_row(sqlite3_stmt *stmt, Mem *res)
void fdb_sqlite_row(sqlclntstate *clnt, sqlite3_stmt *stmt, Mem *res)
{
UnpackedRecord upr;
int nField;
Expand All @@ -1011,6 +1010,13 @@ void fdb_sqlite_row(sqlite3_stmt *stmt, Mem *res)
assert(upr.aMem);
upr.nField = nField;

/* special treatment for sqlite_master */
if (clnt->remsql_set.is_schema) {
int rc = fdb_svc_alter_schema(clnt, stmt, &upr);
if (rc) {
/* break; Ignore for now, this will run less optimized */
}
}
bzero(res, sizeof(*res));
sqlite3VdbeRecordPack(&upr, res);
}
Expand Down
1 change: 1 addition & 0 deletions db/fdb_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <sbuf2.h>
#include "comdb2.h"
#include "sql.h"
#include "fdb_fend.h"

enum {
/* all previous versions 0-4 are legacy and reserved */
Expand Down
Loading

0 comments on commit e907f7c

Please sign in to comment.