diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java index 2a65ec7d52..07925d9785 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheActiveEntity.java @@ -67,6 +67,7 @@ import org.ehcache.clustered.server.internal.messages.EhcacheDataSyncMessage; import org.ehcache.clustered.server.internal.messages.EhcacheStateSyncMessage; import org.ehcache.clustered.server.management.Management; +import org.ehcache.clustered.server.repo.StateRepositoryManager; import org.ehcache.clustered.server.state.EhcacheStateService; import org.ehcache.clustered.server.state.InvalidationTracker; import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig; @@ -372,6 +373,10 @@ public void synchronizeKeyToPassive(PassiveSynchronizationChannel + stateRepositoryManager.syncMessageFor(storeName).forEach(syncChannel::synchronizeToPassive)); } else { Long dataSizeThreshold = Long.getLong(SYNC_DATA_SIZE_PROP, DEFAULT_SYNC_DATA_SIZE_THRESHOLD); AtomicLong size = new AtomicLong(0); diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java index d91468c8e4..c8605302ac 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcachePassiveEntity.java @@ -21,15 +21,14 @@ import org.ehcache.clustered.common.internal.ClusteredEhcacheIdentity; import org.ehcache.clustered.common.internal.ServerStoreConfiguration; import org.ehcache.clustered.common.internal.exceptions.ClusterException; -import org.ehcache.clustered.common.internal.exceptions.IllegalMessageException; import org.ehcache.clustered.common.internal.exceptions.LifecycleException; -import org.ehcache.clustered.common.internal.exceptions.ServerMisconfigurationException; import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage; import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse; import org.ehcache.clustered.common.internal.messages.EhcacheMessageType; import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage; import org.ehcache.clustered.common.internal.messages.LifecycleMessage; import org.ehcache.clustered.common.internal.messages.LifecycleMessage.ConfigureStoreManager; +import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ChainReplicationMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ClearInvalidationCompleteMessage; @@ -46,14 +45,10 @@ import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.terracotta.entity.BasicServiceConfiguration; import org.terracotta.entity.PassiveServerEntity; import org.terracotta.entity.ServiceRegistry; -import org.terracotta.offheapresource.OffHeapResources; -import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -237,6 +232,10 @@ private void invokeSyncOperation(EhcacheSyncMessage message) throws ClusterExcep }); break; + case STATE_REPO: + EhcacheStateRepoSyncMessage stateRepoSyncMessage = (EhcacheStateRepoSyncMessage) message; + ehcacheStateService.getStateRepositoryManager().processSyncMessage(stateRepoSyncMessage); + break; default: throw new AssertionError("Unsupported Sync operation " + message.getMessageType()); } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java index 8b3789f505..5bd171f609 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/EhcacheStateServiceImpl.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import org.terracotta.context.TreeNode; import org.terracotta.offheapresource.OffHeapResource; -import org.terracotta.offheapresource.OffHeapResourceIdentifier; import org.terracotta.offheapresource.OffHeapResources; import org.terracotta.offheapstore.paging.PageSource; import org.terracotta.statistics.StatisticsManager; @@ -472,7 +471,7 @@ public boolean isConfigured() { } @Override - public StateRepositoryManager getStateRepositoryManager() throws ClusterException { + public StateRepositoryManager getStateRepositoryManager() { return this.stateRepositoryManager; } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheStateRepoSyncMessage.java b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheStateRepoSyncMessage.java new file mode 100644 index 0000000000..ddaed046c0 --- /dev/null +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheStateRepoSyncMessage.java @@ -0,0 +1,52 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.ehcache.clustered.server.internal.messages; + +import java.util.concurrent.ConcurrentMap; + +/** + * EhcacheStateRepoSyncMessage + */ +public class EhcacheStateRepoSyncMessage extends EhcacheSyncMessage { + + private final String cacheId; + private final String mapId; + private final ConcurrentMap mappings; + + public EhcacheStateRepoSyncMessage(String cacheId, String mapId, ConcurrentMap mappings) { + this.cacheId = cacheId; + this.mapId = mapId; + this.mappings = mappings; + } + + @Override + public SyncMessageType getMessageType() { + return SyncMessageType.STATE_REPO; + } + + public ConcurrentMap getMappings() { + return mappings; + } + + public String getCacheId() { + return cacheId; + } + + public String getMapId() { + return mapId; + } +} diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java index a54a7b0e4c..9864c9d1ac 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheSyncMessageCodec.java @@ -36,12 +36,18 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import static java.nio.ByteBuffer.wrap; import static org.ehcache.clustered.common.internal.messages.ChainCodec.CHAIN_STRUCT; import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.KEY_FIELD; import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.SERVER_STORE_NAME_FIELD; +import static org.ehcache.clustered.common.internal.store.Util.marshall; +import static org.ehcache.clustered.common.internal.store.Util.unmarshall; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.DATA; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.STATE; +import static org.ehcache.clustered.server.internal.messages.SyncMessageType.STATE_REPO; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_FIELD_INDEX; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_FIELD_NAME; import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_MAPPING; @@ -54,6 +60,9 @@ public class EhcacheSyncMessageCodec implements SyncMessageCodec entryEncoder.byteBuffer(KEY_FIELD, wrap(marshall(entry.getKey()))) + .byteBuffer(STATE_REPO_VALUE_FIELD, wrap(marshall(entry.getValue())))); + return encoder.encode().array(); + } default: throw new IllegalArgumentException("Sync message codec can not encode " + syncMessage.getMessageType()); } @@ -126,7 +158,7 @@ public byte[] encode(final int concurrencyKey, final EhcacheEntityMessage messag @Override public EhcacheSyncMessage decode(final int concurrencyKey, final byte[] payload) throws MessageCodecException { - ByteBuffer message = ByteBuffer.wrap(payload); + ByteBuffer message = wrap(payload); StructDecoder decoder = stateSyncStruct.decoder(message); Enm enm = decoder.enm(SYNC_MESSAGE_TYPE_FIELD_NAME); if (!enm.isFound()) { @@ -148,6 +180,22 @@ public EhcacheSyncMessage decode(final int concurrencyKey, final byte[] payload) String storeName = decoder.string(SERVER_STORE_NAME_FIELD); Map chainMap = decodeChainMapEntries(decoder); return new EhcacheDataSyncMessage(storeName, chainMap); + case STATE_REPO: + message.rewind(); + decoder = STATE_REPO_SYNC_STRUCT.decoder(message); + String storeId = decoder.string(SERVER_STORE_NAME_FIELD); + String mapId = decoder.string(STATE_REPO_MAP_NAME_FIELD); + ConcurrentMap mappings = new ConcurrentHashMap<>(); + StructArrayDecoder> structsDecoder = decoder.structs(STATE_REPO_ENTRIES_SUB_STRUCT); + if (structsDecoder != null) { + for (int i = 0; i < structsDecoder.length(); i++) { + Object key = unmarshall(structsDecoder.byteBuffer(KEY_FIELD)); + Object value = unmarshall(structsDecoder.byteBuffer(STATE_REPO_VALUE_FIELD)); + mappings.put(key, value); + structsDecoder.next(); + } + } + return new EhcacheStateRepoSyncMessage(storeId, mapId, mappings); default: throw new AssertionError("Cannot happen given earlier checks"); } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java index 73ac2f2dbd..0636d98417 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/internal/messages/SyncMessageType.java @@ -28,12 +28,14 @@ @CommonComponent public enum SyncMessageType { STATE, + STATE_REPO, DATA; public static final String SYNC_MESSAGE_TYPE_FIELD_NAME = "msgType"; public static final int SYNC_MESSAGE_TYPE_FIELD_INDEX = 10; public static final EnumMapping SYNC_MESSAGE_TYPE_MAPPING = newEnumMappingBuilder(SyncMessageType.class) .mapping(STATE, 1) + .mapping(STATE_REPO, 5) .mapping(DATA, 10) .build(); } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java index f4f878d92f..e6bc05876e 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/ServerStateRepository.java @@ -17,11 +17,14 @@ package org.ehcache.clustered.server.repo; import org.ehcache.clustered.common.internal.exceptions.ClusterException; -import org.ehcache.clustered.common.internal.exceptions.IllegalMessageException; import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse; import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; +import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage; import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -32,14 +35,7 @@ class ServerStateRepository { EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterException { String mapId = message.getMapId(); - ConcurrentMap map = concurrentMapRepo.get(mapId); - if (map == null) { - ConcurrentHashMap newMap = new ConcurrentHashMap<>(); - map = concurrentMapRepo.putIfAbsent(mapId, newMap); - if (map == null) { - map = newMap; - } - } + ConcurrentMap map = getStateMap(mapId); Object result; switch (message.getMessageType()) { @@ -62,4 +58,28 @@ EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterExc } return EhcacheEntityResponse.mapValue(result); } + + private ConcurrentMap getStateMap(String mapId) { + ConcurrentMap map = concurrentMapRepo.get(mapId); + if (map == null) { + ConcurrentHashMap newMap = new ConcurrentHashMap<>(); + map = concurrentMapRepo.putIfAbsent(mapId, newMap); + if (map == null) { + map = newMap; + } + } + return map; + } + + List syncMessage(String cacheId) { + ArrayList result = new ArrayList<>(); + for (Map.Entry> entry : concurrentMapRepo.entrySet()) { + result.add(new EhcacheStateRepoSyncMessage(cacheId, entry.getKey(), entry.getValue())); + } + return result; + } + + void processSyncMessage(EhcacheStateRepoSyncMessage stateRepoSyncMessage) { + concurrentMapRepo.put(stateRepoSyncMessage.getMapId(), stateRepoSyncMessage.getMappings()); + } } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java index c795010b69..58c43aeecb 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/repo/StateRepositoryManager.java @@ -17,19 +17,18 @@ package org.ehcache.clustered.server.repo; import org.ehcache.clustered.common.internal.exceptions.ClusterException; -import org.ehcache.clustered.common.internal.exceptions.LifecycleException; import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse; import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; +import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage; import com.tc.classloader.CommonComponent; -import java.util.AbstractMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static java.util.Collections.emptyList; + @CommonComponent public class StateRepositoryManager { @@ -41,6 +40,11 @@ public void destroyStateRepository(String cacheId) throws ClusterException { public EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterException { String cacheId = message.getCacheId(); + ServerStateRepository currentRepo = getServerStateRepository(cacheId); + return currentRepo.invoke(message); + } + + private ServerStateRepository getServerStateRepository(String cacheId) { ServerStateRepository currentRepo = mapRepositoryMap.get(cacheId); if (currentRepo == null) { ServerStateRepository newRepo = new ServerStateRepository(); @@ -49,7 +53,18 @@ public EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws Clu currentRepo = newRepo; } } - return currentRepo.invoke(message); + return currentRepo; } + public List syncMessageFor(String cacheId) { + ServerStateRepository repository = mapRepositoryMap.get(cacheId); + if (repository != null) { + return repository.syncMessage(cacheId); + } + return emptyList(); + } + + public void processSyncMessage(EhcacheStateRepoSyncMessage stateRepoSyncMessage) { + getServerStateRepository(stateRepoSyncMessage.getCacheId()).processSyncMessage(stateRepoSyncMessage); + } } diff --git a/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java b/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java index bcbba9d402..c27811155f 100644 --- a/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java +++ b/clustered/server/src/main/java/org/ehcache/clustered/server/state/EhcacheStateService.java @@ -56,7 +56,7 @@ public interface EhcacheStateService { boolean isConfigured(); - StateRepositoryManager getStateRepositoryManager() throws ClusterException; + StateRepositoryManager getStateRepositoryManager(); ClientMessageTracker getClientMessageTracker();