diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt index 022df0c2..f9129b06 100644 --- a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt +++ b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt @@ -7,6 +7,8 @@ import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.runWrappedSuspending +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table import io.github.jan.supabase.SupabaseClient import io.github.jan.supabase.annotations.SupabaseInternal import io.github.jan.supabase.auth.Auth @@ -37,6 +39,7 @@ public class SupabaseConnector( private val storageBucket: String? = null, ) : PowerSyncBackendConnector() { private var errorCode: String? = null + private var schema: Schema? = null private object PostgresFatalCodes { // Using Regex patterns for Postgres error codes @@ -143,6 +146,15 @@ public class SupabaseConnector( public fun session(): UserSession? = supabaseClient.auth.currentSessionOrNull() public val sessionStatus: StateFlow = supabaseClient.auth.sessionStatus + + /** + * Set the PowerSync schema to enable automatic conflict resolution for upserts. + * When a schema is provided, the connector will use unique constraints defined in the schema + * to determine conflict resolution columns for upsert operations. + */ + public fun setSchema(schema: Schema) { + this.schema = schema + } public suspend fun loginAnonymously() { runWrappedSuspending { @@ -191,7 +203,22 @@ public class SupabaseConnector( UpdateType.PUT -> { val data = entry.opData?.toMutableMap() ?: mutableMapOf() data["id"] = entry.id - table.upsert(data) + + // Check if we have schema information to determine unique constraints + val tableSchema = schema?.tables?.find { it.name == entry.table } + val uniqueIndex = tableSchema?.getFirstUniqueIndex() + + if (uniqueIndex != null) { + // Use unique columns for conflict resolution + val conflictColumns = uniqueIndex.columns.joinToString(",") { it.column } + table.upsert(data) { + onConflict = conflictColumns + ignoreDuplicates = false // Merge duplicates by default + } + } else { + // Default upsert behavior (conflict on primary key) + table.upsert(data) + } } UpdateType.PATCH -> { diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt new file mode 100644 index 00000000..3d56e187 --- /dev/null +++ b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt @@ -0,0 +1,69 @@ +package com.powersync.connector.supabase + +import com.powersync.PowerSyncDatabase +import com.powersync.db.schema.Schema + +/** + * Extension function to configure a SupabaseConnector with a PowerSync database schema. + * This enables automatic conflict resolution for upsert operations based on unique constraints + * defined in the schema. + * + * Example: + * ``` + * val connector = SupabaseConnector(supabaseUrl, supabaseKey, powerSyncUrl) + * connector.configureWithDatabase(database) + * ``` + */ +public fun SupabaseConnector.configureWithDatabase(database: PowerSyncDatabase) { + val schema = database.schema + this.setSchema(schema) +} + +/** + * Create a SupabaseConnector with schema configuration. + * + * Example: + * ``` + * val schema = Schema( + * Table( + * name = "users", + * columns = listOf( + * Column.text("email"), + * Column.text("username") + * ), + * indexes = listOf( + * Index.unique("idx_email", "email") + * ) + * ) + * ) + * + * val connector = SupabaseConnector.withSchema( + * supabaseUrl = "https://example.supabase.co", + * supabaseKey = "your-key", + * powerSyncEndpoint = "https://example.powersync.com", + * schema = schema + * ) + * ``` + */ +public fun SupabaseConnector.Companion.withSchema( + supabaseUrl: String, + supabaseKey: String, + powerSyncEndpoint: String, + storageBucket: String? = null, + schema: Schema +): SupabaseConnector { + val connector = SupabaseConnector( + supabaseUrl = supabaseUrl, + supabaseKey = supabaseKey, + powerSyncEndpoint = powerSyncEndpoint, + storageBucket = storageBucket + ) + connector.setSchema(schema) + return connector +} + +// Add companion object to SupabaseConnector for the extension function +public val SupabaseConnector.Companion: SupabaseConnectorCompanion + get() = SupabaseConnectorCompanion + +public object SupabaseConnectorCompanion \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt b/core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt new file mode 100644 index 00000000..de3693ee --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt @@ -0,0 +1,30 @@ +package com.powersync.db.crud + +/** + * Options for configuring upsert behavior when handling conflicts. + * + * @property onConflict Comma-separated column name(s) to specify how duplicate rows are determined. + * Two rows are duplicates if all the onConflict columns are equal. + * If null, the primary key is used. + * @property ignoreDuplicates If true, duplicate rows are ignored. If false, duplicate rows are merged with existing rows. + */ +public data class UpsertOptions( + val onConflict: String? = null, + val ignoreDuplicates: Boolean = false +) { + public companion object { + /** + * Default upsert options that merge duplicates based on primary key. + */ + public val DEFAULT: UpsertOptions = UpsertOptions() + + /** + * Create upsert options from a list of conflict columns. + */ + public fun fromColumns(columns: List, ignoreDuplicates: Boolean = false): UpsertOptions = + UpsertOptions( + onConflict = columns.joinToString(","), + ignoreDuplicates = ignoreDuplicates + ) + } +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt index ebcdef3c..f987088f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt @@ -11,12 +11,16 @@ public data class Index( * List of columns used for the index. */ val columns: List, + /** + * Whether this index enforces a unique constraint. + */ + val unique: Boolean = false, ) { /** * @param name Descriptive name of the index. * @param columns List of columns used for the index. */ - public constructor(name: String, vararg columns: IndexedColumn) : this(name, columns.asList()) + public constructor(name: String, vararg columns: IndexedColumn) : this(name, columns.asList(), false) /** * Construct a new index with the specified column names. @@ -25,7 +29,23 @@ public data class Index( public fun ascending( name: String, columns: List, - ): Index = Index(name, columns.map { IndexedColumn.ascending(it) }) + ): Index = Index(name, columns.map { IndexedColumn.ascending(it) }, unique = false) + + /** + * Create a unique index with the specified column names. + */ + public fun unique( + name: String, + columns: List, + ): Index = Index(name, columns.map { IndexedColumn.ascending(it) }, unique = true) + + /** + * Create a unique index with a single column. + */ + public fun unique( + name: String, + column: String, + ): Index = unique(name, listOf(column)) } /** @@ -40,7 +60,8 @@ public data class Index( */ internal fun toSqlDefinition(table: Table): String { val fields = columns.joinToString(", ") { it.toSql(table) } - return """CREATE INDEX "${fullName(table)}" ON "${table.internalName}"($fields)""" + val indexType = if (unique) "UNIQUE INDEX" else "INDEX" + return """CREATE $indexType "${fullName(table)}" ON "${table.internalName}"($fields)""" } } @@ -48,6 +69,7 @@ public data class Index( internal data class SerializableIndex( val name: String, val columns: List, + val unique: Boolean = false, ) internal fun Index.toSerializable(): SerializableIndex = @@ -55,5 +77,6 @@ internal fun Index.toSerializable(): SerializableIndex = SerializableIndex( name, columns.map { it.toSerializable() }, + unique, ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt new file mode 100644 index 00000000..49a951a7 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt @@ -0,0 +1,67 @@ +package com.powersync.db.schema + +/** + * Extension functions for working with schemas and unique constraints. + */ + +/** + * Find a table by name in the schema. + */ +public fun Schema.getTable(name: String): Table? = tables.find { it.name == name } + +/** + * Get all tables that have at least one unique constraint. + */ +public fun Schema.getTablesWithUniqueConstraints(): List = + tables.filter { table -> table.indexes.any { it.unique } } + +/** + * Check if a table has any unique constraints. + */ +public fun Table.hasUniqueConstraints(): Boolean = indexes.any { it.unique } + +/** + * Get all unique indexes for a table. + */ +public fun Table.getUniqueIndexes(): List = indexes.filter { it.unique } + +/** + * Create a table builder with a unique constraint. + * + * Example: + * ``` + * val userTable = Table( + * name = "users", + * columns = listOf( + * Column.text("email"), + * Column.text("username"), + * Column.text("name") + * ), + * indexes = listOf( + * Index.unique("idx_email", "email"), + * Index.unique("idx_username", "username") + * ) + * ) + * ``` + */ +public fun Table.Companion.withUnique( + name: String, + columns: List, + uniqueColumns: List, + additionalIndexes: List = emptyList() +): Table { + val uniqueIndex = Index.unique("${name}_unique", uniqueColumns) + return Table( + name = name, + columns = columns, + indexes = listOf(uniqueIndex) + additionalIndexes + ) +} + +/** + * Check if a column participates in any unique constraint. + */ +public fun Table.isColumnUnique(columnName: String): Boolean = + indexes.any { index -> + index.unique && index.columns.any { it.column == columnName } + } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt index 8e321adf..9171d69c 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt @@ -216,6 +216,25 @@ public data class Table( */ public val viewName: String get() = viewNameOverride ?: name + + /** + * Get all unique column names from unique indexes. + * This returns a list of all columns that participate in unique constraints. + */ + public fun getUniqueColumns(): List { + return indexes + .filter { it.unique } + .flatMap { index -> index.columns.map { it.column } } + .distinct() + } + + /** + * Get the first unique index if any exists. + * This is useful for determining conflict resolution columns for upsert operations. + */ + public fun getFirstUniqueIndex(): Index? { + return indexes.firstOrNull { it.unique } + } } /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt new file mode 100644 index 00000000..501d447f --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt @@ -0,0 +1,66 @@ +package com.powersync.db.schema + +/** + * Example demonstrating how to define tables with unique constraints + * and use them with the Supabase connector for proper upsert behavior. + * + * ```kotlin + * // Define a schema with unique constraints + * val schema = Schema( + * Table( + * name = "users", + * columns = listOf( + * Column.text("email"), + * Column.text("username"), + * Column.text("full_name"), + * Column.integer("age") + * ), + * indexes = listOf( + * // Single column unique constraint + * Index.unique("idx_email", "email"), + * // Another single column unique constraint + * Index.unique("idx_username", "username"), + * // Regular non-unique index for performance + * Index.ascending("idx_age", listOf("age")) + * ) + * ), + * Table( + * name = "products", + * columns = listOf( + * Column.text("sku"), + * Column.text("name"), + * Column.real("price"), + * Column.text("category") + * ), + * indexes = listOf( + * // Composite unique constraint on multiple columns + * Index.unique("idx_sku_category", listOf("sku", "category")) + * ) + * ) + * ) + * + * // Initialize PowerSync database with the schema + * val database = PowerSyncDatabase( + * schema = schema, + * // ... other configuration + * ) + * + * // Configure Supabase connector with the schema + * val connector = SupabaseConnector( + * supabaseUrl = "https://your-project.supabase.co", + * supabaseKey = "your-anon-key", + * powerSyncEndpoint = "https://your-instance.powersync.com" + * ) + * connector.setSchema(schema) + * + * // When the connector uploads data with PUT operations: + * // - For the "users" table, conflicts will be resolved on the "email" column + * // (using the first unique index found) + * // - For the "products" table, conflicts will be resolved on "sku,category" + * // - Tables without unique constraints will use the default "id" column + * + * // The Supabase connector automatically generates the appropriate + * // upsert query with onConflict parameter based on your schema + * ``` + */ +internal object UniqueConstraintsExample \ No newline at end of file