Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Aug 29, 2024
1 parent 69e69ec commit 418e57f
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
import org.apache.helix.gateway.participant.HelixGatewayParticipant;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.apache.helix.gateway.util.PerKeyBlockingExecutor;


Expand Down Expand Up @@ -60,6 +62,8 @@ public class GatewayServiceManager {

private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;

private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;

public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) {
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
Expand All @@ -68,6 +72,7 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable
_gatewayServiceChannelConfig = gatewayServiceChannelConfig;
_currentStateCacheMap = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -143,6 +148,10 @@ public void stopService() {
_helixGatewayParticipantMap.clear();
}

public GatewayCurrentStateCache getCurrentStateCache(String clusterName) {
return _currentStateCacheMap.computeIfAbsent(clusterName, k -> new GatewayCurrentStateCache(clusterName));
}

private void createHelixGatewayParticipant(String clusterName, String instanceName,
Map<String, Map<String, String>> initialShardStateMap) {
// Create and add the participant to the participant map
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.apache.helix.gateway.util;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class GatewayCurrentStateCache {
String _clusterName;

// A cache of current state. It should be updated by the HelixGatewayServiceChannel
// instance -> instances assignments
Map<String, Map<String, String>> _currentStateMap;

// A cache of target state.
// instance -> assignments
Map<String, Map<String, String>> _targetStateMap;

boolean _targetStateChanged = false;

public GatewayCurrentStateCache(String clusterName) {
_clusterName = clusterName;
_currentStateMap = new ConcurrentHashMap<>();
_targetStateMap = new ConcurrentHashMap<>();
}

public String getCurrentState(String instance, String shard) {
return _currentStateMap.get(instance).get(shard);
}

/**
* Update the cached current state of instances in a cluster, and return the diff of the change.
* @param newCurrentStateMap The new current state map of instances in the cluster
* @return
*/
public Map<String, Map<String, String>> updateCacheWithNewCurrentStateAndGetDiff(
Map<String, Map<String, String>> newCurrentStateMap) {
Map<String, Map<String, String>> diff = null;
for (Map.Entry<String, Map<String, String>> entry : newCurrentStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> newCurrentState = entry.getValue();
Map<String, String> oldCurrentState = _currentStateMap.get(instance);
if (oldCurrentState == null || !oldCurrentState.equals(newCurrentState)) {
if (diff == null) {
diff = new HashMap<>();
}
if (oldCurrentState == null) {
diff.put(instance, newCurrentState);
continue;
}
for (String shard : newCurrentState.keySet()) {
if (oldCurrentState.get(shard) == null || !oldCurrentState.get(shard).equals(newCurrentState.get(shard))) {
diff.computeIfAbsent(instance, k -> new HashMap<>()).put(shard, newCurrentState.get(shard));
}
}
}
}
_currentStateMap = newCurrentStateMap;
return diff;
}

public void updateCacheWithCurrentStateDiff(Map<String, Map<String, String>> currentStateDiff) {
updateShardStateMapWithDiff(currentStateDiff, _currentStateMap);
}

/**
* Udate the target state with the changed target state maps.
* All existing target states remains the same
* @param targetStateChangeMap
*/
public void updateTargetStateWithDiff(Map<String, Map<String, String>> targetStateChangeMap) {
_targetStateChanged = updateShardStateMapWithDiff(targetStateChangeMap, _targetStateMap);
}

public boolean isTargetStateChanged() {
return _targetStateChanged;
}

public void resetTargetStateChanged() {
_targetStateChanged = false;
}

public Map<String, Map<String, String>> getTargetStateMap() {
return _targetStateMap;
}

public String serializeTargetAssignments() {
ObjectMapper mapper = new ObjectMapper();
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, Map<String, String>> entry : _targetStateMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> assignments = entry.getValue();
ObjectNode instanceNode = mapper.createObjectNode();
for (Map.Entry<String, String> assignment : assignments.entrySet()) {
instanceNode.put(assignment.getKey(), assignment.getValue());
}
root.set(instance, instanceNode);
}
return root.toString();
}

private boolean updateShardStateMapWithDiff(Map<String, Map<String, String>> diffMap,
Map<String, Map<String, String>> currentMap) {
if (diffMap == null || diffMap.isEmpty()) {
return false;
}
for (Map.Entry<String, Map<String, String>> entry : diffMap.entrySet()) {
String instance = entry.getKey();
Map<String, String> currentState = entry.getValue();
if (currentMap.get(instance) == null) {
currentMap.put(instance, currentState);
} else {
currentMap.get(instance).entrySet().stream().forEach(e -> {
if (currentState.get(e.getKey()) != null) {
e.setValue(currentState.get(e.getKey()));
}
});
}
}
return true;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.apache.helix.gateway.utils;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.Map;
import org.apache.helix.gateway.util.GatewayCurrentStateCache;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestGatewayCurrentStateCache {
private GatewayCurrentStateCache cache = new GatewayCurrentStateCache("TestCluster");;

@Test
public void testUpdateCacheWithNewCurrentStateAndGetDiff() {
Map<String, Map<String, String>> newState = new HashMap<>();
Map<String, String> instanceState = new HashMap<>();
instanceState.put("shard1", "ONLINE");
instanceState.put("shard2", "OFFLINE");
newState.put("instance1", instanceState);

Map<String, Map<String, String>> diff = cache.updateCacheWithNewCurrentStateAndGetDiff(newState);

Assert.assertNotNull(diff);
Assert.assertEquals(diff.size(), 1);
Assert.assertEquals(diff.get("instance1").size(), 2);
Assert.assertEquals(diff.get("instance1").get("shard1"), "ONLINE");
Assert.assertEquals(diff.get("instance1").get("shard2"), "OFFLINE");
Assert.assertEquals(cache.getCurrentState("instance1", "shard1"), "ONLINE");
Assert.assertEquals(cache.getCurrentState("instance1", "shard2"), "OFFLINE");
}

@Test(dependsOnMethods = "testUpdateCacheWithNewCurrentStateAndGetDiff")
public void testUpdateCacheWithCurrentStateDiff() {
Map<String, Map<String, String>> diff = new HashMap<>();
Map<String, String> instanceState = new HashMap<>();
instanceState.put("shard1", "OFFLINE");
diff.put("instance1", instanceState);

cache.updateCacheWithCurrentStateDiff(diff);
Assert.assertEquals(cache.getCurrentState("instance1", "shard1"), "OFFLINE");
Assert.assertEquals(cache.getCurrentState("instance1", "shard2"), "OFFLINE");
}

@Test(dependsOnMethods = "testUpdateCacheWithCurrentStateDiff")
public void testUpdateTargetStateWithDiff() {
Map<String, Map<String, String>> targetStateChange = new HashMap<>();
Map<String, String> instanceState = new HashMap<>();
instanceState.put("shard1", "ONLINE");
targetStateChange.put("instance1", instanceState);

cache.updateTargetStateWithDiff(targetStateChange);

Assert.assertTrue(cache.isTargetStateChanged());
Assert.assertEquals(cache.getTargetStateMap().get("instance1").get("shard1"), "ONLINE");
}

@Test(dependsOnMethods = "testUpdateTargetStateWithDiff")
public void testSerializeTargetAssignments() {
Map<String, Map<String, String>> targetState = new HashMap<>();
Map<String, String> instanceState = new HashMap<>();
instanceState.put("shard1", "OFFLINE");
targetState.put("instance1", instanceState);

cache.updateTargetStateWithDiff(targetState);

String serialized = cache.serializeTargetAssignments();
Assert.assertTrue(serialized.contains("\"instance1\":{\"shard1\":\"OFFLINE\"}"));
}
}

0 comments on commit 418e57f

Please sign in to comment.