Skip to content

Commit

Permalink
GH-39: Added Resgate subscription counting.
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed May 14, 2020
1 parent 196259f commit 1e86567
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
13 changes: 8 additions & 5 deletions src/class/CacheItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ class CacheItem {
this.item = null;
this.direct = 0;
this.indirect = 0;
this.subscribed = false;
this.subscribed = 0; // Count of direct subscriptions towards Resgate
this.promise = null;
}

setSubscribed(isSubscribed) {
this.subscribed = isSubscribed;
if (!isSubscribed && this.unsubTimeout) {
/**
* Adds or subtracts from the subscribed counter.
* @param {number} dir Value to add. If 0, the subscribed counter will be set to 0.
*/
addSubscribed(dir) {
this.subscribed += dir ? dir : -this.subscribed;
if (!this.subscribed && this.unsubTimeout) {
clearTimeout(this.unsubTimeout);
this.unsubTimeout = null;
}
return this;
}

setPromise(promise) {
Expand Down
32 changes: 18 additions & 14 deletions src/class/ResClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ class ResClient {
.then(result => {
this._cacheResources(result);
let ci = this.cache[result.rid];
ci.setSubscribed(true);
ci.addSubscribed(1);
return ci.item;
});
}
Expand Down Expand Up @@ -493,7 +493,7 @@ class ResClient {
if (result.rid) {
this._cacheResources(result);
let ci = this.cache[result.rid];
ci.setSubscribed(true);
ci.addSubscribed(1);
return ci.item;
}
return result.payload;
Expand Down Expand Up @@ -675,7 +675,7 @@ class ResClient {
}

_handleUnsubscribeEvent(cacheItem) {
cacheItem.setSubscribed(false);
cacheItem.addSubscribed(0);
this._tryDelete(cacheItem);
this.eventBus.emit(cacheItem.item, this.namespace + '.resource.' + cacheItem.rid + '.unsubscribe', { item: cacheItem.item });
return true;
Expand Down Expand Up @@ -707,7 +707,7 @@ class ResClient {

_subscribe(ci, throwError) {
let rid = ci.rid;
ci.setSubscribed(true);
ci.addSubscribed(1);
this._removeStale(rid);
return this._send('subscribe', rid)
.then(response => this._cacheResources(response))
Expand Down Expand Up @@ -806,7 +806,7 @@ class ResClient {
for (let rid in this.cache) {
let ci = this.cache[rid];
if (ci.subscribed) {
ci.setSubscribed(false);
ci.addSubscribed(0);
this._addStale(rid);
this._tryDelete(ci);
}
Expand Down Expand Up @@ -1225,18 +1225,22 @@ class ResClient {

this._subscribeReferred(ci);

this._send('unsubscribe', ci.rid)
.then(() => {
ci.setSubscribed(false);
this._tryDelete(ci);
})
.catch(err => this._tryDelete(ci));
let i = ci.subscribed;
while (i--) {
this._send('unsubscribe', ci.rid)
.then(() => {
ci.addSubscribed(-1);
this._tryDelete(ci);
})
.catch(err => this._tryDelete(ci));
}
}

_subscribeReferred(ci) {
ci.subscribed = false;
let i = ci.subscribed;
ci.subscribed = 0;
let refs = this._getRefState(ci);
ci.subscribed = true;
ci.subscribed = i;

for (let rid in refs) {
let r = refs[rid];
Expand All @@ -1247,7 +1251,7 @@ class ResClient {
}

_handleFailedSubscribe(cacheItem) {
cacheItem.setSubscribed(false);
cacheItem.addSubscribed(-1);
this._tryDelete(cacheItem);
}

Expand Down
42 changes: 42 additions & 0 deletions src/class/ResClient.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,46 @@ describe("ResClient", () => {
}));
});
});

it("unsubscribes model with multiple direct subscriptions, when no longer listened to", () => {
return getServerResource('service.model', modelResources).then(model => {
let promise = model.call('test');

return flushRequests().then(() => {
let req = server.getNextRequest();
server.sendResponse(req, { rid: "service.model" });

return flushRequests()
.then(() => promise)
.then(m => {
expect(m).toBe(model);
// Cause unsubscribe by waiting
return waitAWhile().then(flushRequests).then(() => {
expect(server.error).toBe(null);
// Expect 2 unsubscribes
for (var i = 0; i < 2; i++) {
let req = server.getNextRequest();
expect(req).not.toBe(undefined);
expect(req.method).toBe('unsubscribe.service.model');
server.sendResponse(req, null);
}

// Wait for the unsubscribe response
return flushRequests().then(() => {
expect(server.error).toBe(null);

return getServerResource('service.model', modelResources).then(modelSecond => {
expect(model).not.toBe(modelSecond);

let req = server.getNextRequest();
expect(req).toBe(undefined);
});
});
});
});
});
});
});
});

describe("getResource collection", () => {
Expand Down Expand Up @@ -736,6 +776,8 @@ describe("ResClient", () => {
it("instantly resubscribes to a model when listening between an unsubscribe request and its response", () => {
// TODO
});


});

describe("collection.on", () => {
Expand Down

0 comments on commit 1e86567

Please sign in to comment.