Skip to content

Commit

Permalink
Currently sync interval stops if there is a failure, instead log fail…
Browse files Browse the repository at this point in the history
…ure and wait for the interval (#958)

This PR fixes the issue where a sync interval would stop completely if any error occurred during sync.

This will now catch the error and log it, continuing to attempt sync the next interval.
Additionally, enqueueing operations for each DID/Remote will not wait for each one to complete sequentially and instead take advantage of `Promise.allSettled`. Which also will not fail if a single peer of the array fails and instead log the error.
  • Loading branch information
LiranCohen authored Oct 21, 2024
1 parent 5120f6f commit 7072331
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 3 deletions.
8 changes: 8 additions & 0 deletions .changeset/lucky-dots-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Prevent SyncEngine from stopping completely during a sync failure, next interval will try again.
19 changes: 16 additions & 3 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ export class SyncEngineLevel implements SyncEngine {

clearInterval(this._syncIntervalId);
this._syncIntervalId = undefined;
await this.sync();

try {
await this.sync();
} catch (error) {
console.error('SyncEngineLevel: Error during sync operation', error);
}

if (!this._syncIntervalId) {
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
Expand Down Expand Up @@ -405,7 +410,7 @@ export class SyncEngineLevel implements SyncEngine {
syncDirection: SyncDirection,
syncPeerState: SyncState[]
}) {
for (let syncState of syncPeerState) {
const enqueueOps = await Promise.allSettled(syncPeerState.map(async (syncState) => {
// Get the event log from the remote DWN if pull sync, or local DWN if push sync.
const eventLog = await this.getDwnEventLog({
did : syncState.did,
Expand Down Expand Up @@ -435,7 +440,15 @@ export class SyncEngineLevel implements SyncEngine {
: this.getPushQueue();
await syncQueue.batch(syncOperations as any);
}
}
}));

// log any errors that occurred during the enqueuing process
enqueueOps.forEach((result, index) => {
if (result.status === 'rejected') {
const peerState = syncPeerState[index];
console.error(`SyncEngineLevel: Error enqueuing sync operation for peerState: ${JSON.stringify(peerState)}`, result.reason);
}
});
}

private static generateSyncMessageParamsKey({ did, delegateDid, dwnUrl, protocol, watermark, messageCid }:SyncMessageParams): string {
Expand Down
68 changes: 68 additions & 0 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,36 @@ describe('SyncEngineLevel', () => {

clock.restore();
});

it('sync logs failures when enqueueing sync operations', async () => {
// returns 3 DID peers to sync with
sinon.stub(syncEngine as any, 'getSyncPeerState').resolves([{
did: 'did:example:alice',
}, {
did: 'did:example:bob',
}, {
did: 'did:example:carol',
}]);

const getDwnEventLogSpy = sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([]);
getDwnEventLogSpy.onCall(2).rejects(new Error('Failed to get event log'));

// spy on the console error
const consoleErrorSpy = sinon.stub(console, 'error').resolves();

await syncEngine.sync();

expect(consoleErrorSpy.callCount).to.equal(1);
expect(consoleErrorSpy.firstCall.args[0]).to.include('Error enqueuing sync operation for peerState');

// reset the error spy
consoleErrorSpy.resetHistory();

// sync again, this time no errors should be thrown
await syncEngine.sync();

expect(consoleErrorSpy.notCalled).to.be.true;
});
});

describe('pull()', () => {
Expand Down Expand Up @@ -2002,6 +2032,44 @@ describe('SyncEngineLevel', () => {
syncSpy.restore();
clock.restore();
});

it('should log sync errors, but continue syncing the next interval', async () => {
await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');

syncSpy.returns(new Promise<void>((resolve, reject) => {
clock.setTimeout(() => {
resolve();
}, 100);
}));

// first call is the initial sync, 2nd and onward are the intervals
// on the 2nd interval (3rd call), we reject the promise, a 4th call should be made
syncSpy.onThirdCall().rejects(new Error('Sync error'));

// spy on console.error to check if the error message is logged
const consoleErrorSpy = sinon.stub(console, 'error').resolves();

testHarness.agent.sync.startSync({ interval: '500ms' });

// three intervals
await clock.tickAsync(1_500);

// this should equal 4, once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(4);

// check if the error message is logged
expect(consoleErrorSpy.callCount).to.equal(1);
expect(consoleErrorSpy.args[0][0]).to.include('SyncEngineLevel: Error during sync operation');

syncSpy.restore();
consoleErrorSpy.restore();
clock.restore();
});
});

describe('stopSync()', () => {
Expand Down

0 comments on commit 7072331

Please sign in to comment.