Skip to content

Commit

Permalink
Merge pull request #114 from vlcn-io/version-fixup
Browse files Browse the repository at this point in the history
🐛 - fix an issue where we would not converge in certain P2P scenarios
  • Loading branch information
tantaman authored Dec 22, 2022
2 parents 3f4d32c + bcd63bf commit c4ea0d9
Show file tree
Hide file tree
Showing 22 changed files with 370 additions and 356 deletions.
21 changes: 18 additions & 3 deletions core/src/changes-vtab-common.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@
#include "consts.h"
#include "util.h"

/**
* Extracts a where expression from the provided column names and list of `quote
* concatenated` column values.
*
* quote concated column values can be untrusted input as we validate those
* values.
*
* TODO: a future improvement would be to encode changesets into something like
* flat buffers so we can extract out individual values and bind them to the SQL
* statement. The values are currently represented on the wire in a text
* encoding that is not suitable for direct binding but rather for direct
* inclusion into the SQL string. We thus have to ensure we validate the
* provided string.
*/
char *crsql_extractWhereList(crsql_ColumnInfo *zColumnInfos, int columnInfosLen,
const char *quoteConcatedVals) {
char **zzParts = 0;
Expand Down Expand Up @@ -48,9 +62,10 @@ char *crsql_extractWhereList(crsql_ColumnInfo *zColumnInfos, int columnInfosLen,
return ret;
}

// parts must already be properly quoted and escaped for inclusion in a SQL
// statement
char *crsql_quotedValuesAsList(char **parts, int numParts) {
/**
* Should only be called by `quoteConcatedValuesAsList`
*/
static char *crsql_quotedValuesAsList(char **parts, int numParts) {
int len = 0;
for (int i = 0; i < numParts; ++i) {
len += strlen(parts[i]);
Expand Down
6 changes: 3 additions & 3 deletions core/src/changes-vtab-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ SQLITE_EXTENSION_INIT3
#define CHANGES_SINCE_VTAB_PK 1
#define CHANGES_SINCE_VTAB_CID 2
#define CHANGES_SINCE_VTAB_CVAL 3
#define CHANGES_SINCE_VTAB_VRSN 4
#define CHANGES_SINCE_VTAB_SITE_ID 5
#define CHANGES_SINCE_VTAB_COL_VRSN 4
#define CHANGES_SINCE_VTAB_DB_VRSN 5
#define CHANGES_SINCE_VTAB_SITE_ID 6

char *crsql_extractWhereList(crsql_ColumnInfo *zColumnInfos, int columnInfosLen,
const char *quoteConcatedVals);

char *crsql_quotedValuesAsList(char **parts, int len);
char *crsql_quoteConcatedValuesAsList(const char *quoteConcatedVals, int len);

#endif
12 changes: 7 additions & 5 deletions core/src/changes-vtab-read.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ char *crsql_changesQueryForTable(crsql_TableInfo *tableInfo, int idxNum) {
'%s' as tbl,\
%z as pks,\
__crsql_col_name as cid,\
__crsql_version as vrsn,\
__crsql_col_version as col_vrsn,\
__crsql_db_version as db_vrsn,\
__crsql_site_id as site_id\
FROM \"%s__crsql_clock\"\
WHERE\
site_id IS %s ?\
AND\
vrsn > ?",
db_vrsn > ?",
tableInfo->tblName, crsql_quoteConcat(tableInfo->pks, tableInfo->pksLen),
tableInfo->tblName, (idxNum & 8) == 8 ? "" : "NOT");

Expand Down Expand Up @@ -88,7 +89,8 @@ char *crsql_changesUnionQuery(crsql_TableInfo **tableInfos, int tableInfosLen,

// compose the final query
return sqlite3_mprintf(
"SELECT tbl, pks, cid, vrsn, site_id FROM (%z) ORDER BY vrsn, tbl ASC",
"SELECT tbl, pks, cid, col_vrsn, db_vrsn, site_id FROM (%z) ORDER BY "
"db_vrsn, tbl ASC",
unionsStr);
// %z frees unionsStr https://www.sqlite.org/printf.html#percentz
}
Expand Down Expand Up @@ -123,8 +125,8 @@ char *crsql_rowPatchDataQuery(sqlite3 *db, crsql_TableInfo *tblInfo,
return 0;
}
// TODO: should we `quote([])` so it fatals on missing columns?
// we'd need something other than `%w` to escape [ in order to prevent
// injection then.
// we'd need something other than `%w` to escape [
// %w assumes and escapes \"
char *zSql = sqlite3_mprintf("SELECT quote(\"%w\") FROM \"%w\" WHERE %z",
colName, tblInfo->tblName, pkWhereList);

Expand Down
5 changes: 3 additions & 2 deletions core/src/changes-vtab-read.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ char *crsql_changesQueryForTable(crsql_TableInfo *tableInfo, int idxNum);
#define TBL 0
#define PKS 1
#define CID 2
#define VRSN 3
#define SITE_ID 4
#define COL_VRSN 3
#define DB_VRSN 4
#define SITE_ID 5
char *crsql_changesUnionQuery(crsql_TableInfo **tableInfos, int tableInfosLen,
int idxNum);
char *crsql_rowPatchDataQuery(sqlite3 *db, crsql_TableInfo *tblInfo,
Expand Down
57 changes: 32 additions & 25 deletions core/src/changes-vtab-read.test.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ static void testChangesQueryForTable() {

assert(strcmp(query,
"SELECT \'foo\' as tbl, quote(\"a\") as pks, "
"__crsql_col_name as cid, __crsql_version as vrsn, "
"__crsql_col_name as cid, __crsql_col_version as "
"col_vrsn, __crsql_db_version as db_vrsn, "
"__crsql_site_id as site_id FROM \"foo__crsql_clock\" "
"WHERE site_id IS NOT ? AND vrsn > ?") == 0);
"WHERE site_id IS NOT ? AND db_vrsn > ?") == 0);
sqlite3_free(query);

query = crsql_changesQueryForTable(tblInfo, 8);

assert(strcmp(query,
"SELECT \'foo\' as tbl, quote(\"a\") as pks, "
"__crsql_col_name as cid, __crsql_version as vrsn, "
"__crsql_col_name as cid, __crsql_col_version as "
"col_vrsn, __crsql_db_version as db_vrsn, "
"__crsql_site_id as site_id FROM \"foo__crsql_clock\" "
"WHERE site_id IS ? AND vrsn > ?") == 0);
"WHERE site_id IS ? AND db_vrsn > ?") == 0);
sqlite3_free(query);

printf("\t\e[0;32mSuccess\e[0m\n");
Expand Down Expand Up @@ -81,29 +82,35 @@ static void testChangesUnionQuery() {
assert(rc == SQLITE_OK);

char *query = crsql_changesUnionQuery(tblInfos, 2, 6);
assert(strcmp(query,
"SELECT tbl, pks, cid, vrsn, site_id FROM (SELECT \'foo\' "
"as tbl, quote(\"a\") as pks, __crsql_col_name as "
"cid, __crsql_version as vrsn, __crsql_site_id as "
"site_id FROM \"foo__crsql_clock\" WHERE site_id IS "
"NOT ? AND vrsn > ? UNION SELECT \'bar\' as tbl, "
" quote(\"x\") as pks, __crsql_col_name as cid, "
"__crsql_version as vrsn, __crsql_site_id as site_id "
"FROM \"bar__crsql_clock\" WHERE site_id IS NOT ? "
"AND vrsn > ?) ORDER BY vrsn, tbl ASC") == 0);
assert(
strcmp(
query,
"SELECT tbl, pks, cid, col_vrsn, db_vrsn, site_id FROM (SELECT "
"\'foo\' as tbl, quote(\"a\") as pks, __crsql_col_name as "
"cid, __crsql_col_version as col_vrsn, __crsql_db_version "
"as db_vrsn, __crsql_site_id as site_id FROM "
"\"foo__crsql_clock\" WHERE site_id IS NOT ? AND "
"db_vrsn > ? UNION SELECT \'bar\' as tbl, quote(\"x\") as "
"pks, __crsql_col_name as cid, __crsql_col_version as "
"col_vrsn, __crsql_db_version as db_vrsn, __crsql_site_id "
"as site_id FROM \"bar__crsql_clock\" WHERE site_id IS "
"NOT ? AND db_vrsn > ?) ORDER BY db_vrsn, tbl ASC") == 0);
sqlite3_free(query);

query = crsql_changesUnionQuery(tblInfos, 2, 8);
assert(strcmp(query,
"SELECT tbl, pks, cid, vrsn, site_id FROM (SELECT \'foo\' "
"as tbl, quote(\"a\") as pks, __crsql_col_name as "
"cid, __crsql_version as vrsn, __crsql_site_id as "
"site_id FROM \"foo__crsql_clock\" WHERE site_id IS "
" ? AND vrsn > ? UNION SELECT \'bar\' as tbl, "
" quote(\"x\") as pks, __crsql_col_name as cid, "
"__crsql_version as vrsn, __crsql_site_id as site_id "
"FROM \"bar__crsql_clock\" WHERE site_id IS ? "
"AND vrsn > ?) ORDER BY vrsn, tbl ASC") == 0);
assert(
strcmp(
query,
"SELECT tbl, pks, cid, col_vrsn, db_vrsn, site_id FROM (SELECT "
"\'foo\' as tbl, quote(\"a\") as pks, __crsql_col_name as "
"cid, __crsql_col_version as col_vrsn, __crsql_db_version "
"as db_vrsn, __crsql_site_id as site_id FROM "
"\"foo__crsql_clock\" WHERE site_id IS ? AND "
"db_vrsn > ? UNION SELECT \'bar\' as tbl, quote(\"x\") as "
"pks, __crsql_col_name as cid, __crsql_col_version as "
"col_vrsn, __crsql_db_version as db_vrsn, __crsql_site_id "
"as site_id FROM \"bar__crsql_clock\" WHERE site_id IS ? "
" AND db_vrsn > ?) ORDER BY db_vrsn, tbl ASC") == 0);
sqlite3_free(query);

printf("\t\e[0;32mSuccess\e[0m\n");
Expand Down
72 changes: 38 additions & 34 deletions core/src/changes-vtab-write.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
int crsql_didCidWin(sqlite3 *db, const unsigned char *localSiteId,
const char *insertTbl, const char *pkWhereList,
const char *colName, const char *sanitizedInsertVal,
sqlite3_int64 version, char **errmsg) {
sqlite3_int64 colVersion, char **errmsg) {
char *zSql = 0;

zSql = sqlite3_mprintf(
"SELECT __crsql_version FROM \"%s__crsql_clock\" WHERE %s AND %Q = "
"SELECT __crsql_col_version FROM \"%s__crsql_clock\" WHERE %s AND %Q = "
"__crsql_col_name",
insertTbl, pkWhereList, colName);

Expand Down Expand Up @@ -63,15 +63,16 @@ int crsql_didCidWin(sqlite3 *db, const unsigned char *localSiteId,
sqlite3_int64 localVersion = sqlite3_column_int64(pStmt, 0);
sqlite3_finalize(pStmt);

if (version > localVersion) {
if (colVersion > localVersion) {
return 1;
} else if (version < localVersion) {
} else if (colVersion < localVersion) {
return 0;
}

// else -- versions are equal
// - pull curr value
// - compare for tie break
// TODO: pull bytes and memcmp instead of strcmp?
zSql = sqlite3_mprintf("SELECT quote(\"%w\") FROM \"%w\" WHERE %s", colName,
insertTbl, pkWhereList);
rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0);
Expand Down Expand Up @@ -133,19 +134,22 @@ int crsql_checkForLocalDelete(sqlite3 *db, const char *tblName,

int crsql_setWinnerClock(sqlite3 *db, crsql_TableInfo *tblInfo,
const char *pkIdentifierList, const char *pkValsStr,
const char *insertColName, sqlite3_int64 insertVrsn,
const void *insertSiteId, int insertSiteIdLen) {
const char *insertColName, sqlite3_int64 insertColVrsn,
sqlite3_int64 insertDbVrsn, const void *insertSiteId,
int insertSiteIdLen) {
int rc = SQLITE_OK;
char *zSql = sqlite3_mprintf(
"INSERT OR REPLACE INTO \"%s__crsql_clock\" \
(%s, \"__crsql_col_name\", \"__crsql_version\", \"__crsql_site_id\")\
(%s, \"__crsql_col_name\", \"__crsql_col_version\", \"__crsql_db_version\", \"__crsql_site_id\")\
VALUES (\
%s,\
%Q,\
%lld,\
MAX(crsql_nextdbversion(), %lld),\
?\
)",
tblInfo->tblName, pkIdentifierList, pkValsStr, insertColName, insertVrsn);
tblInfo->tblName, pkIdentifierList, pkValsStr, insertColName,
insertColVrsn, insertDbVrsn);

sqlite3_stmt *pStmt = 0;
rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0);
Expand Down Expand Up @@ -175,12 +179,9 @@ int crsql_setWinnerClock(sqlite3 *db, crsql_TableInfo *tblInfo,

int crsql_mergePkOnlyInsert(sqlite3 *db, crsql_TableInfo *tblInfo,
const char *pkValsStr, const char *pkIdentifiers,
sqlite3_int64 remoteVersion,
sqlite3_int64 remoteColVersion,
sqlite3_int64 remoteDbVersion,
const void *remoteSiteId, int remoteSiteIdLen) {
// TODO: do we need to check that the row doesn't alrdy
// exist?
// if it exists we can skip the merge pks only bc we alrdy have this state

char *zSql = sqlite3_mprintf("INSERT OR IGNORE INTO \"%s\" (%s) VALUES (%s)",
tblInfo->tblName, pkIdentifiers, pkValsStr);
int rc = sqlite3_exec(db, SET_SYNC_BIT, 0, 0, 0);
Expand All @@ -198,14 +199,15 @@ int crsql_mergePkOnlyInsert(sqlite3 *db, crsql_TableInfo *tblInfo,

// TODO: if insert was ignored, no reason to change clock
return crsql_setWinnerClock(db, tblInfo, pkIdentifiers, pkValsStr,
PKS_ONLY_CID_SENTINEL, remoteVersion,
remoteSiteId, remoteSiteIdLen);
PKS_ONLY_CID_SENTINEL, remoteColVersion,
remoteDbVersion, remoteSiteId, remoteSiteIdLen);
}

int crsql_mergeDelete(sqlite3 *db, crsql_TableInfo *tblInfo,
const char *pkWhereList, const char *pkValsStr,
const char *pkIdentifiers, sqlite3_int64 remoteVersion,
const void *remoteSiteId, int remoteSiteIdLen) {
const char *pkIdentifiers, sqlite3_int64 remoteColVersion,
sqlite3_int64 remoteDbVersion, const void *remoteSiteId,
int remoteSiteIdLen) {
char *zSql = sqlite3_mprintf("DELETE FROM \"%s\" WHERE %s", tblInfo->tblName,
pkWhereList);
int rc = sqlite3_exec(db, SET_SYNC_BIT, 0, 0, 0);
Expand All @@ -222,8 +224,8 @@ int crsql_mergeDelete(sqlite3 *db, crsql_TableInfo *tblInfo,
}

return crsql_setWinnerClock(db, tblInfo, pkIdentifiers, pkValsStr,
DELETE_CID_SENTINEL, remoteVersion, remoteSiteId,
remoteSiteIdLen);
DELETE_CID_SENTINEL, remoteColVersion,
remoteDbVersion, remoteSiteId, remoteSiteIdLen);
}

int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
Expand Down Expand Up @@ -270,8 +272,10 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
// splitquoteconcat for the validation
const unsigned char *insertVal =
sqlite3_value_text(argv[2 + CHANGES_SINCE_VTAB_CVAL]);
sqlite3_int64 insertVrsn =
sqlite3_value_int64(argv[2 + CHANGES_SINCE_VTAB_VRSN]);
sqlite3_int64 insertColVrsn =
sqlite3_value_int64(argv[2 + CHANGES_SINCE_VTAB_COL_VRSN]);
sqlite3_int64 insertDbVrsn =
sqlite3_value_int64(argv[2 + CHANGES_SINCE_VTAB_DB_VRSN]);

int insertSiteIdLen =
sqlite3_value_bytes(argv[2 + CHANGES_SINCE_VTAB_SITE_ID]);
Expand Down Expand Up @@ -326,9 +330,9 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
char *pkIdentifierList =
crsql_asIdentifierList(tblInfo->pks, tblInfo->pksLen, 0);
if (isDelete) {
rc =
crsql_mergeDelete(db, tblInfo, pkWhereList, pkValsStr, pkIdentifierList,
insertVrsn, insertSiteId, insertSiteIdLen);
rc = crsql_mergeDelete(db, tblInfo, pkWhereList, pkValsStr,
pkIdentifierList, insertColVrsn, insertDbVrsn,
insertSiteId, insertSiteIdLen);

sqlite3_free(pkWhereList);
sqlite3_free(pkValsStr);
Expand All @@ -339,7 +343,8 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
if (isPkOnly ||
!crsql_columnExists(insertColName, tblInfo->nonPks, tblInfo->nonPksLen)) {
rc = crsql_mergePkOnlyInsert(db, tblInfo, pkValsStr, pkIdentifierList,
insertVrsn, insertSiteId, insertSiteIdLen);
insertColVrsn, insertDbVrsn, insertSiteId,
insertSiteIdLen);
sqlite3_free(pkWhereList);
sqlite3_free(pkValsStr);
sqlite3_free(pkIdentifierList);
Expand All @@ -356,9 +361,9 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
return SQLITE_ERROR;
}

int doesCidWin =
crsql_didCidWin(db, pTab->pExtData->siteId, tblInfo->tblName, pkWhereList,
insertColName, sanitizedInsertVal[0], insertVrsn, errmsg);
int doesCidWin = crsql_didCidWin(
db, pTab->pExtData->siteId, tblInfo->tblName, pkWhereList, insertColName,
sanitizedInsertVal[0], insertColVrsn, errmsg);
sqlite3_free(pkWhereList);
if (doesCidWin == -1 || doesCidWin == 0) {
sqlite3_free(pkValsStr);
Expand All @@ -376,11 +381,10 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
zSql = sqlite3_mprintf(
"INSERT INTO \"%w\" (%s, \"%w\")\
VALUES (%s, %s)\
ON CONFLICT (%s) DO UPDATE\
ON CONFLICT DO UPDATE\
SET \"%w\" = %s",
tblInfo->tblName, pkIdentifierList, insertColName, pkValsStr,
sanitizedInsertVal[0], pkIdentifierList, insertColName,
sanitizedInsertVal[0]);
sanitizedInsertVal[0], insertColName, sanitizedInsertVal[0]);

sqlite3_free(sanitizedInsertVal[0]);
sqlite3_free(sanitizedInsertVal);
Expand All @@ -406,8 +410,8 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
}

rc = crsql_setWinnerClock(db, tblInfo, pkIdentifierList, pkValsStr,
insertColName, insertVrsn, insertSiteId,
insertSiteIdLen);
insertColName, insertColVrsn, insertDbVrsn,
insertSiteId, insertSiteIdLen);
sqlite3_free(pkIdentifierList);
sqlite3_free(pkValsStr);

Expand All @@ -419,6 +423,6 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
// the table.
// Is it fine if we prevent anyone from using `rowid` on a vtab?
// or must we convert to `without rowid`?
*pRowid = insertVrsn;
*pRowid = insertDbVrsn;
return rc;
}
2 changes: 1 addition & 1 deletion core/src/changes-vtab-write.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ int crsql_mergeInsert(sqlite3_vtab *pVTab, int argc, sqlite3_value **argv,
int crsql_didCidWin(sqlite3 *db, const unsigned char *localSiteId,
const char *insertTbl, const char *pkWhereList,
const char *colName, const char *sanitizedInsertVal,
sqlite3_int64 version, char **errmsg);
sqlite3_int64 dbVersion, char **errmsg);

#endif
Loading

0 comments on commit c4ea0d9

Please sign in to comment.