Skip to content

Commit

Permalink
wrapping up the storage subsystem to use in the synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
heckj committed Mar 5, 2024
1 parent a5011df commit 7aa0a5f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import struct Foundation.Data

actor InMemoryDocuments: StorageProvider {
public actor InMemoryStorage: StorageProvider {
var _storage: [DocumentId: Data] = [:]
var _incrementalChunks: [CombinedKey: [Data]] = [:]

Expand All @@ -9,31 +9,31 @@ actor InMemoryDocuments: StorageProvider {
let prefix: String
}

func load(key: DocumentId) async -> Data? {
public func load(key: DocumentId) async -> Data? {
_storage[key]
}

func save(key: DocumentId, data: Data) async {
public func save(key: DocumentId, data: Data) async {
_storage[key] = data
}

func remove(key: DocumentId) async {
public func remove(key: DocumentId) async {
_storage.removeValue(forKey: key)
}

// MARK: Incremental Load Support

func addToRange(key: DocumentId, prefix: String, data: Data) async {
public func addToRange(key: DocumentId, prefix: String, data: Data) async {
var dataArray: [Data] = _incrementalChunks[CombinedKey(id: key, prefix: prefix)] ?? []
dataArray.append(data)
_incrementalChunks[CombinedKey(id: key, prefix: prefix)] = dataArray
}

func loadRange(key: DocumentId, prefix: String) async -> [Data] {
public func loadRange(key: DocumentId, prefix: String) async -> [Data] {
_incrementalChunks[CombinedKey(id: key, prefix: prefix)] ?? []
}

func removeRange(key: DocumentId, prefix: String) async {
public func removeRange(key: DocumentId, prefix: String) async {
_incrementalChunks.removeValue(forKey: CombinedKey(id: key, prefix: prefix))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,13 @@ import struct Foundation.Data
// loose adaptation from automerge-repo storage interface
// https://github.com/automerge/automerge-repo/blob/main/packages/automerge-repo/src/storage/StorageAdapter.ts
public protocol StorageProvider {
// TODO: update type from Data to 'Chunk'
// to represent the encoded changes of an Automerge document
func load(key: DocumentId) async -> Data?

// TODO: update type from Data to 'Chunk'
// to represent the encoded changes of an Automerge document
func save(key: DocumentId, data: Data) async

func remove(key: DocumentId) async

// MARK: Incremental Load Support

// TODO: update type from Data to 'Chunk'
// to represent an encoded partial set of changes to an Automerge document
func addToRange(key: DocumentId, prefix: String, data: Data) async

// TODO: update type from Data to 'Chunk'
// to represent an encoded partial set of changes to an Automerge document
func loadRange(key: DocumentId, prefix: String) async -> [Data]

func removeRange(key: DocumentId, prefix: String) async
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,101 @@
import class Automerge.Document
import struct Automerge.SyncState
import struct Foundation.Data
import struct Foundation.UUID
import Automerge
import Foundation
import OSLog

// replicating main structure from automerge-repo
// https://github.com/automerge/automerge-repo/blob/main/packages/automerge-repo/src/storage/StorageSubsystem.ts
public struct StorageSubsystem {
public func loadDoc(id _: DocumentId) async -> Document {
Document()
public actor StorageSubsystem<S: StorageProvider> {
let chunkNamespace = "incChanges"
var compacting: Bool
let _storage: StorageProvider
var latestHeads: [DocumentId: Set<ChangeHash>]
var chunks: [DocumentId: [Data]]

public init(_ storage: some StorageProvider) {
compacting = false
_storage = storage
latestHeads = [:]
chunks = [:]
}

public func loadDoc(id: DocumentId) async throws -> Document {
var combined: Data
let storageChunks = await _storage.loadRange(key: id, prefix: chunkNamespace)
let inMemChunks: [Data] = chunks[id] ?? []

if let baseData = await _storage.load(key: id) {
// loading all the changes from the base document and any incremental saves available
combined = baseData
} else {
// loading only incremental saves available, the base document doesn't exist in storage
combined = Data()
}
for chunk in inMemChunks {
combined.append(chunk)
}
for chunk in storageChunks {
combined.append(chunk)
}
return try Document(combined)
}

private func shouldCompact(for key: DocumentId) async -> Bool {
if compacting {
return false
}
let baseSize = await _storage.load(key: key)?.count ?? 0
let chunkSize = await _storage.loadRange(key: key, prefix: chunkNamespace).reduce(0) { incrSize, data in
incrSize + data.count
}
return chunkSize > baseSize
}

public func saveDoc(id _: DocumentId, doc _: Document) async {}
private func shouldSave(for key: DocumentId, doc: Document) -> Bool {
guard let storedHeads = self.latestHeads[key] else {
return true
}
let newHeads = doc.heads()
if newHeads == storedHeads {
return false
}
return true
}

public func saveDoc(id: DocumentId, doc: Document) async throws {
if shouldSave(for: id, doc: doc) {
if await shouldCompact(for: id) {
try await compact(id: id, doc: doc, chunks: chunks[id] ?? [])
self.chunks[id] = []
} else {
try await self.saveIncremental(id: id, doc: doc)
}
}
}

public func compact(id _: DocumentId, doc _: Document, chunks _: [Data]) async {}
public func saveIncremental(id _: DocumentId, doc _: Document) async {}
// TODO: update data type from Data to Chunk when validating a byte array as a partial set of changes is available from Automerge core
public func compact(id: DocumentId, doc _: Document, chunks _: [Data]) async throws {
compacting = true
let compacted = try await self.loadDoc(id: id)
// only remove the chunks AFTER the save is complete
await _storage.save(key: id, data: compacted.save())
await _storage.removeRange(key: id, prefix: chunkNamespace)
latestHeads[id] = compacted.heads()
compacting = false
}

public func saveIncremental(id: DocumentId, doc: Document) async throws {
var chunkCollection = chunks[id] ?? []
let oldHeads = latestHeads[id] ?? Set<ChangeHash>()
let incrementals = try doc.encodeChangesSince(heads: oldHeads)
chunkCollection.append(incrementals)
chunks[id] = chunkCollection
await _storage.addToRange(key: id, prefix: chunkNamespace, data: incrementals)
latestHeads[id] = doc.heads()
}

public func loadSyncState(id _: DocumentId, storageId _: UUID) async -> SyncState {
public func loadSyncState(id _: DocumentId, storageId _: SyncV1.STORAGE_ID) async -> SyncState {
SyncState()
}

public func saveSyncState(id _: DocumentId, storageId _: UUID, state _: SyncState) async {}
public func saveSyncState(id _: DocumentId, storageId _: SyncV1.STORAGE_ID, state _: SyncState) async {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ extension Logger {

/// Logs updates and interaction related to the process of synchronization over the network.
static let webSocket = Logger(subsystem: subsystem, category: "WebSocket")

/// Logs updates and interaction related to the process of synchronization over the network.
static let storage = Logger(subsystem: subsystem, category: "storageSubsystem")
}

0 comments on commit 7aa0a5f

Please sign in to comment.