From a54177a91aab288e0957129d7ff4efcbbb571fb3 Mon Sep 17 00:00:00 2001 From: BoondoggleLabs Date: Fri, 30 May 2025 17:30:45 +0100 Subject: [PATCH] Add unique constraint support for tables and automatic upserts in Supabase connector - Add `unique` property to Index class to define unique constraints - Update Table class with methods to retrieve unique columns and indexes - Create UpsertOptions data class for configuring upsert behavior - Enhance SupabaseConnector to automatically handle upserts based on schema - Detects unique constraints from table schema - Sets appropriate onConflict columns for Supabase upsert operations - Falls back to primary key conflict resolution when no unique indexes exist - Add extension functions and examples for working with unique constraints This allows developers to define unique constraints in their PowerSync schema and have the Supabase connector automatically handle upserts with proper conflict resolution without manual configuration. --- .../connector/supabase/SupabaseConnector.kt | 29 +++++++- .../supabase/SupabaseConnectorExtensions.kt | 69 +++++++++++++++++++ .../com/powersync/db/crud/UpsertOptions.kt | 30 ++++++++ .../kotlin/com/powersync/db/schema/Index.kt | 29 +++++++- .../powersync/db/schema/SchemaExtensions.kt | 67 ++++++++++++++++++ .../kotlin/com/powersync/db/schema/Table.kt | 19 +++++ .../db/schema/UniqueConstraintsExample.kt | 66 ++++++++++++++++++ 7 files changed, 305 insertions(+), 4 deletions(-) create mode 100644 connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt index 022df0c2..f9129b06 100644 --- a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt +++ b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt @@ -7,6 +7,8 @@ import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.runWrappedSuspending +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table import io.github.jan.supabase.SupabaseClient import io.github.jan.supabase.annotations.SupabaseInternal import io.github.jan.supabase.auth.Auth @@ -37,6 +39,7 @@ public class SupabaseConnector( private val storageBucket: String? = null, ) : PowerSyncBackendConnector() { private var errorCode: String? = null + private var schema: Schema? = null private object PostgresFatalCodes { // Using Regex patterns for Postgres error codes @@ -143,6 +146,15 @@ public class SupabaseConnector( public fun session(): UserSession? = supabaseClient.auth.currentSessionOrNull() public val sessionStatus: StateFlow = supabaseClient.auth.sessionStatus + + /** + * Set the PowerSync schema to enable automatic conflict resolution for upserts. + * When a schema is provided, the connector will use unique constraints defined in the schema + * to determine conflict resolution columns for upsert operations. + */ + public fun setSchema(schema: Schema) { + this.schema = schema + } public suspend fun loginAnonymously() { runWrappedSuspending { @@ -191,7 +203,22 @@ public class SupabaseConnector( UpdateType.PUT -> { val data = entry.opData?.toMutableMap() ?: mutableMapOf() data["id"] = entry.id - table.upsert(data) + + // Check if we have schema information to determine unique constraints + val tableSchema = schema?.tables?.find { it.name == entry.table } + val uniqueIndex = tableSchema?.getFirstUniqueIndex() + + if (uniqueIndex != null) { + // Use unique columns for conflict resolution + val conflictColumns = uniqueIndex.columns.joinToString(",") { it.column } + table.upsert(data) { + onConflict = conflictColumns + ignoreDuplicates = false // Merge duplicates by default + } + } else { + // Default upsert behavior (conflict on primary key) + table.upsert(data) + } } UpdateType.PATCH -> { diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt new file mode 100644 index 00000000..3d56e187 --- /dev/null +++ b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnectorExtensions.kt @@ -0,0 +1,69 @@ +package com.powersync.connector.supabase + +import com.powersync.PowerSyncDatabase +import com.powersync.db.schema.Schema + +/** + * Extension function to configure a SupabaseConnector with a PowerSync database schema. + * This enables automatic conflict resolution for upsert operations based on unique constraints + * defined in the schema. + * + * Example: + * ``` + * val connector = SupabaseConnector(supabaseUrl, supabaseKey, powerSyncUrl) + * connector.configureWithDatabase(database) + * ``` + */ +public fun SupabaseConnector.configureWithDatabase(database: PowerSyncDatabase) { + val schema = database.schema + this.setSchema(schema) +} + +/** + * Create a SupabaseConnector with schema configuration. + * + * Example: + * ``` + * val schema = Schema( + * Table( + * name = "users", + * columns = listOf( + * Column.text("email"), + * Column.text("username") + * ), + * indexes = listOf( + * Index.unique("idx_email", "email") + * ) + * ) + * ) + * + * val connector = SupabaseConnector.withSchema( + * supabaseUrl = "https://example.supabase.co", + * supabaseKey = "your-key", + * powerSyncEndpoint = "https://example.powersync.com", + * schema = schema + * ) + * ``` + */ +public fun SupabaseConnector.Companion.withSchema( + supabaseUrl: String, + supabaseKey: String, + powerSyncEndpoint: String, + storageBucket: String? = null, + schema: Schema +): SupabaseConnector { + val connector = SupabaseConnector( + supabaseUrl = supabaseUrl, + supabaseKey = supabaseKey, + powerSyncEndpoint = powerSyncEndpoint, + storageBucket = storageBucket + ) + connector.setSchema(schema) + return connector +} + +// Add companion object to SupabaseConnector for the extension function +public val SupabaseConnector.Companion: SupabaseConnectorCompanion + get() = SupabaseConnectorCompanion + +public object SupabaseConnectorCompanion \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt b/core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt new file mode 100644 index 00000000..de3693ee --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt @@ -0,0 +1,30 @@ +package com.powersync.db.crud + +/** + * Options for configuring upsert behavior when handling conflicts. + * + * @property onConflict Comma-separated column name(s) to specify how duplicate rows are determined. + * Two rows are duplicates if all the onConflict columns are equal. + * If null, the primary key is used. + * @property ignoreDuplicates If true, duplicate rows are ignored. If false, duplicate rows are merged with existing rows. + */ +public data class UpsertOptions( + val onConflict: String? = null, + val ignoreDuplicates: Boolean = false +) { + public companion object { + /** + * Default upsert options that merge duplicates based on primary key. + */ + public val DEFAULT: UpsertOptions = UpsertOptions() + + /** + * Create upsert options from a list of conflict columns. + */ + public fun fromColumns(columns: List, ignoreDuplicates: Boolean = false): UpsertOptions = + UpsertOptions( + onConflict = columns.joinToString(","), + ignoreDuplicates = ignoreDuplicates + ) + } +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt index ebcdef3c..f987088f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt @@ -11,12 +11,16 @@ public data class Index( * List of columns used for the index. */ val columns: List, + /** + * Whether this index enforces a unique constraint. + */ + val unique: Boolean = false, ) { /** * @param name Descriptive name of the index. * @param columns List of columns used for the index. */ - public constructor(name: String, vararg columns: IndexedColumn) : this(name, columns.asList()) + public constructor(name: String, vararg columns: IndexedColumn) : this(name, columns.asList(), false) /** * Construct a new index with the specified column names. @@ -25,7 +29,23 @@ public data class Index( public fun ascending( name: String, columns: List, - ): Index = Index(name, columns.map { IndexedColumn.ascending(it) }) + ): Index = Index(name, columns.map { IndexedColumn.ascending(it) }, unique = false) + + /** + * Create a unique index with the specified column names. + */ + public fun unique( + name: String, + columns: List, + ): Index = Index(name, columns.map { IndexedColumn.ascending(it) }, unique = true) + + /** + * Create a unique index with a single column. + */ + public fun unique( + name: String, + column: String, + ): Index = unique(name, listOf(column)) } /** @@ -40,7 +60,8 @@ public data class Index( */ internal fun toSqlDefinition(table: Table): String { val fields = columns.joinToString(", ") { it.toSql(table) } - return """CREATE INDEX "${fullName(table)}" ON "${table.internalName}"($fields)""" + val indexType = if (unique) "UNIQUE INDEX" else "INDEX" + return """CREATE $indexType "${fullName(table)}" ON "${table.internalName}"($fields)""" } } @@ -48,6 +69,7 @@ public data class Index( internal data class SerializableIndex( val name: String, val columns: List, + val unique: Boolean = false, ) internal fun Index.toSerializable(): SerializableIndex = @@ -55,5 +77,6 @@ internal fun Index.toSerializable(): SerializableIndex = SerializableIndex( name, columns.map { it.toSerializable() }, + unique, ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt new file mode 100644 index 00000000..49a951a7 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/SchemaExtensions.kt @@ -0,0 +1,67 @@ +package com.powersync.db.schema + +/** + * Extension functions for working with schemas and unique constraints. + */ + +/** + * Find a table by name in the schema. + */ +public fun Schema.getTable(name: String): Table? = tables.find { it.name == name } + +/** + * Get all tables that have at least one unique constraint. + */ +public fun Schema.getTablesWithUniqueConstraints(): List = + tables.filter { table -> table.indexes.any { it.unique } } + +/** + * Check if a table has any unique constraints. + */ +public fun Table.hasUniqueConstraints(): Boolean = indexes.any { it.unique } + +/** + * Get all unique indexes for a table. + */ +public fun Table.getUniqueIndexes(): List = indexes.filter { it.unique } + +/** + * Create a table builder with a unique constraint. + * + * Example: + * ``` + * val userTable = Table( + * name = "users", + * columns = listOf( + * Column.text("email"), + * Column.text("username"), + * Column.text("name") + * ), + * indexes = listOf( + * Index.unique("idx_email", "email"), + * Index.unique("idx_username", "username") + * ) + * ) + * ``` + */ +public fun Table.Companion.withUnique( + name: String, + columns: List, + uniqueColumns: List, + additionalIndexes: List = emptyList() +): Table { + val uniqueIndex = Index.unique("${name}_unique", uniqueColumns) + return Table( + name = name, + columns = columns, + indexes = listOf(uniqueIndex) + additionalIndexes + ) +} + +/** + * Check if a column participates in any unique constraint. + */ +public fun Table.isColumnUnique(columnName: String): Boolean = + indexes.any { index -> + index.unique && index.columns.any { it.column == columnName } + } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt index 8e321adf..9171d69c 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt @@ -216,6 +216,25 @@ public data class Table( */ public val viewName: String get() = viewNameOverride ?: name + + /** + * Get all unique column names from unique indexes. + * This returns a list of all columns that participate in unique constraints. + */ + public fun getUniqueColumns(): List { + return indexes + .filter { it.unique } + .flatMap { index -> index.columns.map { it.column } } + .distinct() + } + + /** + * Get the first unique index if any exists. + * This is useful for determining conflict resolution columns for upsert operations. + */ + public fun getFirstUniqueIndex(): Index? { + return indexes.firstOrNull { it.unique } + } } /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt new file mode 100644 index 00000000..501d447f --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/UniqueConstraintsExample.kt @@ -0,0 +1,66 @@ +package com.powersync.db.schema + +/** + * Example demonstrating how to define tables with unique constraints + * and use them with the Supabase connector for proper upsert behavior. + * + * ```kotlin + * // Define a schema with unique constraints + * val schema = Schema( + * Table( + * name = "users", + * columns = listOf( + * Column.text("email"), + * Column.text("username"), + * Column.text("full_name"), + * Column.integer("age") + * ), + * indexes = listOf( + * // Single column unique constraint + * Index.unique("idx_email", "email"), + * // Another single column unique constraint + * Index.unique("idx_username", "username"), + * // Regular non-unique index for performance + * Index.ascending("idx_age", listOf("age")) + * ) + * ), + * Table( + * name = "products", + * columns = listOf( + * Column.text("sku"), + * Column.text("name"), + * Column.real("price"), + * Column.text("category") + * ), + * indexes = listOf( + * // Composite unique constraint on multiple columns + * Index.unique("idx_sku_category", listOf("sku", "category")) + * ) + * ) + * ) + * + * // Initialize PowerSync database with the schema + * val database = PowerSyncDatabase( + * schema = schema, + * // ... other configuration + * ) + * + * // Configure Supabase connector with the schema + * val connector = SupabaseConnector( + * supabaseUrl = "https://your-project.supabase.co", + * supabaseKey = "your-anon-key", + * powerSyncEndpoint = "https://your-instance.powersync.com" + * ) + * connector.setSchema(schema) + * + * // When the connector uploads data with PUT operations: + * // - For the "users" table, conflicts will be resolved on the "email" column + * // (using the first unique index found) + * // - For the "products" table, conflicts will be resolved on "sku,category" + * // - Tables without unique constraints will use the default "id" column + * + * // The Supabase connector automatically generates the appropriate + * // upsert query with onConflict parameter based on your schema + * ``` + */ +internal object UniqueConstraintsExample \ No newline at end of file