Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HelixVcrCluster and HelixVcrClusterFactory. #1158

Merged
merged 2 commits into from
May 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ public interface VirtualReplicatorCluster extends AutoCloseable {
* @return list of PartitionId
*/
List<? extends PartitionId> getAssignedPartitionIds();

/**
* Add {@link VirtualReplicatorClusterListener} to listen for cluster change.
* @param listener to add.
*/
void addListener(VirtualReplicatorClusterListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
/**
* A factory interface to get {@link VirtualReplicatorCluster}.
*/
public interface VirtualReplicatorClusterFactory extends AutoCloseable {
public interface VirtualReplicatorClusterFactory {

/**
* @return an instance of {@link VirtualReplicatorCluster} generated by the factory.
*/
VirtualReplicatorCluster getVirtualReplicatorCluster();
VirtualReplicatorCluster getVirtualReplicatorCluster() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

/**
* {@link VirtualReplicatorClusterListener} takes actions on {@link VirtualReplicatorCluster} partition add or removal.
*/
public interface VirtualReplicatorClusterListener {

/**
* Action to take when new Partition is added.
* @param partitionId on add.
*/
void onPartitionAdded(PartitionId partitionId);

/**
* Action to take when new Partition is removed.
* @param partitionId on remove.
*/
void onPartitionRemoved(PartitionId partitionId);
}
161 changes: 161 additions & 0 deletions ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterMapUtils;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.VirtualReplicatorCluster;
import com.github.ambry.clustermap.VirtualReplicatorClusterListener;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.utils.Utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.model.LeaderStandbySMD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Helix Based VCR Cluster.
*/
public class HelixVcrCluster implements VirtualReplicatorCluster {
private static final Logger logger = LoggerFactory.getLogger(HelixVcrCluster.class);
private final DataNodeId currentDataNode;
private final String vcrClusterName;
private final String vcrInstanceName;
private final HelixManager manager;
private final HelixAdmin helixAdmin;
private final Map<String, PartitionId> partitionIdMap;
private final Set<PartitionId> assignedPartitionIds = ConcurrentHashMap.newKeySet();
private final HelixVcrClusterMetrics metrics;
private final List<VirtualReplicatorClusterListener> listeners = new ArrayList<>();

/**
* Construct the helix VCR cluster.
* @param cloudConfig The cloud configuration to use.
* @param clusterMapConfig The clusterMap configuration to use.
* @param clusterMap The clusterMap to use.
*/
public HelixVcrCluster(CloudConfig cloudConfig, ClusterMapConfig clusterMapConfig, ClusterMap clusterMap)
throws Exception {
if (Utils.isNullOrEmpty(cloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING)) {
throw new IllegalArgumentException("Missing value for " + CloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING);
} else if (Utils.isNullOrEmpty(cloudConfig.VCR_CLUSTER_NAME)) {
throw new IllegalArgumentException("Missing value for " + CloudConfig.VCR_CLUSTER_NAME);
}

currentDataNode = new CloudDataNode(cloudConfig, clusterMapConfig);
List<? extends PartitionId> allPartitions = clusterMap.getAllPartitionIds(null);
logger.trace("All partitions from clusterMap: {}.", allPartitions);
partitionIdMap = allPartitions.stream().collect(Collectors.toMap(PartitionId::toPathString, Function.identity()));
vcrClusterName = cloudConfig.vcrClusterName;
vcrInstanceName =
ClusterMapUtils.getInstanceName(clusterMapConfig.clusterMapHostName, clusterMapConfig.clusterMapPort);

manager = HelixManagerFactory.getZKHelixManager(vcrClusterName, vcrInstanceName, InstanceType.PARTICIPANT,
cloudConfig.vcrClusterZkConnectString);
manager.getStateMachineEngine()
.registerStateModelFactory(LeaderStandbySMD.name, new HelixVcrStateModelFactory(this));
manager.connect();
helixAdmin = manager.getClusterManagmentTool();
metrics = new HelixVcrClusterMetrics(clusterMap.getMetricRegistry());
logger.info("HelixVcrCluster started successfully.");
}

/**
* Add {@link PartitionId} to assignedPartitionIds set, if {@parm partitionIdStr} valid.
* Used in {@link HelixVcrStateModel} if current VCR becomes leader of a partition.
* @param partitionIdStr The partitionIdStr notified by Helix.
*/
public void addPartition(String partitionIdStr) {
PartitionId partitionId = partitionIdMap.get(partitionIdStr);
if (partitionId != null) {
if (assignedPartitionIds.add(partitionId)) {
for (VirtualReplicatorClusterListener listener : listeners) {
listener.onPartitionAdded(partitionId);
}
logger.info("Partition {} is added to current VCR: {}", partitionIdStr, vcrInstanceName);
} else {
logger.info("Partition {} exists on current VCR: {}", partitionIdStr, vcrInstanceName);
}
} else {
logger.error("Partition {} not in clusterMap on add.", partitionIdStr);
metrics.partitionIdNotInClusterMapOnAdd.inc();
}
}

/**
* Remove {@link PartitionId} from assignedPartitionIds set, if {@parm partitionIdStr} valid.
* Used in {@link HelixVcrStateModel} if current VCR becomes offline or standby a partition.
* @param partitionIdStr The partitionIdStr notified by Helix.
*/
public void removePartition(String partitionIdStr) {
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
PartitionId partitionId = partitionIdMap.get(partitionIdStr);
if (partitionId != null) {
if (assignedPartitionIds.remove(partitionId)) {
for (VirtualReplicatorClusterListener listener : listeners) {
listener.onPartitionRemoved(partitionId);
}
logger.info("Partition {} is removed from current VCR: {}.", partitionIdStr, vcrInstanceName);
} else {
logger.info("Partition {} not exists on current VCR: {}", partitionIdStr, vcrInstanceName);
}
} else {
logger.error("Partition {} not in clusterMap on remove.", partitionIdStr);
metrics.partitionIdNotInClusterMapOnRemove.inc();
}
}

@Override
public List<? extends DataNodeId> getAllDataNodeIds() {
// TODO: return all VCR nodes for recovery.
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
return Collections.singletonList(currentDataNode);
}

@Override
public DataNodeId getCurrentDataNodeId() {
return currentDataNode;
}

@Override
public List<? extends PartitionId> getAssignedPartitionIds() {
return new LinkedList<>(assignedPartitionIds);
}

@Override
public void addListener(VirtualReplicatorClusterListener listener) {
listeners.add(listener);
}

@Override
public void close() {
manager.disconnect();
helixAdmin.close();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.VirtualReplicatorCluster;
import com.github.ambry.clustermap.VirtualReplicatorClusterFactory;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.ClusterMapConfig;


/**
* {@link HelixVcrClusterFactory} to generate {@link HelixVcrCluster} for dynamic partition assignment.
*/
public class HelixVcrClusterFactory implements VirtualReplicatorClusterFactory {

private final CloudConfig cloudConfig;
private final ClusterMapConfig clusterMapConfig;
private final ClusterMap clusterMap;
private VirtualReplicatorCluster virtualReplicatorCluster;

public HelixVcrClusterFactory(CloudConfig cloudConfig, ClusterMapConfig clusterMapConfig, ClusterMap clusterMap) {
this.cloudConfig = cloudConfig;
this.clusterMapConfig = clusterMapConfig;
this.clusterMap = clusterMap;
}

@Override
synchronized public VirtualReplicatorCluster getVirtualReplicatorCluster() throws Exception {
if (virtualReplicatorCluster == null) {
virtualReplicatorCluster = new HelixVcrCluster(cloudConfig, clusterMapConfig, clusterMap);
}
return virtualReplicatorCluster;
}

public void close() throws Exception {
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
if (virtualReplicatorCluster != null) {
virtualReplicatorCluster.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;


public class HelixVcrClusterMetrics {

public final Counter partitionIdNotInClusterMapOnRemove;
public final Counter partitionIdNotInClusterMapOnAdd;

public HelixVcrClusterMetrics(MetricRegistry registry) {
partitionIdNotInClusterMapOnRemove =
registry.counter(MetricRegistry.name(HelixVcrCluster.class, "PartitionIdNotInClusterMapOnRemove"));
partitionIdNotInClusterMapOnAdd =
registry.counter(MetricRegistry.name(VcrServer.class, "PartitionIdNotInClusterMapOnAdd"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* {@link StateModel} to use when the VCR participants register to Helix. The methods are callbacks
* that get called within a participant whenever its state changes in Helix.
*/
@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
public class HelixVcrStateModel extends StateModel {
private Logger logger = LoggerFactory.getLogger(getClass());
private HelixVcrCluster helixVcrCluster;

HelixVcrStateModel(HelixVcrCluster helixVcrCluster) {
this.helixVcrCluster = helixVcrCluster;
}

@Transition(to = "STANDBY", from = "OFFLINE")
public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
logger.trace("{} Becoming STANDBY from OFFLINE of Partition {}", helixVcrCluster.getCurrentDataNodeId(),
message.getPartitionName());
}

@Transition(to = "LEADER", from = "STANDBY")
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
logger.info("{} Becoming LEADER from STANDBY of Partition {}", helixVcrCluster.getCurrentDataNodeId(),
message.getPartitionName());
helixVcrCluster.addPartition(message.getPartitionName());
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
}

@Transition(to = "STANDBY", from = "LEADER")
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
logger.info("{} Becoming STANDBY from LEADER of Partition {}", helixVcrCluster.getCurrentDataNodeId(),
message.getPartitionName());
helixVcrCluster.removePartition(message.getPartitionName());
}

@Transition(to = "OFFLINE", from = "STANDBY")
public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
logger.trace("{} Becoming OFFLINE from STANDBY of Partition {}", helixVcrCluster.getCurrentDataNodeId(),
message.getPartitionName());
}

@Transition(to = "OFFLINE", from = "LEADER")
public void onBecomeOfflineFromLeader(Message message, NotificationContext context) {
logger.info("{} Becoming OFFLINE from LEADER of Partition {}", helixVcrCluster.getCurrentDataNodeId(),
message.getPartitionName());
helixVcrCluster.removePartition(message.getPartitionName());
}

@Override
public void reset() {
// no op
}
}
Loading