Skip to content

Add unique constraint support for tables and automatic upserts in Supabase connector #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.crud.CrudEntry
import com.powersync.db.crud.UpdateType
import com.powersync.db.runWrappedSuspending
import com.powersync.db.schema.Schema
import com.powersync.db.schema.Table
import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.Auth
Expand Down Expand Up @@ -37,6 +39,7 @@ public class SupabaseConnector(
private val storageBucket: String? = null,
) : PowerSyncBackendConnector() {
private var errorCode: String? = null
private var schema: Schema? = null

private object PostgresFatalCodes {
// Using Regex patterns for Postgres error codes
Expand Down Expand Up @@ -143,6 +146,15 @@ public class SupabaseConnector(
public fun session(): UserSession? = supabaseClient.auth.currentSessionOrNull()

public val sessionStatus: StateFlow<SessionStatus> = supabaseClient.auth.sessionStatus

/**
* Set the PowerSync schema to enable automatic conflict resolution for upserts.
* When a schema is provided, the connector will use unique constraints defined in the schema
* to determine conflict resolution columns for upsert operations.
*/
public fun setSchema(schema: Schema) {
this.schema = schema
}

public suspend fun loginAnonymously() {
runWrappedSuspending {
Expand Down Expand Up @@ -191,7 +203,22 @@ public class SupabaseConnector(
UpdateType.PUT -> {
val data = entry.opData?.toMutableMap() ?: mutableMapOf()
data["id"] = entry.id
table.upsert(data)

// Check if we have schema information to determine unique constraints
val tableSchema = schema?.tables?.find { it.name == entry.table }
val uniqueIndex = tableSchema?.getFirstUniqueIndex()

if (uniqueIndex != null) {
// Use unique columns for conflict resolution
val conflictColumns = uniqueIndex.columns.joinToString(",") { it.column }
table.upsert(data) {
onConflict = conflictColumns
ignoreDuplicates = false // Merge duplicates by default
}
} else {
// Default upsert behavior (conflict on primary key)
table.upsert(data)
}
}

UpdateType.PATCH -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.powersync.connector.supabase

import com.powersync.PowerSyncDatabase
import com.powersync.db.schema.Schema

/**
* Extension function to configure a SupabaseConnector with a PowerSync database schema.
* This enables automatic conflict resolution for upsert operations based on unique constraints
* defined in the schema.
*
* Example:
* ```
* val connector = SupabaseConnector(supabaseUrl, supabaseKey, powerSyncUrl)
* connector.configureWithDatabase(database)
* ```
*/
public fun SupabaseConnector.configureWithDatabase(database: PowerSyncDatabase) {
val schema = database.schema
this.setSchema(schema)
}

/**
* Create a SupabaseConnector with schema configuration.
*
* Example:
* ```
* val schema = Schema(
* Table(
* name = "users",
* columns = listOf(
* Column.text("email"),
* Column.text("username")
* ),
* indexes = listOf(
* Index.unique("idx_email", "email")
* )
* )
* )
*
* val connector = SupabaseConnector.withSchema(
* supabaseUrl = "https://example.supabase.co",
* supabaseKey = "your-key",
* powerSyncEndpoint = "https://example.powersync.com",
* schema = schema
* )
* ```
*/
public fun SupabaseConnector.Companion.withSchema(
supabaseUrl: String,
supabaseKey: String,
powerSyncEndpoint: String,
storageBucket: String? = null,
schema: Schema
): SupabaseConnector {
val connector = SupabaseConnector(
supabaseUrl = supabaseUrl,
supabaseKey = supabaseKey,
powerSyncEndpoint = powerSyncEndpoint,
storageBucket = storageBucket
)
connector.setSchema(schema)
return connector
}

// Add companion object to SupabaseConnector for the extension function
public val SupabaseConnector.Companion: SupabaseConnectorCompanion
get() = SupabaseConnectorCompanion

public object SupabaseConnectorCompanion
30 changes: 30 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/db/crud/UpsertOptions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.powersync.db.crud

/**
* Options for configuring upsert behavior when handling conflicts.
*
* @property onConflict Comma-separated column name(s) to specify how duplicate rows are determined.
* Two rows are duplicates if all the onConflict columns are equal.
* If null, the primary key is used.
* @property ignoreDuplicates If true, duplicate rows are ignored. If false, duplicate rows are merged with existing rows.
*/
public data class UpsertOptions(
val onConflict: String? = null,
val ignoreDuplicates: Boolean = false
) {
public companion object {
/**
* Default upsert options that merge duplicates based on primary key.
*/
public val DEFAULT: UpsertOptions = UpsertOptions()

/**
* Create upsert options from a list of conflict columns.
*/
public fun fromColumns(columns: List<String>, ignoreDuplicates: Boolean = false): UpsertOptions =
UpsertOptions(
onConflict = columns.joinToString(","),
ignoreDuplicates = ignoreDuplicates
)
}
}
29 changes: 26 additions & 3 deletions core/src/commonMain/kotlin/com/powersync/db/schema/Index.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ public data class Index(
* List of columns used for the index.
*/
val columns: List<IndexedColumn>,
/**
* Whether this index enforces a unique constraint.
*/
val unique: Boolean = false,
) {
/**
* @param name Descriptive name of the index.
* @param columns List of columns used for the index.
*/
public constructor(name: String, vararg columns: IndexedColumn) : this(name, columns.asList())
public constructor(name: String, vararg columns: IndexedColumn) : this(name, columns.asList(), false)

/**
* Construct a new index with the specified column names.
Expand All @@ -25,7 +29,23 @@ public data class Index(
public fun ascending(
name: String,
columns: List<String>,
): Index = Index(name, columns.map { IndexedColumn.ascending(it) })
): Index = Index(name, columns.map { IndexedColumn.ascending(it) }, unique = false)

/**
* Create a unique index with the specified column names.
*/
public fun unique(
name: String,
columns: List<String>,
): Index = Index(name, columns.map { IndexedColumn.ascending(it) }, unique = true)

/**
* Create a unique index with a single column.
*/
public fun unique(
name: String,
column: String,
): Index = unique(name, listOf(column))
}

/**
Expand All @@ -40,20 +60,23 @@ public data class Index(
*/
internal fun toSqlDefinition(table: Table): String {
val fields = columns.joinToString(", ") { it.toSql(table) }
return """CREATE INDEX "${fullName(table)}" ON "${table.internalName}"($fields)"""
val indexType = if (unique) "UNIQUE INDEX" else "INDEX"
return """CREATE $indexType "${fullName(table)}" ON "${table.internalName}"($fields)"""
}
}

@Serializable
internal data class SerializableIndex(
val name: String,
val columns: List<SerializableIndexColumn>,
val unique: Boolean = false,
)

internal fun Index.toSerializable(): SerializableIndex =
with(this) {
SerializableIndex(
name,
columns.map { it.toSerializable() },
unique,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.powersync.db.schema

/**
* Extension functions for working with schemas and unique constraints.
*/

/**
* Find a table by name in the schema.
*/
public fun Schema.getTable(name: String): Table? = tables.find { it.name == name }

/**
* Get all tables that have at least one unique constraint.
*/
public fun Schema.getTablesWithUniqueConstraints(): List<Table> =
tables.filter { table -> table.indexes.any { it.unique } }

/**
* Check if a table has any unique constraints.
*/
public fun Table.hasUniqueConstraints(): Boolean = indexes.any { it.unique }

/**
* Get all unique indexes for a table.
*/
public fun Table.getUniqueIndexes(): List<Index> = indexes.filter { it.unique }

/**
* Create a table builder with a unique constraint.
*
* Example:
* ```
* val userTable = Table(
* name = "users",
* columns = listOf(
* Column.text("email"),
* Column.text("username"),
* Column.text("name")
* ),
* indexes = listOf(
* Index.unique("idx_email", "email"),
* Index.unique("idx_username", "username")
* )
* )
* ```
*/
public fun Table.Companion.withUnique(
name: String,
columns: List<Column>,
uniqueColumns: List<String>,
additionalIndexes: List<Index> = emptyList()
): Table {
val uniqueIndex = Index.unique("${name}_unique", uniqueColumns)
return Table(
name = name,
columns = columns,
indexes = listOf(uniqueIndex) + additionalIndexes
)
}

/**
* Check if a column participates in any unique constraint.
*/
public fun Table.isColumnUnique(columnName: String): Boolean =
indexes.any { index ->
index.unique && index.columns.any { it.column == columnName }
}
19 changes: 19 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,25 @@ public data class Table(
*/
public val viewName: String
get() = viewNameOverride ?: name

/**
* Get all unique column names from unique indexes.
* This returns a list of all columns that participate in unique constraints.
*/
public fun getUniqueColumns(): List<String> {
return indexes
.filter { it.unique }
.flatMap { index -> index.columns.map { it.column } }
.distinct()
}

/**
* Get the first unique index if any exists.
* This is useful for determining conflict resolution columns for upsert operations.
*/
public fun getFirstUniqueIndex(): Index? {
return indexes.firstOrNull { it.unique }
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.powersync.db.schema

/**
* Example demonstrating how to define tables with unique constraints
* and use them with the Supabase connector for proper upsert behavior.
*
* ```kotlin
* // Define a schema with unique constraints
* val schema = Schema(
* Table(
* name = "users",
* columns = listOf(
* Column.text("email"),
* Column.text("username"),
* Column.text("full_name"),
* Column.integer("age")
* ),
* indexes = listOf(
* // Single column unique constraint
* Index.unique("idx_email", "email"),
* // Another single column unique constraint
* Index.unique("idx_username", "username"),
* // Regular non-unique index for performance
* Index.ascending("idx_age", listOf("age"))
* )
* ),
* Table(
* name = "products",
* columns = listOf(
* Column.text("sku"),
* Column.text("name"),
* Column.real("price"),
* Column.text("category")
* ),
* indexes = listOf(
* // Composite unique constraint on multiple columns
* Index.unique("idx_sku_category", listOf("sku", "category"))
* )
* )
* )
*
* // Initialize PowerSync database with the schema
* val database = PowerSyncDatabase(
* schema = schema,
* // ... other configuration
* )
*
* // Configure Supabase connector with the schema
* val connector = SupabaseConnector(
* supabaseUrl = "https://your-project.supabase.co",
* supabaseKey = "your-anon-key",
* powerSyncEndpoint = "https://your-instance.powersync.com"
* )
* connector.setSchema(schema)
*
* // When the connector uploads data with PUT operations:
* // - For the "users" table, conflicts will be resolved on the "email" column
* // (using the first unique index found)
* // - For the "products" table, conflicts will be resolved on "sku,category"
* // - Tables without unique constraints will use the default "id" column
*
* // The Supabase connector automatically generates the appropriate
* // upsert query with onConflict parameter based on your schema
* ```
*/
internal object UniqueConstraintsExample