Skip to content

Commit

Permalink
#464: Fix registrationByAddr (use index and handle conflict)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jun 7, 2018
1 parent 5231f67 commit 947e801
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class InMemoryRegistrationStore implements CaliforniumRegistrationStore,

// Data structure
private final Map<String /* end-point */, Registration> regsByEp = new HashMap<>();
private final Map<InetSocketAddress, Registration> regsByAddr = new HashMap<>();
private Map<Token, org.eclipse.californium.core.observe.Observation> obsByToken = new HashMap<>();
private Map<String, Set<Token>> tokensByRegId = new HashMap<>();

Expand Down Expand Up @@ -99,8 +101,12 @@ public Deregistration addRegistration(Registration registration) {
lock.writeLock().lock();

Registration registrationRemoved = regsByEp.put(registration.getEndpoint(), registration);
// If a registration is already associated to this address we don't care as we only want to keep the most
// recent binding.
regsByAddr.put(registration.getSocketAddress(), registration);
if (registrationRemoved != null) {
Collection<Observation> observationsRemoved = unsafeRemoveAllObservations(registrationRemoved.getId());
removeFromMap(regsByAddr, registrationRemoved.getSocketAddress(), registrationRemoved);
return new Deregistration(registrationRemoved, observationsRemoved);
}
} finally {
Expand All @@ -120,6 +126,10 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) {
} else {
Registration updatedRegistration = update.update(registration);
regsByEp.put(updatedRegistration.getEndpoint(), updatedRegistration);
// If registration is already associated to this address we don't care as we only want to keep the most
// recent binding.
regsByAddr.put(updatedRegistration.getSocketAddress(), updatedRegistration);
removeFromMap(regsByAddr, registration.getSocketAddress(), registration);
return new UpdatedRegistration(registration, updatedRegistration);
}
} finally {
Expand Down Expand Up @@ -159,15 +169,7 @@ public Registration getRegistrationByEndpoint(String endpoint) {
public Registration getRegistrationByAdress(InetSocketAddress address) {
try {
lock.readLock().lock();
// TODO we should create an index instead of iterate all over the collection
if (address != null) {
for (Registration r : regsByEp.values()) {
if (address.getPort() == r.getPort() && address.getAddress().equals(r.getAddress())) {
return r;
}
}
}
return null;
return regsByAddr.get(address);
} finally {
lock.readLock().unlock();
}
Expand All @@ -192,6 +194,7 @@ public Deregistration removeRegistration(String registrationId) {
if (registration != null) {
Collection<Observation> observationsRemoved = unsafeRemoveAllObservations(registration.getId());
regsByEp.remove(registration.getEndpoint());
removeFromMap(regsByAddr, registration.getSocketAddress(), registration);
return new Deregistration(registration, observationsRemoved);
}
return null;
Expand Down Expand Up @@ -280,16 +283,19 @@ public Collection<Observation> removeObservations(String registrationId) {
/* *************** Californium ObservationStore API **************** */

@Override
public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token, org.eclipse.californium.core.observe.Observation obs) {
public org.eclipse.californium.core.observe.Observation putIfAbsent(Token token,
org.eclipse.californium.core.observe.Observation obs) {
return add(token, obs, true);
}

@Override
public org.eclipse.californium.core.observe.Observation put(Token token, org.eclipse.californium.core.observe.Observation obs) {
public org.eclipse.californium.core.observe.Observation put(Token token,
org.eclipse.californium.core.observe.Observation obs) {
return add(token, obs, false);
}

private org.eclipse.californium.core.observe.Observation add(Token token, org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) {
private org.eclipse.californium.core.observe.Observation add(Token token,
org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) {
org.eclipse.californium.core.observe.Observation previousObservation = null;
if (obs != null) {
try {
Expand Down Expand Up @@ -476,4 +482,14 @@ public void run() {
}
}
}

// boolean remove(Object key, Object value) exist only since java8
// So this method is here only while we want to support java 7
protected <K, V> boolean removeFromMap(Map<K, V> map, K key, V value) {
if (map.containsKey(key) && Objects.equals(map.get(key), value)) {
map.remove(key);
return true;
} else
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta
private static final Logger LOG = LoggerFactory.getLogger(RedisRegistrationStore.class);

// Redis key prefixes
private static final String REG_EP = "REG:EP:";
private static final String REG_EP_REGID_IDX = "EP:REGID:"; // secondary index key (registration)
private static final String REG_EP = "REG:EP:"; // (Endpoint => Registration)
private static final String REG_EP_REGID_IDX = "EP:REGID:"; // secondary index key (Registration ID => Endpoint)
private static final String REG_EP_ADDR_IDX = "EP:ADDR:"; // secondary index key (Socket Address => Endpoint)
private static final String LOCK_EP = "LOCK:EP:";
private static final byte[] OBS_TKN = "OBS:TKN:".getBytes(UTF_8);
private static final String OBS_TKNS_REGID_IDX = "TKNS:REGID:"; // secondary index (token list by registration)
Expand Down Expand Up @@ -139,15 +140,20 @@ public Deregistration addRegistration(Registration registration) {
byte[] k = toEndpointKey(registration.getEndpoint());
byte[] old = j.getSet(k, serializeReg(registration));

// add registration: secondary index
byte[] idx = toRegIdKey(registration.getId());
j.set(idx, registration.getEndpoint().getBytes(UTF_8));
// add registration: secondary indexes
byte[] regid_idx = toRegIdKey(registration.getId());
j.set(regid_idx, registration.getEndpoint().getBytes(UTF_8));
byte[] addr_idx = toRegAddrKey(registration.getSocketAddress());
j.set(addr_idx, registration.getEndpoint().getBytes(UTF_8));

if (old != null) {
Registration oldRegistration = deserializeReg(old);
// remove old secondary index
if (registration.getId() != oldRegistration.getId())
j.del(toRegIdKey(oldRegistration.getId()));
if (!oldRegistration.getSocketAddress().equals(registration.getSocketAddress())) {
removeAddrIndex(j, oldRegistration);
}
// remove old observation
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, oldRegistration.getId());

Expand Down Expand Up @@ -189,6 +195,15 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) {
// store the new client
j.set(toEndpointKey(updatedRegistration.getEndpoint()), serializeReg(updatedRegistration));

// Update secondary index :
// If registration is already associated to this address we don't care as we only want to keep the most
// recent binding.
byte[] addr_idx = toRegAddrKey(updatedRegistration.getSocketAddress());
j.set(addr_idx, updatedRegistration.getEndpoint().getBytes(UTF_8));
if (!r.getSocketAddress().equals(updatedRegistration.getSocketAddress())) {
removeAddrIndex(j, r);
}

return new UpdatedRegistration(r, updatedRegistration);

} finally {
Expand Down Expand Up @@ -218,14 +233,18 @@ public Registration getRegistrationByEndpoint(String endpoint) {

@Override
public Registration getRegistrationByAdress(InetSocketAddress address) {
// TODO we should create an index instead of iterate all over the collection
for (Iterator<Registration> iterator = getAllRegistrations(); iterator.hasNext();) {
Registration r = iterator.next();
if (address.getPort() == r.getPort() && address.getAddress().equals(r.getAddress())) {
return r;
Validate.notNull(address);
try (Jedis j = pool.getResource()) {
byte[] ep = j.get(toRegAddrKey(address));
if (ep == null) {
return null;
}
byte[] data = j.get(toEndpointKey(ep));
if (data == null) {
return null;
}
return deserializeReg(data);
}
return null;
}

@Override
Expand Down Expand Up @@ -327,6 +346,7 @@ private Deregistration removeRegistration(Jedis j, String registrationId, boolea
if (nbRemoved > 0) {
j.del(toEndpointKey(r.getEndpoint()));
Collection<Observation> obsRemoved = unsafeRemoveAllObservations(j, r.getId());
removeAddrIndex(j, r);
return new Deregistration(r, obsRemoved);
}
}
Expand All @@ -336,10 +356,22 @@ private Deregistration removeRegistration(Jedis j, String registrationId, boolea
}
}

private void removeAddrIndex(Jedis j, Registration registration) {
byte[] regAddrKey = toRegAddrKey(registration.getSocketAddress());
byte[] epFromAddr = j.get(regAddrKey);
if (Arrays.equals(epFromAddr, registration.getEndpoint().getBytes(UTF_8))) {
j.del(regAddrKey);
}
}

private byte[] toRegIdKey(String registrationId) {
return toKey(REG_EP_REGID_IDX, registrationId);
}

private byte[] toRegAddrKey(InetSocketAddress addr) {
return toKey(REG_EP_ADDR_IDX, addr.getAddress().toString() + ":" + addr.getPort());
}

private byte[] toEndpointKey(String endpoint) {
return toKey(REG_EP, endpoint);
}
Expand Down Expand Up @@ -485,7 +517,7 @@ public org.eclipse.californium.core.observe.Observation put(Token token, org.ecl
private org.eclipse.californium.core.observe.Observation add(Token token, org.eclipse.californium.core.observe.Observation obs, boolean ifAbsent) {
String endpoint = ObserveUtil.validateCoapObservation(obs);
org.eclipse.californium.core.observe.Observation previousObservation = null;

try (Jedis j = pool.getResource()) {
byte[] lockValue = null;
byte[] lockKey = toKey(LOCK_EP, endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ public Identity getIdentity() {
return identity;
}

/**
* Gets the client's network socket address.
*
* @return the source address from the client's most recent CoAP message.
*/
public InetSocketAddress getSocketAddress() {
return identity.getPeerAddress();
}

/**
* Gets the client's network address.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public interface RegistrationStore {
Registration getRegistrationByEndpoint(String endpoint);

/**
* Get the registration by socket address.
* Get the registration by socket address. If there are 2 Registrations linked to the same address, the most recent
* one should be returned. Generally this happened when devices are behind NAT and so address could be reused.
*
* @param address of the client registered.
* @return the registration or null if there is no client registered with this socket address.
Expand Down

0 comments on commit 947e801

Please sign in to comment.