diff --git a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java index 3746a6a3c2..4a8f2943e3 100644 --- a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java +++ b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java @@ -292,37 +292,39 @@ public void remove() { @Override public Deregistration removeRegistration(String registrationId) { try (Jedis j = pool.getResource()) { + return removeRegistration(j, registrationId, false); + } + } - byte[] regKey = toRegIdKey(registrationId); - - // fetch the client ep by registration ID index - byte[] ep = j.get(regKey); - if (ep == null) { - return null; - } - - byte[] data = j.get(toEndpointKey(ep)); - if (data == null) { - return null; - } + private Deregistration removeRegistration(Jedis j, String registrationId, boolean removeOnlyIfNotAlive) { + // fetch the client ep by registration ID index + byte[] ep = j.get(toRegIdKey(registrationId)); + if (ep == null) { + return null; + } - Registration r = deserializeReg(data); - deleteRegistration(j, r); - Collection obsRemoved = unsafeRemoveAllObservations(j, r.getId()); - return new Deregistration(r, obsRemoved); + // fetch the client + byte[] data = j.get(toEndpointKey(ep)); + if (data == null) { + return null; } - } - private void deleteRegistration(Jedis j, Registration r) { + Registration r = deserializeReg(data); + byte[] lockValue = null; byte[] lockKey = toLockKey(r.getEndpoint()); try { lockValue = RedisLock.acquire(j, lockKey); - // delete all entries - j.del(toRegIdKey(r.getId())); - j.del(toEndpointKey(r.getEndpoint())); - + if (!removeOnlyIfNotAlive || !r.isAlive()) { + long nbRemoved = j.del(toRegIdKey(r.getId())); + if (nbRemoved > 0) { + j.del(toEndpointKey(r.getEndpoint())); + Collection obsRemoved = unsafeRemoveAllObservations(j, r.getId()); + return new Deregistration(r, obsRemoved); + } + } + return null; } finally { RedisLock.release(j, lockKey, lockValue); } @@ -669,13 +671,14 @@ public void run() { ScanParams params = new ScanParams().match(REG_EP + "*").count(100); String cursor = "0"; do { - // TODO we probably need a lock here ScanResult res = j.scan(cursor.getBytes(), params); for (byte[] key : res.getResult()) { Registration r = deserializeReg(j.get(key)); if (!r.isAlive()) { - deleteRegistration(j, r); - expirationListener.registrationExpired(r, new ArrayList()); + Deregistration dereg = removeRegistration(j, r.getId(), true); + if (dereg != null) + expirationListener.registrationExpired(dereg.getRegistration(), + dereg.getObservations()); } } cursor = res.getStringCursor();