Skip to content

Commit

Permalink
🐛 Fix #1814 Passive sync of state repositories
Browse files Browse the repository at this point in the history
  • Loading branch information
ljacomet committed Feb 9, 2017
1 parent 964e94c commit ea0dfef
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.ehcache.clustered.server.internal.messages.EhcacheDataSyncMessage;
import org.ehcache.clustered.server.internal.messages.EhcacheStateSyncMessage;
import org.ehcache.clustered.server.management.Management;
import org.ehcache.clustered.server.repo.StateRepositoryManager;
import org.ehcache.clustered.server.state.EhcacheStateService;
import org.ehcache.clustered.server.state.InvalidationTracker;
import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig;
Expand Down Expand Up @@ -372,6 +373,10 @@ public void synchronizeKeyToPassive(PassiveSynchronizationChannel<EhcacheEntityM
}

syncChannel.synchronizeToPassive(new EhcacheStateSyncMessage(configuration, storeConfigs));

StateRepositoryManager stateRepositoryManager = ehcacheStateService.getStateRepositoryManager();
ehcacheStateService.getStores().forEach(storeName ->
stateRepositoryManager.syncMessageFor(storeName).forEach(syncChannel::synchronizeToPassive));
} else {
Long dataSizeThreshold = Long.getLong(SYNC_DATA_SIZE_PROP, DEFAULT_SYNC_DATA_SIZE_THRESHOLD);
AtomicLong size = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import org.ehcache.clustered.common.internal.ClusteredEhcacheIdentity;
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
import org.ehcache.clustered.common.internal.exceptions.IllegalMessageException;
import org.ehcache.clustered.common.internal.exceptions.LifecycleException;
import org.ehcache.clustered.common.internal.exceptions.ServerMisconfigurationException;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.EhcacheMessageType;
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
import org.ehcache.clustered.common.internal.messages.LifecycleMessage;
import org.ehcache.clustered.common.internal.messages.LifecycleMessage.ConfigureStoreManager;
import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ChainReplicationMessage;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ClearInvalidationCompleteMessage;
Expand All @@ -46,14 +45,10 @@
import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.entity.BasicServiceConfiguration;
import org.terracotta.entity.PassiveServerEntity;
import org.terracotta.entity.ServiceRegistry;
import org.terracotta.offheapresource.OffHeapResources;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -237,6 +232,10 @@ private void invokeSyncOperation(EhcacheSyncMessage message) throws ClusterExcep

});
break;
case STATE_REPO:
EhcacheStateRepoSyncMessage stateRepoSyncMessage = (EhcacheStateRepoSyncMessage) message;
ehcacheStateService.getStateRepositoryManager().processSyncMessage(stateRepoSyncMessage);
break;
default:
throw new AssertionError("Unsupported Sync operation " + message.getMessageType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.LoggerFactory;
import org.terracotta.context.TreeNode;
import org.terracotta.offheapresource.OffHeapResource;
import org.terracotta.offheapresource.OffHeapResourceIdentifier;
import org.terracotta.offheapresource.OffHeapResources;
import org.terracotta.offheapstore.paging.PageSource;
import org.terracotta.statistics.StatisticsManager;
Expand Down Expand Up @@ -472,7 +471,7 @@ public boolean isConfigured() {
}

@Override
public StateRepositoryManager getStateRepositoryManager() throws ClusterException {
public StateRepositoryManager getStateRepositoryManager() {
return this.stateRepositoryManager;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.ehcache.clustered.server.internal.messages;

import java.util.concurrent.ConcurrentMap;

/**
* EhcacheStateRepoSyncMessage
*/
public class EhcacheStateRepoSyncMessage extends EhcacheSyncMessage {

private final String cacheId;
private final String mapId;
private final ConcurrentMap<Object, Object> mappings;

public EhcacheStateRepoSyncMessage(String cacheId, String mapId, ConcurrentMap<Object, Object> mappings) {
this.cacheId = cacheId;
this.mapId = mapId;
this.mappings = mappings;
}

@Override
public SyncMessageType getMessageType() {
return SyncMessageType.STATE_REPO;
}

public ConcurrentMap<Object, Object> getMappings() {
return mappings;
}

public String getCacheId() {
return cacheId;
}

public String getMapId() {
return mapId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,18 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static java.nio.ByteBuffer.wrap;
import static org.ehcache.clustered.common.internal.messages.ChainCodec.CHAIN_STRUCT;
import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.KEY_FIELD;
import static org.ehcache.clustered.common.internal.messages.MessageCodecUtils.SERVER_STORE_NAME_FIELD;
import static org.ehcache.clustered.common.internal.store.Util.marshall;
import static org.ehcache.clustered.common.internal.store.Util.unmarshall;
import static org.ehcache.clustered.server.internal.messages.SyncMessageType.DATA;
import static org.ehcache.clustered.server.internal.messages.SyncMessageType.STATE;
import static org.ehcache.clustered.server.internal.messages.SyncMessageType.STATE_REPO;
import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_FIELD_INDEX;
import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_FIELD_NAME;
import static org.ehcache.clustered.server.internal.messages.SyncMessageType.SYNC_MESSAGE_TYPE_MAPPING;
Expand All @@ -54,6 +60,9 @@ public class EhcacheSyncMessageCodec implements SyncMessageCodec<EhcacheEntityMe
private static final String STORES_SUB_STRUCT = "stores";
private static final String CHAIN_FIELD = "chain";
private static final String CHAIN_MAP_ENTRIES_SUB_STRUCT = "entries";
private static final String STATE_REPO_ENTRIES_SUB_STRUCT = "mappings";
private static final String STATE_REPO_VALUE_FIELD = "value";
private static final String STATE_REPO_MAP_NAME_FIELD = "mapName";

private static final Struct CHAIN_MAP_ENTRY_STRUCT = newStructBuilder()
.int64(KEY_FIELD, 10)
Expand All @@ -66,6 +75,18 @@ public class EhcacheSyncMessageCodec implements SyncMessageCodec<EhcacheEntityMe
.structs(CHAIN_MAP_ENTRIES_SUB_STRUCT, 40, CHAIN_MAP_ENTRY_STRUCT)
.build();

private static final Struct STATE_REPO_ENTRY_STRUCT = newStructBuilder()
.byteBuffer(KEY_FIELD, 10)
.byteBuffer(STATE_REPO_VALUE_FIELD, 20)
.build();

private static final Struct STATE_REPO_SYNC_STRUCT = newStructBuilder()
.enm(SYNC_MESSAGE_TYPE_FIELD_NAME, SYNC_MESSAGE_TYPE_FIELD_INDEX, SYNC_MESSAGE_TYPE_MAPPING)
.string(SERVER_STORE_NAME_FIELD, 20)
.string(STATE_REPO_MAP_NAME_FIELD, 30)
.structs(STATE_REPO_ENTRIES_SUB_STRUCT, 40, STATE_REPO_ENTRY_STRUCT)
.build();

private final Struct stateSyncStruct;

private final ChainCodec chainCodec;
Expand Down Expand Up @@ -116,6 +137,17 @@ public byte[] encode(final int concurrencyKey, final EhcacheEntityMessage messag
});
return encoder.encode().array();
}
case STATE_REPO: {
encoder = STATE_REPO_SYNC_STRUCT.encoder();
EhcacheStateRepoSyncMessage stateRepoSyncMessage = (EhcacheStateRepoSyncMessage) syncMessage;
encoder.enm(SYNC_MESSAGE_TYPE_FIELD_NAME, STATE_REPO)
.string(SERVER_STORE_NAME_FIELD, stateRepoSyncMessage.getCacheId())
.string(STATE_REPO_MAP_NAME_FIELD, stateRepoSyncMessage.getMapId());
encoder.structs(STATE_REPO_ENTRIES_SUB_STRUCT, stateRepoSyncMessage.getMappings().entrySet(),
(entryEncoder, entry) -> entryEncoder.byteBuffer(KEY_FIELD, wrap(marshall(entry.getKey())))
.byteBuffer(STATE_REPO_VALUE_FIELD, wrap(marshall(entry.getValue()))));
return encoder.encode().array();
}
default:
throw new IllegalArgumentException("Sync message codec can not encode " + syncMessage.getMessageType());
}
Expand All @@ -126,7 +158,7 @@ public byte[] encode(final int concurrencyKey, final EhcacheEntityMessage messag

@Override
public EhcacheSyncMessage decode(final int concurrencyKey, final byte[] payload) throws MessageCodecException {
ByteBuffer message = ByteBuffer.wrap(payload);
ByteBuffer message = wrap(payload);
StructDecoder<Void> decoder = stateSyncStruct.decoder(message);
Enm<SyncMessageType> enm = decoder.enm(SYNC_MESSAGE_TYPE_FIELD_NAME);
if (!enm.isFound()) {
Expand All @@ -148,6 +180,22 @@ public EhcacheSyncMessage decode(final int concurrencyKey, final byte[] payload)
String storeName = decoder.string(SERVER_STORE_NAME_FIELD);
Map<Long, Chain> chainMap = decodeChainMapEntries(decoder);
return new EhcacheDataSyncMessage(storeName, chainMap);
case STATE_REPO:
message.rewind();
decoder = STATE_REPO_SYNC_STRUCT.decoder(message);
String storeId = decoder.string(SERVER_STORE_NAME_FIELD);
String mapId = decoder.string(STATE_REPO_MAP_NAME_FIELD);
ConcurrentMap<Object, Object> mappings = new ConcurrentHashMap<>();
StructArrayDecoder<StructDecoder<Void>> structsDecoder = decoder.structs(STATE_REPO_ENTRIES_SUB_STRUCT);
if (structsDecoder != null) {
for (int i = 0; i < structsDecoder.length(); i++) {
Object key = unmarshall(structsDecoder.byteBuffer(KEY_FIELD));
Object value = unmarshall(structsDecoder.byteBuffer(STATE_REPO_VALUE_FIELD));
mappings.put(key, value);
structsDecoder.next();
}
}
return new EhcacheStateRepoSyncMessage(storeId, mapId, mappings);
default:
throw new AssertionError("Cannot happen given earlier checks");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
@CommonComponent
public enum SyncMessageType {
STATE,
STATE_REPO,
DATA;

public static final String SYNC_MESSAGE_TYPE_FIELD_NAME = "msgType";
public static final int SYNC_MESSAGE_TYPE_FIELD_INDEX = 10;
public static final EnumMapping<SyncMessageType> SYNC_MESSAGE_TYPE_MAPPING = newEnumMappingBuilder(SyncMessageType.class)
.mapping(STATE, 1)
.mapping(STATE_REPO, 5)
.mapping(DATA, 10)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.ehcache.clustered.server.repo;

import org.ehcache.clustered.common.internal.exceptions.ClusterException;
import org.ehcache.clustered.common.internal.exceptions.IllegalMessageException;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
Expand All @@ -32,14 +35,7 @@ class ServerStateRepository {

EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterException {
String mapId = message.getMapId();
ConcurrentMap<Object, Object> map = concurrentMapRepo.get(mapId);
if (map == null) {
ConcurrentHashMap<Object, Object> newMap = new ConcurrentHashMap<>();
map = concurrentMapRepo.putIfAbsent(mapId, newMap);
if (map == null) {
map = newMap;
}
}
ConcurrentMap<Object, Object> map = getStateMap(mapId);

Object result;
switch (message.getMessageType()) {
Expand All @@ -62,4 +58,28 @@ EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterExc
}
return EhcacheEntityResponse.mapValue(result);
}

private ConcurrentMap<Object, Object> getStateMap(String mapId) {
ConcurrentMap<Object, Object> map = concurrentMapRepo.get(mapId);
if (map == null) {
ConcurrentHashMap<Object, Object> newMap = new ConcurrentHashMap<>();
map = concurrentMapRepo.putIfAbsent(mapId, newMap);
if (map == null) {
map = newMap;
}
}
return map;
}

List<EhcacheStateRepoSyncMessage> syncMessage(String cacheId) {
ArrayList<EhcacheStateRepoSyncMessage> result = new ArrayList<>();
for (Map.Entry<String, ConcurrentMap<Object, Object>> entry : concurrentMapRepo.entrySet()) {
result.add(new EhcacheStateRepoSyncMessage(cacheId, entry.getKey(), entry.getValue()));
}
return result;
}

void processSyncMessage(EhcacheStateRepoSyncMessage stateRepoSyncMessage) {
concurrentMapRepo.put(stateRepoSyncMessage.getMapId(), stateRepoSyncMessage.getMappings());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
package org.ehcache.clustered.server.repo;

import org.ehcache.clustered.common.internal.exceptions.ClusterException;
import org.ehcache.clustered.common.internal.exceptions.LifecycleException;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
import org.ehcache.clustered.server.internal.messages.EhcacheStateRepoSyncMessage;

import com.tc.classloader.CommonComponent;

import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static java.util.Collections.emptyList;

@CommonComponent
public class StateRepositoryManager {

Expand All @@ -41,6 +40,11 @@ public void destroyStateRepository(String cacheId) throws ClusterException {

public EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws ClusterException {
String cacheId = message.getCacheId();
ServerStateRepository currentRepo = getServerStateRepository(cacheId);
return currentRepo.invoke(message);
}

private ServerStateRepository getServerStateRepository(String cacheId) {
ServerStateRepository currentRepo = mapRepositoryMap.get(cacheId);
if (currentRepo == null) {
ServerStateRepository newRepo = new ServerStateRepository();
Expand All @@ -49,7 +53,18 @@ public EhcacheEntityResponse invoke(StateRepositoryOpMessage message) throws Clu
currentRepo = newRepo;
}
}
return currentRepo.invoke(message);
return currentRepo;
}

public List<EhcacheStateRepoSyncMessage> syncMessageFor(String cacheId) {
ServerStateRepository repository = mapRepositoryMap.get(cacheId);
if (repository != null) {
return repository.syncMessage(cacheId);
}
return emptyList();
}

public void processSyncMessage(EhcacheStateRepoSyncMessage stateRepoSyncMessage) {
getServerStateRepository(stateRepoSyncMessage.getCacheId()).processSyncMessage(stateRepoSyncMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface EhcacheStateService {

boolean isConfigured();

StateRepositoryManager getStateRepositoryManager() throws ClusterException;
StateRepositoryManager getStateRepositoryManager();

ClientMessageTracker getClientMessageTracker();

Expand Down

0 comments on commit ea0dfef

Please sign in to comment.