Skip to content
This repository has been archived by the owner on Jan 30, 2025. It is now read-only.

Encode state as stream of updates #1

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "yjs",
"version": "13.5.41",
"name": "@datacamp/yjs",
"version": "13.5.41-datacamp.0",
"description": "Shared Editing Library",
"main": "./dist/yjs.cjs",
"module": "./dist/yjs.mjs",
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export {
readUpdate,
readUpdateV2,
encodeStateAsUpdate,
encodeStateAsStreamOfUpdates,
encodeStateAsUpdateV2,
encodeStateVector,
UndoManager,
Expand Down
190 changes: 180 additions & 10 deletions src/utils/encoding.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,64 @@ import * as binary from 'lib0/binary'
import * as map from 'lib0/map'
import * as math from 'lib0/math'

/**
* @param {Array<GC|Item>} structs All structs by `client`
* @param {number} minClock write structs starting with `ID(client,minClock)`
* @param {number | null} maxClock write structs with clock < maxClock for client
*
* @function
*/
const getStructsToWrite = (structs, minClock, maxClock = null) => {
minClock = math.max(minClock, structs[0].id.clock) // make sure the first id exists
const startNewStructs = findIndexSS(structs, minClock)
const lastStruct = structs[structs.length - 1]
if (maxClock == null || maxClock > lastStruct.id.clock) {
return structs.slice(startNewStructs)
}
let endNewStructs = findIndexSS(structs, maxClock)
if (maxClock > structs[endNewStructs].id.clock) {
// We write the last fully, so we don't split it until maxClock
// Therefore we need to also include the last one
endNewStructs += 1
}
return structs.slice(startNewStructs, endNewStructs)
}

/**
* @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
* @param {Array<GC|Item>} structs All structs by `client`
* @param {number} client
* @param {number} clock write structs starting with `ID(client,clock)`
* @param {number | null} maxClock write structs with clock < maxClock `ID(client,clock)`
* @returns {number} the last clock written
*
* @function
*/
const writeStructs = (encoder, structs, client, clock) => {
// write first id
const writeStructs = (encoder, structs, client, clock, maxClock = null) => {
clock = math.max(clock, structs[0].id.clock) // make sure the first id exists
const startNewStructs = findIndexSS(structs, clock)
const newStructs = getStructsToWrite(structs, clock, maxClock)
// write # encoded structs
encoding.writeVarUint(encoder.restEncoder, structs.length - startNewStructs)
encoding.writeVarUint(encoder.restEncoder, newStructs.length)
encoder.writeClient(client)
encoding.writeVarUint(encoder.restEncoder, clock)
const firstStruct = structs[startNewStructs]
const firstStruct = newStructs[0]
// write first struct with an offset
firstStruct.write(encoder, clock - firstStruct.id.clock)
for (let i = startNewStructs + 1; i < structs.length; i++) {
structs[i].write(encoder, 0)
for (let i = 1; i < newStructs.length; i++) {
newStructs[i].write(encoder, 0)
}
const lastStruct = newStructs[newStructs.length - 1]
return lastStruct == null ? clock : lastStruct.id.clock + lastStruct.length
}

/**
* @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
* @param {StructStore} store
* @param {Map<number,number>} _sm
*
* @private
* @function
*/
export const writeClientsStructs = (encoder, store, _sm) => {
export const getStatesToWrite = (store, _sm) => {
// we filter all valid _sm entries into sm
const sm = new Map()
_sm.forEach((clock, client) => {
Expand All @@ -92,6 +117,19 @@ export const writeClientsStructs = (encoder, store, _sm) => {
sm.set(client, 0)
}
})
return sm
}

/**
* @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
* @param {StructStore} store
* @param {Map<number,number>} _sm
*
* @private
* @function
*/
export const writeClientsStructs = (encoder, store, _sm) => {
const sm = getStatesToWrite(store, _sm)
// write # states that were updated
encoding.writeVarUint(encoder.restEncoder, sm.size)
// Write items with higher client ids first
Expand Down Expand Up @@ -508,7 +546,84 @@ export const writeStateAsUpdate = (encoder, doc, targetStateVector = new Map())
}

/**
* Write all the document as a single update message that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will
* @param {Array<[number, number]>} clientClocks
* @return {Array<[number, number]>}
*
* @function
*/
const sortClientsLargestToSmallest = (clientClocks) => {
return clientClocks.sort((a, z) => z[0] - a[0])
}

/**
* Write the whole document as a stream of update messages. If you specify the state of the remote client (`targetStateVector`) it will only write the operations that are missing.
*
* @param {() => UpdateEncoderV1 | UpdateEncoderV2} getEncoder
* @param {Doc} doc
* @param {object} options
* @param {(client: number, clock: number, maxClock: number) => Iterable<number> | Generator<number, void, number>} [options.clockSplits] For this client, where would you like the updates to be splitted. If a clockSplit is in the middle of an item, it might return the full item and so this function doesn't split exactly on the given clockSplits. Use the injected clock to know where the last split happened
* @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients] How to sort the clients. In general, it's better to sort with higher client ids first as this heavily improves the conflict algorithm. This is also the default implementation.
* @param {Map<number,number>} [targetStateVector] The state of the target that receives the update. Leave empty to write all known structs
* @return {Iterable<UpdateEncoderV1 | UpdateEncoderV2>}
*
* @generator
*/
export const writeStateAsStreamOfUpdates = function * (getEncoder, doc, options, targetStateVector = new Map()) {
const deleteEncoder = getEncoder()
// no updates / structs to write
encoding.writeVarUint(deleteEncoder.restEncoder, 0)
writeDeleteSet(deleteEncoder, createDeleteSetFromStructStore(doc.store))
yield deleteEncoder

const sm = getStatesToWrite(doc.store, targetStateVector)
const sortClients = options.sortClients ?? sortClientsLargestToSmallest
for (let [client, clock] of sortClients(Array.from(sm.entries()))) {
const lastClockClient = getState(doc.store, client)
/** @type {Array<GC | Item> | undefined} */
const structs = doc.store.clients.get(client)
if (structs == null) {
continue
}

if (options.clockSplits != null) {
const iterator = options.clockSplits(client, clock, lastClockClient)[Symbol.iterator]()
while (true) {
// @ts-expect-error clock is number and iterator expects no argument...
const clockSplit = iterator.next(clock)
if (clockSplit.done || clockSplit.value >= lastClockClient) {
break
}
if (clockSplit.value <= clock) {
continue
}

const encoder = getEncoder()
// 1 client has structs to write
encoding.writeVarUint(encoder.restEncoder, 1)
clock = writeStructs(encoder, structs, client, clock, clockSplit.value)

// no deletes to write
encoding.writeVarUint(encoder.restEncoder, 0)

yield encoder
}
}
if (clock < lastClockClient) {
const encoder = getEncoder()
// 1 client has structs to write
encoding.writeVarUint(encoder.restEncoder, 1)
clock = writeStructs(encoder, structs, client, clock)

// no deletes to write
encoding.writeVarUint(encoder.restEncoder, 0)

yield encoder
}
}
}

/**
* Write the document as a single update message that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will
* only write the operations that are missing.
*
* Use `writeStateAsUpdate` instead if you are working with lib0/encoding.js#Encoder
Expand Down Expand Up @@ -555,6 +670,61 @@ export const encodeStateAsUpdateV2 = (doc, encodedTargetStateVector = new Uint8A
*/
export const encodeStateAsUpdate = (doc, encodedTargetStateVector) => encodeStateAsUpdateV2(doc, encodedTargetStateVector, new UpdateEncoderV1())

/**
* Write the whole document as a stream of update messages that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will only write the operations that are missing.
*
* Use `writeStateAsStreamOfUpdates` instead if you are working with lib0/encoding.js#Encoder
*
* @param {Doc} doc
* @param {object} options
* @param {(client: number, clock: number, maxClock: number) => Iterable<number> | Generator<number, void, number>} [options.clockSplits]
* @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients]
* @param {Uint8Array} [encodedTargetStateVector] The state of the target that receives the update. Leave empty to write all known structs
* @param {() => UpdateEncoderV1 | UpdateEncoderV2} [getEncoder]
* @return {Iterable<Uint8Array>}
*
* @generator
*/
export const encodeStateAsStreamOfUpdatesV2 = function * (doc, options, encodedTargetStateVector = new Uint8Array([0]), getEncoder = () => new UpdateEncoderV2()) {
const targetStateVector = decodeStateVector(encodedTargetStateVector)
for (const encoder of writeStateAsStreamOfUpdates(getEncoder, doc, options, targetStateVector)) {
yield encoder.toUint8Array()
}

const updates = []
// also add the pending updates (if there are any)
if (doc.store.pendingDs) {
updates.push(doc.store.pendingDs)
}
if (doc.store.pendingStructs) {
updates.push(diffUpdateV2(doc.store.pendingStructs.update, encodedTargetStateVector))
}
if (updates.length > 0) {
const encoder = getEncoder()
if (encoder.constructor === UpdateEncoderV1) {
yield mergeUpdates(updates.map((update, i) => i === 0 ? update : convertUpdateFormatV2ToV1(update)))
} else if (encoder.constructor === UpdateEncoderV2) {
yield mergeUpdatesV2(updates)
}
}
}

/**
* Write the whole document as a stream of update messages that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will only write the operations that are missing.
*
* Use `writeStateAsStreamOfUpdates` instead if you are working with lib0/encoding.js#Encoder
*
* @param {Doc} doc
* @param {object} options
* @param {(client: number, clock: number, maxClock: number) => Iterable<number> | Generator<number, void, number>} [options.clockSplits]
* @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients]
* @param {Uint8Array} [encodedTargetStateVector] The state of the target that receives the update. Leave empty to write all known structs
* @return {Iterable<Uint8Array>}
*
* @function
*/
export const encodeStateAsStreamOfUpdates = (doc, options, encodedTargetStateVector) => encodeStateAsStreamOfUpdatesV2(doc, options, encodedTargetStateVector, () => new UpdateEncoderV1())

/**
* Read state vector from Decoder and return as Map
*
Expand Down
Loading