diff --git a/Cargo.lock b/Cargo.lock index 3de79aaa..f1912ff4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,6 +252,7 @@ dependencies = [ "serde", "serde_json", "sqlite_nostd", + "thiserror", "uuid", ] @@ -439,6 +440,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "unicode-ident" version = "1.0.11" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index e39d2b72..af3f3307 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,6 +22,7 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv const_format = "0.2.34" futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] } rustc-hash = { version = "2.1", default-features = false } +thiserror = { version = "2", default-features = false } [dependencies.uuid] version = "1.4.1" diff --git a/crates/core/src/bson/error.rs b/crates/core/src/bson/error.rs index dafeff3a..57e49cd2 100644 --- a/crates/core/src/bson/error.rs +++ b/crates/core/src/bson/error.rs @@ -47,6 +47,8 @@ impl BsonError { } } +impl core::error::Error for BsonError {} + impl Display for BsonError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { self.err.fmt(f) diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 7e962726..e7f0621b 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -11,7 +11,7 @@ use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context, Value}; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::sync::checkpoint::{validate_checkpoint, OwnedBucketChecksum}; use crate::sync::line::Checkpoint; @@ -24,9 +24,10 @@ struct CheckpointResult { fn powersync_validate_checkpoint_impl( ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { let data = args[0].text(); - let checkpoint: Checkpoint = serde_json::from_str(data)?; + let checkpoint: Checkpoint = + serde_json::from_str(data).map_err(PowerSyncError::as_argument_error)?; let db = ctx.db_handle(); let buckets: Vec = checkpoint .buckets @@ -45,7 +46,7 @@ fn powersync_validate_checkpoint_impl( failed_buckets: failed_buckets, }; - Ok(json::to_string(&result)?) + Ok(json::to_string(&result).map_err(PowerSyncError::internal)?) } create_sqlite_text_fn!( diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index 1cb0f934..ddffbfaf 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -12,7 +12,7 @@ use sqlite::{Connection, ResultCode, Value}; use sqlite_nostd::ManagedStmt; use sqlite_nostd::{self as sqlite, ColumnType}; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::ext::SafeManagedStmt; use crate::schema::TableInfoFlags; use crate::state::DatabaseState; @@ -81,11 +81,11 @@ impl VirtualTable { } } - fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> { + fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), PowerSyncError> { let current_tx = self .current_tx .as_mut() - .ok_or_else(|| SQLiteError::misuse("No tx_id"))?; + .ok_or_else(|| PowerSyncError::state_error("Not in tx"))?; let db = self.db; if self.state.is_in_sync_local.load(Ordering::Relaxed) { @@ -162,7 +162,8 @@ impl VirtualTable { } else { None }, - })?; + }) + .map_err(PowerSyncError::internal)?; stmt.bind_text(2, &serialized, sqlite::Destructor::STATIC)?; stmt.exec()?; @@ -178,7 +179,7 @@ impl VirtualTable { Ok(()) } - fn begin(&mut self) -> Result<(), SQLiteError> { + fn begin(&mut self) -> Result<(), PowerSyncError> { let db = self.db; // language=SQLite @@ -187,7 +188,7 @@ impl VirtualTable { let tx_id = if statement.step()? == ResultCode::ROW { statement.column_int64(0) - 1 } else { - return Err(SQLiteError::from(ResultCode::ABORT)); + return Err(PowerSyncError::unknown_internal()); }; self.current_tx = Some(ActiveCrudTransaction { diff --git a/crates/core/src/diff.rs b/crates/core/src/diff.rs index fd37a059..a32accfd 100644 --- a/crates/core/src/diff.rs +++ b/crates/core/src/diff.rs @@ -10,21 +10,21 @@ use sqlite_nostd::{Connection, Context, Value}; use serde_json as json; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; fn powersync_diff_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { let data_old = args[0].text(); let data_new = args[1].text(); diff_objects(data_old, data_new) } -pub fn diff_objects(data_old: &str, data_new: &str) -> Result { - let v_new: json::Value = json::from_str(data_new)?; - let v_old: json::Value = json::from_str(data_old)?; +pub fn diff_objects(data_old: &str, data_new: &str) -> Result { + let v_new: json::Value = json::from_str(data_new).map_err(PowerSyncError::as_argument_error)?; + let v_old: json::Value = json::from_str(data_old).map_err(PowerSyncError::as_argument_error)?; if let (json::Value::Object(mut left), json::Value::Object(mut right)) = (v_new, v_old) { // Remove all null values @@ -56,7 +56,7 @@ pub fn diff_objects(data_old: &str, data_new: &str) -> Result>); +pub struct PowerSyncError { + inner: Box, +} -impl SQLiteError { - pub fn with_description(code: ResultCode, message: impl Into>) -> Self { - Self(code, Some(message.into())) +impl PowerSyncError { + fn errstr(db: *mut sqlite3) -> Option { + let message = db.errmsg().unwrap_or(String::from("Conversion error")); + if message != "not an error" { + Some(message) + } else { + None + } } - pub fn misuse(message: impl Into>) -> Self { - Self::with_description(ResultCode::MISUSE, message) + pub fn from_sqlite( + db: *mut sqlite3, + code: ResultCode, + context: impl Into>, + ) -> Self { + RawPowerSyncError::Sqlite(SqliteError { + code, + errstr: Self::errstr(db), + context: Some(context.into()), + }) + .into() } -} -impl core::fmt::Display for SQLiteError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!(f, "SQLiteError: {:?}", self.0)?; - if let Some(desc) = &self.1 { - write!(f, ", desc: {}", desc)?; + pub fn argument_error(desc: impl Into>) -> Self { + RawPowerSyncError::ArgumentError { + desc: desc.into(), + cause: PowerSyncErrorCause::Unknown, } - Ok(()) + .into() + } + + /// Converts something that can be a [PowerSyncErrorCause] into an argument error. + /// + /// This can be used to represent e.g. JSON parsing errors as argument errors, e.g. with + /// ` serde_json::from_str(payload.text()).map_err(PowerSyncError::as_argument_error)`. + pub fn as_argument_error(cause: impl Into) -> Self { + RawPowerSyncError::ArgumentError { + desc: "".into(), + cause: cause.into(), + } + .into() + } + + pub fn json_local_error(cause: serde_json::Error) -> Self { + RawPowerSyncError::LocalDataError { + cause: PowerSyncErrorCause::Json(cause), + } + .into() + } + + pub fn state_error(desc: &'static str) -> Self { + RawPowerSyncError::StateError { desc }.into() + } + + pub fn sync_protocol_error(desc: &'static str, cause: impl Into) -> Self { + RawPowerSyncError::SyncProtocolError { + desc, + cause: cause.into(), + } + .into() + } + + /// A generic internal error. + /// + /// This should only be used rarely since this error provides no further details. + pub fn unknown_internal() -> Self { + Self::internal(PowerSyncErrorCause::Unknown) + } + + /// A generic internal error with an associated cause. + pub fn internal(cause: impl Into) -> Self { + RawPowerSyncError::Internal { + cause: cause.into(), + } + .into() + } + + pub fn missing_client_id() -> Self { + RawPowerSyncError::MissingClientId.into() } -} -impl SQLiteError { + pub fn down_migration_did_not_update_version(current_version: i32) -> Self { + return RawPowerSyncError::DownMigrationDidNotUpdateVersion { current_version }.into(); + } + + /// Applies this error to a function result context, setting the error code and a descriptive + /// text. pub fn apply_to_ctx(self, description: &str, ctx: *mut context) { - let SQLiteError(code, message) = self; + let mut desc = self.to_string(); + desc.insert_str(0, description); + desc.insert_str(description.len(), ": "); - if let Some(msg) = message { - ctx.result_error(&format!("{:} {:}", description, msg)); - } else { - let error = ctx.db_handle().errmsg().unwrap(); - if error == "not an error" { - ctx.result_error(&format!("{:}", description)); - } else { - ctx.result_error(&format!("{:} {:}", description, error)); - } + ctx.result_error(&desc); + ctx.result_error_code(self.sqlite_error_code()); + } + + pub fn sqlite_error_code(&self) -> ResultCode { + use RawPowerSyncError::*; + + match self.inner.as_ref() { + Sqlite(desc) => desc.code, + ArgumentError { .. } => ResultCode::CONSTRAINT_DATATYPE, + StateError { .. } => ResultCode::MISUSE, + MissingClientId + | SyncProtocolError { .. } + | DownMigrationDidNotUpdateVersion { .. } => ResultCode::ABORT, + LocalDataError { .. } => ResultCode::CORRUPT, + Internal { .. } => ResultCode::INTERNAL, } - ctx.result_error_code(code); } } -impl Error for SQLiteError {} +impl Display for PowerSyncError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + self.inner.fmt(f) + } +} -pub trait PSResult { - fn into_db_result(self, db: *mut sqlite3) -> Result; +impl Error for PowerSyncError {} + +impl From for PowerSyncError { + fn from(value: RawPowerSyncError) -> Self { + return PowerSyncError { + inner: Box::new(value), + }; + } } -impl PSResult for Result { - fn into_db_result(self, db: *mut sqlite3) -> Result { - if let Err(code) = self { - let message = db.errmsg().unwrap_or(String::from("Conversion error")); - if message == "not an error" { - Err(SQLiteError(code, None)) - } else { - Err(SQLiteError(code, Some(message.into()))) - } - } else if let Ok(r) = self { - Ok(r) - } else { - Err(SQLiteError(ResultCode::ABORT, None)) +impl From for PowerSyncError { + fn from(value: ResultCode) -> Self { + return RawPowerSyncError::Sqlite(SqliteError { + code: value, + errstr: None, + context: None, + }) + .into(); + } +} + +/// A structured enumeration of possible errors that can occur in the core extension. +#[derive(Error, Debug)] +enum RawPowerSyncError { + /// An internal call to SQLite made by the core extension has failed. We store the original + /// result code and an optional context describing what the core extension was trying to do when + /// the error occurred. + /// + /// We don't call `sqlite3_errstr` at the time the error is created. Instead, we stop using the + /// database, bubble the error up to the outermost function/vtab definition and then use + /// [PowerSyncError::description] to create a detailed error message. + /// + /// This error should _never_ be created for anything but rethrowing underlying SQLite errors. + #[error("{0}")] + Sqlite(SqliteError), + /// A user (e.g. the one calling a PowerSync function, likely an SDK) has provided invalid + /// arguments. + /// + /// This always indicates an error in how the core extension is used. + #[error("invalid argument: {desc}. {cause}")] + ArgumentError { + desc: Cow<'static, str>, + cause: PowerSyncErrorCause, + }, + /// A PowerSync function or vtab was used in a state where it's unavailable. + /// + /// This always indicates an error in how the core extension is used. + #[error("invalid state: {desc}")] + StateError { desc: &'static str }, + /// We've received a sync line we couldn't parse, or in a state where it doesn't make sense + /// (e.g. a checkpoint diff before we've ever received a checkpoint). + /// + /// This interrupts a sync iteration as we cannot reasonably continue afterwards (the client and + /// server are necessarily in diverged states). + #[error("Sync protocol error: {desc}. {cause}")] + SyncProtocolError { + desc: &'static str, + cause: PowerSyncErrorCause, + }, + /// There's invalid local data in the database (like malformed JSON in the oplog table). + #[error("invalid local data: {cause}")] + LocalDataError { cause: PowerSyncErrorCause }, + #[error("No client_id found in ps_kv")] + MissingClientId, + #[error("Down migration failed - version not updated from {current_version}")] + DownMigrationDidNotUpdateVersion { current_version: i32 }, + /// A catch-all for remaining internal errors that are very unlikely to happen. + #[error("Internal PowerSync error. {cause}")] + Internal { cause: PowerSyncErrorCause }, +} + +#[derive(Debug)] +struct SqliteError { + code: ResultCode, + errstr: Option, + context: Option>, +} + +impl Display for SqliteError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if let Some(context) = &self.context { + write!(f, "{}: ", context)?; } + + write!(f, "internal SQLite call returned {}", self.code)?; + if let Some(desc) = &self.errstr { + write!(f, ": {}", desc)? + } + + Ok(()) } } -impl From for SQLiteError { - fn from(value: ResultCode) -> Self { - SQLiteError(value, None) +pub trait PSResult { + fn into_db_result(self, db: *mut sqlite3) -> Result; +} + +impl PSResult for Result { + fn into_db_result(self, db: *mut sqlite3) -> Result { + self.map_err(|code| { + RawPowerSyncError::Sqlite(SqliteError { + code, + errstr: PowerSyncError::errstr(db), + context: None, + }) + .into() + }) } } -impl From for SQLiteError { +#[derive(Debug)] +pub enum PowerSyncErrorCause { + Json(serde_json::Error), + Bson(BsonError), + Unknown, +} + +impl From for PowerSyncErrorCause { fn from(value: serde_json::Error) -> Self { - SQLiteError::with_description(ResultCode::ABORT, value.to_string()) + return PowerSyncErrorCause::Json(value); } } -impl From for SQLiteError { - fn from(value: core::fmt::Error) -> Self { - SQLiteError::with_description(ResultCode::INTERNAL, format!("{}", value)) +impl From for PowerSyncErrorCause { + fn from(value: BsonError) -> Self { + return PowerSyncErrorCause::Bson(value); } } -impl From for SQLiteError { - fn from(value: BsonError) -> Self { - SQLiteError::with_description(ResultCode::ERROR, value.to_string()) +impl Display for PowerSyncErrorCause { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "cause: ")?; + + match self { + PowerSyncErrorCause::Json(error) => error.fmt(f), + PowerSyncErrorCause::Bson(error) => error.fmt(f), + PowerSyncErrorCause::Unknown => write!(f, "unknown"), + } } } diff --git a/crates/core/src/fix_data.rs b/crates/core/src/fix_data.rs index 8dcab1b6..98f4a735 100644 --- a/crates/core/src/fix_data.rs +++ b/crates/core/src/fix_data.rs @@ -4,7 +4,7 @@ use alloc::format; use alloc::string::String; use crate::create_sqlite_optional_text_fn; -use crate::error::{PSResult, SQLiteError}; +use crate::error::{PSResult, PowerSyncError}; use sqlite_nostd::{self as sqlite, ColumnType, Value}; use sqlite_nostd::{Connection, Context, ResultCode}; @@ -20,7 +20,7 @@ use crate::util::quote_identifier; // // The fix here is to find these dangling rows, and add them to ps_updated_rows. // The next time the sync_local operation is run, these rows will be removed. -pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result { +pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result { // language=SQLite let statement = db .prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'") @@ -119,7 +119,7 @@ fn remove_duplicate_key_encoding(key: &str) -> Option { fn powersync_remove_duplicate_key_encoding_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result, SQLiteError> { +) -> Result, PowerSyncError> { let arg = args.get(0).ok_or(ResultCode::MISUSE)?; if arg.value_type() != ColumnType::Text { diff --git a/crates/core/src/json_merge.rs b/crates/core/src/json_merge.rs index cb31479d..273c88c4 100644 --- a/crates/core/src/json_merge.rs +++ b/crates/core/src/json_merge.rs @@ -8,7 +8,7 @@ use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context, Value}; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; /// Given any number of JSON TEXT arguments, merge them into a single JSON object. /// @@ -17,7 +17,7 @@ use crate::error::SQLiteError; fn powersync_json_merge_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { if args.is_empty() { return Ok("{}".to_string()); } @@ -25,7 +25,7 @@ fn powersync_json_merge_impl( for arg in args { let chunk = arg.text(); if chunk.is_empty() || !chunk.starts_with('{') || !chunk.ends_with('}') { - return Err(SQLiteError::from(ResultCode::MISMATCH)); + return Err(PowerSyncError::argument_error("Expected json object")); } // Strip outer braces diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 06980bd8..82fc4ce7 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -9,19 +9,19 @@ use sqlite_nostd::{Connection, Context}; use crate::create_sqlite_optional_text_fn; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::sync::BucketPriority; fn powersync_client_id_impl( ctx: *mut sqlite::context, _args: &[*mut sqlite::value], -) -> Result { +) -> Result { let db = ctx.db_handle(); client_id(db) } -pub fn client_id(db: *mut sqlite::sqlite3) -> Result { +pub fn client_id(db: *mut sqlite::sqlite3) -> Result { // language=SQLite let statement = db.prepare_v2("select value from ps_kv where key = 'client_id'")?; @@ -29,10 +29,7 @@ pub fn client_id(db: *mut sqlite::sqlite3) -> Result { let client_id = statement.column_text(0)?; Ok(client_id.to_string()) } else { - Err(SQLiteError::with_description( - ResultCode::ABORT, - "No client_id found in ps_kv", - )) + Err(PowerSyncError::missing_client_id()) } } @@ -45,7 +42,7 @@ create_sqlite_text_fn!( fn powersync_last_synced_at_impl( ctx: *mut sqlite::context, _args: &[*mut sqlite::value], -) -> Result, SQLiteError> { +) -> Result, ResultCode> { let db = ctx.db_handle(); // language=SQLite diff --git a/crates/core/src/macros.rs b/crates/core/src/macros.rs index 459a5ab8..6d7d25ae 100644 --- a/crates/core/src/macros.rs +++ b/crates/core/src/macros.rs @@ -11,7 +11,7 @@ macro_rules! create_sqlite_text_fn { let result = $fn_impl_name(ctx, args); if let Err(err) = result { - SQLiteError::from(err).apply_to_ctx($description, ctx); + PowerSyncError::from(err).apply_to_ctx($description, ctx); } else if let Ok(r) = result { ctx.result_text_transient(&r); } @@ -32,7 +32,7 @@ macro_rules! create_sqlite_optional_text_fn { let result = $fn_impl_name(ctx, args); if let Err(err) = result { - SQLiteError::from(err).apply_to_ctx($description, ctx); + PowerSyncError::from(err).apply_to_ctx($description, ctx); } else if let Ok(r) = result { if let Some(s) = r { ctx.result_text_transient(&s); @@ -54,7 +54,7 @@ macro_rules! create_auto_tx_function { fn $fn_name( ctx: *mut sqlite::context, args: &[*mut sqlite::value], - ) -> Result { + ) -> Result { let db = ctx.db_handle(); // Auto-start a transaction if we're not in a transaction diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 81a94634..440548d4 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -8,7 +8,7 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::error::{PSResult, SQLiteError}; +use crate::error::{PSResult, PowerSyncError}; use crate::fix_data::apply_v035_fix; use crate::sync::BucketPriority; @@ -17,7 +17,7 @@ pub const LATEST_VERSION: i32 = 10; pub fn powersync_migrate( ctx: *mut sqlite::context, target_version: i32, -) -> Result<(), SQLiteError> { +) -> Result<(), PowerSyncError> { let local_db = ctx.db_handle(); // language=SQLite @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?; let rc = current_version_stmt.step()?; if rc != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); + return Err(PowerSyncError::unknown_internal()); } let mut current_version = current_version_stmt.column_int(0); @@ -55,7 +55,8 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations for sql in down_sql { let rs = local_db.exec_safe(&sql); if let Err(code) = rs { - return Err(SQLiteError::with_description( + return Err(PowerSyncError::from_sqlite( + local_db, code, format!( "Down migration failed for {:} {:} {:}", @@ -73,20 +74,17 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations current_version_stmt.reset()?; let rc = current_version_stmt.step()?; if rc != ResultCode::ROW { - return Err(SQLiteError::with_description( + return Err(PowerSyncError::from_sqlite( + local_db, rc, "Down migration failed - could not get version", )); } let new_version = current_version_stmt.column_int(0); if new_version >= current_version { - // Database down from version $currentVersion to $version failed - version not updated after dow migration - return Err(SQLiteError::with_description( - ResultCode::ABORT, - format!( - "Down migration failed - version not updated from {:}", - current_version - ), + // Database down from version $currentVersion to $version failed - version not updated after down migration + return Err(PowerSyncError::down_migration_did_not_update_version( + current_version, )); } current_version = new_version; diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 0c031b94..fe265a6f 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,4 +1,4 @@ -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::sync::line::DataLine; use crate::sync::operations::insert_bucket_operations; use crate::sync::storage_adapter::StorageAdapter; @@ -10,14 +10,15 @@ use sqlite_nostd::{Connection, ResultCode}; use crate::ext::SafeManagedStmt; // Run inside a transaction -pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { +pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), PowerSyncError> { #[derive(Deserialize)] struct BucketBatch<'a> { #[serde(borrow)] buckets: Vec>, } - let batch: BucketBatch = serde_json::from_str(data)?; + let batch: BucketBatch = + serde_json::from_str(data).map_err(PowerSyncError::as_argument_error)?; let adapter = StorageAdapter::new(db)?; for line in &batch.buckets { @@ -27,19 +28,19 @@ pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLi Ok(()) } -pub fn clear_remove_ops(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { +pub fn clear_remove_ops(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), ResultCode> { // No-op Ok(()) } -pub fn delete_pending_buckets(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { +pub fn delete_pending_buckets(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), ResultCode> { // No-op Ok(()) } -pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> { +pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), ResultCode> { // language=SQLite let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1 RETURNING id")?; statement.bind_text(1, name, sqlite::Destructor::STATIC)?; diff --git a/crates/core/src/schema/management.rs b/crates/core/src/schema/management.rs index bccba3f0..b8d0fd2f 100644 --- a/crates/core/src/schema/management.rs +++ b/crates/core/src/schema/management.rs @@ -9,14 +9,14 @@ use sqlite::{Connection, ResultCode, Value}; use sqlite_nostd as sqlite; use sqlite_nostd::Context; -use crate::error::{PSResult, SQLiteError}; +use crate::error::{PSResult, PowerSyncError}; use crate::ext::ExtendedDatabase; use crate::util::{quote_identifier, quote_json_path}; use crate::{create_auto_tx_function, create_sqlite_text_fn}; use super::Schema; -fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { +fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), PowerSyncError> { { // In a block so that the statement is finalized before dropping tables // language=SQLite @@ -138,9 +138,10 @@ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN ( Ok(()) } -fn update_indexes(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { +fn update_indexes(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), PowerSyncError> { let mut statements: Vec = alloc::vec![]; - let schema = serde_json::from_str::(schema)?; + let schema = + serde_json::from_str::(schema).map_err(PowerSyncError::as_argument_error)?; let mut expected_index_names: Vec = vec![]; { @@ -215,7 +216,8 @@ SELECT ", ) .into_db_result(db)?; - let json_names = serde_json::to_string(&expected_index_names)?; + let json_names = serde_json::to_string(&expected_index_names) + .map_err(PowerSyncError::as_argument_error)?; statement.bind_text(1, &json_names, sqlite::Destructor::STATIC)?; while statement.step()? == ResultCode::ROW { @@ -234,7 +236,7 @@ SELECT Ok(()) } -fn update_views(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { +fn update_views(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), PowerSyncError> { // Update existing views if modified // language=SQLite db.exec_text("\ @@ -294,7 +296,7 @@ DELETE FROM powersync_views WHERE name NOT IN ( fn powersync_replace_schema_impl( ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { let schema = args[0].text(); let db = ctx.db_handle(); diff --git a/crates/core/src/sync/bucket_priority.rs b/crates/core/src/sync/bucket_priority.rs index a69f2a60..9e715be3 100644 --- a/crates/core/src/sync/bucket_priority.rs +++ b/crates/core/src/sync/bucket_priority.rs @@ -1,7 +1,6 @@ use serde::{de::Visitor, Deserialize, Serialize}; -use sqlite_nostd::ResultCode; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; #[repr(transparent)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] @@ -23,13 +22,12 @@ impl BucketPriority { } impl TryFrom for BucketPriority { - type Error = SQLiteError; + type Error = PowerSyncError; fn try_from(value: i32) -> Result { if value < BucketPriority::HIGHEST.number || value == Self::SENTINEL.number { - return Err(SQLiteError( - ResultCode::MISUSE, - Some("Invalid bucket priority".into()), + return Err(PowerSyncError::argument_error( + "Invalid bucket priority value", )); } @@ -72,7 +70,7 @@ impl<'de> Deserialize<'de> for BucketPriority { where E: serde::de::Error, { - BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default())) + BucketPriority::try_from(v).map_err(|e| E::custom(e)) } fn visit_i64(self, v: i64) -> Result diff --git a/crates/core/src/sync/checkpoint.rs b/crates/core/src/sync/checkpoint.rs index 57c9b7ce..0c105a99 100644 --- a/crates/core/src/sync/checkpoint.rs +++ b/crates/core/src/sync/checkpoint.rs @@ -1,10 +1,7 @@ use alloc::{string::String, vec::Vec}; use num_traits::Zero; -use crate::{ - error::SQLiteError, - sync::{line::BucketChecksum, BucketPriority, Checksum}, -}; +use crate::sync::{line::BucketChecksum, BucketPriority, Checksum}; use sqlite_nostd::{self as sqlite, Connection, ResultCode}; /// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from @@ -48,7 +45,7 @@ pub fn validate_checkpoint<'a>( buckets: impl Iterator, priority: Option, db: *mut sqlite::sqlite3, -) -> Result, SQLiteError> { +) -> Result, ResultCode> { // language=SQLite let statement = db.prepare_v2( " diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index b34dc01d..65245cfc 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -11,7 +11,7 @@ use sqlite::{ResultCode, Value}; use sqlite_nostd::{self as sqlite, ColumnType}; use sqlite_nostd::{Connection, Context}; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; @@ -128,26 +128,30 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<( argc: c_int, argv: *mut *mut sqlite::value, ) -> () { - let result = (|| -> Result<(), SQLiteError> { + let result = (|| -> Result<(), PowerSyncError> { debug_assert!(!ctx.db_handle().get_autocommit()); let controller = unsafe { ctx.user_data().cast::().as_mut() } - .ok_or_else(|| SQLiteError::from(ResultCode::INTERNAL))?; + .ok_or_else(|| PowerSyncError::unknown_internal())?; let args = sqlite::args!(argc, argv); let [op, payload] = args else { - return Err(ResultCode::MISUSE.into()); + // This should be unreachable, we register the function with two arguments. + return Err(PowerSyncError::unknown_internal()); }; if op.value_type() != ColumnType::Text { - return Err(SQLiteError::misuse("First argument must be a string")); + return Err(PowerSyncError::argument_error( + "First argument must be a string", + )); } let op = op.text(); let event = match op { "start" => SyncControlRequest::StartSyncStream({ if payload.value_type() == ColumnType::Text { - serde_json::from_str(payload.text())? + serde_json::from_str(payload.text()) + .map_err(PowerSyncError::as_argument_error)? } else { StartSyncStream::default() } @@ -157,25 +161,30 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<( data: if payload.value_type() == ColumnType::Text { payload.text() } else { - return Err(SQLiteError::misuse("Second argument must be a string")); + return Err(PowerSyncError::argument_error( + "Second argument must be a string", + )); }, }), "line_binary" => SyncControlRequest::SyncEvent(SyncEvent::BinaryLine { data: if payload.value_type() == ColumnType::Blob { payload.blob() } else { - return Err(SQLiteError::misuse("Second argument must be a byte array")); + return Err(PowerSyncError::argument_error( + "Second argument must be a byte array", + )); }, }), "refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken), "completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished), _ => { - return Err(SQLiteError::misuse("Unknown operation")); + return Err(PowerSyncError::argument_error("Unknown operation")); } }; let instructions = controller.client.push_event(event)?; - let formatted = serde_json::to_string(&instructions)?; + let formatted = + serde_json::to_string(&instructions).map_err(PowerSyncError::internal)?; ctx.result_text_transient(&formatted); Ok(()) diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs index 29c23f3d..5e4ee011 100644 --- a/crates/core/src/sync/operations.rs +++ b/crates/core/src/sync/operations.rs @@ -5,7 +5,7 @@ use sqlite_nostd::Connection; use sqlite_nostd::{self as sqlite, ResultCode}; use crate::{ - error::{PSResult, SQLiteError}, + error::{PSResult, PowerSyncError}, ext::SafeManagedStmt, }; @@ -19,7 +19,7 @@ use super::{ pub fn insert_bucket_operations( adapter: &StorageAdapter, data: &DataLine, -) -> Result<(), SQLiteError> { +) -> Result<(), PowerSyncError> { let db = adapter.db; let BucketInfo { id: bucket_id, diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index a7c736ac..a68d891a 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -5,7 +5,7 @@ use serde::Serialize; use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; use crate::{ - error::SQLiteError, + error::{PSResult, PowerSyncError}, ext::SafeManagedStmt, operations::delete_bucket, schema::Schema, @@ -31,10 +31,11 @@ pub struct StorageAdapter { } impl StorageAdapter { - pub fn new(db: *mut sqlite::sqlite3) -> Result { + pub fn new(db: *mut sqlite::sqlite3) -> Result { // language=SQLite - let progress = - db.prepare_v2("SELECT name, count_at_last, count_since_last FROM ps_buckets")?; + let progress = db + .prepare_v2("SELECT name, count_at_last, count_since_last FROM ps_buckets") + .into_db_result(db)?; // language=SQLite let time = db.prepare_v2("SELECT unixepoch()")?; @@ -46,11 +47,11 @@ impl StorageAdapter { }) } - pub fn collect_bucket_requests(&self) -> Result, SQLiteError> { + pub fn collect_bucket_requests(&self) -> Result, PowerSyncError> { // language=SQLite let statement = self.db.prepare_v2( "SELECT name, last_op FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'", - )?; + ).into_db_result(self.db)?; let mut requests = Vec::::new(); @@ -70,7 +71,7 @@ impl StorageAdapter { pub fn delete_buckets<'a>( &self, buckets: impl IntoIterator, - ) -> Result<(), SQLiteError> { + ) -> Result<(), ResultCode> { for bucket in buckets { // TODO: This is a neat opportunity to create the statements here and cache them delete_bucket(self.db, bucket)?; @@ -97,23 +98,27 @@ impl StorageAdapter { } } - pub fn reset_progress(&self) -> Result<(), ResultCode> { + pub fn reset_progress(&self) -> Result<(), PowerSyncError> { self.db - .exec_safe("UPDATE ps_buckets SET count_since_last = 0, count_at_last = 0;")?; + .exec_safe("UPDATE ps_buckets SET count_since_last = 0, count_at_last = 0;") + .into_db_result(self.db)?; Ok(()) } - pub fn lookup_bucket(&self, bucket: &str) -> Result { + pub fn lookup_bucket(&self, bucket: &str) -> Result { // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. // We can consider splitting this into separate SELECT and INSERT statements. // language=SQLite - let bucket_statement = self.db.prepare_v2( - "INSERT INTO ps_buckets(name) + let bucket_statement = self + .db + .prepare_v2( + "INSERT INTO ps_buckets(name) VALUES(?) ON CONFLICT DO UPDATE SET last_applied_op = last_applied_op RETURNING id, last_applied_op", - )?; + ) + .into_db_result(self.db)?; bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; let res = bucket_statement.step()?; debug_assert_matches!(res, ResultCode::ROW); @@ -133,7 +138,7 @@ impl StorageAdapter { checkpoint: &OwnedCheckpoint, priority: Option, schema: &Schema, - ) -> Result { + ) -> Result { let mismatched_checksums = validate_checkpoint(checkpoint.buckets.values(), priority, self.db)?; @@ -147,7 +152,8 @@ impl StorageAdapter { let update_bucket = self .db - .prepare_v2("UPDATE ps_buckets SET last_op = ? WHERE name = ?")?; + .prepare_v2("UPDATE ps_buckets SET last_op = ? WHERE name = ?") + .into_db_result(self.db)?; for bucket in checkpoint.buckets.values() { if bucket.is_in_priority(priority) { @@ -192,7 +198,8 @@ impl StorageAdapter { }; // TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters. - let serialized_args = serde_json::to_string(&args)?; + let serialized_args = + serde_json::to_string(&args).map_err(PowerSyncError::internal)?; let mut sync = SyncOperation::new( state, self.db, @@ -213,7 +220,7 @@ impl StorageAdapter { // partial completions. let update = self.db.prepare_v2( "UPDATE ps_buckets SET count_since_last = 0, count_at_last = ? WHERE name = ?", - )?; + ).into_db_result(self.db)?; for bucket in checkpoint.buckets.values() { if let Some(count) = bucket.count { diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 872673e1..3d6209de 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -17,7 +17,7 @@ use futures_lite::FutureExt; use crate::{ bson, - error::SQLiteError, + error::{PowerSyncError, PowerSyncErrorCause}, kv::client_id, state::DatabaseState, sync::{checkpoint::OwnedBucketChecksum, interface::StartSyncStream}, @@ -56,7 +56,7 @@ impl SyncClient { pub fn push_event<'a>( &mut self, event: SyncControlRequest<'a>, - ) -> Result, SQLiteError> { + ) -> Result, PowerSyncError> { match event { SyncControlRequest::StartSyncStream(options) => { self.state.tear_down()?; @@ -71,7 +71,7 @@ impl SyncClient { let mut active = ActiveEvent::new(sync_event); let ClientState::IterationActive(handle) = &mut self.state else { - return Err(SQLiteError::misuse("No iteration is active")); + return Err(PowerSyncError::state_error("No iteration is active")); }; match handle.run(&mut active) { @@ -101,7 +101,7 @@ enum ClientState { } impl ClientState { - fn tear_down(&mut self) -> Result, SQLiteError> { + fn tear_down(&mut self) -> Result, PowerSyncError> { let mut event = ActiveEvent::new(SyncEvent::TearDown); if let ClientState::IterationActive(old) = self { @@ -120,7 +120,7 @@ impl ClientState { /// At each invocation, the future is polled once (and gets access to context that allows it to /// render [Instruction]s to return from the function). struct SyncIterationHandle { - future: Pin>>>, + future: Pin>>>, } impl SyncIterationHandle { @@ -130,7 +130,7 @@ impl SyncIterationHandle { db: *mut sqlite::sqlite3, options: StartSyncStream, state: Arc, - ) -> Result { + ) -> Result { let runner = StreamingSyncIteration { db, options, @@ -145,7 +145,7 @@ impl SyncIterationHandle { /// Forwards a [SyncEvent::Initialize] to the current sync iteration, returning the initial /// instructions generated. - fn initialize(&mut self) -> Result, SQLiteError> { + fn initialize(&mut self) -> Result, PowerSyncError> { let mut event = ActiveEvent::new(SyncEvent::Initialize); let result = self.run(&mut event)?; assert!(!result, "Stream client aborted initialization"); @@ -153,7 +153,7 @@ impl SyncIterationHandle { Ok(event.instructions) } - fn run(&mut self, active: &mut ActiveEvent) -> Result { + fn run(&mut self, active: &mut ActiveEvent) -> Result { // Using a noop waker because the only event thing StreamingSyncIteration::run polls on is // the next incoming sync event. let waker = unsafe { @@ -229,7 +229,7 @@ impl StreamingSyncIteration { Wait { a: PhantomData } } - async fn run(mut self) -> Result<(), SQLiteError> { + async fn run(mut self) -> Result<(), PowerSyncError> { let mut target = SyncTarget::BeforeCheckpoint(self.prepare_request().await?); // A checkpoint that has been fully received and validated, but couldn't be applied due to @@ -249,8 +249,10 @@ impl StreamingSyncIteration { .update(|s| s.disconnect(), &mut event.instructions); break; } - SyncEvent::TextLine { data } => serde_json::from_str(data)?, - SyncEvent::BinaryLine { data } => bson::from_bytes(data)?, + SyncEvent::TextLine { data } => serde_json::from_str(data) + .map_err(|e| PowerSyncError::sync_protocol_error("invalid text line", e))?, + SyncEvent::BinaryLine { data } => bson::from_bytes(data) + .map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?, SyncEvent::UploadFinished => { if let Some(checkpoint) = validated_but_not_applied.take() { let result = self.adapter.sync_local( @@ -305,9 +307,9 @@ impl StreamingSyncIteration { } SyncLine::CheckpointDiff(diff) => { let Some(target) = target.target_checkpoint_mut() else { - return Err(SQLiteError::with_description( - ResultCode::ABORT, + return Err(PowerSyncError::sync_protocol_error( "Received checkpoint_diff without previous checkpoint", + PowerSyncErrorCause::Unknown, )); }; @@ -324,9 +326,9 @@ impl StreamingSyncIteration { } SyncLine::CheckpointComplete(_) => { let Some(target) = target.target_checkpoint_mut() else { - return Err(SQLiteError::with_description( - ResultCode::ABORT, + return Err(PowerSyncError::sync_protocol_error( "Received checkpoint complete without previous checkpoint", + PowerSyncErrorCause::Unknown, )); }; let result = @@ -366,8 +368,7 @@ impl StreamingSyncIteration { SyncLine::CheckpointPartiallyComplete(complete) => { let priority = complete.priority; let Some(target) = target.target_checkpoint_mut() else { - return Err(SQLiteError::with_description( - ResultCode::ABORT, + return Err(PowerSyncError::state_error( "Received checkpoint complete without previous checkpoint", )); }; @@ -437,7 +438,7 @@ impl StreamingSyncIteration { fn load_progress( &self, checkpoint: &OwnedCheckpoint, - ) -> Result { + ) -> Result { let SyncProgressFromCheckpoint { progress, needs_counter_reset, @@ -454,10 +455,12 @@ impl StreamingSyncIteration { /// /// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket /// parameters. - async fn prepare_request(&mut self) -> Result, SQLiteError> { + async fn prepare_request(&mut self) -> Result, PowerSyncError> { let event = Self::receive_event().await; let SyncEvent::Initialize = event.event else { - return Err(SQLiteError::from(ResultCode::MISUSE)); + return Err(PowerSyncError::argument_error( + "first event must initialize", + )); }; self.status diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 69c3764f..ec4a8869 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -4,7 +4,7 @@ use alloc::string::String; use alloc::vec::Vec; use serde::Deserialize; -use crate::error::{PSResult, SQLiteError}; +use crate::error::{PSResult, PowerSyncError}; use crate::schema::{PendingStatement, PendingStatementValue, RawTable, Schema}; use crate::state::DatabaseState; use crate::sync::BucketPriority; @@ -18,8 +18,9 @@ pub fn sync_local( state: &DatabaseState, db: *mut sqlite::sqlite3, data: &V, -) -> Result { - let mut operation: SyncOperation<'_> = SyncOperation::from_args(state, db, data)?; +) -> Result { + let mut operation: SyncOperation<'_> = + SyncOperation::from_args(state, db, data).map_err(PowerSyncError::as_argument_error)?; operation.apply() } @@ -43,7 +44,7 @@ impl<'a> SyncOperation<'a> { state: &'a DatabaseState, db: *mut sqlite::sqlite3, data: &'a V, - ) -> Result { + ) -> Result { Ok(Self::new( state, db, @@ -89,7 +90,7 @@ impl<'a> SyncOperation<'a> { self.schema.add_from_schema(schema); } - fn can_apply_sync_changes(&self) -> Result { + fn can_apply_sync_changes(&self) -> Result { // Don't publish downloaded data until the upload queue is empty (except for downloaded data // in priority 0, which is published earlier). @@ -108,7 +109,7 @@ impl<'a> SyncOperation<'a> { )?; if statement.step()? != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); + return Err(PowerSyncError::unknown_internal()); } if statement.column_type(0)? == ColumnType::Text { @@ -124,7 +125,7 @@ impl<'a> SyncOperation<'a> { Ok(true) } - pub fn apply(&mut self) -> Result { + pub fn apply(&mut self) -> Result { let guard = self.state.sync_local_guard(); if !self.can_apply_sync_changes()? { @@ -154,7 +155,8 @@ impl<'a> SyncOperation<'a> { match data { Ok(data) => { let stmt = raw.put_statement(self.db)?; - let parsed: serde_json::Value = serde_json::from_str(data)?; + let parsed: serde_json::Value = serde_json::from_str(data) + .map_err(PowerSyncError::json_local_error)?; stmt.bind_for_put(id, &parsed)?; stmt.stmt.exec()?; } @@ -251,11 +253,11 @@ impl<'a> SyncOperation<'a> { Ok(1) } - fn collect_tables(&mut self) -> Result<(), SQLiteError> { + fn collect_tables(&mut self) -> Result<(), PowerSyncError> { self.schema.add_from_db(self.db) } - fn collect_full_operations(&self) -> Result { + fn collect_full_operations(&self) -> Result { Ok(match &self.partial { None => { // Complete sync @@ -332,7 +334,7 @@ SELECT }) } - fn set_last_applied_op(&self) -> Result<(), SQLiteError> { + fn set_last_applied_op(&self) -> Result<(), PowerSyncError> { match &self.partial { Some(partial) => { // language=SQLite @@ -343,8 +345,7 @@ SELECT SET last_applied_op = last_op WHERE last_applied_op != last_op AND name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))", - ) - .into_db_result(self.db)?; + ) .into_db_result(self.db)?; updated.bind_text(1, partial.args, Destructor::STATIC)?; updated.exec()?; } @@ -363,7 +364,7 @@ SELECT Ok(()) } - fn mark_completed(&self) -> Result<(), SQLiteError> { + fn mark_completed(&self) -> Result<(), PowerSyncError> { let priority_code: i32 = match &self.partial { None => { // language=SQLite @@ -390,8 +391,7 @@ SELECT // language=SQLite let stmt = self .db - .prepare_v2("INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, datetime());") - .into_db_result(self.db)?; + .prepare_v2("INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, datetime());") .into_db_result(self.db)?; stmt.bind_int(1, priority_code)?; stmt.exec()?; @@ -417,7 +417,7 @@ impl<'a> ParsedDatabaseSchema<'a> { } } - fn add_from_db(&mut self, db: *mut sqlite::sqlite3) -> Result<(), SQLiteError> { + fn add_from_db(&mut self, db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> { // language=SQLite let statement = db .prepare_v2( @@ -452,7 +452,7 @@ impl<'a> RawTableWithCachedStatements<'a> { fn put_statement( &mut self, db: *mut sqlite::sqlite3, - ) -> Result<&PreparedPendingStatement, SQLiteError> { + ) -> Result<&PreparedPendingStatement, PowerSyncError> { let cache_slot = &mut self.cached_put; if let None = cache_slot { let stmt = PreparedPendingStatement::prepare(db, &self.definition.put)?; @@ -465,7 +465,7 @@ impl<'a> RawTableWithCachedStatements<'a> { fn delete_statement( &mut self, db: *mut sqlite::sqlite3, - ) -> Result<&PreparedPendingStatement, SQLiteError> { + ) -> Result<&PreparedPendingStatement, PowerSyncError> { let cache_slot = &mut self.cached_delete; if let None = cache_slot { let stmt = PreparedPendingStatement::prepare(db, &self.definition.delete)?; @@ -501,10 +501,10 @@ impl<'a> PreparedPendingStatement<'a> { pub fn prepare( db: *mut sqlite::sqlite3, pending: &'a PendingStatement, - ) -> Result { - let stmt = db.prepare_v2(&pending.sql)?; + ) -> Result { + let stmt = db.prepare_v2(&pending.sql).into_db_result(db)?; if stmt.bind_parameter_count() as usize != pending.params.len() { - return Err(SQLiteError::misuse(format!( + return Err(PowerSyncError::argument_error(format!( "Statement {} has {} parameters, but {} values were provided as sources.", &pending.sql, stmt.bind_parameter_count(), @@ -512,7 +512,7 @@ impl<'a> PreparedPendingStatement<'a> { ))); } - // TODO: Compare number of variables / other validity checks? + // TODO: other validity checks? Ok(Self { stmt, @@ -520,7 +520,11 @@ impl<'a> PreparedPendingStatement<'a> { }) } - pub fn bind_for_put(&self, id: &str, json_data: &serde_json::Value) -> Result<(), SQLiteError> { + pub fn bind_for_put( + &self, + id: &str, + json_data: &serde_json::Value, + ) -> Result<(), PowerSyncError> { use serde_json::Value; for (i, source) in self.params.iter().enumerate() { let i = (i + 1) as i32; @@ -531,10 +535,7 @@ impl<'a> PreparedPendingStatement<'a> { } PendingStatementValue::Column(column) => { let parsed = json_data.as_object().ok_or_else(|| { - SQLiteError::with_description( - ResultCode::CONSTRAINT_DATATYPE, - "expected oplog data to be an object", - ) + PowerSyncError::argument_error("expected oplog data to be an object") })?; match parsed.get(column) { @@ -562,13 +563,13 @@ impl<'a> PreparedPendingStatement<'a> { Ok(()) } - pub fn bind_for_delete(&self, id: &str) -> Result<(), SQLiteError> { + pub fn bind_for_delete(&self, id: &str) -> Result<(), PowerSyncError> { for (i, source) in self.params.iter().enumerate() { if let PendingStatementValue::Id = source { self.stmt .bind_text((i + 1) as i32, id, Destructor::STATIC)?; } else { - return Err(SQLiteError::misuse( + return Err(PowerSyncError::argument_error( "Raw delete statement parameters must only reference id", )); } diff --git a/crates/core/src/uuid.rs b/crates/core/src/uuid.rs index 82d90460..a50cc856 100644 --- a/crates/core/src/uuid.rs +++ b/crates/core/src/uuid.rs @@ -9,7 +9,7 @@ use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::util::*; fn uuid_v4_impl( diff --git a/crates/core/src/version.rs b/crates/core/src/version.rs index 6a39ad39..e53de5e0 100644 --- a/crates/core/src/version.rs +++ b/crates/core/src/version.rs @@ -9,7 +9,7 @@ use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; fn powersync_rs_version_impl( _ctx: *mut sqlite::context, diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 19c12ed3..46922203 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -9,7 +9,7 @@ use sqlite::{ResultCode, Value}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::migrations::{powersync_migrate, LATEST_VERSION}; use crate::util::quote_identifier; use crate::{create_auto_tx_function, create_sqlite_text_fn}; @@ -92,7 +92,7 @@ create_sqlite_text_fn!( fn powersync_external_table_name_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { // name: full table name let name = args[0].text(); @@ -101,7 +101,7 @@ fn powersync_external_table_name_impl( } else if name.starts_with("ps_data__") { Ok(String::from(&name[9..])) } else { - Err(SQLiteError::from(ResultCode::CONSTRAINT_DATATYPE)) + Err(PowerSyncError::argument_error("not a powersync table")) } } @@ -114,7 +114,7 @@ create_sqlite_text_fn!( fn powersync_init_impl( ctx: *mut sqlite::context, _args: &[*mut sqlite::value], -) -> Result { +) -> Result { let local_db = ctx.db_handle(); setup_internal_views(local_db)?; @@ -130,7 +130,7 @@ create_sqlite_text_fn!(powersync_init, powersync_init_tx, "powersync_init"); fn powersync_test_migration_impl( ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { let target_version = args[0].int(); powersync_migrate(ctx, target_version)?; @@ -147,7 +147,7 @@ create_sqlite_text_fn!( fn powersync_clear_impl( ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { +) -> Result { let local_db = ctx.db_handle(); let clear_local = args[0].int(); diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index feed3da7..b79c98b9 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -11,15 +11,15 @@ use sqlite::{Connection, Context, ResultCode, Value}; use sqlite_nostd::{self as sqlite}; use crate::create_sqlite_text_fn; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; use crate::schema::{DiffIncludeOld, Table}; use crate::util::*; fn powersync_view_sql_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { - let table_info = Table::from_json(args[0].text())?; +) -> Result { + let table_info = Table::from_json(args[0].text()).map_err(PowerSyncError::as_argument_error)?; let name = &table_info.name; let view_name = &table_info.view_name(); @@ -71,8 +71,8 @@ create_sqlite_text_fn!( fn powersync_trigger_delete_sql_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { - let table_info = Table::from_json(args[0].text())?; +) -> Result { + let table_info = Table::from_json(args[0].text()).map_err(PowerSyncError::as_argument_error)?; let name = &table_info.name; let view_name = &table_info.view_name(); @@ -148,7 +148,7 @@ END", } else if insert_only { Ok(String::from("")) } else { - Err(SQLiteError::from(ResultCode::MISUSE)) + Err(PowerSyncError::argument_error("invalid flags for table")) }; } @@ -161,8 +161,8 @@ create_sqlite_text_fn!( fn powersync_trigger_insert_sql_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { - let table_info = Table::from_json(args[0].text())?; +) -> Result { + let table_info = Table::from_json(args[0].text()).map_err(PowerSyncError::as_argument_error)?; let name = &table_info.name; let view_name = &table_info.view_name(); @@ -221,7 +221,7 @@ fn powersync_trigger_insert_sql_impl( END", type_string, json_fragment); Ok(trigger) } else { - Err(SQLiteError::from(ResultCode::MISUSE)) + Err(PowerSyncError::argument_error("invalid flags for table")) }; } @@ -234,8 +234,8 @@ create_sqlite_text_fn!( fn powersync_trigger_update_sql_impl( _ctx: *mut sqlite::context, args: &[*mut sqlite::value], -) -> Result { - let table_info = Table::from_json(args[0].text())?; +) -> Result { + let table_info = Table::from_json(args[0].text()).map_err(PowerSyncError::as_argument_error)?; let name = &table_info.name; let view_name = &table_info.view_name(); @@ -338,7 +338,7 @@ END" } else if insert_only { Ok(String::from("")) } else { - Err(SQLiteError::from(ResultCode::MISUSE)) + Err(PowerSyncError::argument_error("invalid flags for table")) }; } @@ -402,7 +402,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { fn json_object_fragment<'a>( prefix: &str, name_results: &mut dyn Iterator, -) -> Result { +) -> Result { // floor(SQLITE_MAX_FUNCTION_ARG / 2). // To keep databases portable, we use the default limit of 100 args for this, // and don't try to query the limit dynamically. @@ -421,7 +421,9 @@ fn json_object_fragment<'a>( // SQLITE_MAX_COLUMN - 1 (because of the id column) if column_names_quoted.len() > 1999 { - return Err(SQLiteError::from(ResultCode::TOOBIG)); + return Err(PowerSyncError::argument_error( + "too many parameters to json_object_fragment", + )); } else if column_names_quoted.len() <= MAX_ARG_COUNT { // Small number of columns - use json_object() directly. let json_fragment = column_names_quoted.join(", "); diff --git a/crates/core/src/vtab_util.rs b/crates/core/src/vtab_util.rs index e462f559..8053ff99 100644 --- a/crates/core/src/vtab_util.rs +++ b/crates/core/src/vtab_util.rs @@ -1,12 +1,13 @@ extern crate alloc; +use alloc::string::ToString; use core::ffi::{c_char, c_int}; -use sqlite::{ResultCode}; +use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::VTab; -use crate::error::SQLiteError; +use crate::error::PowerSyncError; // For insert-only virtual tables, there are many functions that have to be defined, even if they're // not intended to be used. We return MISUSE for each. @@ -21,7 +22,6 @@ pub extern "C" fn vtab_no_filter( ResultCode::MISUSE as c_int } - pub extern "C" fn vtab_no_next(_cursor: *mut sqlite::vtab_cursor) -> c_int { ResultCode::MISUSE as c_int } @@ -38,15 +38,24 @@ pub extern "C" fn vtab_no_column( ResultCode::MISUSE as c_int } -pub extern "C" fn vtab_no_rowid(_cursor: *mut sqlite::vtab_cursor, _row_id: *mut sqlite::int64) -> c_int { +pub extern "C" fn vtab_no_rowid( + _cursor: *mut sqlite::vtab_cursor, + _row_id: *mut sqlite::int64, +) -> c_int { ResultCode::MISUSE as c_int } -pub extern "C" fn vtab_no_best_index(_vtab: *mut sqlite::vtab, _index_info: *mut sqlite::index_info) -> c_int { +pub extern "C" fn vtab_no_best_index( + _vtab: *mut sqlite::vtab, + _index_info: *mut sqlite::index_info, +) -> c_int { return ResultCode::MISUSE as c_int; } -pub extern "C" fn vtab_no_open(_vtab: *mut sqlite::vtab, _cursor: *mut *mut sqlite::vtab_cursor) -> c_int { +pub extern "C" fn vtab_no_open( + _vtab: *mut sqlite::vtab, + _cursor: *mut *mut sqlite::vtab_cursor, +) -> c_int { ResultCode::MISUSE as c_int } @@ -55,12 +64,15 @@ pub extern "C" fn vtab_no_close(_cursor: *mut sqlite::vtab_cursor) -> c_int { ResultCode::MISUSE as c_int } -pub fn vtab_result(vtab: *mut sqlite::vtab, result: Result) -> c_int { - if let Err(SQLiteError(code, message)) = result { - if message.is_some() { - vtab.set_err(&message.unwrap()); - } - code as c_int +pub fn vtab_result>( + vtab: *mut sqlite::vtab, + result: Result, +) -> c_int { + if let Err(error) = result { + let error = error.into(); + + vtab.set_err(&error.to_string()); + error.sqlite_error_code() as c_int } else { ResultCode::OK as c_int } diff --git a/dart/test/error_test.dart b/dart/test/error_test.dart new file mode 100644 index 00000000..ccada8e0 --- /dev/null +++ b/dart/test/error_test.dart @@ -0,0 +1,75 @@ +import 'dart:convert'; + +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; + +void main() { + group('error reporting', () { + late CommonDatabase db; + + setUp(() async { + db = openTestDatabase(); + }); + + tearDown(() { + db.dispose(); + }); + + test('contain inner SQLite descriptions', () { + expect( + () => db.execute('SELECT powersync_replace_schema(?)', [ + json.encode({ + // This fails because we're trying to json_extract from the string + // in e.g. update_tables. + 'tables': ['invalid entry'], + }) + ]), + throwsA(isSqliteException( + 1, + 'powersync_replace_schema: internal SQLite call returned ERROR: malformed JSON', + )), + ); + }); + + test('missing client id', () { + db + ..execute('SELECT powersync_init()') + ..execute('DELETE FROM ps_kv;'); + + expect( + () => db.execute('SELECT powersync_client_id()'), + throwsA(isSqliteException( + 4, + 'powersync_client_id: No client_id found in ps_kv', + )), + ); + }); + + group('sync protocol', () { + setUp(() => db.execute('SELECT powersync_init()')); + + test('invalid json', () { + const stmt = 'SELECT powersync_control(?,?)'; + db.execute('BEGIN'); + final control = db.prepare(stmt); + + control.execute(['start', null]); + expect( + () => control.execute(['line_text', 'invalid sync line']), + throwsA(isSqliteException( + 4, + 'powersync_control: Sync protocol error: invalid text line. cause: expected value at line 1 column 1', + )), + ); + }); + }); + }); +} + +Matcher isSqliteException(int code, dynamic message) { + return isA() + .having((e) => e.extendedResultCode, 'extendedResultCode', code) + .having((e) => e.message, 'message', message); +}