From ae823f6be784819aedd3a7bc7cdb4d8e064f73f7 Mon Sep 17 00:00:00 2001 From: Simon Bernard Date: Wed, 6 Jun 2018 17:29:29 +0200 Subject: [PATCH] #508: Enhance registration expiration of RedisRegistrationStore --- .../cluster/RedisRegistrationStore.java | 68 ++++++++++++------- .../server/registration/Registration.java | 10 ++- 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java index 08af37dba3..2828532dca 100644 --- a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java +++ b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -64,6 +65,7 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta /** Default time in seconds between 2 cleaning tasks (used to remove expired registration). */ public static final long DEFAULT_CLEAN_PERIOD = 60; + public static final int DEFAULT_CLEAN_LIMIT = 500; /** Defaut Extra time for registration lifetime in seconds */ public static final long DEFAULT_GRACE_PERIOD = 0; @@ -76,6 +78,8 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta private static final String LOCK_EP = "LOCK:EP:"; private static final byte[] OBS_TKN = "OBS:TKN:".getBytes(UTF_8); private static final String OBS_TKNS_REGID_IDX = "TKNS:REGID:"; // secondary index (token list by registration) + private static final byte[] EXP_EP = "EXP:EP".getBytes(UTF_8); // a sorted set used for registration expiration + // (expiration date, Endpoint) private final Pool pool; @@ -84,23 +88,25 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta private final ScheduledExecutorService schedExecutor; private final long cleanPeriod; // in seconds + private final int cleanLimit; // maximum number to clean in a clean period private final long gracePeriod; // in seconds public RedisRegistrationStore(Pool p) { - this(p, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD); // default clean period 60s + this(p, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT); // default clean period 60s } - public RedisRegistrationStore(Pool p, long cleanPeriodInSec, long lifetimeGracePeriodInSec) { + public RedisRegistrationStore(Pool p, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) { this(p, Executors.newScheduledThreadPool(1, new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), - cleanPeriodInSec, lifetimeGracePeriodInSec); + cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit); } public RedisRegistrationStore(Pool p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, - long lifetimeGracePeriodInSec) { + long lifetimeGracePeriodInSec, int cleanLimit) { this.pool = p; this.schedExecutor = schedExecutor; this.cleanPeriod = cleanPeriodInSec; + this.cleanLimit = cleanLimit; this.gracePeriod = lifetimeGracePeriodInSec; } @@ -146,6 +152,9 @@ public Deregistration addRegistration(Registration registration) { byte[] addr_idx = toRegAddrKey(registration.getSocketAddress()); j.set(addr_idx, registration.getEndpoint().getBytes(UTF_8)); + // Add or update expiration + addOrUpdateExpiration(j, registration); + if (old != null) { Registration oldRegistration = deserializeReg(old); // remove old secondary index @@ -171,7 +180,7 @@ public Deregistration addRegistration(Registration registration) { public UpdatedRegistration updateRegistration(RegistrationUpdate update) { try (Jedis j = pool.getResource()) { - // fetch the client ep by registration ID index + // Fetch the registration ep by registration ID index byte[] ep = j.get(toRegIdKey(update.getRegistrationId())); if (ep == null) { return null; @@ -182,7 +191,7 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) { try { lockValue = RedisLock.acquire(j, lockKey); - // fetch the client + // Fetch the registration byte[] data = j.get(toEndpointKey(ep)); if (data == null) { return null; @@ -192,9 +201,12 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) { Registration updatedRegistration = update.update(r); - // store the new client + // Store the new registration j.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration)); + // Add or update expiration + addOrUpdateExpiration(j, updatedRegistration); + // Update secondary index : // If registration is already associated to this address we don't care as we only want to keep the most // recent binding. @@ -347,6 +359,7 @@ private Deregistration removeRegistration(Jedis j, String registrationId, boolea j.del(toEndpointKey(r.getEndpoint())); Collection obsRemoved = unsafeRemoveAllObservations(j, r.getId()); removeAddrIndex(j, r); + removeExpiration(j, r); return new Deregistration(r, obsRemoved); } } @@ -364,6 +377,14 @@ private void removeAddrIndex(Jedis j, Registration registration) { } } + private void addOrUpdateExpiration(Jedis j, Registration registration) { + j.zadd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8)); + } + + private void removeExpiration(Jedis j, Registration registration) { + j.zrem(EXP_EP, registration.getEndpoint().getBytes(UTF_8)); + } + private byte[] toRegIdKey(String registrationId) { return toKey(REG_EP_REGID_IDX, registrationId); } @@ -505,16 +526,19 @@ public Collection removeObservations(String registrationId) { /* *************** Californium ObservationStore API **************** */ @Override - public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token, org.eclipse.californium.core.observe.Observation obs) { + public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token, + org.eclipse.californium.core.observe.Observation obs) { return add(token, obs, true); } @Override - public org.eclipse.californium.core.observe.Observation put(Token token, org.eclipse.californium.core.observe.Observation obs) { + public org.eclipse.californium.core.observe.Observation put(Token token, + org.eclipse.californium.core.observe.Observation obs) { return add(token, obs, false); } - private org.eclipse.californium.core.observe.Observation add(Token token, org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) { + private org.eclipse.californium.core.observe.Observation add(Token token, + org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) { String endpoint = ObserveUtil.validateCoapObservation(obs); org.eclipse.californium.core.observe.Observation previousObservation = null; @@ -690,21 +714,17 @@ private class Cleaner implements Runnable { public void run() { try (Jedis j = pool.getResource()) { - ScanParams params = new ScanParams().match(REG_EP + "*").count(100); - String cursor = "0"; - do { - ScanResult res = j.scan(cursor.getBytes(), params); - for (byte[] key : res.getResult()) { - Registration r = deserializeReg(j.get(key)); - if (!r.isAlive(gracePeriod)) { - Deregistration dereg = removeRegistration(j, r.getId(), true); - if (dereg != null) - expirationListener.registrationExpired(dereg.getRegistration(), - dereg.getObservations()); - } + Set endpointsExpired = j.zrangeByScore(EXP_EP, Double.NEGATIVE_INFINITY, + System.currentTimeMillis(), 0, cleanLimit); + + for (byte[] endpoint : endpointsExpired) { + Registration r = deserializeReg(j.get(toEndpointKey(endpoint))); + if (!r.isAlive(gracePeriod)) { + Deregistration dereg = removeRegistration(j, r.getId(), true); + if (dereg != null) + expirationListener.registrationExpired(dereg.getRegistration(), dereg.getObservations()); } - cursor = res.getStringCursor(); - } while (!"0".equals(cursor)); + } } catch (Exception e) { LOG.warn("Unexpected Exception while registration cleaning", e); } diff --git a/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java b/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java index 8fb77320ed..9571319d18 100644 --- a/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java +++ b/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java @@ -273,6 +273,14 @@ public Date getLastUpdate() { return lastUpdate; } + public long getExpirationTimeStamp() { + return getExpirationTimeStamp(0L); + } + + public long getExpirationTimeStamp(long gracePeriodInSec) { + return lastUpdate.getTime() + lifeTimeInSec * 1000 + gracePeriodInSec * 1000; + } + /** * @return true if the last registration update was done less than lifetime seconds ago. */ @@ -286,7 +294,7 @@ public boolean isAlive() { * @return true if the last registration update was done less than lifetime+gracePeriod seconds ago. */ public boolean isAlive(long gracePeriodInSec) { - return lastUpdate.getTime() + lifeTimeInSec * 1000 + gracePeriodInSec * 1000 > System.currentTimeMillis(); + return getExpirationTimeStamp(gracePeriodInSec) > System.currentTimeMillis(); } public Map getAdditionalRegistrationAttributes() {