Skip to content

Commit

Permalink
Do not remove registration by endpoint, if no registration by Id.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Nov 10, 2017
1 parent 3610834 commit e581476
Showing 1 changed file with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,37 +292,39 @@ public void remove() {
@Override
public Deregistration removeRegistration(String registrationId) {
try (Jedis j = pool.getResource()) {
return removeRegistration(j, registrationId, false);
}
}

byte[] regKey = toRegIdKey(registrationId);

// fetch the client ep by registration ID index
byte[] ep = j.get(regKey);
if (ep == null) {
return null;
}

byte[] data = j.get(toEndpointKey(ep));
if (data == null) {
return null;
}
private Deregistration removeRegistration(Jedis j, String registrationId, boolean removeOnlyIfNotAlive) {
// fetch the client ep by registration ID index
byte[] ep = j.get(toRegIdKey(registrationId));
if (ep == null) {
return null;
}

Registration r = deserializeReg(data);
deleteRegistration(j, r);
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, r.getId());
return new Deregistration(r, obsRemoved);
// fetch the client
byte[] data = j.get(toEndpointKey(ep));
if (data == null) {
return null;
}
}

private void deleteRegistration(Jedis j, Registration r) {
Registration r = deserializeReg(data);

byte[] lockValue = null;
byte[] lockKey = toLockKey(r.getEndpoint());
try {
lockValue = RedisLock.acquire(j, lockKey);

// delete all entries
j.del(toRegIdKey(r.getId()));
j.del(toEndpointKey(r.getEndpoint()));

if (!removeOnlyIfNotAlive || !r.isAlive()) {
long nbRemoved = j.del(toRegIdKey(r.getId()));
if (nbRemoved > 0) {
j.del(toEndpointKey(r.getEndpoint()));
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, r.getId());
return new Deregistration(r, obsRemoved);
}
}
return null;
} finally {
RedisLock.release(j, lockKey, lockValue);
}
Expand Down Expand Up @@ -669,13 +671,14 @@ public void run() {
ScanParams params = new ScanParams().match(REG_EP + "*").count(100);
String cursor = "0";
do {
// TODO we probably need a lock here
ScanResult<byte[]> res = j.scan(cursor.getBytes(), params);
for (byte[] key : res.getResult()) {
Registration r = deserializeReg(j.get(key));
if (!r.isAlive()) {
deleteRegistration(j, r);
expirationListener.registrationExpired(r, new ArrayList<Observation>());
Deregistration dereg = removeRegistration(j, r.getId(), true);
if (dereg != null)
expirationListener.registrationExpired(dereg.getRegistration(),
dereg.getObservations());
}
}
cursor = res.getStringCursor();
Expand Down

0 comments on commit e581476

Please sign in to comment.