From 5540d616727aef5ff89826838ee2cf83daaefb76 Mon Sep 17 00:00:00 2001 From: Louis Jacomet Date: Thu, 9 Feb 2017 11:12:09 +0100 Subject: [PATCH] :bug: #1814 StateRepository data format change The StateRepository provided map for a clustered cache cannot store any Serializable type as otherwise the server will fail to deserialize classes not on its classpath. So keys and values are encoded to preserve usage flexibility while providing a best effort on equality contract. --- .../service/ClusteredStateRepository.java | 2 +- .../service/ConcurrentClusteredMap.java | 33 +++++- .../client/internal/service/ValueCodec.java | 27 +++++ .../internal/service/ValueCodecFactory.java | 105 ++++++++++++++++++ ...usteredStateRepositoryReplicationTest.java | 66 +++++++++++ .../common/internal/store/ValueWrapper.java | 56 ++++++++++ 6 files changed, 283 insertions(+), 6 deletions(-) create mode 100644 clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java create mode 100644 clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java create mode 100644 clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java index 06b20fcb92..c80d91d240 100644 --- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java @@ -41,6 +41,6 @@ class ClusteredStateRepository implements StateRepository { @Override public ConcurrentMap getPersistentConcurrentMap(String name, Class keyClass, Class valueClass) { - return new ConcurrentClusteredMap(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity); + return new ConcurrentClusteredMap(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass); } } diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java index d8548549ae..3d1f96569c 100644 --- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java @@ -22,18 +22,28 @@ import org.ehcache.clustered.common.internal.messages.StateRepositoryMessageFactory; import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; +import java.util.AbstractMap; import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; +import static org.ehcache.clustered.client.internal.service.ValueCodecFactory.getCodecForClass; + public class ConcurrentClusteredMap implements ConcurrentMap { private final StateRepositoryMessageFactory messageFactory; private final EhcacheClientEntity entity; - - public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity) { + private final Class keyClass; + private final ValueCodec keyCodec; + private final ValueCodec valueCodec; + + public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class keyClass, Class valueClass) { + this.keyClass = keyClass; + this.keyCodec = getCodecForClass(keyClass); + this.valueCodec = getCodecForClass(valueClass); this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId); this.entity = entity; } @@ -60,7 +70,12 @@ public boolean containsValue(final Object value) { @Override public V get(final Object key) { - return (V) getResponse(messageFactory.getMessage(key)); + if (!keyClass.isAssignableFrom(key.getClass())) { + return null; + } + @SuppressWarnings("unchecked") + Object response = getResponse(messageFactory.getMessage(keyCodec.encode((K) key))); + return valueCodec.decode(response); } private Object getResponse(StateRepositoryOpMessage message) { @@ -106,12 +121,20 @@ public Collection values() { @Override public Set> entrySet() { - return (Set>) getResponse(messageFactory.entrySetMessage()); + @SuppressWarnings("unchecked") + Set> response = (Set>) getResponse(messageFactory.entrySetMessage()); + Set> entries = new HashSet>(); + for (Entry objectEntry : response) { + entries.add(new AbstractMap.SimpleEntry(keyCodec.decode(objectEntry.getKey()), + valueCodec.decode(objectEntry.getValue()))); + } + return entries; } @Override public V putIfAbsent(final K key, final V value) { - return (V) getResponse(messageFactory.putIfAbsentMessage(key, value)); + Object response = getResponse(messageFactory.putIfAbsentMessage(keyCodec.encode(key), valueCodec.encode(value))); + return valueCodec.decode(response); } @Override diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java new file mode 100644 index 0000000000..58440889ea --- /dev/null +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java @@ -0,0 +1,27 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.client.internal.service; + +/** + * ValueCodec + */ +interface ValueCodec { + + Object encode(T input); + + T decode(Object input); +} diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java new file mode 100644 index 0000000000..a19450e27d --- /dev/null +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java @@ -0,0 +1,105 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.client.internal.service; + +import org.ehcache.clustered.common.internal.store.ValueWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * ValueCodecFactory + */ +class ValueCodecFactory { + static ValueCodec getCodecForClass(Class clazz) { + if (!Serializable.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("The provided type is invalid as it is not Serializable " + clazz); + } + if (Integer.class.equals(clazz) || Long.class.equals(clazz) + || Float.class.equals(clazz) || Double.class.equals(clazz) + || Byte.class.equals(clazz) || Character.class.equals(clazz) + || clazz.isPrimitive() || String.class.equals(clazz)) { + return new IdentityCodec(); + } else { + return new SerializationWrapperCodec(); + } + } + + private static class IdentityCodec implements ValueCodec { + @Override + public Object encode(T input) { + return input; + } + + @Override + @SuppressWarnings("unchecked") + public T decode(Object input) { + return (T) input; + } + } + + private static class SerializationWrapperCodec implements ValueCodec { + @Override + public Object encode(T input) { + if (input == null) { + return null; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(baos); + try { + oos.writeObject(input); + } catch(IOException e) { + throw new RuntimeException("Object cannot be serialized", e); + } finally { + oos.close(); + } + } catch(IOException e) { + // ignore + } + return new ValueWrapper(input.hashCode(), baos.toByteArray()); + } + + @Override + public T decode(Object input) { + if (input == null) { + return null; + } + ValueWrapper data = (ValueWrapper) input; + ByteArrayInputStream bais = new ByteArrayInputStream(data.getValue()); + try { + ObjectInputStream ois = new ObjectInputStream(bais); + try { + @SuppressWarnings("unchecked") + T result = (T) ois.readObject(); + return result; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load class", e); + } finally { + ois.close(); + } + } catch(IOException e) { + // ignore + } + throw new AssertionError("Cannot reach here!"); + } + } +} diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java index 01327283b7..a19f504036 100644 --- a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java +++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java @@ -35,11 +35,13 @@ import org.terracotta.passthrough.PassthroughServer; import org.terracotta.passthrough.PassthroughTestHelpers; +import java.io.Serializable; import java.lang.reflect.Field; import java.net.URI; import java.util.concurrent.ConcurrentMap; import static org.ehcache.clustered.client.internal.UnitTestConnectionService.getOffheapResourcesType; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -115,10 +117,74 @@ public Class getServiceType() { service.stop(); } + @Test + public void testClusteredStateRepositoryReplicationWithSerializableKV() throws Exception { + ClusteringServiceConfiguration configuration = + ClusteringServiceConfigurationBuilder.cluster(URI.create(STRIPE_URI)) + .autoCreate() + .build(); + + ClusteringService service = new ClusteringServiceFactory().create(configuration); + + service.start(null); + + EhcacheClientEntity clientEntity = getEntity(service); + + ClusteredStateRepository stateRepository = new ClusteredStateRepository(new ClusteringService.ClusteredCacheIdentifier() { + @Override + public String getId() { + return "testStateRepo"; + } + + @Override + public Class getServiceType() { + return ClusteringService.class; + } + }, "test", clientEntity); + + ConcurrentMap testMap = stateRepository.getPersistentConcurrentMap("testMap", TestVal.class, TestVal.class); + testMap.putIfAbsent(new TestVal("One"), new TestVal("One")); + testMap.putIfAbsent(new TestVal("Two"), new TestVal("Two")); + + clusterControl.terminateActive(); + clusterControl.waitForActive(); + + assertThat(testMap.get(new TestVal("One")), is(new TestVal("One"))); + assertThat(testMap.get(new TestVal("Two")), is(new TestVal("Two"))); + + assertThat(testMap.entrySet(), hasSize(2)); + + service.stop(); + } + private static EhcacheClientEntity getEntity(ClusteringService clusteringService) throws NoSuchFieldException, IllegalAccessException { Field entity = clusteringService.getClass().getDeclaredField("entity"); entity.setAccessible(true); return (EhcacheClientEntity)entity.get(clusteringService); } + private static class TestVal implements Serializable { + final String val; + + + private TestVal(String val) { + this.val = val; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestVal testVal = (TestVal) o; + + return val != null ? val.equals(testVal.val) : testVal.val == null; + } + + @Override + public int hashCode() { + return val != null ? val.hashCode() : 0; + } + } + } diff --git a/clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java b/clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java new file mode 100644 index 0000000000..87fb91bf3d --- /dev/null +++ b/clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java @@ -0,0 +1,56 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.common.internal.store; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * ValueWrapper + */ +public class ValueWrapper implements Serializable { + + private static final long serialVersionUID = -4794738044295644587L; + + private final int hashCode; + private final byte[] value; + + public ValueWrapper(int hashCode, byte[] value) { + this.hashCode = hashCode; + this.value = value; + } + + public byte[] getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ValueWrapper that = (ValueWrapper) o; + + if (hashCode != that.hashCode) return false; + return Arrays.equals(value, that.value); + } + + @Override + public int hashCode() { + return hashCode; + } +}