diff --git a/composer/composer/autoload_classmap.php b/composer/composer/autoload_classmap.php index 5696014cfed..12a05698cd5 100644 --- a/composer/composer/autoload_classmap.php +++ b/composer/composer/autoload_classmap.php @@ -64,4 +64,5 @@ 'OCA\\Text\\Service\\SessionService' => $baseDir . '/../lib/Service/SessionService.php', 'OCA\\Text\\Service\\WorkspaceService' => $baseDir . '/../lib/Service/WorkspaceService.php', 'OCA\\Text\\TextFile' => $baseDir . '/../lib/TextFile.php', + 'OCA\\Text\\YjsMessage' => $baseDir . '/../lib/YjsMessage.php', ); diff --git a/composer/composer/autoload_static.php b/composer/composer/autoload_static.php index 0895a04e5cb..4d34f834925 100644 --- a/composer/composer/autoload_static.php +++ b/composer/composer/autoload_static.php @@ -79,6 +79,7 @@ class ComposerStaticInitText 'OCA\\Text\\Service\\SessionService' => __DIR__ . '/..' . '/../lib/Service/SessionService.php', 'OCA\\Text\\Service\\WorkspaceService' => __DIR__ . '/..' . '/../lib/Service/WorkspaceService.php', 'OCA\\Text\\TextFile' => __DIR__ . '/..' . '/../lib/TextFile.php', + 'OCA\\Text\\YjsMessage' => __DIR__ . '/..' . '/../lib/YjsMessage.php', ); public static function getInitializer(ClassLoader $loader) diff --git a/cypress/e2e/api/SessionApi.spec.js b/cypress/e2e/api/SessionApi.spec.js index 7dee798f0c2..d98c81727ec 100644 --- a/cypress/e2e/api/SessionApi.spec.js +++ b/cypress/e2e/api/SessionApi.spec.js @@ -135,7 +135,10 @@ describe('The session Api', function() { .should('eql', 0) cy.wrap(response) .its('steps.length') - .should('eql', 0) + .should('eql', 1) + cy.wrap(response) + .its('steps[0].data') + .should('eql', [messages.update]) }) }) }) diff --git a/lib/Service/DocumentService.php b/lib/Service/DocumentService.php index 4ccd2e0506d..29a86171f39 100644 --- a/lib/Service/DocumentService.php +++ b/lib/Service/DocumentService.php @@ -36,6 +36,7 @@ use OCA\Text\Db\StepMapper; use OCA\Text\Exception\DocumentHasUnsavedChangesException; use OCA\Text\Exception\DocumentSaveConflictException; +use OCA\Text\YjsMessage; use OCP\AppFramework\Db\DoesNotExistException; use OCP\Constants; use OCP\DB\Exception; @@ -217,11 +218,9 @@ public function addStep(Document $document, Session $session, array $steps, int $getStepsSinceVersion = null; $newVersion = $version; foreach ($steps as $step) { - // Steps are base64 encoded messages of the yjs protocols - // https://github.com/yjs/y-protocols - // Base64 encoded values smaller than "AAE" belong to sync step 1 messages. - // These messages query other participants for their current state. - if ($step < "AAE") { + $message = YjsMessage::fromBase64($step); + // Filter out query steps as they would just trigger clients to send their steps again + if ($message->getYjsMessageType() === YjsMessage::YJS_MESSAGE_SYNC && $message->getYjsSyncType() === YjsMessage::YJS_MESSAGE_SYNC_STEP1) { array_push($querySteps, $step); } else { array_push($stepsToInsert, $step); @@ -235,8 +234,9 @@ public function addStep(Document $document, Session $session, array $steps, int $allSteps = $this->getSteps($documentId, $getStepsSinceVersion); $stepsToReturn = []; foreach ($allSteps as $step) { - if ($step < "AAQ") { - array_push($stepsToReturn, $step); + $message = YjsMessage::fromBase64($step->getData()); + if ($message->getYjsMessageType() === YjsMessage::YJS_MESSAGE_SYNC && $message->getYjsSyncType() === YjsMessage::YJS_MESSAGE_SYNC_UPDATE) { + $stepsToReturn[] = $step; } } return [ @@ -284,6 +284,7 @@ private function insertSteps(Document $document, Session $session, array $steps, } } + /** @return Step[] */ public function getSteps(int $documentId, int $lastVersion): array { if ($lastVersion === $this->cache->get('document-version-' . $documentId)) { return []; diff --git a/lib/Service/SessionService.php b/lib/Service/SessionService.php index 3776c93cbaf..e24204e201d 100644 --- a/lib/Service/SessionService.php +++ b/lib/Service/SessionService.php @@ -28,6 +28,7 @@ use OCA\Text\Db\Session; use OCA\Text\Db\SessionMapper; +use OCA\Text\YjsMessage; use OCP\AppFramework\Db\DoesNotExistException; use OCP\AppFramework\Utility\ITimeFactory; use OCP\DirectEditing\IManager; @@ -225,6 +226,12 @@ public function updateSessionAwareness(Session $session, string $message): Sessi if (empty($message)) { return $session; } + + $decoded = YjsMessage::fromBase64($message); + if ($decoded->getYjsMessageType() !== YjsMessage::YJS_MESSAGE_AWARENESS) { + throw new \ValueError('Message passed was not an awareness message'); + } + $session->setLastAwarenessMessage($message); return $this->sessionMapper->update($session); } diff --git a/lib/YjsMessage.php b/lib/YjsMessage.php new file mode 100644 index 00000000000..913d9e91f0a --- /dev/null +++ b/lib/YjsMessage.php @@ -0,0 +1,85 @@ +data)); + $num = 0; + $mult = 1; + $len = count($bytes); + while ($this->pos < $len) { + $r = $bytes[$this->pos++]; + // num = num | ((r & binary.BITS7) << len) + $num = $num + ($r & 0b1111111) * $mult; + $mult *= 128; + if ($r <= 0b1111111) { + return $num; + } + // Number.MAX_SAFE_INTEGER in JS + if ($num > 9007199254740990) { + throw new \OutOfBoundsException(); + } + } + throw new InvalidArgumentException(); + } + + public function getYjsMessageType(): int { + $oldPos = $this->pos; + $this->pos = 0; + $messageType = $this->readVarUint(); + $this->pos = $oldPos; + return $messageType; + } + + public function getYjsSyncType(): int { + $oldPos = $this->pos; + $this->pos = 0; + $messageType = $this->readVarUint(); + if ($messageType !== self::YJS_MESSAGE_SYNC) { + throw new \ValueError('Message is not a sync message'); + } + $syncType = $this->readVarUint(); + $this->pos = $oldPos; + return $syncType; + } + +} diff --git a/src/components/Editor.vue b/src/components/Editor.vue index f271ae03ad7..5d641606df9 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -327,6 +327,11 @@ export default { }, created() { this.$ydoc = new Doc() + // The following can be useful for debugging ydoc updates + // this.$ydoc.on('update', function(update, origin, doc, tr) { + // console.debug('ydoc update', update, origin, doc, tr) + // Y.logUpdate(update) + // }); this.$providers = [] this.$editor = null this.$syncService = null diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index bc83c1e30a5..bbc4ee24e13 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -22,6 +22,7 @@ import { encodeArrayBuffer, decodeArrayBuffer } from '../helpers/base64.js' import { Doc, encodeStateAsUpdate, applyUpdate } from 'yjs' +import * as decoding from 'lib0/decoding.js' /** * @@ -42,3 +43,31 @@ export function applyDocumentState(ydoc, documentState, origin) { const update = decodeArrayBuffer(documentState) applyUpdate(ydoc, update, origin) } + +/** + * Log y.js messages with their type and initiator call stack + * + * @param {string} step - Y.js message + */ +export function logStep(step) { + // Create error for stack trace + const err = new Error() + + const decoder = decoding.createDecoder(step) + + const messageType = decoding.readVarUint(decoder) + const subType = decoding.readVarUint(decoder) + + const encodedStep = encodeArrayBuffer(step) + switch (messageType) { + case 0: + console.debug('y.js message sync', subType, encodedStep, err.stack) + break + case 3: + console.debug('y.js message awareness_query', encodedStep, err.stack) + break + case 1: + console.debug('y.js message awareness', encodedStep, err.stack) + break + } +} diff --git a/src/services/WebSocketPolyfill.js b/src/services/WebSocketPolyfill.js index 48ba4d67d83..082a07d836e 100644 --- a/src/services/WebSocketPolyfill.js +++ b/src/services/WebSocketPolyfill.js @@ -80,6 +80,9 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio } send(...data) { + // Useful for debugging what steps are sent and how they were initiated + // data.forEach(logStep) + this.#queue.push(...data) let outbox = [] syncService.sendSteps(() => { diff --git a/tests/unit/YjsMessageTest.php b/tests/unit/YjsMessageTest.php new file mode 100644 index 00000000000..087aa6d5d77 --- /dev/null +++ b/tests/unit/YjsMessageTest.php @@ -0,0 +1,59 @@ +getYjsMessageType(); + self::assertEquals($type, $unpack1, 'type'); + if ($subtype !== null) { + $unpack2 = $buffer->getYjsSyncType(); + self::assertEquals($subtype, $unpack2); + } + } + + public function testV() { + self::assertEquals(0, YjsMessage::fromBase64('AA==')->readVarUint()); + self::assertEquals(127, YjsMessage::fromBase64('fw==')->readVarUint()); + self::assertEquals(128, YjsMessage::fromBase64('gAE=')->readVarUint()); + self::assertEquals(129, YjsMessage::fromBase64('gQE=')->readVarUint()); + self::assertEquals(259, YjsMessage::fromBase64('gwI=')->readVarUint()); + self::assertEquals(0, YjsMessage::fromBase64('AA==')->readVarUint()); + self::assertEquals(13372342, YjsMessage::fromBase64('tpewBg==')->readVarUint()); + self::assertEquals(1357913579, YjsMessage::fromBase64('67vAhwU=')->readVarUint()); + + $buffer = YjsMessage::fromBase64('tpewBg=='); + self::assertEquals(13372342, $buffer->readVarUint()); + } +}