diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java index 8e18d37626..d0693a7458 100644 --- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java @@ -23,16 +23,26 @@ import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; import org.ehcache.spi.persistence.StateHolder; +import java.util.AbstractMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; +import static org.ehcache.clustered.client.internal.service.ValueCodecFactory.getCodecForClass; + public class ClusteredStateHolder implements StateHolder { private final StateRepositoryMessageFactory messageFactory; private final EhcacheClientEntity entity; + private final Class keyClass; + private final ValueCodec keyCodec; + private final ValueCodec valueCodec; - public ClusteredStateHolder(final String cacheId, final String mapId, final EhcacheClientEntity entity) { + public ClusteredStateHolder(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class keyClass, Class valueClass) { + this.keyClass = keyClass; + this.keyCodec = getCodecForClass(keyClass); + this.valueCodec = getCodecForClass(valueClass); this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId, entity.getClientId()); this.entity = entity; } @@ -40,7 +50,12 @@ public ClusteredStateHolder(final String cacheId, final String mapId, final Ehca @Override @SuppressWarnings("unchecked") public V get(final Object key) { - return (V) getResponse(messageFactory.getMessage(key)); + if (!keyClass.isAssignableFrom(key.getClass())) { + return null; + } + @SuppressWarnings("unchecked") + Object response = getResponse(messageFactory.getMessage(keyCodec.encode((K) key))); + return valueCodec.decode(response); } private Object getResponse(StateRepositoryOpMessage message) { @@ -57,13 +72,21 @@ private Object getResponse(StateRepositoryOpMessage message) { @Override @SuppressWarnings("unchecked") public Set> entrySet() { - return (Set>) getResponse(messageFactory.entrySetMessage()); + @SuppressWarnings("unchecked") + Set> response = (Set>) getResponse(messageFactory.entrySetMessage()); + Set> entries = new HashSet>(); + for (Map.Entry objectEntry : response) { + entries.add(new AbstractMap.SimpleEntry(keyCodec.decode(objectEntry.getKey()), + valueCodec.decode(objectEntry.getValue()))); + } + return entries; } @Override @SuppressWarnings("unchecked") public V putIfAbsent(final K key, final V value) { - return (V) getResponse(messageFactory.putIfAbsentMessage(key, value)); + Object response = getResponse(messageFactory.putIfAbsentMessage(keyCodec.encode(key), valueCodec.encode(value))); + return valueCodec.decode(response); } } diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java index ca70c4cef3..d4205d028e 100644 --- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java @@ -40,6 +40,6 @@ class ClusteredStateRepository implements StateRepository { @Override public StateHolder getPersistentStateHolder(String name, Class keyClass, Class valueClass) { - return new ClusteredStateHolder(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity); + return new ClusteredStateHolder(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass); } } diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java new file mode 100644 index 0000000000..58440889ea --- /dev/null +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodec.java @@ -0,0 +1,27 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.client.internal.service; + +/** + * ValueCodec + */ +interface ValueCodec { + + Object encode(T input); + + T decode(Object input); +} diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java new file mode 100644 index 0000000000..a19450e27d --- /dev/null +++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ValueCodecFactory.java @@ -0,0 +1,105 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.client.internal.service; + +import org.ehcache.clustered.common.internal.store.ValueWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * ValueCodecFactory + */ +class ValueCodecFactory { + static ValueCodec getCodecForClass(Class clazz) { + if (!Serializable.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("The provided type is invalid as it is not Serializable " + clazz); + } + if (Integer.class.equals(clazz) || Long.class.equals(clazz) + || Float.class.equals(clazz) || Double.class.equals(clazz) + || Byte.class.equals(clazz) || Character.class.equals(clazz) + || clazz.isPrimitive() || String.class.equals(clazz)) { + return new IdentityCodec(); + } else { + return new SerializationWrapperCodec(); + } + } + + private static class IdentityCodec implements ValueCodec { + @Override + public Object encode(T input) { + return input; + } + + @Override + @SuppressWarnings("unchecked") + public T decode(Object input) { + return (T) input; + } + } + + private static class SerializationWrapperCodec implements ValueCodec { + @Override + public Object encode(T input) { + if (input == null) { + return null; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(baos); + try { + oos.writeObject(input); + } catch(IOException e) { + throw new RuntimeException("Object cannot be serialized", e); + } finally { + oos.close(); + } + } catch(IOException e) { + // ignore + } + return new ValueWrapper(input.hashCode(), baos.toByteArray()); + } + + @Override + public T decode(Object input) { + if (input == null) { + return null; + } + ValueWrapper data = (ValueWrapper) input; + ByteArrayInputStream bais = new ByteArrayInputStream(data.getValue()); + try { + ObjectInputStream ois = new ObjectInputStream(bais); + try { + @SuppressWarnings("unchecked") + T result = (T) ois.readObject(); + return result; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load class", e); + } finally { + ois.close(); + } + } catch(IOException e) { + // ignore + } + throw new AssertionError("Cannot reach here!"); + } + } +} diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java index a13b0534e0..c44abf73a1 100644 --- a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java +++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepositoryReplicationTest.java @@ -32,13 +32,14 @@ import org.terracotta.offheapresource.OffHeapResourcesProvider; import org.terracotta.offheapresource.config.MemoryUnit; import org.terracotta.passthrough.PassthroughClusterControl; -import org.terracotta.passthrough.PassthroughServer; import org.terracotta.passthrough.PassthroughTestHelpers; +import java.io.Serializable; import java.lang.reflect.Field; import java.net.URI; import static org.ehcache.clustered.client.internal.UnitTestConnectionService.getOffheapResourcesType; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -51,18 +52,15 @@ public class ClusteredStateRepositoryReplicationTest { @Before public void setUp() throws Exception { this.clusterControl = PassthroughTestHelpers.createActivePassive(STRIPENAME, - new PassthroughTestHelpers.ServerInitializer() { - @Override - public void registerServicesForServer(PassthroughServer server) { - server.registerServerEntityService(new EhcacheServerEntityService()); - server.registerClientEntityService(new EhcacheClientEntityService()); - server.registerServerEntityService(new VoltronReadWriteLockServerEntityService()); - server.registerClientEntityService(new VoltronReadWriteLockEntityClientService()); - server.registerExtendedConfiguration(new OffHeapResourcesProvider(getOffheapResourcesType("test", 32, MemoryUnit.MB))); - - UnitTestConnectionService.addServerToStripe(STRIPENAME, server); - } - } + server -> { + server.registerServerEntityService(new EhcacheServerEntityService()); + server.registerClientEntityService(new EhcacheClientEntityService()); + server.registerServerEntityService(new VoltronReadWriteLockServerEntityService()); + server.registerClientEntityService(new VoltronReadWriteLockEntityClientService()); + server.registerExtendedConfiguration(new OffHeapResourcesProvider(getOffheapResourcesType("test", 32, MemoryUnit.MB))); + + UnitTestConnectionService.addServerToStripe(STRIPENAME, server); + } ); clusterControl.waitForActive(); @@ -113,10 +111,74 @@ public Class getServiceType() { service.stop(); } + @Test + public void testClusteredStateRepositoryReplicationWithSerializableKV() throws Exception { + ClusteringServiceConfiguration configuration = + ClusteringServiceConfigurationBuilder.cluster(URI.create(STRIPE_URI)) + .autoCreate() + .build(); + + ClusteringService service = new ClusteringServiceFactory().create(configuration); + + service.start(null); + + EhcacheClientEntity clientEntity = getEntity(service); + + ClusteredStateRepository stateRepository = new ClusteredStateRepository(new ClusteringService.ClusteredCacheIdentifier() { + @Override + public String getId() { + return "testStateRepo"; + } + + @Override + public Class getServiceType() { + return ClusteringService.class; + } + }, "test", clientEntity); + + StateHolder testMap = stateRepository.getPersistentStateHolder("testMap", TestVal.class, TestVal.class); + testMap.putIfAbsent(new TestVal("One"), new TestVal("One")); + testMap.putIfAbsent(new TestVal("Two"), new TestVal("Two")); + + clusterControl.terminateActive(); + clusterControl.waitForActive(); + + assertThat(testMap.get(new TestVal("One")), is(new TestVal("One"))); + assertThat(testMap.get(new TestVal("Two")), is(new TestVal("Two"))); + + assertThat(testMap.entrySet(), hasSize(2)); + + service.stop(); + } + private static EhcacheClientEntity getEntity(ClusteringService clusteringService) throws NoSuchFieldException, IllegalAccessException { Field entity = clusteringService.getClass().getDeclaredField("entity"); entity.setAccessible(true); return (EhcacheClientEntity)entity.get(clusteringService); } + private static class TestVal implements Serializable { + final String val; + + + private TestVal(String val) { + this.val = val; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestVal testVal = (TestVal) o; + + return val != null ? val.equals(testVal.val) : testVal.val == null; + } + + @Override + public int hashCode() { + return val != null ? val.hashCode() : 0; + } + } + } diff --git a/clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java b/clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java new file mode 100644 index 0000000000..87fb91bf3d --- /dev/null +++ b/clustered/common/src/main/java/org/ehcache/clustered/common/internal/store/ValueWrapper.java @@ -0,0 +1,56 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.common.internal.store; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * ValueWrapper + */ +public class ValueWrapper implements Serializable { + + private static final long serialVersionUID = -4794738044295644587L; + + private final int hashCode; + private final byte[] value; + + public ValueWrapper(int hashCode, byte[] value) { + this.hashCode = hashCode; + this.value = value; + } + + public byte[] getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ValueWrapper that = (ValueWrapper) o; + + if (hashCode != that.hashCode) return false; + return Arrays.equals(value, that.value); + } + + @Override + public int hashCode() { + return hashCode; + } +} diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java index 2a65ec7d52..07925d9785 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java @@ -67,6 +67,7 @@ import org.ehcache.clustered.server.internal.messages.EhcacheDataSyncMessage; import org.ehcache.clustered.server.internal.messages.EhcacheStateSyncMessage; import org.ehcache.clustered.server.management.Management; +import org.ehcache.clustered.server.repo.StateRepositoryManager; import org.ehcache.clustered.server.state.EhcacheStateService; import org.ehcache.clustered.server.state.InvalidationTracker; import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig; @@ -372,6 +373,10 @@ public void synchronizeKeyToPassive(PassiveSynchronizationChannel + stateRepositoryManager.syncMessageFor(storeName).forEach(syncChannel::synchronizeToPassive)); } else { Long dataSizeThreshold = Long.getLong(SYNC_DATA_SIZE_PROP, DEFAULT_SYNC_DATA_SIZE_THRESHOLD); AtomicLong size = new AtomicLong(0); diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java index d91468c8e4..c8605302ac 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java @@ -21,15 +21,14 @@ import org.ehcache.clustered.common.internal.ClusteredEhcacheIdentity; import org.ehcache.clustered.common.internal.ServerStoreConfiguration; import org.ehcache.clustered.common.internal.exceptions.ClusterException; -import org.ehcache.clustered.common.internal.exceptions.IllegalMessageException; import org.ehcache.clustered.common.internal.exceptions.LifecycleException; -import org.ehcache.clustered.common.internal.exceptions.ServerMisconfigurationException; import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage; import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse; import org.ehcache.clustered.common.internal.messages.EhcacheMessageType; import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage; import org.ehcache.clustered.common.internal.messages.LifecycleMessage; import org.ehcache.clustered.common.internal.messages.LifecycleMessage.ConfigureStoreManager; +import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ChainReplicationMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ClearInvalidationCompleteMessage; @@ -46,14 +45,10 @@ import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.terracotta.entity.BasicServiceConfiguration; import org.terracotta.entity.PassiveServerEntity; import org.terracotta.entity.ServiceRegistry; -import org.terracotta.offheapresource.OffHeapResources; -import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -237,6 +232,10 @@ private void invokeSyncOperation(EhcacheSyncMessage message) throws ClusterExcep }); break; + case STATE_REPO: + EhcacheStateRepoSyncMessage stateRepoSyncMessage = (EhcacheStateRepoSyncMessage) message; + ehcacheStateService.getStateRepositoryManager().processSyncMessage(stateRepoSyncMessage); + break; default: throw new AssertionError("Unsupported Sync operation " + message.getMessageType()); } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java index 8b3789f505..5bd171f609 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import org.terracotta.context.TreeNode; import org.terracotta.offheapresource.OffHeapResource; -import org.terracotta.offheapresource.OffHeapResourceIdentifier; import org.terracotta.offheapresource.OffHeapResources; import org.terracotta.offheapstore.paging.PageSource; import org.terracotta.statistics.StatisticsManager; @@ -472,7 +471,7 @@ public boolean isConfigured() { } @Override - public StateRepositoryManager getStateRepositoryManager() throws ClusterException { + public StateRepositoryManager getStateRepositoryManager() { return this.stateRepositoryManager; } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheStateRepoSyncMessage.java b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheStateRepoSyncMessage.java new file mode 100644 index 0000000000..ddaed046c0 --- /dev/null +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheStateRepoSyncMessage.java @@ -0,0 +1,52 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.server.internal.messages; + +import java.util.concurrent.ConcurrentMap; + +/** + * EhcacheStateRepoSyncMessage + */ +public class EhcacheStateRepoSyncMessage extends EhcacheSyncMessage { + + private final String cacheId; + private final String mapId; + private final ConcurrentMap mappings; + + public EhcacheStateRepoSyncMessage(String cacheId, String mapId, ConcurrentMap mappings) { + this.cacheId = cacheId; + this.mapId = mapId; + this.mappings = mappings; + } + + @Override + public SyncMessageType getMessageType() { + return SyncMessageType.STATE_REPO; + } + + public ConcurrentMap getMappings() { + return mappings; + } + + public String getCacheId() { + return cacheId; + } + + public String getMapId() { + return mapId; + } +} diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java index a54a7b0e4c..9864c9d1ac 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java @@ -36,12 +36,18 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import static java.nio.ByteBuffer.wrap; import static org.ehcache.clustered.common.internal.messages.ChainCodec.CHAIN_STRUCT; import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.KEY_FIELD; import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.SERVER_STORE_NAME_FIELD; +import static org.ehcache.clustered.common.internal.store.Util.marshall; +import static org.ehcache.clustered.common.internal.store.Util.unmarshall; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.DATA; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.STATE; +import static org.ehcache.clustered.server.internal.messages.SyncMessageType.STATE_REPO; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_FIELD_INDEX; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_FIELD_NAME; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_MAPPING; @@ -54,6 +60,9 @@ public class EhcacheSyncMessageCodec implements SyncMessageCodec entryEncoder.byteBuffer(KEY_FIELD, wrap(marshall(entry.getKey()))) + .byteBuffer(STATE_REPO_VALUE_FIELD, wrap(marshall(entry.getValue())))); + return encoder.encode().array(); + } default: throw new IllegalArgumentException("Sync message codec can not encode " + syncMessage.getMessageType()); } @@ -126,7 +158,7 @@ public byte[] encode(final int concurrencyKey, final EhcacheEntityMessage messag @Override public EhcacheSyncMessage decode(final int concurrencyKey, final byte[] payload) throws MessageCodecException { - ByteBuffer message = ByteBuffer.wrap(payload); + ByteBuffer message = wrap(payload); StructDecoder decoder = stateSyncStruct.decoder(message); Enm enm = decoder.enm(SYNC_MESSAGE_TYPE_FIELD_NAME); if (!enm.isFound()) { @@ -148,6 +180,22 @@ public EhcacheSyncMessage decode(final int concurrencyKey, final byte[] payload) String storeName = decoder.string(SERVER_STORE_NAME_FIELD); Map chainMap = decodeChainMapEntries(decoder); return new EhcacheDataSyncMessage(storeName, chainMap); + case STATE_REPO: + message.rewind(); + decoder = STATE_REPO_SYNC_STRUCT.decoder(message); + String storeId = decoder.string(SERVER_STORE_NAME_FIELD); + String mapId = decoder.string(STATE_REPO_MAP_NAME_FIELD); + ConcurrentMap mappings = new ConcurrentHashMap<>(); + StructArrayDecoder> structsDecoder = decoder.structs(STATE_REPO_ENTRIES_SUB_STRUCT); + if (structsDecoder != null) { + for (int i = 0; i < structsDecoder.length(); i++) { + Object key = unmarshall(structsDecoder.byteBuffer(KEY_FIELD)); + Object value = unmarshall(structsDecoder.byteBuffer(STATE_REPO_VALUE_FIELD)); + mappings.put(key, value); + structsDecoder.next(); + } + } + return new EhcacheStateRepoSyncMessage(storeId, mapId, mappings); default: throw new AssertionError("Cannot happen given earlier checks"); } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java index 73ac2f2dbd..0636d98417 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java @@ -28,12 +28,14 @@ @CommonComponent public enum SyncMessageType { STATE, + STATE_REPO, DATA; public static final String SYNC_MESSAGE_TYPE_FIELD_NAME = "msgType"; public static final int SYNC_MESSAGE_TYPE_FIELD_INDEX = 10; public static final EnumMapping SYNC_MESSAGE_TYPE_MAPPING = newEnumMappingBuilder(SyncMessageType.class) .mapping(STATE, 1) + .mapping(STATE_REPO, 5) .mapping(DATA, 10) .build(); } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java index f4f878d92f..e6bc05876e 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java @@ -17,11 +17,14 @@ package org.ehcache.clustered.server.repo; import org.ehcache.clustered.common.internal.exceptions.ClusterException; -import org.ehcache.clustered.common.internal.exceptions.IllegalMessageException; import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse; import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; +import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage; import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -32,14 +35,7 @@ class ServerStateRepository { EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterException { String mapId = message.getMapId(); - ConcurrentMap map = concurrentMapRepo.get(mapId); - if (map == null) { - ConcurrentHashMap newMap = new ConcurrentHashMap<>(); - map = concurrentMapRepo.putIfAbsent(mapId, newMap); - if (map == null) { - map = newMap; - } - } + ConcurrentMap map = getStateMap(mapId); Object result; switch (message.getMessageType()) { @@ -62,4 +58,28 @@ EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterExc } return EhcacheEntityResponse.mapValue(result); } + + private ConcurrentMap getStateMap(String mapId) { + ConcurrentMap map = concurrentMapRepo.get(mapId); + if (map == null) { + ConcurrentHashMap newMap = new ConcurrentHashMap<>(); + map = concurrentMapRepo.putIfAbsent(mapId, newMap); + if (map == null) { + map = newMap; + } + } + return map; + } + + List syncMessage(String cacheId) { + ArrayList result = new ArrayList<>(); + for (Map.Entry> entry : concurrentMapRepo.entrySet()) { + result.add(new EhcacheStateRepoSyncMessage(cacheId, entry.getKey(), entry.getValue())); + } + return result; + } + + void processSyncMessage(EhcacheStateRepoSyncMessage stateRepoSyncMessage) { + concurrentMapRepo.put(stateRepoSyncMessage.getMapId(), stateRepoSyncMessage.getMappings()); + } } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java index c795010b69..58c43aeecb 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java @@ -17,19 +17,18 @@ package org.ehcache.clustered.server.repo; import org.ehcache.clustered.common.internal.exceptions.ClusterException; -import org.ehcache.clustered.common.internal.exceptions.LifecycleException; import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse; import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; +import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage; import com.tc.classloader.CommonComponent; -import java.util.AbstractMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static java.util.Collections.emptyList; + @CommonComponent public class StateRepositoryManager { @@ -41,6 +40,11 @@ public void destroyStateRepository(String cacheId) throws ClusterException { public EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterException { String cacheId = message.getCacheId(); + ServerStateRepository currentRepo = getServerStateRepository(cacheId); + return currentRepo.invoke(message); + } + + private ServerStateRepository getServerStateRepository(String cacheId) { ServerStateRepository currentRepo = mapRepositoryMap.get(cacheId); if (currentRepo == null) { ServerStateRepository newRepo = new ServerStateRepository(); @@ -49,7 +53,18 @@ public EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws Clu currentRepo = newRepo; } } - return currentRepo.invoke(message); + return currentRepo; } + public List syncMessageFor(String cacheId) { + ServerStateRepository repository = mapRepositoryMap.get(cacheId); + if (repository != null) { + return repository.syncMessage(cacheId); + } + return emptyList(); + } + + public void processSyncMessage(EhcacheStateRepoSyncMessage stateRepoSyncMessage) { + getServerStateRepository(stateRepoSyncMessage.getCacheId()).processSyncMessage(stateRepoSyncMessage); + } } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java b/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java index bcbba9d402..c27811155f 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java @@ -56,7 +56,7 @@ public interface EhcacheStateService { boolean isConfigured(); - StateRepositoryManager getStateRepositoryManager() throws ClusterException; + StateRepositoryManager getStateRepositoryManager(); ClientMessageTracker getClientMessageTracker();