Skip to content

Commit

Permalink
Merge pull request #1836 from ljacomet/issue-1814-1
Browse files Browse the repository at this point in the history
[release/3.1] Issue #1814 StateRepository data format change
  • Loading branch information
albinsuresh authored Feb 10, 2017
2 parents 23a15dc + 5540d61 commit c61b5e3
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ class ClusteredStateRepository implements StateRepository {

@Override
public <K extends Serializable, V extends Serializable> ConcurrentMap<K, V> getPersistentConcurrentMap(String name, Class<K> keyClass, Class<V> valueClass) {
return new ConcurrentClusteredMap<K, V>(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity);
return new ConcurrentClusteredMap<K, V>(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,28 @@
import org.ehcache.clustered.common.internal.messages.StateRepositoryMessageFactory;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;

import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;

import static org.ehcache.clustered.client.internal.service.ValueCodecFactory.getCodecForClass;

public class ConcurrentClusteredMap<K, V> implements ConcurrentMap<K, V> {

private final StateRepositoryMessageFactory messageFactory;
private final EhcacheClientEntity entity;

public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity) {
private final Class<K> keyClass;
private final ValueCodec<K> keyCodec;
private final ValueCodec<V> valueCodec;

public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class<K> keyClass, Class<V> valueClass) {
this.keyClass = keyClass;
this.keyCodec = getCodecForClass(keyClass);
this.valueCodec = getCodecForClass(valueClass);
this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId);
this.entity = entity;
}
Expand All @@ -60,7 +70,12 @@ public boolean containsValue(final Object value) {

@Override
public V get(final Object key) {
return (V) getResponse(messageFactory.getMessage(key));
if (!keyClass.isAssignableFrom(key.getClass())) {
return null;
}
@SuppressWarnings("unchecked")
Object response = getResponse(messageFactory.getMessage(keyCodec.encode((K) key)));
return valueCodec.decode(response);
}

private Object getResponse(StateRepositoryOpMessage message) {
Expand Down Expand Up @@ -106,12 +121,20 @@ public Collection<V> values() {

@Override
public Set<Entry<K, V>> entrySet() {
return (Set<Entry<K, V>>) getResponse(messageFactory.entrySetMessage());
@SuppressWarnings("unchecked")
Set<Entry<Object, Object>> response = (Set<Entry<Object, Object>>) getResponse(messageFactory.entrySetMessage());
Set<Entry<K, V>> entries = new HashSet<Entry<K, V>>();
for (Entry<Object, Object> objectEntry : response) {
entries.add(new AbstractMap.SimpleEntry<K, V>(keyCodec.decode(objectEntry.getKey()),
valueCodec.decode(objectEntry.getValue())));
}
return entries;
}

@Override
public V putIfAbsent(final K key, final V value) {
return (V) getResponse(messageFactory.putIfAbsentMessage(key, value));
Object response = getResponse(messageFactory.putIfAbsentMessage(keyCodec.encode(key), valueCodec.encode(value)));
return valueCodec.decode(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.client.internal.service;

/**
* ValueCodec
*/
interface ValueCodec<T> {

Object encode(T input);

T decode(Object input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.client.internal.service;

import org.ehcache.clustered.common.internal.store.ValueWrapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
* ValueCodecFactory
*/
class ValueCodecFactory {
static <T> ValueCodec<T> getCodecForClass(Class<T> clazz) {
if (!Serializable.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException("The provided type is invalid as it is not Serializable " + clazz);
}
if (Integer.class.equals(clazz) || Long.class.equals(clazz)
|| Float.class.equals(clazz) || Double.class.equals(clazz)
|| Byte.class.equals(clazz) || Character.class.equals(clazz)
|| clazz.isPrimitive() || String.class.equals(clazz)) {
return new IdentityCodec<T>();
} else {
return new SerializationWrapperCodec<T>();
}
}

private static class IdentityCodec<T> implements ValueCodec<T> {
@Override
public Object encode(T input) {
return input;
}

@Override
@SuppressWarnings("unchecked")
public T decode(Object input) {
return (T) input;
}
}

private static class SerializationWrapperCodec<T> implements ValueCodec<T> {
@Override
public Object encode(T input) {
if (input == null) {
return null;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(input);
} catch(IOException e) {
throw new RuntimeException("Object cannot be serialized", e);
} finally {
oos.close();
}
} catch(IOException e) {
// ignore
}
return new ValueWrapper(input.hashCode(), baos.toByteArray());
}

@Override
public T decode(Object input) {
if (input == null) {
return null;
}
ValueWrapper data = (ValueWrapper) input;
ByteArrayInputStream bais = new ByteArrayInputStream(data.getValue());
try {
ObjectInputStream ois = new ObjectInputStream(bais);
try {
@SuppressWarnings("unchecked")
T result = (T) ois.readObject();
return result;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load class", e);
} finally {
ois.close();
}
} catch(IOException e) {
// ignore
}
throw new AssertionError("Cannot reach here!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.terracotta.passthrough.PassthroughServer;
import org.terracotta.passthrough.PassthroughTestHelpers;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.concurrent.ConcurrentMap;

import static org.ehcache.clustered.client.internal.UnitTestConnectionService.getOffheapResourcesType;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -115,10 +117,74 @@ public Class<ClusteringService> getServiceType() {
service.stop();
}

@Test
public void testClusteredStateRepositoryReplicationWithSerializableKV() throws Exception {
ClusteringServiceConfiguration configuration =
ClusteringServiceConfigurationBuilder.cluster(URI.create(STRIPE_URI))
.autoCreate()
.build();

ClusteringService service = new ClusteringServiceFactory().create(configuration);

service.start(null);

EhcacheClientEntity clientEntity = getEntity(service);

ClusteredStateRepository stateRepository = new ClusteredStateRepository(new ClusteringService.ClusteredCacheIdentifier() {
@Override
public String getId() {
return "testStateRepo";
}

@Override
public Class<ClusteringService> getServiceType() {
return ClusteringService.class;
}
}, "test", clientEntity);

ConcurrentMap<TestVal, TestVal> testMap = stateRepository.getPersistentConcurrentMap("testMap", TestVal.class, TestVal.class);
testMap.putIfAbsent(new TestVal("One"), new TestVal("One"));
testMap.putIfAbsent(new TestVal("Two"), new TestVal("Two"));

clusterControl.terminateActive();
clusterControl.waitForActive();

assertThat(testMap.get(new TestVal("One")), is(new TestVal("One")));
assertThat(testMap.get(new TestVal("Two")), is(new TestVal("Two")));

assertThat(testMap.entrySet(), hasSize(2));

service.stop();
}

private static EhcacheClientEntity getEntity(ClusteringService clusteringService) throws NoSuchFieldException, IllegalAccessException {
Field entity = clusteringService.getClass().getDeclaredField("entity");
entity.setAccessible(true);
return (EhcacheClientEntity)entity.get(clusteringService);
}

private static class TestVal implements Serializable {
final String val;


private TestVal(String val) {
this.val = val;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TestVal testVal = (TestVal) o;

return val != null ? val.equals(testVal.val) : testVal.val == null;
}

@Override
public int hashCode() {
return val != null ? val.hashCode() : 0;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.common.internal.store;

import java.io.Serializable;
import java.util.Arrays;

/**
* ValueWrapper
*/
public class ValueWrapper implements Serializable {

private static final long serialVersionUID = -4794738044295644587L;

private final int hashCode;
private final byte[] value;

public ValueWrapper(int hashCode, byte[] value) {
this.hashCode = hashCode;
this.value = value;
}

public byte[] getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ValueWrapper that = (ValueWrapper) o;

if (hashCode != that.hashCode) return false;
return Arrays.equals(value, that.value);
}

@Override
public int hashCode() {
return hashCode;
}
}

0 comments on commit c61b5e3

Please sign in to comment.