From f12d6fd1101a0da6fc6905f4c16bf34ec7b70bed Mon Sep 17 00:00:00 2001 From: lyming99 <44185539@qq.com> Date: Sun, 3 Mar 2024 23:35:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../home/mobile_home_page_controller.dart | 71 ----------- .../view/doc/mobile_doc_page_controller.dart | 8 +- .../mobile/view/edit/doc_edit_controller.dart | 7 +- .../view/today/mobile_today_controller.dart | 7 +- .../controller/home/win_home_controller.dart | 4 +- .../today/win_today_controller.dart | 6 +- .../service/doc/win_doc_list_service.dart | 13 +- .../service/today/win_today_service.dart | 5 - .../windows/view/doc/win_note_edit_tab.dart | 57 +++------ lib/commons/service/document_manager.dart | 19 ++- lib/service/doc/doc_service.dart | 16 ++- lib/service/edit/doc_edit_service.dart | 120 +++++++++--------- lib/service/sync/doc_snapshot_service.dart | 33 +++-- lib/service/sync/p2p_service.dart | 61 ++++++++- lib/service/sync/record_sync_service.dart | 6 +- lib/service/sync/upload_task_service.dart | 12 +- test/yjs_update_test.dart | 19 +++ 17 files changed, 216 insertions(+), 248 deletions(-) delete mode 100644 lib/app/mobile/controller/home/mobile_home_page_controller.dart create mode 100644 test/yjs_update_test.dart diff --git a/lib/app/mobile/controller/home/mobile_home_page_controller.dart b/lib/app/mobile/controller/home/mobile_home_page_controller.dart deleted file mode 100644 index ead6b30..0000000 --- a/lib/app/mobile/controller/home/mobile_home_page_controller.dart +++ /dev/null @@ -1,71 +0,0 @@ -import 'package:flutter_crdt/flutter_crdt.dart'; -import 'package:get/get.dart'; -import 'package:wenznote/app/mobile/view/doc/mobile_doc_page_controller.dart'; -import 'package:wenznote/app/mobile/view/today/mobile_today_controller.dart'; -import 'package:wenznote/editor/crdt/YsText.dart'; -import 'package:wenznote/model/note/po/doc_po.dart'; -import 'package:wenznote/service/service_manager.dart'; -import 'package:uuid/uuid.dart'; - -class MobileHomePageController extends ServiceManagerController { - var navIndex = 0.obs; - String? currentDocDir; - var showBottomNav = true.obs; - var todayController = MobileTodayController(); - - var docListController = MobileDocPageController(); - - String getTile() { - return getLabel(navIndex.value); - } - - String getLabel(int index) { - String title = ""; - switch (index) { - case 0: - title = "今天"; - break; - case 1: - title = "笔记"; - break; - case 2: - title = "卡片"; - break; - } - return title; - } - - Future createDoc(String? pid, String text) async { - var item = DocPO( - pid: pid, - uuid: Uuid().v1(), - createTime: DateTime.now().millisecondsSinceEpoch, - updateTime: DateTime.now().millisecondsSinceEpoch, - name: text, - type: "doc", - ); - await serviceManager.docService.createDoc(item); - var docContent = Doc(); - docContent.getArray("blocks").insert(0, [createEmptyTextYMap()]); - await serviceManager.editService.writeDoc(item.uuid, docContent); - return item; - } - - Future createNote() async { - var item = DocPO( - uuid: const Uuid().v1(), - createTime: DateTime.now().millisecondsSinceEpoch, - updateTime: DateTime.now().millisecondsSinceEpoch, - type: "note", - ); - await serviceManager.docService.createDoc(item); - var docContent = Doc(); - docContent.getArray("blocks").insert(0, [createEmptyTextYMap()]); - await serviceManager.editService.writeDoc(item.uuid, docContent); - return item; - } - - Future createNoteAndOpen() async { - return null; - } -} diff --git a/lib/app/mobile/view/doc/mobile_doc_page_controller.dart b/lib/app/mobile/view/doc/mobile_doc_page_controller.dart index f094762..7437288 100644 --- a/lib/app/mobile/view/doc/mobile_doc_page_controller.dart +++ b/lib/app/mobile/view/doc/mobile_doc_page_controller.dart @@ -127,8 +127,6 @@ class MobileDocPageController extends ServiceManagerController { return false; } - void createNote() {} - void openSearchItem(SearchResultVO searchItem) {} void copySearchItem(BuildContext context, int index) {} @@ -159,11 +157,7 @@ class MobileDocPageController extends ServiceManagerController { createTime: DateTime.now().millisecondsSinceEpoch, updateTime: DateTime.now().millisecondsSinceEpoch, ); - await serviceManager.todayService.createDoc(doc); - var docContent = serviceManager.editService.createDoc(); - serviceManager.p2pService - .sendDocEditMessage(doc.uuid!, encodeStateAsUpdateV2(docContent, null)); - await serviceManager.editService.writeDoc(doc.uuid, docContent); + await serviceManager.docService.createDoc(doc,null); fetchData(); GoRouter.of(context).push("/mobile/doc/edit", extra: {"doc": doc}); } diff --git a/lib/app/mobile/view/edit/doc_edit_controller.dart b/lib/app/mobile/view/edit/doc_edit_controller.dart index 2a2b14a..93b1c04 100644 --- a/lib/app/mobile/view/edit/doc_edit_controller.dart +++ b/lib/app/mobile/view/edit/doc_edit_controller.dart @@ -122,13 +122,12 @@ class MobileDocEditController extends ServiceManagerController { editController.waitLayout(() { editController.requestFocus(); }); - doc.on("update", (args) async { + doc.on("update", (args) { var data = args[0]; - if (serviceManager.editService - .isInUpdateCache(this.doc?.uuid ?? "", data)) { + if (serviceManager.editService.isNotEditUpdate(this.doc?.uuid ?? "")) { return; } - await serviceManager.editService.writeDoc(this.doc?.uuid, doc); + serviceManager.editService.writeDoc(this.doc?.uuid, doc); serviceManager.p2pService .sendDocEditMessage(this.doc?.uuid ?? "", data); }); diff --git a/lib/app/mobile/view/today/mobile_today_controller.dart b/lib/app/mobile/view/today/mobile_today_controller.dart index bcf3cbc..8c1d9c8 100644 --- a/lib/app/mobile/view/today/mobile_today_controller.dart +++ b/lib/app/mobile/view/today/mobile_today_controller.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'package:date_format/date_format.dart'; import 'package:flutter/material.dart'; -import 'package:flutter_crdt/flutter_crdt.dart'; import 'package:fluttertoast/fluttertoast.dart'; import 'package:get/get.dart'; import 'package:go_router/go_router.dart'; @@ -219,11 +218,7 @@ class MobileTodayController extends ServiceManagerController { createTime: DateTime.now().millisecondsSinceEpoch, updateTime: DateTime.now().millisecondsSinceEpoch, ); - await serviceManager.todayService.createDoc(doc); - var docContent = serviceManager.editService.createDoc(); - serviceManager.p2pService - .sendDocEditMessage(doc.uuid!, encodeStateAsUpdateV2(docContent, null)); - await serviceManager.editService.writeDoc(doc.uuid, docContent); + await serviceManager.docService.createDoc(doc, null); await GoRouter.of(context).push("/mobile/doc/edit", extra: {"doc": doc}); fetchDoc(refreshList: true); } diff --git a/lib/app/windows/controller/home/win_home_controller.dart b/lib/app/windows/controller/home/win_home_controller.dart index bbd4049..cc52eb8 100644 --- a/lib/app/windows/controller/home/win_home_controller.dart +++ b/lib/app/windows/controller/home/win_home_controller.dart @@ -16,7 +16,6 @@ import 'package:wenznote/app/windows/widgets/win_tab_view.dart'; import 'package:wenznote/commons/mvc/controller.dart'; import 'package:wenznote/commons/util/markdown/markdown.dart'; import 'package:wenznote/editor/crdt/doc_utils.dart'; -import 'package:wenznote/editor/widget/drop_menu.dart'; import 'package:wenznote/model/note/enum/note_type.dart'; import 'package:wenznote/model/note/po/doc_po.dart'; import 'package:wenznote/service/file/file_manager.dart'; @@ -177,8 +176,7 @@ class WinHomeController extends ServiceManagerController { type: NoteType.note.name, ); var yDoc = await elementsToYDoc(elements); - await serviceManager.editService.writeDoc(doc.uuid, yDoc); - await serviceManager.docService.createDoc(doc); + await serviceManager.docService.createDoc(doc, yDoc); openDoc(doc); } diff --git a/lib/app/windows/controller/today/win_today_controller.dart b/lib/app/windows/controller/today/win_today_controller.dart index 2831165..2289f32 100644 --- a/lib/app/windows/controller/today/win_today_controller.dart +++ b/lib/app/windows/controller/today/win_today_controller.dart @@ -113,11 +113,7 @@ class WinTodayController extends ServiceManagerController { createTime: DateTime.now().millisecondsSinceEpoch, updateTime: DateTime.now().millisecondsSinceEpoch, ); - await serviceManager.todayService.createDoc(doc); - var docContent = serviceManager.editService.createDoc(); - serviceManager.p2pService - .sendDocEditMessage(doc.uuid!, encodeStateAsUpdateV2(docContent, null)); - await serviceManager.editService.writeDoc(doc.uuid, docContent); + await serviceManager.docService.createDoc(doc,null); homeController.openDoc(doc); startSearchTask(); } diff --git a/lib/app/windows/service/doc/win_doc_list_service.dart b/lib/app/windows/service/doc/win_doc_list_service.dart index b546f1c..de9faa0 100644 --- a/lib/app/windows/service/doc/win_doc_list_service.dart +++ b/lib/app/windows/service/doc/win_doc_list_service.dart @@ -1,12 +1,8 @@ -import 'package:flutter_crdt/flutter_crdt.dart'; -import 'package:get/get.dart'; -import 'package:wenznote/app/windows/controller/home/win_home_controller.dart'; +import 'package:uuid/uuid.dart'; import 'package:wenznote/app/windows/model/doc/win_doc_list_item_vo.dart'; -import 'package:wenznote/editor/crdt/YsText.dart'; import 'package:wenznote/model/note/po/doc_dir_po.dart'; import 'package:wenznote/model/note/po/doc_po.dart'; import 'package:wenznote/service/service_manager.dart'; -import 'package:uuid/uuid.dart'; class WinDocListService { ServiceManager serviceManager; @@ -46,10 +42,7 @@ class WinDocListService { name: text, type: "doc", ); - await serviceManager.docService.createDoc(item); - var docContent = Doc(); - docContent.getArray("blocks").insert(0, [createEmptyTextYMap()]); - await serviceManager.editService.writeDoc(item.uuid, docContent); + await serviceManager.docService.createDoc(item, null); return item; } @@ -83,7 +76,7 @@ class WinDocListService { item.pid = toDir.uuid; item.updateTime = DateTime.now().millisecondsSinceEpoch; await serviceManager.docService.updateDocDir(item); - } else if(value is DocPO){ + } else if (value is DocPO) { var doc = value; doc.pid = toDir.uuid; doc.updateTime = DateTime.now().millisecondsSinceEpoch; diff --git a/lib/app/windows/service/today/win_today_service.dart b/lib/app/windows/service/today/win_today_service.dart index bf1cd57..09ee19d 100644 --- a/lib/app/windows/service/today/win_today_service.dart +++ b/lib/app/windows/service/today/win_today_service.dart @@ -66,11 +66,6 @@ class WinTodayService with IsarServiceMixin { return result; } - /// 创建文档: 便签 - Future createDoc(DocPO doc) async { - await serviceManager.docService.createDoc(doc); - } - Future searchElement( YMap map, String searchContent) async { var element = createWenElementFromYMap(map); diff --git a/lib/app/windows/view/doc/win_note_edit_tab.dart b/lib/app/windows/view/doc/win_note_edit_tab.dart index 758c316..aaff344 100644 --- a/lib/app/windows/view/doc/win_note_edit_tab.dart +++ b/lib/app/windows/view/doc/win_note_edit_tab.dart @@ -25,12 +25,11 @@ import 'package:wenznote/model/note/enum/note_order_type.dart'; import 'package:wenznote/model/note/enum/note_type.dart'; import 'package:wenznote/model/note/po/doc_dir_po.dart'; import 'package:wenznote/model/note/po/doc_po.dart'; -import 'package:wenznote/service/task/task.dart'; import 'package:window_manager/window_manager.dart'; class WinNoteEditTabController extends WinEditTabController { - DocPO doc; bool isCreateMode; + DocPO doc; var title = "".obs; String firstCreatTitle = ""; Function(Doc content)? onUpdate; @@ -95,17 +94,17 @@ class WinNoteEditTabController extends WinEditTabController { yDoc: doc, ); tree!.init(); - doc.on("update", (args) async { - onContentChanged(doc); - var data = args[0]; - if (homeController.serviceManager.editService - .isInUpdateCache(this.doc.uuid ?? "", data)) { + doc.on("update", (args) { + var editService = homeController.serviceManager.editService; + // 文档更新后,如果不是本地编辑导致的更新,则无需发送 + if (editService.isNotEditUpdate(this.doc.uuid ?? "")) { return; } - await homeController.serviceManager.editService - .writeDoc(this.doc.uuid, doc); + onContentChanged(doc); + var deltaData = args[0]; + editService.writeDoc(this.doc.uuid, doc); homeController.serviceManager.p2pService - .sendDocEditMessage(this.doc.uuid ?? "", data); + .sendDocEditMessage(this.doc.uuid ?? "", deltaData); }); editController.waitLayout(() { editController.requestFocus(); @@ -114,31 +113,6 @@ class WinNoteEditTabController extends WinEditTabController { } void onContentChanged(Doc content) async { - if (isCreateMode) { - await TaskService.instance.executeTask( - taskGroup: "createModeQueue", - task: () async { - var docName = await homeController.serviceManager.docService - .getDocName(doc.id); - if (firstCreatTitle != docName) { - isCreateMode = false; - } else { - var blocks = editController.ysTree?.blocks; - if (blocks != null && blocks.length == 1) { - var text = blocks[0].yMap.get("text"); - if (text is YText) { - var name = text.toString(); - if (name.length > 20) { - name = name.substring(0, 20); - } - doc.name = name; - firstCreatTitle = name; - title.value = getDocTitle(); - } - } - } - }); - } onUpdate?.call(content); } @@ -260,10 +234,15 @@ class WinNoteEditTabController extends WinEditTabController { title.value = text; } - void sync(BuildContext ctx) { + void syncNow(BuildContext ctx) async { printLog("手动同步笔记:${doc.uuid},${doc.name}"); - homeController.serviceManager.docSnapshotService - .downloadDocFile(doc.uuid ?? ""); + var serviceManager = homeController.serviceManager; + serviceManager.docSnapshotService.downloadDocFile(doc.uuid ?? ""); + var snap = await serviceManager.editService.queryDocSnap(doc.uuid ?? ""); + if (snap == null || snap.isEmpty) { + return; + } + serviceManager.p2pService.sendQueryDocMessage(doc.uuid ?? "", snap); } } @@ -472,7 +451,7 @@ class WinNoteEditTab extends MvcView with Focusable { ), onPress: (ctx) { hideDropMenu(ctx); - controller.sync(ctx); + controller.syncNow(ctx); }, ), DropMenu( diff --git a/lib/commons/service/document_manager.dart b/lib/commons/service/document_manager.dart index 24bfe90..c9cd4e5 100644 --- a/lib/commons/service/document_manager.dart +++ b/lib/commons/service/document_manager.dart @@ -1,20 +1,19 @@ import 'dart:convert'; import 'package:isar/isar.dart'; +import 'package:uuid/uuid.dart'; import 'package:wenznote/commons/util/markdown/markdown.dart'; import 'package:wenznote/commons/util/string.dart'; import 'package:wenznote/commons/util/wdoc/wdoc.dart'; import 'package:wenznote/model/note/po/doc_dir_po.dart'; import 'package:wenznote/model/note/po/doc_po.dart'; import 'package:wenznote/service/service_manager.dart'; -import 'package:uuid/uuid.dart'; enum ConflictMode { keepNew, keepAll } class ImportService { ServiceManager serviceManager; - ImportService(this.serviceManager); Future importWdoc({ @@ -22,7 +21,7 @@ class ImportService { String toPath = "", ConflictMode conflictMode = ConflictMode.keepAll, }) async { - var docInfo = await readWdocFile(serviceManager.fileManager,file); + var docInfo = await readWdocFile(serviceManager.fileManager, file); var infoPo = docInfo.info; if (infoPo != null) { var dir = await createDocPath(toPath); @@ -31,20 +30,19 @@ class ImportService { if (infoPo.uuid != null) { oldItem = await serviceManager.docService.queryDoc(infoPo.uuid!); } + var yDoc = await serviceManager.editService.readJsonDoc(docInfo.content); if (conflictMode == ConflictMode.keepAll) { //保留旧的和新的 infoPo.id = Isar.autoIncrement; if (oldItem?.uuid == infoPo.uuid) { infoPo.uuid = const Uuid().v1(); } - await serviceManager.docService.createDoc(infoPo); - await serviceManager.editService.saveDocStringFile(infoPo.uuid, docInfo.content); + await serviceManager.docService.createDoc(infoPo, yDoc); } else { //保留新的 oldTime<=newTime if (compareDocTime(oldItem, infoPo) <= 0) { await serviceManager.docService.deleteDocReally(infoPo.uuid!); - await serviceManager.docService.createDoc(infoPo); - await serviceManager.editService.saveDocStringFile(infoPo.uuid, docInfo.content); + await serviceManager.docService.createDoc(infoPo, yDoc); } } } @@ -64,7 +62,7 @@ class ImportService { {required String file, required String toPath, required ConflictMode conflictMode}) async { - var docInfo = await readMarkdownInfo(serviceManager.fileManager,file); + var docInfo = await readMarkdownInfo(serviceManager.fileManager, file); if (docInfo == null) { return; } @@ -86,8 +84,9 @@ class ImportService { uuid: uuid, createTime: DateTime.now().millisecondsSinceEpoch, ); - await serviceManager.docService.createDoc(doc); - await serviceManager.editService.saveDocStringFile(uuid, jsonEncode(elements)); + var yDoc = + await serviceManager.editService.readJsonDoc(jsonEncode(elements)); + await serviceManager.docService.createDoc(doc, yDoc); } Future createDocPath(String path) async { diff --git a/lib/service/doc/doc_service.dart b/lib/service/doc/doc_service.dart index 4c14447..25143b3 100644 --- a/lib/service/doc/doc_service.dart +++ b/lib/service/doc/doc_service.dart @@ -1,11 +1,12 @@ import 'package:fluent_ui/fluent_ui.dart'; +import 'package:flutter_crdt/flutter_crdt.dart'; import 'package:isar/isar.dart'; +import 'package:uuid/uuid.dart'; import 'package:wenznote/model/note/enum/note_type.dart'; import 'package:wenznote/model/note/po/doc_dir_po.dart'; import 'package:wenznote/model/note/po/doc_po.dart'; import 'package:wenznote/service/isar/isar_service_mixin.dart'; import 'package:wenznote/service/service_manager.dart'; -import 'package:uuid/uuid.dart'; class DocService with IsarServiceMixin, ChangeNotifier { @override @@ -13,14 +14,17 @@ class DocService with IsarServiceMixin, ChangeNotifier { DocService(this.serviceManager); - Future createDoc(DocPO doc) async { - doc.uuid ??= const Uuid().v1(); - doc.createTime = doc.updateTime = DateTime.now().millisecondsSinceEpoch; + Future createDoc(DocPO info, Doc? content) async { + info.uuid ??= const Uuid().v1(); + info.createTime = info.updateTime = DateTime.now().millisecondsSinceEpoch; await upsertDbDelta( - dataId: doc.uuid!, dataType: "note", properties: doc.toMap()); + dataId: info.uuid!, dataType: "note", properties: info.toMap()); await documentIsar.writeTxn(() async { - await documentIsar.docPOs.put(doc); + await documentIsar.docPOs.put(info); }); + var yDoc = serviceManager.editService.createYDoc(info); + serviceManager.p2pService + .sendDocEditMessage(info.uuid!, encodeStateAsUpdateV2(yDoc, null)); } Future deleteDoc(DocPO doc) async { diff --git a/lib/service/edit/doc_edit_service.dart b/lib/service/edit/doc_edit_service.dart index 7aa9f36..ffc78bc 100644 --- a/lib/service/edit/doc_edit_service.dart +++ b/lib/service/edit/doc_edit_service.dart @@ -10,6 +10,7 @@ import 'package:isar/isar.dart'; import 'package:synchronized/extension.dart'; import 'package:wenznote/editor/crdt/YsText.dart'; import 'package:wenznote/editor/crdt/doc_utils.dart'; +import 'package:wenznote/model/note/po/doc_po.dart'; import 'package:wenznote/model/note/po/doc_state_po.dart'; import 'package:wenznote/service/service_manager.dart'; @@ -25,9 +26,9 @@ class DocEditService { final _fileCache = {}; final _docCache = {}; - final _updateCache = {}; final _updateLock = Lock(); final _openedDocList = {}; + final _docUserEditMap = {}; void openDocEditor(String id) { _openedDocList.add(id); @@ -41,7 +42,19 @@ class DocEditService { return _openedDocList.contains(id); } - Future readDocFile(String? docId) async { + bool isNotEditUpdate(String docId) { + return _docUserEditMap[docId] ?? false; + } + + void setNotEditUpdate(String docId, bool? state) { + if (state == null) { + _docUserEditMap.remove(docId); + return; + } + _docUserEditMap[docId] = state; + } + + Future readDocBytes(String? docId) async { if (docId == null || docId.isEmpty) { return null; } @@ -59,7 +72,7 @@ class DocEditService { return null; } - Future writeDocFile(String? docId, Uint8List data) async { + Future writeDocBytes(String? docId, Uint8List data) async { if (docId == null || docId.isEmpty) { return; } @@ -78,8 +91,10 @@ class DocEditService { return doc; } // 如何将读取耗时控制在一定范围内? - var bytes = await readDocFile(docId); + var bytes = await readDocBytes(docId); if (bytes == null) { + // 读取失败,应该触发1秒后从服务器下载文档数据 + serviceManager.docSnapshotService.downloadDocFile(docId); return null; } try { @@ -100,15 +115,16 @@ class DocEditService { String? docId, Doc doc, { bool needUpload = true, + bool uploadNow = false, }) async { if (docId == null) { return; } useV2Encoding(); - _updateLock.synchronized(() async { + await _updateLock.synchronized(() async { var bytes = encodeStateAsUpdate(doc, null); _docCache[docId] = doc; - await writeDocFile(docId, bytes); + await writeDocBytes(docId, bytes); // 更新doc state var isar = serviceManager.isarService.documentIsar; var clientId = serviceManager.userService.clientId; @@ -124,53 +140,52 @@ class DocEditService { }); // 增加上传快照任务 if (needUpload) { - await serviceManager.uploadTaskService.uploadDoc(docId); + if (uploadNow) { + await serviceManager.uploadTaskService.uploadDoc(docId, 0); + } else { + await serviceManager.uploadTaskService.uploadDoc(docId); + } } }); } - Future updateDoc( + /// 收到数据,或者下载数据时,会促发这个方法 + /// 需要通知界面刷新具体的数据 + Future updateDocContent( String docId, - Uint8List delta, { - bool needUpload = true, - bool checkUpload = false, - }) async { + Uint8List delta, + ) async { + if (delta.isEmpty) { + return false; + } + // 提出问题,这个方法能否在 on update 里面直接调用? return _updateLock.synchronized(() async { try { - _updateCache[docId] = delta; + // 1.更新到编辑器 var doc = _docCache[docId]; if (doc != null) { + // 将此次更新设置位不需要上传变化,也不需要写到文件 + setNotEditUpdate(docId, true); try { applyUpdateV2(doc, delta, null); } catch (e) { printLog("更新doc失败, applyUpdateV2 error: $e"); + } finally { + setNotEditUpdate(docId, null); } } - var docBytes = await readDocFile(docId); - var newBytes = mergeUpdatesV2([delta, if (docBytes != null) docBytes]); - await writeDocFile(docId, newBytes); - bool uploadNow = false; - if (checkUpload && docBytes != null) { - var pullState = decodeSnapshotV2(delta); - var localState = decodeSnapshotV2(docBytes); - var localId = localState.sv[serviceManager.userService.clientId]; - var serverId = pullState.sv[serviceManager.userService.clientId]; - if (localId != serverId) { - //数据不同,需要上传,并且需要通知同步 - needUpload = true; - uploadNow = true; - } - } - if (needUpload) { - if (uploadNow) { - await serviceManager.uploadTaskService.uploadDoc(docId, 1); - } else { - await serviceManager.uploadTaskService.uploadDoc(docId); - } + // 2.写到文件 + var newBytes = delta; + var docBytes = await readDocBytes(docId); + if (docBytes != null && docBytes.isNotEmpty) { + var mergeList = [delta, docBytes]; + newBytes = mergeUpdatesV2(mergeList); } - return !_equalsSnapShot(docBytes ?? Uint8List(0), newBytes); + await writeDocBytes(docId, newBytes); + // 3.上传到服务器(20秒后) + await serviceManager.uploadTaskService.uploadDoc(docId); + return docBytes == null || !_equalsBytes(docBytes, newBytes); } catch (e) { - // todo 并doc失败, error: RangeError (index): Index out of range: no indices are valid: 0 printLog("合并doc失败, error: $e"); return false; } @@ -194,23 +209,11 @@ class DocEditService { return jsonEncode(doc.toJSON()); } - bool isInUpdateCache(String docId, Uint8List delta) { - var cache = _updateCache[docId]; - if (cache == null) { - return false; - } - if (cache.length != delta.length) { - return false; - } - for (var i = 0; i < cache.length; i++) { - if (cache[i] != delta[i]) { - return false; - } - } - return true; + Future readJsonDoc(String? content) async { + return jsonToYDoc(serviceManager.userService.clientId, content); } - bool _equalsSnapShot(Uint8List a, Uint8List b) { + bool _equalsBytes(Uint8List a, Uint8List b) { if (a.length != b.length) { return false; } @@ -222,18 +225,19 @@ class DocEditService { return true; } - Future saveDocStringFile(String? uuid, String? content) async { - if (uuid == null) { - return; - } - var doc = await jsonToYDoc(serviceManager.userService.clientId, content); - await writeDoc(uuid, doc); + bool _equalsSnapshot(Uint8List bytes1, Uint8List bytes2) { + Doc doc1 = Doc(); + applyUpdateV2(doc1, bytes1, null); + Doc doc2 = Doc(); + applyUpdateV2(doc2, bytes2, null); + return equalSnapshots(snapshot(doc1), snapshot(doc2)); } - Doc createDoc() { + Doc createYDoc(DocPO info) { var doc = Doc(); doc.clientID = serviceManager.userService.clientId; doc.getArray("blocks").insert(0, [createEmptyTextYMap()]); + writeDoc(info.uuid, doc, uploadNow: true); return doc; } diff --git a/lib/service/sync/doc_snapshot_service.dart b/lib/service/sync/doc_snapshot_service.dart index 0dde478..2c1c80c 100644 --- a/lib/service/sync/doc_snapshot_service.dart +++ b/lib/service/sync/doc_snapshot_service.dart @@ -42,14 +42,13 @@ class DocSnapshotService { return; } var dataId = pkt.dataIdList[0]; - var updated = await serviceManager.editService - .updateDoc(dataId, Uint8List.fromList(pkt.content), needUpload: false); + var updated = await serviceManager.editService.updateDocContent( + dataId, + Uint8List.fromList(pkt.content), + ); + // 没有更新成功,数据错位导致的,发送请求完整数据 if (updated == false) { - var snap = await serviceManager.editService.queryDocSnap(dataId); - if (snap == null || snap.isEmpty) { - return; - } - serviceManager.p2pService.sendQueryDocMessage(dataId, snap); + printLog("收到编辑更新消息,但内容没有得到更新,需要修复最新数据."); } } @@ -111,8 +110,10 @@ class DocSnapshotService { await isar.writeTxn(() async { await isar.docStatePOs.put(state); }); - serviceManager.editService - .updateDoc(dataId, Uint8List.fromList(pkt.content), needUpload: false); + serviceManager.editService.updateDocContent( + dataId, + Uint8List.fromList(pkt.content), + ); } /// 查询文档状态数据 @@ -165,6 +166,16 @@ class DocSnapshotService { } } + Future verifyDoc(List docList) async { + for (var docId in docList) { + var snap = await serviceManager.editService.queryDocSnap(docId); + if (snap == null || snap.isEmpty) { + continue; + } + serviceManager.p2pService.sendQueryDocMessage(docId, snap); + } + } + /// 接收下载文档指令 Future downloadDoc(P2pPacket pkt) async { /// 1.增加8秒文档下载锁,减少下载量 @@ -258,11 +269,9 @@ class DocSnapshotService { // 写入文件,并且通知upload try { // 更新文档,并且检测文档是否需要更新 - await serviceManager.editService.updateDoc( + await serviceManager.editService.updateDocContent( docId, fileBytes, - needUpload: needUpload, - checkUpload: true, ); } catch (e) { // 可能存在yjs合并失败的bug,需要处理yjs类型转换问题 diff --git a/lib/service/sync/p2p_service.dart b/lib/service/sync/p2p_service.dart index 0a68cce..961c644 100644 --- a/lib/service/sync/p2p_service.dart +++ b/lib/service/sync/p2p_service.dart @@ -4,13 +4,14 @@ import 'dart:math'; import 'dart:typed_data'; import 'package:fixnum/fixnum.dart'; +import 'package:flutter_crdt/flutter_crdt.dart'; import 'package:get/get.dart'; import 'package:isar/isar.dart'; +import 'package:web_socket_channel/io.dart'; import 'package:wenznote/commons/util/serial_util.dart'; import 'package:wenznote/model/note/po/doc_state_po.dart'; import 'package:wenznote/service/service_manager.dart'; import 'package:wenznote/service/sync/p2p_packet.pb.dart'; -import 'package:web_socket_channel/io.dart'; class MessageType { static const int heart = -1; @@ -25,6 +26,9 @@ class MessageType { static const int downloadDoc = 5; static const int queryDocState = 6; static const int docState = 7; + + /// 校验文档,通知立即同步文档 + static const int verifyDoc = 8; } class UpdateInfo { @@ -33,9 +37,13 @@ class UpdateInfo { class P2pService { ServiceManager serviceManager; + + // 检验文档完整性延迟,单位:毫秒 + int verifyDocDuration = 5000; IOWebSocketChannel? socket; bool isUserClosed = false; Timer? heartTimer; + SendDeltaQueue? verifyDocMessageQueue; P2pService(this.serviceManager); @@ -61,14 +69,15 @@ class P2pService { return; } var token = serviceManager.userService.token; - if(token==null){ + if (token == null) { return; } + verifyDocMessageQueue = SendDeltaQueue( + resendDuration: verifyDocDuration, sender: sendVerifyDocMessage); var clientId = serviceManager.userService.clientId; var uri = Uri.parse( "ws://${noteServer.host}:${noteServer.port}/client/websocket/$clientId"); - socket = IOWebSocketChannel.connect(uri, - headers: {'token': token}); + socket = IOWebSocketChannel.connect(uri, headers: {'token': token}); socket!.stream.listen( (data) { _onReceive(data); @@ -111,6 +120,9 @@ class P2pService { // 编辑文档消息 serviceManager.docSnapshotService.receiveDocEditEvent(pkt); break; + case MessageType.verifyDoc: + serviceManager.docSnapshotService.verifyDoc(pkt.dataIdList); + break; case MessageType.queryDocDelta: // 查询文档增量消息 serviceManager.docSnapshotService.queryDocDelta(pkt); @@ -189,6 +201,14 @@ class P2pService { content: delta, dataIdList: [dataId], ); + verifyDocMessageQueue?.addSendTask(dataId); + } + + void sendVerifyDocMessage(String dataId) { + sendPkt( + messageType: MessageType.verifyDoc, + dataIdList: [dataId], + ); } void sendQueryDocMessage(String dataId, Uint8List snap) { @@ -248,3 +268,36 @@ class P2pService { content: utf8.encode(states)); } } + +class SendDeltaQueue { + final _sendMap = {}; + int resendDuration; + Function(String docId) sender; + + SendDeltaQueue({ + required this.resendDuration, + required this.sender, + }); + + void addSendTask(String docId) { + var now = DateTime.now().millisecondsSinceEpoch; + // 达到下个发送周期才会发送 + var sendTime = resendDuration + now; + var maxTime = max(_sendMap.get(docId) ?? 0, sendTime); + _sendMap[docId] = maxTime; + resendDuration.milliseconds.delay(resend); + } + + /// 连续输入时,在一定时间后补发,避免数据丢失的情况 + void resend() { + var currentTime = DateTime.now().millisecondsSinceEpoch; + for (var task in _sendMap.entries) { + var sendTime = task.value; + var docId = task.key; + if (sendTime <= currentTime) { + sender.call(docId); + } + } + _sendMap.removeWhere((key, value) => value <= currentTime); + } +} diff --git a/lib/service/sync/record_sync_service.dart b/lib/service/sync/record_sync_service.dart index 6e46392..6bbd18d 100644 --- a/lib/service/sync/record_sync_service.dart +++ b/lib/service/sync/record_sync_service.dart @@ -295,10 +295,8 @@ class RecordSyncService with IsarServiceMixin { dataIdList: dataIdList, dataType: dataType, ); - } catch (e) { - if (e is FormatException) { - print(e.source); - } + } catch (e, stack) { + print(stack); printLog("同步时下载$dataType类型数据失败,$e"); } } diff --git a/lib/service/sync/upload_task_service.dart b/lib/service/sync/upload_task_service.dart index 244fee3..5f9e326 100644 --- a/lib/service/sync/upload_task_service.dart +++ b/lib/service/sync/upload_task_service.dart @@ -44,9 +44,13 @@ class UploadTaskService { await isar.writeTxn(() async { await isar.uploadTaskPOs.put(task!); }); - Timer(Duration(seconds: seconds), () { - doUpload(); - }); + if (seconds <= 0) { + await doUpload(); + } else { + Timer(Duration(seconds: seconds), () { + doUpload(); + }); + } } Future uploadFile(String fileId, [int seconds = 1]) async { @@ -151,7 +155,7 @@ class UploadTaskService { // 1.读取文件 // 2.读取state // 3.上传文件 - var data = await serviceManager.editService.readDocFile(docId); + var data = await serviceManager.editService.readDocBytes(docId); if (data == null) { return false; } diff --git a/test/yjs_update_test.dart b/test/yjs_update_test.dart new file mode 100644 index 0000000..2aa185c --- /dev/null +++ b/test/yjs_update_test.dart @@ -0,0 +1,19 @@ +import 'package:flutter_crdt/flutter_crdt.dart'; + +void main(){ + Doc doc = Doc(); + var state = 1; + doc.on('update', (args) { + print("on update content:$state"); + }); + doc.transact((transaction) { + state=2; + doc.getText("hello").insert(0, "hello"); + print("update end..."); + }); + doc.transact((transaction) { + state=3; + doc.getText("hello").insert(0, "hello"); + print("update end..."); + }); +} \ No newline at end of file