diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorCluster.java b/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorCluster.java index 6013e22344..d3d47d247e 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorCluster.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorCluster.java @@ -39,4 +39,10 @@ public interface VirtualReplicatorCluster extends AutoCloseable { * @return list of PartitionId */ List getAssignedPartitionIds(); + + /** + * Add {@link VirtualReplicatorClusterListener} to listen for cluster change. + * @param listener to add. + */ + void addListener(VirtualReplicatorClusterListener listener); } diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterFactory.java b/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterFactory.java index 45bf4d4695..9166031259 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterFactory.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterFactory.java @@ -16,10 +16,10 @@ /** * A factory interface to get {@link VirtualReplicatorCluster}. */ -public interface VirtualReplicatorClusterFactory extends AutoCloseable { +public interface VirtualReplicatorClusterFactory { /** * @return an instance of {@link VirtualReplicatorCluster} generated by the factory. */ - VirtualReplicatorCluster getVirtualReplicatorCluster(); + VirtualReplicatorCluster getVirtualReplicatorCluster() throws Exception; } diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterListener.java b/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterListener.java new file mode 100644 index 0000000000..e0ffbeac62 --- /dev/null +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/VirtualReplicatorClusterListener.java @@ -0,0 +1,33 @@ +/** + * + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.clustermap; + +/** + * {@link VirtualReplicatorClusterListener} takes actions on {@link VirtualReplicatorCluster} partition add or removal. + */ +public interface VirtualReplicatorClusterListener { + + /** + * Action to take when new Partition is added. + * @param partitionId on add. + */ + void onPartitionAdded(PartitionId partitionId); + + /** + * Action to take when new Partition is removed. + * @param partitionId on remove. + */ + void onPartitionRemoved(PartitionId partitionId); +} diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java new file mode 100644 index 0000000000..c155ecd98b --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java @@ -0,0 +1,161 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.ClusterMapUtils; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.VirtualReplicatorCluster; +import com.github.ambry.clustermap.VirtualReplicatorClusterListener; +import com.github.ambry.config.CloudConfig; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.utils.Utils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.model.LeaderStandbySMD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helix Based VCR Cluster. + */ +public class HelixVcrCluster implements VirtualReplicatorCluster { + private static final Logger logger = LoggerFactory.getLogger(HelixVcrCluster.class); + private final DataNodeId currentDataNode; + private final String vcrClusterName; + private final String vcrInstanceName; + private final HelixManager manager; + private final HelixAdmin helixAdmin; + private final Map partitionIdMap; + private final Set assignedPartitionIds = ConcurrentHashMap.newKeySet(); + private final HelixVcrClusterMetrics metrics; + private final List listeners = new ArrayList<>(); + + /** + * Construct the helix VCR cluster. + * @param cloudConfig The cloud configuration to use. + * @param clusterMapConfig The clusterMap configuration to use. + * @param clusterMap The clusterMap to use. + */ + public HelixVcrCluster(CloudConfig cloudConfig, ClusterMapConfig clusterMapConfig, ClusterMap clusterMap) + throws Exception { + if (Utils.isNullOrEmpty(cloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING)) { + throw new IllegalArgumentException("Missing value for " + CloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING); + } else if (Utils.isNullOrEmpty(cloudConfig.VCR_CLUSTER_NAME)) { + throw new IllegalArgumentException("Missing value for " + CloudConfig.VCR_CLUSTER_NAME); + } + + currentDataNode = new CloudDataNode(cloudConfig, clusterMapConfig); + List allPartitions = clusterMap.getAllPartitionIds(null); + logger.trace("All partitions from clusterMap: {}.", allPartitions); + partitionIdMap = allPartitions.stream().collect(Collectors.toMap(PartitionId::toPathString, Function.identity())); + vcrClusterName = cloudConfig.vcrClusterName; + vcrInstanceName = + ClusterMapUtils.getInstanceName(clusterMapConfig.clusterMapHostName, clusterMapConfig.clusterMapPort); + + manager = HelixManagerFactory.getZKHelixManager(vcrClusterName, vcrInstanceName, InstanceType.PARTICIPANT, + cloudConfig.vcrClusterZkConnectString); + manager.getStateMachineEngine() + .registerStateModelFactory(LeaderStandbySMD.name, new HelixVcrStateModelFactory(this)); + manager.connect(); + helixAdmin = manager.getClusterManagmentTool(); + metrics = new HelixVcrClusterMetrics(clusterMap.getMetricRegistry()); + logger.info("HelixVcrCluster started successfully."); + } + + /** + * Add {@link PartitionId} to assignedPartitionIds set, if {@parm partitionIdStr} valid. + * Used in {@link HelixVcrStateModel} if current VCR becomes leader of a partition. + * @param partitionIdStr The partitionIdStr notified by Helix. + */ + public void addPartition(String partitionIdStr) { + PartitionId partitionId = partitionIdMap.get(partitionIdStr); + if (partitionId != null) { + if (assignedPartitionIds.add(partitionId)) { + for (VirtualReplicatorClusterListener listener : listeners) { + listener.onPartitionAdded(partitionId); + } + logger.info("Partition {} is added to current VCR: {}", partitionIdStr, vcrInstanceName); + } else { + logger.info("Partition {} exists on current VCR: {}", partitionIdStr, vcrInstanceName); + } + } else { + logger.error("Partition {} not in clusterMap on add.", partitionIdStr); + metrics.partitionIdNotInClusterMapOnAdd.inc(); + } + } + + /** + * Remove {@link PartitionId} from assignedPartitionIds set, if {@parm partitionIdStr} valid. + * Used in {@link HelixVcrStateModel} if current VCR becomes offline or standby a partition. + * @param partitionIdStr The partitionIdStr notified by Helix. + */ + public void removePartition(String partitionIdStr) { + PartitionId partitionId = partitionIdMap.get(partitionIdStr); + if (partitionId != null) { + if (assignedPartitionIds.remove(partitionId)) { + for (VirtualReplicatorClusterListener listener : listeners) { + listener.onPartitionRemoved(partitionId); + } + logger.info("Partition {} is removed from current VCR: {}.", partitionIdStr, vcrInstanceName); + } else { + logger.info("Partition {} not exists on current VCR: {}", partitionIdStr, vcrInstanceName); + } + } else { + logger.error("Partition {} not in clusterMap on remove.", partitionIdStr); + metrics.partitionIdNotInClusterMapOnRemove.inc(); + } + } + + @Override + public List getAllDataNodeIds() { + // TODO: return all VCR nodes for recovery. + return Collections.singletonList(currentDataNode); + } + + @Override + public DataNodeId getCurrentDataNodeId() { + return currentDataNode; + } + + @Override + public List getAssignedPartitionIds() { + return new LinkedList<>(assignedPartitionIds); + } + + @Override + public void addListener(VirtualReplicatorClusterListener listener) { + listeners.add(listener); + } + + @Override + public void close() { + manager.disconnect(); + helixAdmin.close(); + } +} + diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrClusterFactory.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrClusterFactory.java new file mode 100644 index 0000000000..d8ddd3c961 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrClusterFactory.java @@ -0,0 +1,52 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.VirtualReplicatorCluster; +import com.github.ambry.clustermap.VirtualReplicatorClusterFactory; +import com.github.ambry.config.CloudConfig; +import com.github.ambry.config.ClusterMapConfig; + + +/** + * {@link HelixVcrClusterFactory} to generate {@link HelixVcrCluster} for dynamic partition assignment. + */ +public class HelixVcrClusterFactory implements VirtualReplicatorClusterFactory { + + private final CloudConfig cloudConfig; + private final ClusterMapConfig clusterMapConfig; + private final ClusterMap clusterMap; + private VirtualReplicatorCluster virtualReplicatorCluster; + + public HelixVcrClusterFactory(CloudConfig cloudConfig, ClusterMapConfig clusterMapConfig, ClusterMap clusterMap) { + this.cloudConfig = cloudConfig; + this.clusterMapConfig = clusterMapConfig; + this.clusterMap = clusterMap; + } + + @Override + synchronized public VirtualReplicatorCluster getVirtualReplicatorCluster() throws Exception { + if (virtualReplicatorCluster == null) { + virtualReplicatorCluster = new HelixVcrCluster(cloudConfig, clusterMapConfig, clusterMap); + } + return virtualReplicatorCluster; + } + + public void close() throws Exception { + if (virtualReplicatorCluster != null) { + virtualReplicatorCluster.close(); + } + } +} diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrClusterMetrics.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrClusterMetrics.java new file mode 100644 index 0000000000..51a2005708 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrClusterMetrics.java @@ -0,0 +1,31 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; + + +public class HelixVcrClusterMetrics { + + public final Counter partitionIdNotInClusterMapOnRemove; + public final Counter partitionIdNotInClusterMapOnAdd; + + public HelixVcrClusterMetrics(MetricRegistry registry) { + partitionIdNotInClusterMapOnRemove = + registry.counter(MetricRegistry.name(HelixVcrCluster.class, "PartitionIdNotInClusterMapOnRemove")); + partitionIdNotInClusterMapOnAdd = + registry.counter(MetricRegistry.name(VcrServer.class, "PartitionIdNotInClusterMapOnAdd")); + } +} diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java new file mode 100644 index 0000000000..04e80ce8b6 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java @@ -0,0 +1,75 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * {@link StateModel} to use when the VCR participants register to Helix. The methods are callbacks + * that get called within a participant whenever its state changes in Helix. + */ +@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"}) +public class HelixVcrStateModel extends StateModel { + private Logger logger = LoggerFactory.getLogger(getClass()); + private HelixVcrCluster helixVcrCluster; + + HelixVcrStateModel(HelixVcrCluster helixVcrCluster) { + this.helixVcrCluster = helixVcrCluster; + } + + @Transition(to = "STANDBY", from = "OFFLINE") + public void onBecomeStandbyFromOffline(Message message, NotificationContext context) { + logger.trace("{} Becoming STANDBY from OFFLINE of Partition {}", helixVcrCluster.getCurrentDataNodeId(), + message.getPartitionName()); + } + + @Transition(to = "LEADER", from = "STANDBY") + public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { + logger.info("{} Becoming LEADER from STANDBY of Partition {}", helixVcrCluster.getCurrentDataNodeId(), + message.getPartitionName()); + helixVcrCluster.addPartition(message.getPartitionName()); + } + + @Transition(to = "STANDBY", from = "LEADER") + public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { + logger.info("{} Becoming STANDBY from LEADER of Partition {}", helixVcrCluster.getCurrentDataNodeId(), + message.getPartitionName()); + helixVcrCluster.removePartition(message.getPartitionName()); + } + + @Transition(to = "OFFLINE", from = "STANDBY") + public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { + logger.trace("{} Becoming OFFLINE from STANDBY of Partition {}", helixVcrCluster.getCurrentDataNodeId(), + message.getPartitionName()); + } + + @Transition(to = "OFFLINE", from = "LEADER") + public void onBecomeOfflineFromLeader(Message message, NotificationContext context) { + logger.info("{} Becoming OFFLINE from LEADER of Partition {}", helixVcrCluster.getCurrentDataNodeId(), + message.getPartitionName()); + helixVcrCluster.removePartition(message.getPartitionName()); + } + + @Override + public void reset() { + // no op + } +} diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModelFactory.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModelFactory.java new file mode 100644 index 0000000000..3f9da189c4 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModelFactory.java @@ -0,0 +1,43 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; + + +/** + * A factory for creating {@link HelixVcrStateModel} + */ +public class HelixVcrStateModelFactory extends StateModelFactory { + HelixVcrCluster helixVcrCluster; + + public HelixVcrStateModelFactory(HelixVcrCluster helixVcrCluster) { + this.helixVcrCluster = helixVcrCluster; + } + + /** + * Create and return an instance of {@link HelixVcrStateModel} + * @param resourceName the resource name for which this state model is being created. + * @param partitionName the partition name for which this state model is being created. + * + * @return an instance of {@link HelixVcrStateModel}. + */ + @Override + public StateModel createNewStateModel(String resourceName, String partitionName) { + return new HelixVcrStateModel(helixVcrCluster); + } +} + + diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrCluster.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrCluster.java index 85f1ae70be..7ad045f13b 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrCluster.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrCluster.java @@ -17,6 +17,7 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.VirtualReplicatorCluster; +import com.github.ambry.clustermap.VirtualReplicatorClusterListener; import com.github.ambry.config.CloudConfig; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.utils.Utils; @@ -81,6 +82,10 @@ public List getAssignedPartitionIds() { return assignedPartitionIds; } + @Override + public void addListener(VirtualReplicatorClusterListener listener) { + } + @Override public void close() throws Exception { } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrClusterFactory.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrClusterFactory.java index e55bb2dec8..01d3c31090 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrClusterFactory.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/StaticVcrClusterFactory.java @@ -43,8 +43,4 @@ synchronized public VirtualReplicatorCluster getVirtualReplicatorCluster() { } return virtualReplicatorCluster; } - - public void close() throws Exception { - - } } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java new file mode 100644 index 0000000000..e65cd61ce5 --- /dev/null +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java @@ -0,0 +1,147 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud; + +import com.github.ambry.clustermap.MockClusterAgentsFactory; +import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.VirtualReplicatorCluster; +import com.github.ambry.clustermap.VirtualReplicatorClusterListener; +import com.github.ambry.config.CloudConfig; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.HelixControllerManager; +import com.github.ambry.utils.TestUtils; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Tests of HelixVcrCluster. + */ +public class HelixVcrClusterTest { + private final static Logger logger = LoggerFactory.getLogger(HelixVcrClusterTest.class); + private static MockClusterAgentsFactory mockClusterAgentsFactory; + private static MockClusterMap mockClusterMap; + private static final String ZK_SERVER_HOSTNAME = "localhost"; + private static final int ZK_SERVER_PORT = 31900; + private static final String ZK_CONNECT_STRING = ZK_SERVER_HOSTNAME + ":" + Integer.toString(ZK_SERVER_PORT); + private static TestUtils.ZkInfo zkInfo; + private static final String VCR_CLUSTER_NAME = "vcrTestCluster"; + private static HelixControllerManager helixControllerManager; + private static final int NUM_PARTITIONS = 10; + + @BeforeClass + public static void beforeClass() throws Exception { + mockClusterAgentsFactory = new MockClusterAgentsFactory(false, 1, 1, NUM_PARTITIONS); + mockClusterMap = mockClusterAgentsFactory.getClusterMap(); + zkInfo = new TestUtils.ZkInfo(TestUtils.getTempDir("helixVcr"), "DC1", (byte) 1, ZK_SERVER_PORT, true); + helixControllerManager = + VcrTestUtil.populateZkInfoAndStartController(ZK_CONNECT_STRING, VCR_CLUSTER_NAME, mockClusterMap); + } + + @AfterClass + public static void afterClass() { + helixControllerManager.syncStop(); + zkInfo.shutdown(); + } + + /** + * Test addPartition and removePartition of {@link HelixVcrCluster} + */ + @Test + public void helixVcrClusterTest() throws Exception { + // Create helixInstance1 and join the cluster. All partitions should be assigned to helixInstance1. + VirtualReplicatorCluster helixInstance1 = createHelixInstance(8123, 10123); + List expectedPartitions = mockClusterMap.getAllPartitionIds(null); + CountDownLatch latchForAdd = new CountDownLatch(expectedPartitions.size()); + CountDownLatch latchForRemove = new CountDownLatch(expectedPartitions.size() / 2); + MockVcrListener mockVcrListener = new MockVcrListener(latchForAdd, latchForRemove); + helixInstance1.addListener(mockVcrListener); + Assert.assertTrue("Latch count is not correct.", latchForAdd.await(5, TimeUnit.SECONDS)); + Assert.assertArrayEquals("Partition assignments are not correct.", expectedPartitions.toArray(), + mockVcrListener.getPartitionSet().toArray()); + Assert.assertEquals("Partition assignment are not correct.", helixInstance1.getAssignedPartitionIds(), + expectedPartitions); + + // Create helixInstance2 and join the cluster. Half of partitions should be removed from helixInstance1. + VirtualReplicatorCluster helixInstance2 = createHelixInstance(8124, 10124); + Assert.assertTrue("Latch count is not correct.", latchForRemove.await(5, TimeUnit.SECONDS)); + Assert.assertEquals("Number of partitions removed are not correct.", expectedPartitions.size() / 2, + mockVcrListener.getPartitionSet().size()); + helixInstance1.close(); + helixInstance2.close(); + } + + /** + * Helper function to create helix instance and join helix cluster. + * @param clusterMapPort The clusterMapPort of the instance. + * @param vcrSslPort The vcrSslPort of this vcr. + */ + private VirtualReplicatorCluster createHelixInstance(int clusterMapPort, int vcrSslPort) throws Exception { + Properties props = new Properties(); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.resolve.hostnames", "false"); + props.setProperty("clustermap.cluster.name", "clusterName"); + props.setProperty("clustermap.datacenter.name", "DC1"); + props.setProperty("clustermap.ssl.enabled.datacenters", "DC1,DC2"); + props.setProperty("clustermap.port", Integer.toString(clusterMapPort)); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + + props = new Properties(); + props.setProperty("vcr.ssl.port", Integer.toString(vcrSslPort)); + props.setProperty(CloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING, ZK_SERVER_HOSTNAME + ":" + ZK_SERVER_PORT); + props.setProperty(CloudConfig.VCR_CLUSTER_NAME, VCR_CLUSTER_NAME); + CloudConfig cloudConfig = new CloudConfig(new VerifiableProperties(props)); + return new HelixVcrClusterFactory(cloudConfig, clusterMapConfig, mockClusterMap).getVirtualReplicatorCluster(); + } + + private static class MockVcrListener implements VirtualReplicatorClusterListener { + + private final Set partitionSet = ConcurrentHashMap.newKeySet(); + private final CountDownLatch latchForAdd; + private final CountDownLatch latchForRemove; + + MockVcrListener(CountDownLatch latchForAdd, CountDownLatch latchForRemove) { + this.latchForAdd = latchForAdd; + this.latchForRemove = latchForRemove; + } + + @Override + public void onPartitionAdded(PartitionId partitionId) { + partitionSet.add(partitionId); + latchForAdd.countDown(); + } + + @Override + public void onPartitionRemoved(PartitionId partitionId) { + partitionSet.remove(partitionId); + latchForRemove.countDown(); + } + + public Set getPartitionSet() { + return partitionSet; + } + } +} diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/StaticVcrClusterTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/StaticVcrClusterTest.java index 63fc883aa0..37651f480e 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/StaticVcrClusterTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/StaticVcrClusterTest.java @@ -44,7 +44,7 @@ public void setup() throws Exception { } @Test - public void staticVcrClusterFactoryTest() { + public void staticVcrClusterFactoryTest() throws Exception { Properties props = new Properties(); String hostName = "localhostTest"; int port = 12345; diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrServerTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrServerTest.java index 5caecaa963..54e5f61373 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrServerTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrServerTest.java @@ -15,12 +15,16 @@ import com.github.ambry.clustermap.MockClusterAgentsFactory; import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.config.CloudConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.utils.HelixControllerManager; import com.github.ambry.utils.TestUtils; +import java.io.IOException; import java.util.Collections; import java.util.Properties; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import static org.mockito.Mockito.*; @@ -31,23 +35,28 @@ */ public class VcrServerTest { - private MockClusterAgentsFactory mockClusterAgentsFactory; - private MockClusterMap mockClusterMap; - private NotificationSystem notificationSystem; + private static MockClusterAgentsFactory mockClusterAgentsFactory; + private static MockClusterMap mockClusterMap; + private static NotificationSystem notificationSystem; - @Before - public void setup() throws Exception { + @BeforeClass + public static void setup() throws Exception { mockClusterAgentsFactory = new MockClusterAgentsFactory(false, 1, 1, 2); mockClusterMap = mockClusterAgentsFactory.getClusterMap(); notificationSystem = mock(NotificationSystem.class); } + @AfterClass + public static void cleanUp() throws IOException { + mockClusterMap.cleanup(); + } + /** - * Bring up the VCR server and then shut it down. + * Bring up the VCR server and then shut it down with {@link StaticVcrCluster}. * @throws Exception */ @Test - public void testVCRServer() throws Exception { + public void testVCRServerWithStaticCluster() throws Exception { Properties props = new Properties(); props.setProperty("host.name", mockClusterMap.getDataNodes().get(0).getHostname()); int port = mockClusterMap.getDataNodes().get(0).getPort(); @@ -60,8 +69,8 @@ public void testVCRServer() throws Exception { props.setProperty("clustermap.resolve.hostnames", "false"); props.setProperty("server.scheduler.num.of.threads", "1"); props.setProperty("num.io.threads", "1"); - props.setProperty("vcr.assigned.partitions", "0,1"); props.setProperty("kms.default.container.key", TestUtils.getRandomKey(16)); + props.setProperty(CloudConfig.VCR_ASSIGNED_PARTITIONS, "0,1"); CloudDestinationFactory cloudDestinationFactory = new LatchBasedInMemoryCloudDestinationFactory(new LatchBasedInMemoryCloudDestination(Collections.emptyList())); VerifiableProperties verifiableProperties = new VerifiableProperties(props); @@ -69,6 +78,43 @@ public void testVCRServer() throws Exception { new VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, cloudDestinationFactory); vcrServer.startup(); vcrServer.shutdown(); - mockClusterMap.cleanup(); + } + + /** + * Bring up the VCR server and then shut it down with {@link HelixVcrCluster}. + * @throws Exception + */ + @Test + public void testVCRServerWithHelixCluster() throws Exception { + int zkPort = 31999; + String zkConnectString = "localhost:" + zkPort; + String vcrClusterName = "vcrTestCluster"; + TestUtils.ZkInfo zkInfo = new TestUtils.ZkInfo(TestUtils.getTempDir("helixVcr"), "DC1", (byte) 1, zkPort, true); + HelixControllerManager helixControllerManager = + VcrTestUtil.populateZkInfoAndStartController(zkConnectString, vcrClusterName, mockClusterMap); + Properties props = new Properties(); + props.setProperty("host.name", mockClusterMap.getDataNodes().get(0).getHostname()); + int port = mockClusterMap.getDataNodes().get(0).getPort(); + props.setProperty("port", Integer.toString(port)); + props.setProperty("clustermap.cluster.name", "test"); + props.setProperty("clustermap.datacenter.name", "DC1"); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.port", Integer.toString(port)); + props.setProperty("clustermap.default.partition.class", MockClusterMap.DEFAULT_PARTITION_CLASS); + props.setProperty("clustermap.resolve.hostnames", "false"); + props.setProperty("server.scheduler.num.of.threads", "1"); + props.setProperty("num.io.threads", "1"); + props.setProperty(CloudConfig.VIRTUAL_REPLICATOR_CLUSTER_FACTORY_CLASS, HelixVcrClusterFactory.class.getName()); + props.setProperty(CloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING, zkConnectString); + props.setProperty(CloudConfig.VCR_CLUSTER_NAME, vcrClusterName); + CloudDestinationFactory cloudDestinationFactory = + new LatchBasedInMemoryCloudDestinationFactory(new LatchBasedInMemoryCloudDestination(Collections.emptyList())); + VerifiableProperties verifiableProperties = new VerifiableProperties(props); + VcrServer vcrServer = + new VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, cloudDestinationFactory); + vcrServer.startup(); + vcrServer.shutdown(); + helixControllerManager.syncStop(); + zkInfo.shutdown(); } } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java index 9df0f78190..b371b5fb5c 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java @@ -14,8 +14,28 @@ package com.github.ambry.cloud; import com.github.ambry.clustermap.ClusterAgentsFactory; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.HelixAdminFactory; +import com.github.ambry.clustermap.PartitionId; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.utils.HelixControllerManager; +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LeaderStandbySMD; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.tools.ClusterSetup; /** @@ -35,4 +55,46 @@ public static VcrServer createVcrServer(VerifiableProperties properties, Cluster NotificationSystem notificationSystem, CloudDestinationFactory cloudDestinationFactory) { return new VcrServer(properties, clusterAgentsFactory, notificationSystem, cloudDestinationFactory); } + + /** + * Populate info on ZooKeeper server and start {@link HelixControllerManager}. + * @param zKConnectString zk connect string to zk server. + * @param vcrClusterName the vcr cluster name. + * @param clusterMap the {@link ClusterMap} to use. + * @return the created {@link HelixControllerManager}. + */ + public static HelixControllerManager populateZkInfoAndStartController(String zKConnectString, String vcrClusterName, + ClusterMap clusterMap) { + HelixZkClient zkClient = + SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zKConnectString)); + zkClient.setZkSerializer(new ZNRecordSerializer()); + ClusterSetup clusterSetup = new ClusterSetup(zkClient); + clusterSetup.addCluster(vcrClusterName, true); + HelixAdmin admin = new HelixAdminFactory().getHelixAdmin(zKConnectString); + // set ALLOW_PARTICIPANT_AUTO_JOIN + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER). + forCluster(vcrClusterName).build(); + Map helixClusterProperties = new HashMap<>(); + helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true)); + admin.setConfig(configScope, helixClusterProperties); + // set PersistBestPossibleAssignment + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(vcrClusterName); + clusterConfig.setPersistBestPossibleAssignment(true); + configAccessor.setClusterConfig(vcrClusterName, clusterConfig); + + String resourceName = "1"; + FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resourceName); + builder.setStateModel(LeaderStandbySMD.name); + for (PartitionId partitionId : clusterMap.getAllPartitionIds(null)) { + builder.add(partitionId.toPathString()); + } + builder.setRebalanceStrategy(CrushRebalanceStrategy.class.getName()); + IdealState idealState = builder.build(); + admin.addResource(vcrClusterName, resourceName, idealState); + admin.rebalance(vcrClusterName, resourceName, 3, "", ""); + HelixControllerManager helixControllerManager = new HelixControllerManager(zKConnectString, vcrClusterName); + helixControllerManager.syncStart(); + return helixControllerManager; + } } diff --git a/ambry-utils/src/test/java/com.github.ambry.utils/HelixControllerManager.java b/ambry-utils/src/test/java/com.github.ambry.utils/HelixControllerManager.java new file mode 100644 index 0000000000..d7b511f342 --- /dev/null +++ b/ambry-utils/src/test/java/com.github.ambry.utils/HelixControllerManager.java @@ -0,0 +1,87 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * The code was originally created by Apache Helix under Apache License 2.0. + * https://github.com/apache/helix/blob/master/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java + * Code Changes in this copy: + * 1. Renamed MockParticipantManager to HelixControllerManager. + * 2. Renamed LOG to logger. + * 3. Removed '_' from class members. + */ +package com.github.ambry.utils; + +import java.util.concurrent.CountDownLatch; +import org.apache.helix.InstanceType; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HelixControllerManager extends ZKHelixManager implements Runnable { + private static Logger logger = LoggerFactory.getLogger(HelixControllerManager.class); + + private final CountDownLatch startCountDown = new CountDownLatch(1); + private final CountDownLatch stopCountDown = new CountDownLatch(1); + private final CountDownLatch waitStopFinishCountDown = new CountDownLatch(1); + + private boolean started = false; + + public HelixControllerManager(String zkAddr, String clusterName) { + this(zkAddr, clusterName, "controller"); + } + + public HelixControllerManager(String zkAddr, String clusterName, String controllerName) { + super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr); + } + + public void syncStop() { + stopCountDown.countDown(); + try { + waitStopFinishCountDown.await(); + started = false; + } catch (InterruptedException e) { + logger.error("Interrupted waiting for finish", e); + } + } + + // This should not be called more than once because HelixManager.connect() should not be called more than once. + public void syncStart() { + if (started) { + throw new RuntimeException("Helix Controller already started. Do not call syncStart() more than once."); + } else { + started = true; + } + + new Thread(this).start(); + try { + startCountDown.await(); + } catch (InterruptedException e) { + logger.error("Interrupted waiting for start", e); + } + } + + @Override + public void run() { + try { + connect(); + startCountDown.countDown(); + stopCountDown.await(); + } catch (Exception e) { + logger.error("exception running controller-manager", e); + } finally { + startCountDown.countDown(); + disconnect(); + waitStopFinishCountDown.countDown(); + } + } +}