Skip to content

Commit

Permalink
#508: Enhance registration expiration of RedisRegistrationStore
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jun 7, 2018
1 parent 4b7a6e2 commit ae823f6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta

/** Default time in seconds between 2 cleaning tasks (used to remove expired registration). */
public static final long DEFAULT_CLEAN_PERIOD = 60;
public static final int DEFAULT_CLEAN_LIMIT = 500;
/** Defaut Extra time for registration lifetime in seconds */
public static final long DEFAULT_GRACE_PERIOD = 0;

Expand All @@ -76,6 +78,8 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta
private static final String LOCK_EP = "LOCK:EP:";
private static final byte[] OBS_TKN = "OBS:TKN:".getBytes(UTF_8);
private static final String OBS_TKNS_REGID_IDX = "TKNS:REGID:"; // secondary index (token list by registration)
private static final byte[] EXP_EP = "EXP:EP".getBytes(UTF_8); // a sorted set used for registration expiration
// (expiration date, Endpoint)

private final Pool<Jedis> pool;

Expand All @@ -84,23 +88,25 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta

private final ScheduledExecutorService schedExecutor;
private final long cleanPeriod; // in seconds
private final int cleanLimit; // maximum number to clean in a clean period
private final long gracePeriod; // in seconds

public RedisRegistrationStore(Pool<Jedis> p) {
this(p, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD); // default clean period 60s
this(p, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD, DEFAULT_CLEAN_LIMIT); // default clean period 60s
}

public RedisRegistrationStore(Pool<Jedis> p, long cleanPeriodInSec, long lifetimeGracePeriodInSec) {
public RedisRegistrationStore(Pool<Jedis> p, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit) {
this(p, Executors.newScheduledThreadPool(1,
new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))),
cleanPeriodInSec, lifetimeGracePeriodInSec);
cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit);
}

public RedisRegistrationStore(Pool<Jedis> p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec,
long lifetimeGracePeriodInSec) {
long lifetimeGracePeriodInSec, int cleanLimit) {
this.pool = p;
this.schedExecutor = schedExecutor;
this.cleanPeriod = cleanPeriodInSec;
this.cleanLimit = cleanLimit;
this.gracePeriod = lifetimeGracePeriodInSec;
}

Expand Down Expand Up @@ -146,6 +152,9 @@ public Deregistration addRegistration(Registration registration) {
byte[] addr_idx = toRegAddrKey(registration.getSocketAddress());
j.set(addr_idx, registration.getEndpoint().getBytes(UTF_8));

// Add or update expiration
addOrUpdateExpiration(j, registration);

if (old != null) {
Registration oldRegistration = deserializeReg(old);
// remove old secondary index
Expand All @@ -171,7 +180,7 @@ public Deregistration addRegistration(Registration registration) {
public UpdatedRegistration updateRegistration(RegistrationUpdate update) {
try (Jedis j = pool.getResource()) {

// fetch the client ep by registration ID index
// Fetch the registration ep by registration ID index
byte[] ep = j.get(toRegIdKey(update.getRegistrationId()));
if (ep == null) {
return null;
Expand All @@ -182,7 +191,7 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) {
try {
lockValue = RedisLock.acquire(j, lockKey);

// fetch the client
// Fetch the registration
byte[] data = j.get(toEndpointKey(ep));
if (data == null) {
return null;
Expand All @@ -192,9 +201,12 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) {

Registration updatedRegistration = update.update(r);

// store the new client
// Store the new registration
j.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration));

// Add or update expiration
addOrUpdateExpiration(j, updatedRegistration);

// Update secondary index :
// If registration is already associated to this address we don't care as we only want to keep the most
// recent binding.
Expand Down Expand Up @@ -347,6 +359,7 @@ private Deregistration removeRegistration(Jedis j, String registrationId, boolea
j.del(toEndpointKey(r.getEndpoint()));
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, r.getId());
removeAddrIndex(j, r);
removeExpiration(j, r);
return new Deregistration(r, obsRemoved);
}
}
Expand All @@ -364,6 +377,14 @@ private void removeAddrIndex(Jedis j, Registration registration) {
}
}

private void addOrUpdateExpiration(Jedis j, Registration registration) {
j.zadd(EXP_EP, registration.getExpirationTimeStamp(gracePeriod), registration.getEndpoint().getBytes(UTF_8));
}

private void removeExpiration(Jedis j, Registration registration) {
j.zrem(EXP_EP, registration.getEndpoint().getBytes(UTF_8));
}

private byte[] toRegIdKey(String registrationId) {
return toKey(REG_EP_REGID_IDX, registrationId);
}
Expand Down Expand Up @@ -505,16 +526,19 @@ public Collection<Observation> removeObservations(String registrationId) {
/* *************** Californium ObservationStore API **************** */

@Override
public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token, org.eclipse.californium.core.observe.Observation obs) {
public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token,
org.eclipse.californium.core.observe.Observation obs) {
return add(token, obs, true);
}

@Override
public org.eclipse.californium.core.observe.Observation put(Token token, org.eclipse.californium.core.observe.Observation obs) {
public org.eclipse.californium.core.observe.Observation put(Token token,
org.eclipse.californium.core.observe.Observation obs) {
return add(token, obs, false);
}

private org.eclipse.californium.core.observe.Observation add(Token token, org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) {
private org.eclipse.californium.core.observe.Observation add(Token token,
org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) {
String endpoint = ObserveUtil.validateCoapObservation(obs);
org.eclipse.californium.core.observe.Observation previousObservation = null;

Expand Down Expand Up @@ -690,21 +714,17 @@ private class Cleaner implements Runnable {
public void run() {

try (Jedis j = pool.getResource()) {
ScanParams params = new ScanParams().match(REG_EP + "*").count(100);
String cursor = "0";
do {
ScanResult<byte[]> res = j.scan(cursor.getBytes(), params);
for (byte[] key : res.getResult()) {
Registration r = deserializeReg(j.get(key));
if (!r.isAlive(gracePeriod)) {
Deregistration dereg = removeRegistration(j, r.getId(), true);
if (dereg != null)
expirationListener.registrationExpired(dereg.getRegistration(),
dereg.getObservations());
}
Set<byte[]> endpointsExpired = j.zrangeByScore(EXP_EP, Double.NEGATIVE_INFINITY,
System.currentTimeMillis(), 0, cleanLimit);

for (byte[] endpoint : endpointsExpired) {
Registration r = deserializeReg(j.get(toEndpointKey(endpoint)));
if (!r.isAlive(gracePeriod)) {
Deregistration dereg = removeRegistration(j, r.getId(), true);
if (dereg != null)
expirationListener.registrationExpired(dereg.getRegistration(), dereg.getObservations());
}
cursor = res.getStringCursor();
} while (!"0".equals(cursor));
}
} catch (Exception e) {
LOG.warn("Unexpected Exception while registration cleaning", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ public Date getLastUpdate() {
return lastUpdate;
}

public long getExpirationTimeStamp() {
return getExpirationTimeStamp(0L);
}

public long getExpirationTimeStamp(long gracePeriodInSec) {
return lastUpdate.getTime() + lifeTimeInSec * 1000 + gracePeriodInSec * 1000;
}

/**
* @return true if the last registration update was done less than lifetime seconds ago.
*/
Expand All @@ -286,7 +294,7 @@ public boolean isAlive() {
* @return true if the last registration update was done less than lifetime+gracePeriod seconds ago.
*/
public boolean isAlive(long gracePeriodInSec) {
return lastUpdate.getTime() + lifeTimeInSec * 1000 + gracePeriodInSec * 1000 > System.currentTimeMillis();
return getExpirationTimeStamp(gracePeriodInSec) > System.currentTimeMillis();
}

public Map<String, String> getAdditionalRegistrationAttributes() {
Expand Down

0 comments on commit ae823f6

Please sign in to comment.