From ed8a9e463f12f894ce03c7d313aaadeefa9a3007 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 14 Aug 2024 09:34:19 -0400 Subject: [PATCH] Sync by specific protocols and by delegate did --- packages/agent/src/sync-api.ts | 8 +- packages/agent/src/sync-engine-level.ts | 229 ++++-- packages/agent/src/types/sync.ts | 7 +- .../agent/tests/sync-engine-level.spec.ts | 702 +++++++++++++++++- packages/api/src/web5.ts | 2 +- 5 files changed, 859 insertions(+), 89 deletions(-) diff --git a/packages/agent/src/sync-api.ts b/packages/agent/src/sync-api.ts index f68f45737..3c7ea1089 100644 --- a/packages/agent/src/sync-api.ts +++ b/packages/agent/src/sync-api.ts @@ -1,4 +1,4 @@ -import type { SyncEngine } from './types/sync.js'; +import type { SyncEngine, SyncIdentityOptions } from './types/sync.js'; import type { Web5PlatformAgent } from './types/agent.js'; export type SyncApiParams = { @@ -41,10 +41,14 @@ export class AgentSyncApi implements SyncEngine { this._syncEngine.agent = agent; } - public async registerIdentity(params: { did: string; }): Promise { + public async registerIdentity(params: { did: string; options: SyncIdentityOptions }): Promise { await this._syncEngine.registerIdentity(params); } + public sync(direction?: 'push' | 'pull'): Promise { + return this._syncEngine.sync(direction); + } + public startSync(params: { interval: string; }): Promise { return this._syncEngine.startSync(params); } diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 833a60cce..446d2ac9a 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -17,11 +17,12 @@ import { DwnMethodName, } from '@tbd54566975/dwn-sdk-js'; -import type { SyncEngine } from './types/sync.js'; +import type { SyncEngine, SyncIdentityOptions } from './types/sync.js'; import type { Web5PlatformAgent } from './types/agent.js'; -import { DwnInterface } from './types/dwn.js'; +import { DwnInterface, DwnMessagesPermissionScope } from './types/dwn.js'; import { getDwnServiceEndpointUrls, isRecordsWrite } from './utils.js'; +import { AgentPermissionsApi } from './permissions-api.js'; export type SyncEngineLevelParams = { agent?: Web5PlatformAgent; @@ -35,8 +36,20 @@ type SyncDirection = 'push' | 'pull'; type SyncState = { did: string; + delegateDid?: string; dwnUrl: string; cursor?: PaginationCursor, + protocol?: string; +} + +type SyncMessageParams = { + did: string; + messageCid: string; + watermark: string; + dwnUrl: string; + delegateDid?: string; + cursor?: PaginationCursor, + protocol?: string; } export class SyncEngineLevel implements SyncEngine { @@ -48,12 +61,18 @@ export class SyncEngineLevel implements SyncEngine { */ private _agent?: Web5PlatformAgent; + /** + * An instance of the `AgentPermissionsApi` that is used to interact with permissions grants used during sync + */ + private _permissionsApi: AgentPermissionsApi; + private _db: AbstractLevel; private _syncIntervalId?: ReturnType; private _ulidFactory: ULIDFactory; constructor({ agent, dataPath, db }: SyncEngineLevelParams) { this._agent = agent; + this._permissionsApi = new AgentPermissionsApi({ agent }); this._db = (db) ? db : new Level(dataPath ?? 'DATA/AGENT/SYNC_STORE'); this._ulidFactory = monotonicFactory(); } @@ -74,6 +93,15 @@ export class SyncEngineLevel implements SyncEngine { set agent(agent: Web5PlatformAgent) { this._agent = agent; + this._permissionsApi = new AgentPermissionsApi({ agent }); + } + + private get permissionsApi() { + if (!this._permissionsApi) { + throw new Error('SyncEngineLevel: Permissions API not initialized.'); + } + + return this._permissionsApi; } public async clear(): Promise { @@ -96,8 +124,7 @@ export class SyncEngineLevel implements SyncEngine { for (let job of pullJobs) { const [key] = job; - const [did, dwnUrl, _, messageCid] = key.split('~'); - + const { did, dwnUrl, messageCid, delegateDid, protocol } = SyncEngineLevel.parseSyncMessageParamsKey(key); // If a particular DWN service endpoint is unreachable, skip subsequent pull operations. if (errored.has(dwnUrl)) { continue; @@ -109,13 +136,33 @@ export class SyncEngineLevel implements SyncEngine { continue; } + let permissionGrantId: string | undefined; + let granteeDid: string | undefined; + if (delegateDid) { + const grants = await this.permissionsApi.fetchGrants({ + author : delegateDid, + target : delegateDid, + grantor : did, + protocol + }); + + const messagesReadGrant = grants + .filter(({ grant }) => grant.scope.interface === DwnInterfaceName.Messages && grant.scope.method === DwnMethodName.Read) + .find(({ grant: { scope } }) => (scope as DwnMessagesPermissionScope).protocol === protocol); + + permissionGrantId = messagesReadGrant?.grant.id; + granteeDid = delegateDid; + } + const messagesRead = await this.agent.processDwnRequest({ store : false, author : did, target : did, messageType : DwnInterface.MessagesRead, + granteeDid, messageParams : { - messageCid: messageCid + messageCid, + permissionGrantId } }); @@ -123,8 +170,7 @@ export class SyncEngineLevel implements SyncEngine { try { reply = await this.agent.rpc.sendDwnRequest({ - dwnUrl, - targetDid : did, + dwnUrl, targetDid : did, message : messagesRead.message, }) as MessagesReadReply; } catch(e) { @@ -169,15 +215,14 @@ export class SyncEngineLevel implements SyncEngine { for (let job of pushJobs) { const [key] = job; - const [did, dwnUrl, _, messageCid] = key.split('~'); - + const { did, delegateDid, protocol, dwnUrl, messageCid } = SyncEngineLevel.parseSyncMessageParamsKey(key); // If a particular DWN service endpoint is unreachable, skip subsequent push operations. if (errored.has(dwnUrl)) { continue; } // Attempt to retrieve the message from the local DWN. - const dwnMessage = await this.getDwnMessage({ author: did, messageCid }); + const dwnMessage = await this.getDwnMessage({ author: did, messageCid, delegateDid, protocol }); // If the message does not exist on the local DWN, remove the sync operation from the // push queue, update the push watermark for this DID/DWN endpoint combination, add the @@ -214,12 +259,25 @@ export class SyncEngineLevel implements SyncEngine { await pushQueue.batch(deleteOperations as any); } - public async registerIdentity({ did }: { did: string; }): Promise { + public async registerIdentity({ did, options }: { did: string; options: SyncIdentityOptions }): Promise { // Get a reference to the `registeredIdentities` sublevel. const registeredIdentities = this._db.sublevel('registeredIdentities'); // Add (or overwrite, if present) the Identity's DID as a registered identity. - await registeredIdentities.put(did, ''); + await registeredIdentities.put(did, JSON.stringify(options)); + } + + public async sync(direction?: 'push' | 'pull'): Promise { + if (this._syncIntervalId) { + throw new Error('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.'); + } + + if (!direction || direction === 'push') { + await this.push(); + } + if (!direction || direction === 'pull') { + await this.pull(); + } } public startSync({ interval }: { @@ -236,8 +294,7 @@ export class SyncEngineLevel implements SyncEngine { } try { - await this.push(); - await this.pull(); + await this.sync(); } catch (error: any) { this.stopSync(); reject(error); @@ -276,9 +333,11 @@ export class SyncEngineLevel implements SyncEngine { for (let syncState of syncPeerState) { // Get the event log from the remote DWN if pull sync, or local DWN if push sync. const eventLog = await this.getDwnEventLog({ - did : syncState.did, - dwnUrl : syncState.dwnUrl, - cursor : syncState.cursor, + did : syncState.did, + delegateDid : syncState.delegateDid, + dwnUrl : syncState.dwnUrl, + cursor : syncState.cursor, + protocol : syncState.protocol, syncDirection }); @@ -286,15 +345,11 @@ export class SyncEngineLevel implements SyncEngine { for (let messageCid of eventLog) { const watermark = this._ulidFactory(); - // Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue. - // Note: It is critical that `watermark` precedes `messageCid` to ensure that when the sync - // jobs are pulled off the queue, they are lexographically sorted oldest to newest. - const operationKey = [ - syncState.did, - syncState.dwnUrl, + const operationKey = SyncEngineLevel.generateSyncMessageParamsKey({ + ...syncState, watermark, messageCid - ].join('~'); + }); syncOperations.push({ type: 'put', key: operationKey, value: '' }); } @@ -308,68 +363,136 @@ export class SyncEngineLevel implements SyncEngine { } } - private async getDwnEventLog({ did, dwnUrl, syncDirection, cursor }: { + private static generateSyncMessageParamsKey({ did, delegateDid, dwnUrl, protocol, watermark, messageCid }:SyncMessageParams): string { + // Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue. + // Note: It is critical that `watermark` precedes `messageCid` to ensure that when the sync + // jobs are pulled off the queue, they are lexographically sorted oldest to newest. + // + // `protocol` and `delegateDid` may be undefined, which is fine, its part of the key will be stored as the string undefined. + // Later, when parsing the key, we will handle this case and return an actual undefined. + // This is information useful for subset and delegated sync. + return [did, delegateDid, dwnUrl, protocol, watermark, messageCid ].join('~'); + } + + private static parseSyncMessageParamsKey(key: string): SyncMessageParams { + // The order is import here, see `generateKey` for more information. + const [did, delegateDidString, dwnUrl, protocolString, watermark, messageCid] = key.split('~'); + + // `protocol` or `delegateDid` may be parsed as an undefined string, so we need to handle that case and returned an actual undefined. + const protocol = protocolString === 'undefined' ? undefined : protocolString; + const delegateDid = delegateDidString === 'undefined' ? undefined : delegateDidString; + return { did, delegateDid, dwnUrl, watermark, messageCid, protocol }; + } + + private async getDwnEventLog({ did, delegateDid, dwnUrl, syncDirection, cursor, protocol }: { did: string, + delegateDid?: string, dwnUrl: string, syncDirection: SyncDirection, cursor?: PaginationCursor + protocol?: string }) { let messagesReply = {} as MessagesQueryReply; + let permissionGrantId: string | undefined; + let granteeDid: string | undefined; + if (delegateDid) { + // fetch the grants for the delegate DID + const grants = await this.permissionsApi.fetchGrants({ + author : delegateDid, + target : delegateDid, + grantor : did, + protocol + }); + + // find the grant for the MessagesQuery and the protocol + const messagesReadGrant = grants + .filter(({ grant }) => grant.scope.interface === DwnInterfaceName.Messages && grant.scope.method === DwnMethodName.Query) + .find(({ grant: { scope } }) => (scope as DwnMessagesPermissionScope).protocol === protocol); + + permissionGrantId = messagesReadGrant?.grant.id; + granteeDid = delegateDid; + } + if (syncDirection === 'pull') { + // filter for a specific protocol if one is provided + const filters = protocol ? [{ protocol }] : []; // When sync is a pull, get the event log from the remote DWN. - const messagesReadMessage = await this.agent.dwn.processRequest({ + const messagesQueryMessage = await this.agent.dwn.processRequest({ store : false, target : did, author : did, messageType : DwnInterface.MessagesQuery, - messageParams : { filters: [], cursor } + granteeDid, + messageParams : { filters, cursor, permissionGrantId } }); try { messagesReply = await this.agent.rpc.sendDwnRequest({ dwnUrl : dwnUrl, targetDid : did, - message : messagesReadMessage.message + message : messagesQueryMessage.message, }) as MessagesQueryReply; } catch { // If a particular DWN service endpoint is unreachable, silently ignore. } } else if (syncDirection === 'push') { + const filters = protocol ? [{ protocol }] : []; // When sync is a push, get the event log from the local DWN. - const messagesReadDwnResponse = await this.agent.dwn.processRequest({ + const messagesQueryDwnResponse = await this.agent.dwn.processRequest({ author : did, target : did, messageType : DwnInterface.MessagesQuery, - messageParams : { filters: [], cursor } + granteeDid, + messageParams : { filters, cursor, permissionGrantId } }); - messagesReply = messagesReadDwnResponse.reply as MessagesQueryReply; + messagesReply = messagesQueryDwnResponse.reply as MessagesQueryReply; } const eventLog = messagesReply.entries ?? []; if (messagesReply.cursor) { - this.setCursor(did, dwnUrl, syncDirection, messagesReply.cursor); + this.setCursor(did, dwnUrl, syncDirection, messagesReply.cursor, protocol); } return eventLog; } - private async getDwnMessage({ author, messageCid }: { + private async getDwnMessage({ author, delegateDid, protocol, messageCid }: { author: string; + delegateDid?: string; + protocol?: string; messageCid: string; }): Promise<{ message: GenericMessage, data?: Blob } | undefined> { + + let permissionGrantId: string | undefined; + let granteeDid: string | undefined; + + if (delegateDid) { + const grants = await this.permissionsApi.fetchGrants({ + author : delegateDid, + target : delegateDid, + grantor : author, + protocol : protocol + }); + + const messagesReadGrant = grants + .filter(({ grant}) => grant.scope.interface === DwnInterfaceName.Messages && grant.scope.method === DwnMethodName.Read) + .find(({ grant: { scope } }) => (scope as DwnMessagesPermissionScope).protocol === protocol); + + permissionGrantId = messagesReadGrant?.grant.id; + granteeDid = delegateDid; + } + let { reply } = await this.agent.dwn.processRequest({ author : author, target : author, messageType : DwnInterface.MessagesRead, - messageParams : { - messageCid: messageCid - } + granteeDid, + messageParams : { messageCid, permissionGrantId } }); - // Absence of a messageEntry or message within messageEntry can happen because updating a // Record creates another RecordsWrite with the same recordId. Only the first and // most recent RecordsWrite messages are kept for a given recordId. Any RecordsWrite messages @@ -392,15 +515,15 @@ export class SyncEngineLevel implements SyncEngine { } private async getSyncPeerState({ syncDirection }: { - syncDirection: SyncDirection + syncDirection: SyncDirection; }): Promise { - // Get a list of the DIDs of all registered identities. - const registeredIdentities = await this._db.sublevel('registeredIdentities').keys().all(); // Array to accumulate the list of sync peers for each DID. const syncPeerState: SyncState[] = []; - for (let did of registeredIdentities) { + // iterate over all registered identities + for await (const [ did, options ] of this._db.sublevel('registeredIdentities').iterator()) { + const { protocols, delegateDid } = JSON.parse(options) as SyncIdentityOptions; // First, confirm the DID can be resolved and extract the DWN service endpoint URLs. const dwnEndpointUrls = await getDwnServiceEndpointUrls(did, this.agent.did); if (dwnEndpointUrls.length === 0) { @@ -412,16 +535,27 @@ export class SyncEngineLevel implements SyncEngine { // Get the cursor (or undefined) for each (DID, DWN service endpoint, sync direction) // combination and add it to the sync peer state array. for (let dwnUrl of dwnEndpointUrls) { - const cursor = await this.getCursor(did, dwnUrl, syncDirection); - syncPeerState.push({ did, dwnUrl, cursor}); + if (protocols.length === 0) { + const cursor = await this.getCursor(did, dwnUrl, syncDirection); + syncPeerState.push({ did, delegateDid, dwnUrl, cursor }); + } else { + for (const protocol of protocols) { + const cursor = await this.getCursor(did, dwnUrl, syncDirection, protocol); + syncPeerState.push({ did, delegateDid, dwnUrl, cursor, protocol }); + } + } } } return syncPeerState; } - private async getCursor(did: string, dwnUrl: string, direction: SyncDirection): Promise { - const cursorKey = `${did}~${dwnUrl}~${direction}`; + private async getCursor(did: string, dwnUrl: string, direction: SyncDirection, protocol?: string): Promise { + + // if a protocol is provided, we append it to the key + const cursorKey = protocol ? `${did}~${dwnUrl}~${direction}-${protocol}` : + `${did}~${dwnUrl}~${direction}`; + const cursorsStore = this.getCursorStore(); try { const cursorValue = await cursorsStore.get(cursorKey); @@ -436,8 +570,9 @@ export class SyncEngineLevel implements SyncEngine { } } - private async setCursor(did: string, dwnUrl: string, direction: SyncDirection, cursor: PaginationCursor) { - const cursorKey = `${did}~${dwnUrl}~${direction}`; + private async setCursor(did: string, dwnUrl: string, direction: SyncDirection, cursor: PaginationCursor, protocol?: string) { + const cursorKey = protocol ? `${did}~${dwnUrl}~${direction}-${protocol}` : + `${did}~${dwnUrl}~${direction}`; const cursorsStore = this.getCursorStore(); await cursorsStore.put(cursorKey, JSON.stringify(cursor)); } diff --git a/packages/agent/src/types/sync.ts b/packages/agent/src/types/sync.ts index a1903be0f..ee4dd3182 100644 --- a/packages/agent/src/types/sync.ts +++ b/packages/agent/src/types/sync.ts @@ -1,8 +1,13 @@ import type { Web5PlatformAgent } from './agent.js'; +export type SyncIdentityOptions = { + delegateDid?: string; + protocols: string[] +} export interface SyncEngine { agent: Web5PlatformAgent; - registerIdentity(params: { did: string }): Promise; + registerIdentity(params: { did: string, options: SyncIdentityOptions }): Promise; + sync(direction?: 'push' | 'pull'): Promise; startSync(params: { interval: string }): Promise; stopSync(): void; } \ No newline at end of file diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 22010c6f6..380590b68 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -1,7 +1,7 @@ import sinon from 'sinon'; import { expect } from 'chai'; import { utils as cryptoUtils } from '@web5/crypto'; -import { DwnConstant, ProtocolDefinition } from '@tbd54566975/dwn-sdk-js'; +import { DwnConstant, DwnInterfaceName, DwnMethodName, Jws, ProtocolDefinition, Time } from '@tbd54566975/dwn-sdk-js'; import type { BearerIdentity } from '../src/bearer-identity.js'; @@ -11,6 +11,7 @@ import { DwnInterface } from '../src/types/dwn.js'; import { testDwnUrl } from './utils/test-config.js'; import { SyncEngineLevel } from '../src/sync-engine-level.js'; import { PlatformAgentTestHarness } from '../src/test-harness.js'; +import { Convert } from '@web5/common'; let testDwnUrls: string[] = [testDwnUrl]; @@ -25,6 +26,7 @@ describe('SyncEngineLevel', () => { }); after(async () => { + sinon.restore(); await testHarness.closeStorage(); }); @@ -271,7 +273,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. @@ -339,6 +344,595 @@ describe('SyncEngineLevel', () => { expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); }).slow(1000); // Yellow at 500ms, Red at 1000ms. + describe('sync()', () => { + it('syncs only specified direction, or if non specified syncs both directions', async () => { + // spy on push and pull and stub their response + const pushSpy = sinon.stub(syncEngine, 'push').resolves(); + const pullSpy = sinon.stub(syncEngine, 'pull').resolves(); + + // Register Alice's DID to be synchronized. + await testHarness.agent.sync.registerIdentity({ + did : alice.did.uri, + options : { + protocols: [] + } + }); + + // Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN. + await syncEngine.sync(); + + // Verify push and pull were called once + expect(pushSpy.calledOnce).to.be.true; + expect(pullSpy.calledOnce).to.be.true; + + + // reset counts + pushSpy.reset(); + pullSpy.reset(); + + // Execute only push sync + await syncEngine.sync('push'); + + // Verify push was called once and pull was not called + expect(pushSpy.calledOnce).to.be.true; + expect(pullSpy.notCalled).to.be.true; + + // reset counts + pushSpy.reset(); + pullSpy.reset(); + + // Execute only pull sync + await syncEngine.sync('pull'); + + // Verify pull was called once and push was not called + expect(pushSpy.notCalled).to.be.true; + expect(pullSpy.calledOnce).to.be.true; + }); + }); + + describe('registerIdentity()', () => { + it('syncs only specified protocols', async () => { + // create new identity to not conflict the previous tests's remote records + const aliceSync = await testHarness.createIdentity({ name: 'Alice', testDwnUrls }); + + // create 3 local protocols + const protocolFoo: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/foo', + types : { + foo: { + schema : 'https://schemas.xyz/foo', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + foo: {} + } + }; + + const protocolBar: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/bar', + types : { + bar: { + schema : 'https://schemas.xyz/bar', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + bar: {} + } + }; + + const protocolBaz: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/baz', + types : { + baz: { + schema : 'https://schemas.xyz/baz', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + baz: {} + } + }; + + const protocolsFoo = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolFoo + } + }); + expect(protocolsFoo.reply.status.code).to.equal(202); + + const protocolsBar = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolBar + } + }); + expect(protocolsBar.reply.status.code).to.equal(202); + + const protocolsBaz = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolBaz + } + }); + expect(protocolsBaz.reply.status.code).to.equal(202); + + // write a record for each protocol + const recordFoo = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolFoo.protocol, + protocolPath : 'foo', + schema : protocolFoo.types.foo.schema + }, + dataStream: new Blob(['Hello, foo!']) + }); + expect(recordFoo.reply.status.code).to.equal(202); + + const recordBar = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolBar.protocol, + protocolPath : 'bar', + schema : protocolBar.types.bar.schema + }, + dataStream: new Blob(['Hello, bar!']) + }); + expect(recordBar.reply.status.code).to.equal(202); + + const recordBaz = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolBaz.protocol, + protocolPath : 'baz', + schema : protocolBaz.types.baz.schema + }, + dataStream: new Blob(['Hello, baz!']) + }); + expect(recordBaz.reply.status.code).to.equal(202); + + // Register Alice's DID to be synchronized with only foo and bar protocols + await testHarness.agent.sync.registerIdentity({ + did : aliceSync.did.uri, + options : { + protocols: [ 'https://protocol.xyz/foo', 'https://protocol.xyz/bar' ] + } + }); + + // Execute Sync to push sync, only foo protocol should be synced + await syncEngine.sync('push'); + + // query remote to see foo protocol + const remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.ProtocolsQuery, + messageParams : {} + }); + const remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply; + expect(remoteProtocolsQueryReply.status.code).to.equal(200); + expect(remoteProtocolsQueryReply.entries?.length).to.equal(2); + expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsFoo.message, protocolsBar.message ]); + + // query remote to see foo record + let remoteFooRecordsResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolFoo.protocol, + } + } + }); + let remoteFooRecordsReply = remoteFooRecordsResponse.reply; + expect(remoteFooRecordsReply.status.code).to.equal(200); + expect(remoteFooRecordsReply.entries).to.have.length(1); + let remoteFooRecordIds = remoteFooRecordsReply.entries?.map(entry => entry.recordId); + expect(remoteFooRecordIds).to.have.members([ recordFoo.message!.recordId ]); + + // query remote to see bar record + let remoteBarRecordsResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolBar.protocol, + } + } + }); + let remoteBarRecordsReply = remoteBarRecordsResponse.reply; + expect(remoteBarRecordsReply.status.code).to.equal(200); + expect(remoteBarRecordsReply.entries).to.have.length(1); + let remoteBarRecordIds = remoteBarRecordsReply.entries?.map(entry => entry.recordId); + expect(remoteBarRecordIds).to.have.members([ recordBar.message!.recordId ]); + + // query remote to see baz record, none should be returned + let remoteBazRecordsResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolBaz.protocol, + } + } + }); + let remoteBazRecordsReply = remoteBazRecordsResponse.reply; + expect(remoteBazRecordsReply.status.code).to.equal(200); + expect(remoteBazRecordsReply.entries).to.have.length(0); + + + // now write a foo record remotely, and a bar record locally + // initiate a sync to both push and pull the records respectively + + // write a record to the remote for the foo protocol + const recordFoo2 = await testHarness.agent.sendDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolFoo.protocol, + protocolPath : 'foo', + schema : protocolFoo.types.foo.schema + }, + dataStream: new Blob(['Hello, foo 2!']) + }); + expect(recordFoo2.reply.status.code).to.equal(202); + + // write a local record to the bar protocol + const recordBar2 = await testHarness.agent.processDwnRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolBar.protocol, + protocolPath : 'bar', + schema : protocolBar.types.bar.schema + }, + dataStream: new Blob(['Hello, bar 2!']) + }); + expect(recordBar2.reply.status.code).to.equal(202); + + // confirm that the foo record is not yet in the local DWN + let localFooRecordsResponse = await testHarness.agent.dwn.processRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolFoo.protocol, + } + } + }); + let localFooRecordsReply = localFooRecordsResponse.reply; + expect(localFooRecordsReply.status.code).to.equal(200); + expect(localFooRecordsReply.entries).to.have.length(1); + let localFooRecordIds = localFooRecordsReply.entries?.map(entry => entry.recordId); + expect(localFooRecordIds).to.not.include(recordFoo2.message!.recordId); + + + // confirm that the bar record is not yet in the remote DWN + remoteBarRecordsResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolBar.protocol, + } + } + }); + remoteBarRecordsReply = remoteBarRecordsResponse.reply; + expect(remoteBarRecordsReply.status.code).to.equal(200); + expect(remoteBarRecordsReply.entries).to.have.length(1); + remoteBarRecordIds = remoteBarRecordsReply.entries?.map(entry => entry.recordId); + expect(remoteBarRecordIds).to.not.include(recordBar2.message!.recordId); + + // preform a pull and push sync + await syncEngine.sync(); + + // query local to see foo records with the new record + localFooRecordsResponse = await testHarness.agent.dwn.processRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolFoo.protocol, + } + } + }); + localFooRecordsReply = localFooRecordsResponse.reply; + expect(localFooRecordsReply.status.code).to.equal(200); + expect(localFooRecordsReply.entries).to.have.length(2); + localFooRecordIds = localFooRecordsReply.entries?.map(entry => entry.recordId); + expect(localFooRecordIds).to.have.members([ recordFoo.message!.recordId, recordFoo2.message!.recordId ]); + + // query remote to see bar records with the new record + remoteBarRecordsResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolBar.protocol, + } + } + }); + remoteBarRecordsReply = remoteBarRecordsResponse.reply; + expect(remoteBarRecordsReply.status.code).to.equal(200); + expect(remoteBarRecordsReply.entries).to.have.length(2); + remoteBarRecordIds = remoteBarRecordsReply.entries?.map(entry => entry.recordId); + expect(remoteBarRecordIds).to.have.members([ recordBar.message!.recordId, recordBar2.message!.recordId ]); + + // confirm that still no baz records exist remotely + remoteBazRecordsResponse = await testHarness.agent.dwn.sendRequest({ + author : aliceSync.did.uri, + target : aliceSync.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + filter: { + protocol: protocolBaz.protocol, + } + } + }); + remoteBazRecordsReply = remoteBazRecordsResponse.reply; + expect(remoteBazRecordsReply.status.code).to.equal(200); + expect(remoteBazRecordsReply.entries).to.have.length(0); + }); + + it('syncs only specified protocols and delegates', async () => { + const alice = await testHarness.createIdentity({ name: 'Alice', testDwnUrls }); + + const aliceDeviceXHarness = await PlatformAgentTestHarness.setup({ + agentClass : TestAgent, + agentStores : 'memory', + testDataLocation : '__TESTDATA__/alice-device', + }); + await aliceDeviceXHarness.clearStorage(); + await aliceDeviceXHarness.createAgentDid(); + + // create a connected DID + const aliceDeviceX = await aliceDeviceXHarness.agent.identity.create({ + store : true, + didMethod : 'jwk', + metadata : { name: 'Alice Device X', connectedDid: alice.did.uri } + }); + + // Alice create 2 protocols on alice's remote DWN + const protocolFoo: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/foo', + types : { + foo: { + schema : 'https://schemas.xyz/foo', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + foo: {} + } + }; + + const protocolBar: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/bar', + types : { + bar: { + schema : 'https://schemas.xyz/bar', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + bar: {} + } + }; + + // configure the protocols + const protocolsFoo = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolFoo + } + }); + expect(protocolsFoo.reply.status.code).to.equal(202); + + const protocolsBar = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolBar + } + }); + expect(protocolsBar.reply.status.code).to.equal(202); + + // create grants for foo protocol, granted to aliceDeviceX + const messagesReadGrant = await testHarness.agent.permissions.createGrant({ + store : true, + author : alice.did.uri, + grantedTo : aliceDeviceX.did.uri, + dateExpires : Time.createOffsetTimestamp({ seconds: 60 }), + scope : { protocol: protocolFoo.protocol, interface: DwnInterfaceName.Messages, method: DwnMethodName.Read } + }); + + const messagesQueryGrant = await testHarness.agent.permissions.createGrant({ + store : true, + author : alice.did.uri, + grantedTo : aliceDeviceX.did.uri, + dateExpires : Time.createOffsetTimestamp({ seconds: 60 }), + scope : { protocol: protocolFoo.protocol, interface: DwnInterfaceName.Messages, method: DwnMethodName.Query } + }); + + const recordsQueryGrant = await testHarness.agent.permissions.createGrant({ + store : true, + author : alice.did.uri, + grantedTo : aliceDeviceX.did.uri, + dateExpires : Time.createOffsetTimestamp({ seconds: 60 }), + delegated : true, + scope : { protocol: protocolFoo.protocol, interface: DwnInterfaceName.Records, method: DwnMethodName.Query } + }); + + const { encodedData: readGrantData, ... messagesReadGrantMessage } = messagesReadGrant.message; + const processMessagesReadGrantAsOwner = await aliceDeviceXHarness.agent.processDwnRequest({ + author : aliceDeviceX.did.uri, + target : aliceDeviceX.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : messagesReadGrantMessage, + dataStream : new Blob([ Convert.base64Url(readGrantData).toUint8Array() ]), + signAsOwner : true + }); + expect(processMessagesReadGrantAsOwner.reply.status.code).to.equal(202); + + const processMessagesReadGrant = await aliceDeviceXHarness.agent.processDwnRequest({ + author : aliceDeviceX.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : messagesReadGrantMessage, + dataStream : new Blob([ Convert.base64Url(readGrantData).toUint8Array() ]) + }); + expect(processMessagesReadGrant.reply.status.code).to.equal(202); + + const { encodedData: queryGrantData, ... messagesQueryGrantMessage } = messagesQueryGrant.message; + const processMessagesQueryGrantAsOwner = await aliceDeviceXHarness.agent.processDwnRequest({ + author : aliceDeviceX.did.uri, + target : aliceDeviceX.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : messagesQueryGrantMessage, + dataStream : new Blob([ Convert.base64Url(queryGrantData).toUint8Array() ]), + signAsOwner : true + }); + expect(processMessagesQueryGrantAsOwner.reply.status.code).to.equal(202); + + const processMessagesQueryGrant = await aliceDeviceXHarness.agent.processDwnRequest({ + author : aliceDeviceX.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : messagesQueryGrantMessage, + dataStream : new Blob([ Convert.base64Url(queryGrantData).toUint8Array() ]), + }); + expect(processMessagesQueryGrant.reply.status.code).to.equal(202); + + // send the grants to the remote DWN + const remoteMessagesQueryGrant = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : messagesQueryGrantMessage, + dataStream : new Blob([ Convert.base64Url(queryGrantData).toUint8Array() ]), + }); + expect(remoteMessagesQueryGrant.reply.status.code).to.equal(202); + + const remoteMessagesReadGrant = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : messagesReadGrantMessage, + dataStream : new Blob([ Convert.base64Url(readGrantData).toUint8Array() ]), + }); + expect(remoteMessagesReadGrant.reply.status.code).to.equal(202); + + const { encodedData: recordsQueryGrantData, ... recordsQueryGrantMessage } = recordsQueryGrant.message; + const processRecordsQueryGrant = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + rawMessage : recordsQueryGrantMessage, + dataStream : new Blob([ Convert.base64Url(recordsQueryGrantData).toUint8Array() ]), + }); + expect(processRecordsQueryGrant.reply.status.code).to.equal(202); + + + // create a record for each protocol + const recordFoo = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolFoo.protocol, + protocolPath : 'foo', + schema : protocolFoo.types.foo.schema + }, + dataStream: new Blob(['Hello, foo!']) + }); + expect(recordFoo.reply.status.code).to.equal(202); + + const recordBar = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + protocol : protocolBar.protocol, + protocolPath : 'bar', + schema : protocolBar.types.bar.schema + }, + dataStream: new Blob(['Hello, bar!']) + }); + expect(recordBar.reply.status.code).to.equal(202); + + // Register Alice's DID to be synchronized with only foo protocol + await aliceDeviceXHarness.agent.sync.registerIdentity({ + did : alice.did.uri, + options : { + protocols : [ protocolFoo.protocol ], + delegateDid : aliceDeviceX.did.uri + } + }); + + // Execute Sync to push sync, only foo protocol should be synced + await aliceDeviceXHarness.agent.sync.sync(); + + // query aliceDeviceX to see foo records + const localFooRecords = await aliceDeviceXHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + granteeDid : aliceDeviceX.did.uri, + messageType : DwnInterface.RecordsQuery, + messageParams : { + delegatedGrant : recordsQueryGrant.message, + filter : { + protocol: protocolFoo.protocol, + } + } + }); + const didAuthor = Jws.getSignerDid(localFooRecords.message!.authorization?.signature.signatures[0]!); + expect(didAuthor).to.equal(aliceDeviceX.did.uri); + expect(localFooRecords.reply.status.code).to.equal(200); + expect(localFooRecords.reply.entries).to.have.length(1); + expect(localFooRecords.reply.entries?.map(entry => entry.recordId)).to.have.deep.equal([ recordFoo.message?.recordId ]); + + await aliceDeviceXHarness.createAgentDid(); + }); + }); + describe('pull()', () => { it('silently ignores sendDwnRequest for a messageCid that does not exist on a remote DWN', async () => { // scenario: The messageCids returned from the remote eventLog contains a Cid that is not found in the remote DWN @@ -410,7 +1004,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Execute Sync to pull all records from Alice's remote DWNs @@ -536,7 +1133,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // spy on sendDwnRequest to the remote DWN @@ -627,7 +1227,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. @@ -691,7 +1294,10 @@ describe('SyncEngineLevel', () => { // register alice await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // create a remote record @@ -782,12 +1388,18 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Register Bob's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: bob.did.uri + did : bob.did.uri, + options : { + protocols: [] + } }); // Execute Sync to pull all records from Alice's and Bob's remove DWNs to their local DWNs. @@ -895,7 +1507,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Execute Sync to pull all records from Alice's remote DWNs @@ -931,7 +1546,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // scenario: The messageCids returned from the local eventLog contains a Cid that already exists in the remote DWN. @@ -1103,7 +1721,10 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Execute Sync to push all records from Alice's local DWN to Alice's remote DWN. @@ -1164,7 +1785,10 @@ describe('SyncEngineLevel', () => { //register alice await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // create a local record @@ -1253,12 +1877,18 @@ describe('SyncEngineLevel', () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); // Register Bob's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ - did: bob.did.uri + did : bob.did.uri, + options : { + protocols: [] + } }); // Execute Sync to push all records from Alice's and Bob's local DWNs to their remote DWNs. @@ -1289,60 +1919,56 @@ describe('SyncEngineLevel', () => { }); describe('startSync()', () => { - it('calls push/pull in each interval', async () => { + it('calls sync() in each interval', async () => { await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); - const pushSpy = sinon.stub(SyncEngineLevel.prototype, 'push'); - pushSpy.resolves(); - - const pullSpy = sinon.stub(SyncEngineLevel.prototype, 'pull'); - pullSpy.resolves(); + const syncSpy = sinon.stub(SyncEngineLevel.prototype, 'sync'); + syncSpy.resolves(); const clock = sinon.useFakeTimers(); testHarness.agent.sync.startSync({ interval: '500ms' }); await clock.tickAsync(1_400); // just under 3 intervals - pushSpy.restore(); - pullSpy.restore(); + syncSpy.restore(); clock.restore(); - expect(pushSpy.callCount).to.equal(2, 'push'); - expect(pullSpy.callCount).to.equal(2, 'pull'); + expect(syncSpy.callCount).to.equal(2, 'push'); }); - it('does not call push/pull again until a push/pull finishes', async () => { + it('does not call sync() again until a sync round finishes', async () => { await testHarness.agent.sync.registerIdentity({ - did: alice.did.uri + did : alice.did.uri, + options : { + protocols: [] + } }); const clock = sinon.useFakeTimers(); - const pushSpy = sinon.stub(SyncEngineLevel.prototype, 'push'); - pushSpy.returns(new Promise((resolve) => { + const syncSpy = sinon.stub(SyncEngineLevel.prototype, 'sync'); + syncSpy.returns(new Promise((resolve) => { clock.setTimeout(() => { resolve(); }, 1_500); // more than the interval })); - const pullSpy = sinon.stub(SyncEngineLevel.prototype, 'pull'); - pullSpy.resolves(); - testHarness.agent.sync.startSync({ interval: '500ms' }); await clock.tickAsync(1_400); // less time than the push - expect(pushSpy.callCount).to.equal(1, 'push'); - expect(pullSpy.callCount).to.equal(0, 'pull'); // not called yet + expect(syncSpy.callCount).to.equal(1, 'sync'); - await clock.tickAsync(100); //remaining time for pull to be called + await clock.tickAsync(600); //remaining time for a 2nd sync - expect(pullSpy.callCount).to.equal(1, 'pull'); + expect(syncSpy.callCount).to.equal(2, 'sync'); - pushSpy.restore(); - pullSpy.restore(); + syncSpy.restore(); clock.restore(); }); }); diff --git a/packages/api/src/web5.ts b/packages/api/src/web5.ts index e905332c5..7f22c5916 100644 --- a/packages/api/src/web5.ts +++ b/packages/api/src/web5.ts @@ -370,7 +370,7 @@ export class Web5 { // Enable sync, unless explicitly disabled. if (sync !== 'off') { // First, register the user identity for sync. - await userAgent.sync.registerIdentity({ did: connectedDid }); + await userAgent.sync.registerIdentity({ did: connectedDid, options: { protocols: [] } }); // Enable sync using the specified interval or default. sync ??= '2m';