diff --git a/Packages/automerge-repo/Sources/AutomergeRepo/Storage/InMemoryDocuments.swift b/Packages/automerge-repo/Sources/AutomergeRepo/Storage/InMemoryStorage.swift similarity index 63% rename from Packages/automerge-repo/Sources/AutomergeRepo/Storage/InMemoryDocuments.swift rename to Packages/automerge-repo/Sources/AutomergeRepo/Storage/InMemoryStorage.swift index a9a5390f..8e62eed5 100644 --- a/Packages/automerge-repo/Sources/AutomergeRepo/Storage/InMemoryDocuments.swift +++ b/Packages/automerge-repo/Sources/AutomergeRepo/Storage/InMemoryStorage.swift @@ -1,6 +1,6 @@ import struct Foundation.Data -actor InMemoryDocuments: StorageProvider { +public actor InMemoryStorage: StorageProvider { var _storage: [DocumentId: Data] = [:] var _incrementalChunks: [CombinedKey: [Data]] = [:] @@ -9,31 +9,31 @@ actor InMemoryDocuments: StorageProvider { let prefix: String } - func load(key: DocumentId) async -> Data? { + public func load(key: DocumentId) async -> Data? { _storage[key] } - func save(key: DocumentId, data: Data) async { + public func save(key: DocumentId, data: Data) async { _storage[key] = data } - func remove(key: DocumentId) async { + public func remove(key: DocumentId) async { _storage.removeValue(forKey: key) } // MARK: Incremental Load Support - func addToRange(key: DocumentId, prefix: String, data: Data) async { + public func addToRange(key: DocumentId, prefix: String, data: Data) async { var dataArray: [Data] = _incrementalChunks[CombinedKey(id: key, prefix: prefix)] ?? [] dataArray.append(data) _incrementalChunks[CombinedKey(id: key, prefix: prefix)] = dataArray } - func loadRange(key: DocumentId, prefix: String) async -> [Data] { + public func loadRange(key: DocumentId, prefix: String) async -> [Data] { _incrementalChunks[CombinedKey(id: key, prefix: prefix)] ?? [] } - func removeRange(key: DocumentId, prefix: String) async { + public func removeRange(key: DocumentId, prefix: String) async { _incrementalChunks.removeValue(forKey: CombinedKey(id: key, prefix: prefix)) } } diff --git a/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageProvider.swift b/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageProvider.swift index 6ec8f266..8bb28c0f 100644 --- a/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageProvider.swift +++ b/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageProvider.swift @@ -3,25 +3,13 @@ import struct Foundation.Data // loose adaptation from automerge-repo storage interface // https://github.com/automerge/automerge-repo/blob/main/packages/automerge-repo/src/storage/StorageAdapter.ts public protocol StorageProvider { - // TODO: update type from Data to 'Chunk' - // to represent the encoded changes of an Automerge document func load(key: DocumentId) async -> Data? - - // TODO: update type from Data to 'Chunk' - // to represent the encoded changes of an Automerge document func save(key: DocumentId, data: Data) async - func remove(key: DocumentId) async // MARK: Incremental Load Support - // TODO: update type from Data to 'Chunk' - // to represent an encoded partial set of changes to an Automerge document func addToRange(key: DocumentId, prefix: String, data: Data) async - - // TODO: update type from Data to 'Chunk' - // to represent an encoded partial set of changes to an Automerge document func loadRange(key: DocumentId, prefix: String) async -> [Data] - func removeRange(key: DocumentId, prefix: String) async } diff --git a/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageSubsystem.swift b/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageSubsystem.swift index 012dfc3f..db51ad80 100644 --- a/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageSubsystem.swift +++ b/Packages/automerge-repo/Sources/AutomergeRepo/Storage/StorageSubsystem.swift @@ -1,23 +1,101 @@ -import class Automerge.Document -import struct Automerge.SyncState -import struct Foundation.Data -import struct Foundation.UUID +import Automerge +import Foundation +import OSLog // replicating main structure from automerge-repo // https://github.com/automerge/automerge-repo/blob/main/packages/automerge-repo/src/storage/StorageSubsystem.ts -public struct StorageSubsystem { - public func loadDoc(id _: DocumentId) async -> Document { - Document() +public actor StorageSubsystem { + let chunkNamespace = "incChanges" + var compacting: Bool + let _storage: StorageProvider + var latestHeads: [DocumentId: Set] + var chunks: [DocumentId: [Data]] + + public init(_ storage: some StorageProvider) { + compacting = false + _storage = storage + latestHeads = [:] + chunks = [:] + } + + public func loadDoc(id: DocumentId) async throws -> Document { + var combined: Data + let storageChunks = await _storage.loadRange(key: id, prefix: chunkNamespace) + let inMemChunks: [Data] = chunks[id] ?? [] + + if let baseData = await _storage.load(key: id) { + // loading all the changes from the base document and any incremental saves available + combined = baseData + } else { + // loading only incremental saves available, the base document doesn't exist in storage + combined = Data() + } + for chunk in inMemChunks { + combined.append(chunk) + } + for chunk in storageChunks { + combined.append(chunk) + } + return try Document(combined) + } + + private func shouldCompact(for key: DocumentId) async -> Bool { + if compacting { + return false + } + let baseSize = await _storage.load(key: key)?.count ?? 0 + let chunkSize = await _storage.loadRange(key: key, prefix: chunkNamespace).reduce(0) { incrSize, data in + incrSize + data.count + } + return chunkSize > baseSize } - public func saveDoc(id _: DocumentId, doc _: Document) async {} + private func shouldSave(for key: DocumentId, doc: Document) -> Bool { + guard let storedHeads = self.latestHeads[key] else { + return true + } + let newHeads = doc.heads() + if newHeads == storedHeads { + return false + } + return true + } + + public func saveDoc(id: DocumentId, doc: Document) async throws { + if shouldSave(for: id, doc: doc) { + if await shouldCompact(for: id) { + try await compact(id: id, doc: doc, chunks: chunks[id] ?? []) + self.chunks[id] = [] + } else { + try await self.saveIncremental(id: id, doc: doc) + } + } + } - public func compact(id _: DocumentId, doc _: Document, chunks _: [Data]) async {} - public func saveIncremental(id _: DocumentId, doc _: Document) async {} + // TODO: update data type from Data to Chunk when validating a byte array as a partial set of changes is available from Automerge core + public func compact(id: DocumentId, doc _: Document, chunks _: [Data]) async throws { + compacting = true + let compacted = try await self.loadDoc(id: id) + // only remove the chunks AFTER the save is complete + await _storage.save(key: id, data: compacted.save()) + await _storage.removeRange(key: id, prefix: chunkNamespace) + latestHeads[id] = compacted.heads() + compacting = false + } + + public func saveIncremental(id: DocumentId, doc: Document) async throws { + var chunkCollection = chunks[id] ?? [] + let oldHeads = latestHeads[id] ?? Set() + let incrementals = try doc.encodeChangesSince(heads: oldHeads) + chunkCollection.append(incrementals) + chunks[id] = chunkCollection + await _storage.addToRange(key: id, prefix: chunkNamespace, data: incrementals) + latestHeads[id] = doc.heads() + } - public func loadSyncState(id _: DocumentId, storageId _: UUID) async -> SyncState { + public func loadSyncState(id _: DocumentId, storageId _: SyncV1.STORAGE_ID) async -> SyncState { SyncState() } - public func saveSyncState(id _: DocumentId, storageId _: UUID, state _: SyncState) async {} + public func saveSyncState(id _: DocumentId, storageId _: SyncV1.STORAGE_ID, state _: SyncState) async {} } diff --git a/Packages/automerge-repo/Sources/AutomergeRepo/extensions/OSLog+extensions.swift b/Packages/automerge-repo/Sources/AutomergeRepo/extensions/OSLog+extensions.swift index 54ef603d..ceed12b7 100644 --- a/Packages/automerge-repo/Sources/AutomergeRepo/extensions/OSLog+extensions.swift +++ b/Packages/automerge-repo/Sources/AutomergeRepo/extensions/OSLog+extensions.swift @@ -12,4 +12,7 @@ extension Logger { /// Logs updates and interaction related to the process of synchronization over the network. static let webSocket = Logger(subsystem: subsystem, category: "WebSocket") + + /// Logs updates and interaction related to the process of synchronization over the network. + static let storage = Logger(subsystem: subsystem, category: "storageSubsystem") }