Skip to content

Commit

Permalink
Merge pull request #1838 from ljacomet/issue-1814-1-merge
Browse files Browse the repository at this point in the history
[release/3.2] Issue #1814 Passive sync of state repository data
  • Loading branch information
albinsuresh authored Feb 10, 2017
2 parents c5ec770 + ea0dfef commit d03e9d3
Show file tree
Hide file tree
Showing 15 changed files with 456 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,39 @@
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
import org.ehcache.spi.persistence.StateHolder;

import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;

import static org.ehcache.clustered.client.internal.service.ValueCodecFactory.getCodecForClass;

public class ClusteredStateHolder<K, V> implements StateHolder<K, V> {

private final StateRepositoryMessageFactory messageFactory;
private final EhcacheClientEntity entity;
private final Class<K> keyClass;
private final ValueCodec<K> keyCodec;
private final ValueCodec<V> valueCodec;

public ClusteredStateHolder(final String cacheId, final String mapId, final EhcacheClientEntity entity) {
public ClusteredStateHolder(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class<K> keyClass, Class<V> valueClass) {
this.keyClass = keyClass;
this.keyCodec = getCodecForClass(keyClass);
this.valueCodec = getCodecForClass(valueClass);
this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId, entity.getClientId());
this.entity = entity;
}

@Override
@SuppressWarnings("unchecked")
public V get(final Object key) {
return (V) getResponse(messageFactory.getMessage(key));
if (!keyClass.isAssignableFrom(key.getClass())) {
return null;
}
@SuppressWarnings("unchecked")
Object response = getResponse(messageFactory.getMessage(keyCodec.encode((K) key)));
return valueCodec.decode(response);
}

private Object getResponse(StateRepositoryOpMessage message) {
Expand All @@ -57,13 +72,21 @@ private Object getResponse(StateRepositoryOpMessage message) {
@Override
@SuppressWarnings("unchecked")
public Set<Map.Entry<K, V>> entrySet() {
return (Set<Map.Entry<K, V>>) getResponse(messageFactory.entrySetMessage());
@SuppressWarnings("unchecked")
Set<Map.Entry<Object, Object>> response = (Set<Map.Entry<Object, Object>>) getResponse(messageFactory.entrySetMessage());
Set<Map.Entry<K, V>> entries = new HashSet<Map.Entry<K, V>>();
for (Map.Entry<Object, Object> objectEntry : response) {
entries.add(new AbstractMap.SimpleEntry<K, V>(keyCodec.decode(objectEntry.getKey()),
valueCodec.decode(objectEntry.getValue())));
}
return entries;
}

@Override
@SuppressWarnings("unchecked")
public V putIfAbsent(final K key, final V value) {
return (V) getResponse(messageFactory.putIfAbsentMessage(key, value));
Object response = getResponse(messageFactory.putIfAbsentMessage(keyCodec.encode(key), valueCodec.encode(value)));
return valueCodec.decode(response);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ class ClusteredStateRepository implements StateRepository {

@Override
public <K extends Serializable, V extends Serializable> StateHolder<K, V> getPersistentStateHolder(String name, Class<K> keyClass, Class<V> valueClass) {
return new ClusteredStateHolder<K, V>(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity);
return new ClusteredStateHolder<K, V>(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.client.internal.service;

/**
* ValueCodec
*/
interface ValueCodec<T> {

Object encode(T input);

T decode(Object input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.client.internal.service;

import org.ehcache.clustered.common.internal.store.ValueWrapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
* ValueCodecFactory
*/
class ValueCodecFactory {
static <T> ValueCodec<T> getCodecForClass(Class<T> clazz) {
if (!Serializable.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException("The provided type is invalid as it is not Serializable " + clazz);
}
if (Integer.class.equals(clazz) || Long.class.equals(clazz)
|| Float.class.equals(clazz) || Double.class.equals(clazz)
|| Byte.class.equals(clazz) || Character.class.equals(clazz)
|| clazz.isPrimitive() || String.class.equals(clazz)) {
return new IdentityCodec<T>();
} else {
return new SerializationWrapperCodec<T>();
}
}

private static class IdentityCodec<T> implements ValueCodec<T> {
@Override
public Object encode(T input) {
return input;
}

@Override
@SuppressWarnings("unchecked")
public T decode(Object input) {
return (T) input;
}
}

private static class SerializationWrapperCodec<T> implements ValueCodec<T> {
@Override
public Object encode(T input) {
if (input == null) {
return null;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(input);
} catch(IOException e) {
throw new RuntimeException("Object cannot be serialized", e);
} finally {
oos.close();
}
} catch(IOException e) {
// ignore
}
return new ValueWrapper(input.hashCode(), baos.toByteArray());
}

@Override
public T decode(Object input) {
if (input == null) {
return null;
}
ValueWrapper data = (ValueWrapper) input;
ByteArrayInputStream bais = new ByteArrayInputStream(data.getValue());
try {
ObjectInputStream ois = new ObjectInputStream(bais);
try {
@SuppressWarnings("unchecked")
T result = (T) ois.readObject();
return result;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load class", e);
} finally {
ois.close();
}
} catch(IOException e) {
// ignore
}
throw new AssertionError("Cannot reach here!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
import org.terracotta.offheapresource.OffHeapResourcesProvider;
import org.terracotta.offheapresource.config.MemoryUnit;
import org.terracotta.passthrough.PassthroughClusterControl;
import org.terracotta.passthrough.PassthroughServer;
import org.terracotta.passthrough.PassthroughTestHelpers;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.URI;

import static org.ehcache.clustered.client.internal.UnitTestConnectionService.getOffheapResourcesType;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

Expand All @@ -51,18 +52,15 @@ public class ClusteredStateRepositoryReplicationTest {
@Before
public void setUp() throws Exception {
this.clusterControl = PassthroughTestHelpers.createActivePassive(STRIPENAME,
new PassthroughTestHelpers.ServerInitializer() {
@Override
public void registerServicesForServer(PassthroughServer server) {
server.registerServerEntityService(new EhcacheServerEntityService());
server.registerClientEntityService(new EhcacheClientEntityService());
server.registerServerEntityService(new VoltronReadWriteLockServerEntityService());
server.registerClientEntityService(new VoltronReadWriteLockEntityClientService());
server.registerExtendedConfiguration(new OffHeapResourcesProvider(getOffheapResourcesType("test", 32, MemoryUnit.MB)));

UnitTestConnectionService.addServerToStripe(STRIPENAME, server);
}
}
server -> {
server.registerServerEntityService(new EhcacheServerEntityService());
server.registerClientEntityService(new EhcacheClientEntityService());
server.registerServerEntityService(new VoltronReadWriteLockServerEntityService());
server.registerClientEntityService(new VoltronReadWriteLockEntityClientService());
server.registerExtendedConfiguration(new OffHeapResourcesProvider(getOffheapResourcesType("test", 32, MemoryUnit.MB)));

UnitTestConnectionService.addServerToStripe(STRIPENAME, server);
}
);

clusterControl.waitForActive();
Expand Down Expand Up @@ -113,10 +111,74 @@ public Class<ClusteringService> getServiceType() {
service.stop();
}

@Test
public void testClusteredStateRepositoryReplicationWithSerializableKV() throws Exception {
ClusteringServiceConfiguration configuration =
ClusteringServiceConfigurationBuilder.cluster(URI.create(STRIPE_URI))
.autoCreate()
.build();

ClusteringService service = new ClusteringServiceFactory().create(configuration);

service.start(null);

EhcacheClientEntity clientEntity = getEntity(service);

ClusteredStateRepository stateRepository = new ClusteredStateRepository(new ClusteringService.ClusteredCacheIdentifier() {
@Override
public String getId() {
return "testStateRepo";
}

@Override
public Class<ClusteringService> getServiceType() {
return ClusteringService.class;
}
}, "test", clientEntity);

StateHolder<TestVal, TestVal> testMap = stateRepository.getPersistentStateHolder("testMap", TestVal.class, TestVal.class);
testMap.putIfAbsent(new TestVal("One"), new TestVal("One"));
testMap.putIfAbsent(new TestVal("Two"), new TestVal("Two"));

clusterControl.terminateActive();
clusterControl.waitForActive();

assertThat(testMap.get(new TestVal("One")), is(new TestVal("One")));
assertThat(testMap.get(new TestVal("Two")), is(new TestVal("Two")));

assertThat(testMap.entrySet(), hasSize(2));

service.stop();
}

private static EhcacheClientEntity getEntity(ClusteringService clusteringService) throws NoSuchFieldException, IllegalAccessException {
Field entity = clusteringService.getClass().getDeclaredField("entity");
entity.setAccessible(true);
return (EhcacheClientEntity)entity.get(clusteringService);
}

private static class TestVal implements Serializable {
final String val;


private TestVal(String val) {
this.val = val;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TestVal testVal = (TestVal) o;

return val != null ? val.equals(testVal.val) : testVal.val == null;
}

@Override
public int hashCode() {
return val != null ? val.hashCode() : 0;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.common.internal.store;

import java.io.Serializable;
import java.util.Arrays;

/**
* ValueWrapper
*/
public class ValueWrapper implements Serializable {

private static final long serialVersionUID = -4794738044295644587L;

private final int hashCode;
private final byte[] value;

public ValueWrapper(int hashCode, byte[] value) {
this.hashCode = hashCode;
this.value = value;
}

public byte[] getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ValueWrapper that = (ValueWrapper) o;

if (hashCode != that.hashCode) return false;
return Arrays.equals(value, that.value);
}

@Override
public int hashCode() {
return hashCode;
}
}
Loading

0 comments on commit d03e9d3

Please sign in to comment.