Skip to content

Commit

Permalink
Merge pull request #4310 from nextcloud/chore/yjs
Browse files Browse the repository at this point in the history
Y.js backend enhancements and debug helpers
  • Loading branch information
max-nextcloud authored Sep 13, 2023
2 parents e6119e0 + 2341059 commit 439e7e5
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 8 deletions.
1 change: 1 addition & 0 deletions composer/composer/autoload_classmap.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@
'OCA\\Text\\Service\\SessionService' => $baseDir . '/../lib/Service/SessionService.php',
'OCA\\Text\\Service\\WorkspaceService' => $baseDir . '/../lib/Service/WorkspaceService.php',
'OCA\\Text\\TextFile' => $baseDir . '/../lib/TextFile.php',
'OCA\\Text\\YjsMessage' => $baseDir . '/../lib/YjsMessage.php',
);
1 change: 1 addition & 0 deletions composer/composer/autoload_static.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class ComposerStaticInitText
'OCA\\Text\\Service\\SessionService' => __DIR__ . '/..' . '/../lib/Service/SessionService.php',
'OCA\\Text\\Service\\WorkspaceService' => __DIR__ . '/..' . '/../lib/Service/WorkspaceService.php',
'OCA\\Text\\TextFile' => __DIR__ . '/..' . '/../lib/TextFile.php',
'OCA\\Text\\YjsMessage' => __DIR__ . '/..' . '/../lib/YjsMessage.php',
);

public static function getInitializer(ClassLoader $loader)
Expand Down
5 changes: 4 additions & 1 deletion cypress/e2e/api/SessionApi.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ describe('The session Api', function() {
.should('eql', 0)
cy.wrap(response)
.its('steps.length')
.should('eql', 0)
.should('eql', 1)
cy.wrap(response)
.its('steps[0].data')
.should('eql', [messages.update])
})
})
})
Expand Down
15 changes: 8 additions & 7 deletions lib/Service/DocumentService.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
use OCA\Text\Db\StepMapper;
use OCA\Text\Exception\DocumentHasUnsavedChangesException;
use OCA\Text\Exception\DocumentSaveConflictException;
use OCA\Text\YjsMessage;
use OCP\AppFramework\Db\DoesNotExistException;
use OCP\Constants;
use OCP\DB\Exception;
Expand Down Expand Up @@ -217,11 +218,9 @@ public function addStep(Document $document, Session $session, array $steps, int
$getStepsSinceVersion = null;
$newVersion = $version;
foreach ($steps as $step) {
// Steps are base64 encoded messages of the yjs protocols
// https://github.com/yjs/y-protocols
// Base64 encoded values smaller than "AAE" belong to sync step 1 messages.
// These messages query other participants for their current state.
if ($step < "AAE") {
$message = YjsMessage::fromBase64($step);
// Filter out query steps as they would just trigger clients to send their steps again
if ($message->getYjsMessageType() === YjsMessage::YJS_MESSAGE_SYNC && $message->getYjsSyncType() === YjsMessage::YJS_MESSAGE_SYNC_STEP1) {
array_push($querySteps, $step);
} else {
array_push($stepsToInsert, $step);
Expand All @@ -235,8 +234,9 @@ public function addStep(Document $document, Session $session, array $steps, int
$allSteps = $this->getSteps($documentId, $getStepsSinceVersion);
$stepsToReturn = [];
foreach ($allSteps as $step) {
if ($step < "AAQ") {
array_push($stepsToReturn, $step);
$message = YjsMessage::fromBase64($step->getData());
if ($message->getYjsMessageType() === YjsMessage::YJS_MESSAGE_SYNC && $message->getYjsSyncType() === YjsMessage::YJS_MESSAGE_SYNC_UPDATE) {
$stepsToReturn[] = $step;
}
}
return [
Expand Down Expand Up @@ -284,6 +284,7 @@ private function insertSteps(Document $document, Session $session, array $steps,
}
}

/** @return Step[] */
public function getSteps(int $documentId, int $lastVersion): array {
if ($lastVersion === $this->cache->get('document-version-' . $documentId)) {
return [];
Expand Down
7 changes: 7 additions & 0 deletions lib/Service/SessionService.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

use OCA\Text\Db\Session;
use OCA\Text\Db\SessionMapper;
use OCA\Text\YjsMessage;
use OCP\AppFramework\Db\DoesNotExistException;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\DirectEditing\IManager;
Expand Down Expand Up @@ -225,6 +226,12 @@ public function updateSessionAwareness(Session $session, string $message): Sessi
if (empty($message)) {
return $session;
}

$decoded = YjsMessage::fromBase64($message);
if ($decoded->getYjsMessageType() !== YjsMessage::YJS_MESSAGE_AWARENESS) {
throw new \ValueError('Message passed was not an awareness message');
}

$session->setLastAwarenessMessage($message);
return $this->sessionMapper->update($session);
}
Expand Down
85 changes: 85 additions & 0 deletions lib/YjsMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

namespace OCA\Text;

use InvalidArgumentException;

/**
* Steps are base64 encoded messages of the yjs protocols
* https://github.com/yjs/y-protocols
*
* This class is a simple representation of a message containing some methods
* to decode parts of it for what we need on the backend
*
* Relevant resources:
* https://github.com/yjs/y-protocols/blob/master/PROTOCOL.md
* https://github.com/yjs/y-websocket/blob/master/src/y-websocket.js#L19-L22
* https://github.com/yjs/y-protocols/blob/master/sync.js#L38-L40
* https://github.com/dmonad/lib0/blob/master/decoding.js
*/
class YjsMessage {

public const YJS_MESSAGE_SYNC = 0;
public const YJS_MESSAGE_AWARENESS = 1;
public const YJS_MESSAGE_AWARENESS_QUERY = 3;

public const YJS_MESSAGE_SYNC_STEP1 = 0;
public const YJS_MESSAGE_SYNC_STEP2 = 1;
public const YJS_MESSAGE_SYNC_UPDATE = 2;

private int $pos = 0;

public function __construct(
private string $data = ''
) {
}

public static function fromBase64(string $data = ''): self {
return new self(base64_decode($data));
}

/**
* https://github.com/dmonad/lib0/blob/bd69ab4dc701d77e808f2bab08d96d63acd297da/decoding.js#L242
*/
public function readVarUint(): int {
$bytes = array_values(unpack('C*', $this->data));
$num = 0;
$mult = 1;
$len = count($bytes);
while ($this->pos < $len) {
$r = $bytes[$this->pos++];
// num = num | ((r & binary.BITS7) << len)
$num = $num + ($r & 0b1111111) * $mult;
$mult *= 128;
if ($r <= 0b1111111) {
return $num;
}
// Number.MAX_SAFE_INTEGER in JS
if ($num > 9007199254740990) {
throw new \OutOfBoundsException();
}
}
throw new InvalidArgumentException();
}

public function getYjsMessageType(): int {
$oldPos = $this->pos;
$this->pos = 0;
$messageType = $this->readVarUint();
$this->pos = $oldPos;
return $messageType;
}

public function getYjsSyncType(): int {
$oldPos = $this->pos;
$this->pos = 0;
$messageType = $this->readVarUint();
if ($messageType !== self::YJS_MESSAGE_SYNC) {
throw new \ValueError('Message is not a sync message');
}
$syncType = $this->readVarUint();
$this->pos = $oldPos;
return $syncType;
}

}
5 changes: 5 additions & 0 deletions src/components/Editor.vue
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ export default {
},
created() {
this.$ydoc = new Doc()
// The following can be useful for debugging ydoc updates
// this.$ydoc.on('update', function(update, origin, doc, tr) {
// console.debug('ydoc update', update, origin, doc, tr)
// Y.logUpdate(update)
// });
this.$providers = []
this.$editor = null
this.$syncService = null
Expand Down
29 changes: 29 additions & 0 deletions src/helpers/yjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import { encodeArrayBuffer, decodeArrayBuffer } from '../helpers/base64.js'
import { Doc, encodeStateAsUpdate, applyUpdate } from 'yjs'
import * as decoding from 'lib0/decoding.js'

/**
*
Expand All @@ -42,3 +43,31 @@ export function applyDocumentState(ydoc, documentState, origin) {
const update = decodeArrayBuffer(documentState)
applyUpdate(ydoc, update, origin)
}

/**
* Log y.js messages with their type and initiator call stack
*
* @param {string} step - Y.js message
*/
export function logStep(step) {
// Create error for stack trace
const err = new Error()

const decoder = decoding.createDecoder(step)

const messageType = decoding.readVarUint(decoder)
const subType = decoding.readVarUint(decoder)

const encodedStep = encodeArrayBuffer(step)
switch (messageType) {
case 0:
console.debug('y.js message sync', subType, encodedStep, err.stack)
break
case 3:
console.debug('y.js message awareness_query', encodedStep, err.stack)
break
case 1:
console.debug('y.js message awareness', encodedStep, err.stack)
break
}
}
3 changes: 3 additions & 0 deletions src/services/WebSocketPolyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
}

send(...data) {
// Useful for debugging what steps are sent and how they were initiated
// data.forEach(logStep)

this.#queue.push(...data)
let outbox = []
syncService.sendSteps(() => {
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/YjsMessageTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

namespace OCA\Text;

use Test\TestCase;

class YjsMessageTest extends TestCase {
protected function setUp(): void {
parent::setUp();
}

// https://github.com/yjs/y-dat/blob/745d25f9690fceae5901d1225575fe8b6bcafdd7/src/y-dat.js#LL207C59-L210C1
public function dataMessageTypes() {
return [
//
['AAABAA==', 0, 0],
// messageSync messageYjsSyncStep1
['AAAyCIqghaQNBvrS2LoMB8L10I8KrQPD+t3RB2DcrYL4A40Ema2O4AMHz9bk8AIOtbm0PAE=', 0, 0],
// messageAwareness
['AQoBkZWK7gMAAnt9', 1, null],
['AUIBwvXQjwqWAzl7InVzZXIiOnsibmFtZSI6ImFkbWluIiwiY29sb3IiOiIjZDA5ZTZkIn0sImN1cnNvciI6bnVsbH0=', 1, null],
['AegBAcP63dEHtwHeAXsidXNlciI6eyJuYW1lIjoiR3Vlc3QiLCJjb2xvciI6IiM5M2IyN2IifSwiY3Vyc29yIjp7ImFuY2hvciI6eyJ0eXBlIjp7ImNsaWVudCI6MjcxNzEzNzYwMiwiY2xvY2siOjl9LCJ0bmFtZSI6bnVsbCwiaXRlbSI6bnVsbCwiYXNzb2MiOjB9LCJoZWFkIjp7InR5cGUiOnsiY2xpZW50IjoyNzE3MTM3NjAyLCJjbG9jayI6OX0sInRuYW1lIjpudWxsLCJpdGVtIjpudWxsLCJhc3NvYyI6MH19fQ==', 1, null],
['AbsBAZGViu4DArIBeyJ1c2VyIjp7Im5hbWUiOiJHdWVzdCIsImNvbG9yIjoiI2I4YmU2OCJ9LCJjdXJzb3IiOnsiYW5jaG9yIjp7InR5cGUiOm51bGwsInRuYW1lIjoiZGVmYXVsdCIsIml0ZW0iOm51bGwsImFzc29jIjowfSwiaGVhZCI6eyJ0eXBlIjpudWxsLCJ0bmFtZSI6ImRlZmF1bHQiLCJpdGVtIjpudWxsLCJhc3NvYyI6MH19fQ==', 1, null],
// messageSync messageYjsUpdate
['AAISAQHD+t3RB2CEwvXQjwpHAWEA', 0, 2],
['AAI0AQOKoIWkDQAHAQdkZWZhdWx0AwlwYXJhZ3JhcGgHAIqghaQNAAYEAIqghaQNAQR0ZXN0AA==', 0, 2],
['AAIdAQGRlYruAx2okZWK7gMbAXcCaC0BkZWK7gMBGwE=', 0, 2],
['AAIKAAGRlYruAwEVBA==', 0, 2],
// query, response
['AAABAA==', YjsMessage::YJS_MESSAGE_SYNC, YjsMessage::YJS_MESSAGE_SYNC_STEP1],
['AAECAAA=', YjsMessage::YJS_MESSAGE_SYNC, YjsMessage::YJS_MESSAGE_SYNC_STEP2],
];
}

/** @dataProvider dataMessageTypes */
public function testMessageTypes($data, $type, $subtype) {
$buffer = YjsMessage::fromBase64($data);
$unpack1 = $buffer->getYjsMessageType();
self::assertEquals($type, $unpack1, 'type');
if ($subtype !== null) {
$unpack2 = $buffer->getYjsSyncType();
self::assertEquals($subtype, $unpack2);
}
}

public function testV() {
self::assertEquals(0, YjsMessage::fromBase64('AA==')->readVarUint());
self::assertEquals(127, YjsMessage::fromBase64('fw==')->readVarUint());
self::assertEquals(128, YjsMessage::fromBase64('gAE=')->readVarUint());
self::assertEquals(129, YjsMessage::fromBase64('gQE=')->readVarUint());
self::assertEquals(259, YjsMessage::fromBase64('gwI=')->readVarUint());
self::assertEquals(0, YjsMessage::fromBase64('AA==')->readVarUint());
self::assertEquals(13372342, YjsMessage::fromBase64('tpewBg==')->readVarUint());
self::assertEquals(1357913579, YjsMessage::fromBase64('67vAhwU=')->readVarUint());

$buffer = YjsMessage::fromBase64('tpewBg==');
self::assertEquals(13372342, $buffer->readVarUint());
}
}

0 comments on commit 439e7e5

Please sign in to comment.