Skip to content

Commit

Permalink
🐛 ehcache#1814 StateRepository data format change
Browse files Browse the repository at this point in the history
The StateRepository provided map for a clustered cache cannot store
any Serializable type as otherwise the server will fail to
deserialize classes not on its classpath. So keys and values are
encoded to preserve usage flexibility while providing a best effort on
equality contract.
  • Loading branch information
ljacomet committed Feb 9, 2017
1 parent 72f41aa commit f153f20
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ class ClusteredStateRepository implements StateRepository {

@Override
public <K extends Serializable, V extends Serializable> ConcurrentMap<K, V> getPersistentConcurrentMap(String name, Class<K> keyClass, Class<V> valueClass) {
return new ConcurrentClusteredMap<K, V>(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity);
return new ConcurrentClusteredMap<K, V>(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,28 @@
import org.ehcache.clustered.common.internal.messages.StateRepositoryMessageFactory;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;

import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;

import static org.ehcache.clustered.client.internal.service.ValueCodecFactory.getCodecForClass;

public class ConcurrentClusteredMap<K, V> implements ConcurrentMap<K, V> {

private final StateRepositoryMessageFactory messageFactory;
private final EhcacheClientEntity entity;

public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity) {
private final Class<K> keyClass;
private final ValueCodec<K> keyCodec;
private final ValueCodec<V> valueCodec;

public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class<K> keyClass, Class<V> valueClass) {
this.keyClass = keyClass;
this.keyCodec = getCodecForClass(keyClass);
this.valueCodec = getCodecForClass(valueClass);
this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId);
this.entity = entity;
}
Expand All @@ -60,7 +70,12 @@ public boolean containsValue(final Object value) {

@Override
public V get(final Object key) {
return (V) getResponse(messageFactory.getMessage(key));
if (!keyClass.isAssignableFrom(key.getClass())) {
return null;
}
@SuppressWarnings("unchecked")
Object response = getResponse(messageFactory.getMessage(keyCodec.encode((K) key)));
return valueCodec.decode(response);
}

private Object getResponse(StateRepositoryOpMessage message) {
Expand Down Expand Up @@ -106,12 +121,19 @@ public Collection<V> values() {

@Override
public Set<Entry<K, V>> entrySet() {
return (Set<Entry<K, V>>) getResponse(messageFactory.entrySetMessage());
@SuppressWarnings("unchecked")
Set<Entry<Object, Object>> response = (Set<Entry<Object, Object>>) getResponse(messageFactory.entrySetMessage());
Set<Entry<K, V>> entries = new HashSet<Entry<K, V>>();
for (Entry<Object, Object> objectEntry : response) {
entries.add(new AbstractMap.SimpleEntry<K, V>(keyCodec.decode(objectEntry.getKey()),
valueCodec.decode(objectEntry.getValue())));
}
return entries;
}

@Override
public V putIfAbsent(final K key, final V value) {
return (V) getResponse(messageFactory.putIfAbsentMessage(key, value));
return (V) getResponse(messageFactory.putIfAbsentMessage(keyCodec.encode(key), valueCodec.encode(value)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.client.internal.service;

/**
* ValueCodec
*/
interface ValueCodec<T> {

Object encode(T input);

T decode(Object input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.client.internal.service;

import org.ehcache.clustered.common.internal.store.ValueWrapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
* ValueCodecFactory
*/
class ValueCodecFactory {
static <T> ValueCodec<T> getCodecForClass(Class<T> clazz) {
if (!Serializable.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException("The provided type is invalid as it is not Serializable " + clazz);
}
if (Integer.class.equals(clazz) || Long.class.equals(clazz)
|| Float.class.equals(clazz) || Double.class.equals(clazz)
|| Byte.class.equals(clazz) || Character.class.equals(clazz)
|| clazz.isPrimitive() || String.class.equals(clazz)) {
return new IdentityCodec<T>();
} else {
return new SerializationWrapperCodec<T>();
}
}

private static class IdentityCodec<T> implements ValueCodec<T> {
@Override
public Object encode(T input) {
return input;
}

@Override
@SuppressWarnings("unchcked")
public T decode(Object input) {
return (T) input;
}
}

private static class SerializationWrapperCodec<T> implements ValueCodec<T> {
@Override
public Object encode(T input) {
if (input == null) {
return null;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(input);
} catch(IOException e) {
throw new RuntimeException("Object cannot be serialized", e);
} finally {
oos.close();
}
} catch(IOException e) {
// ignore
}
return new ValueWrapper(input.hashCode(), baos.toByteArray());
}

@Override
public T decode(Object input) {
if (input == null) {
return null;
}
ValueWrapper data = (ValueWrapper) input;
ByteArrayInputStream bais = new ByteArrayInputStream(data.getValue());
try {
ObjectInputStream ois = new ObjectInputStream(bais);
try {
@SuppressWarnings("unchecked")
T result = (T) ois.readObject();
return result;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load class", e);
} finally {
ois.close();
}
} catch(IOException e) {
// ignore
}
throw new AssertionError("Cannot reach here!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.terracotta.passthrough.PassthroughServer;
import org.terracotta.passthrough.PassthroughTestHelpers;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.concurrent.ConcurrentMap;

import static org.ehcache.clustered.client.internal.UnitTestConnectionService.getOffheapResourcesType;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -115,10 +117,74 @@ public Class<ClusteringService> getServiceType() {
service.stop();
}

@Test
public void testClusteredStateRepositoryReplicationWithSerializableKV() throws Exception {
ClusteringServiceConfiguration configuration =
ClusteringServiceConfigurationBuilder.cluster(URI.create(STRIPE_URI))
.autoCreate()
.build();

ClusteringService service = new ClusteringServiceFactory().create(configuration);

service.start(null);

EhcacheClientEntity clientEntity = getEntity(service);

ClusteredStateRepository stateRepository = new ClusteredStateRepository(new ClusteringService.ClusteredCacheIdentifier() {
@Override
public String getId() {
return "testStateRepo";
}

@Override
public Class<ClusteringService> getServiceType() {
return ClusteringService.class;
}
}, "test", clientEntity);

ConcurrentMap<TestVal, TestVal> testMap = stateRepository.getPersistentConcurrentMap("testMap", TestVal.class, TestVal.class);
testMap.putIfAbsent(new TestVal("One"), new TestVal("One"));
testMap.putIfAbsent(new TestVal("Two"), new TestVal("Two"));

clusterControl.terminateActive();
clusterControl.waitForActive();

assertThat(testMap.get(new TestVal("One")), is(new TestVal("One")));
assertThat(testMap.get(new TestVal("Two")), is(new TestVal("Two")));

assertThat(testMap.entrySet(), hasSize(2));

service.stop();
}

private static EhcacheClientEntity getEntity(ClusteringService clusteringService) throws NoSuchFieldException, IllegalAccessException {
Field entity = clusteringService.getClass().getDeclaredField("entity");
entity.setAccessible(true);
return (EhcacheClientEntity)entity.get(clusteringService);
}

private static class TestVal implements Serializable {
final String val;


private TestVal(String val) {
this.val = val;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TestVal testVal = (TestVal) o;

return val != null ? val.equals(testVal.val) : testVal.val == null;
}

@Override
public int hashCode() {
return val != null ? val.hashCode() : 0;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.common.internal.store;

import java.io.Serializable;
import java.util.Arrays;

/**
* ValueWrapper
*/
public class ValueWrapper implements Serializable {

private static final long serialVersionUID = -4794738044295644587L;

private final int hashCode;
private final byte[] value;

public ValueWrapper(int hashCode, byte[] value) {
this.hashCode = hashCode;
this.value = value;
}

public byte[] getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ValueWrapper that = (ValueWrapper) o;

if (hashCode != that.hashCode) return false;
return Arrays.equals(value, that.value);
}

@Override
public int hashCode() {
return hashCode;
}
}

0 comments on commit f153f20

Please sign in to comment.