Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26703 Allow configuration of IPC queue balancer #4063

Merged
merged 1 commit into from
Feb 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.ipc;

import java.util.concurrent.BlockingQueue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -47,18 +47,27 @@ public BalancedQueueRpcExecutor(final String name, final int handlerCount,
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
final Configuration conf, final Abortable abortable) {
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
this.balancer = getBalancer(this.numCallQueues);
initializeQueues(this.numCallQueues);
this.balancer = getBalancer(name, conf, getQueues());
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
int queueIndex = balancer.getNextQueue();
int queueIndex = balancer.getNextQueue(callTask);
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
// that means we can overflow by at most <num reader> size (5), that's ok
if (queue.size() >= currentQueueLimit) {
return false;
}
return queue.offer(callTask);
}

@Override
public void onConfigurationChange(Configuration conf) {
super.onConfigurationChange(conf);

if (balancer instanceof ConfigurationObserver) {
((ConfigurationObserver) balancer).onConfigurationChange(conf);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* Interface for balancing requests across IPC queues
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@InterfaceStability.Stable
public interface QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
*/
int getNextQueue(CallRunner callRunner);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;

/**
* RPC Executor that uses different queues for reads and writes.
Expand Down Expand Up @@ -97,14 +97,17 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
numScanQueues = scanQueues;
scanHandlersCount = scanHandlers;

this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;

initializeQueues(numWriteQueues);
initializeQueues(numReadQueues);
initializeQueues(numScanQueues);

this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
this.readBalancer = getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
this.scanBalancer = numScanQueues > 0 ?
getBalancer(name, conf, queues.subList(numWriteQueues + numReadQueues,
numWriteQueues + numReadQueues + numScanQueues)) :
null;

LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
Expand Down Expand Up @@ -139,11 +142,11 @@ protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
final CallRunner callTask) {
int queueIndex;
if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue();
queueIndex = writeBalancer.getNextQueue(callTask);
} else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
}
Queue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
Expand Down Expand Up @@ -234,6 +237,18 @@ protected boolean isWriteRequest(final RequestHeader header, final Message param
return false;
}

QueueBalancer getWriteBalancer() {
return writeBalancer;
}

QueueBalancer getReadBalancer() {
return readBalancer;
}

QueueBalancer getScanBalancer() {
return scanBalancer;
}

private boolean isScanRequest(final RequestHeader header, final Message param) {
return param instanceof ScanRequest;
}
Expand Down Expand Up @@ -266,4 +281,18 @@ private static int calcNumWriters(final int count, final float readShare) {
private static int calcNumReaders(final int count, final float readShare) {
return count - calcNumWriters(count, readShare);
}

@Override
public void onConfigurationChange(Configuration conf) {
super.onConfigurationChange(conf);
propagateBalancerConfigChange(writeBalancer, conf);
propagateBalancerConfigChange(readBalancer, conf);
propagateBalancerConfigChange(scanBalancer, conf);
}

private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
if (balancer instanceof ConfigurationObserver) {
((ConfigurationObserver) balancer).onConfigurationChange(conf);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.ipc;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* Queue balancer that just randomly selects a queue in the range [0, num queues).
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class RandomQueueBalancer implements QueueBalancer {
private final int queueSize;
private final List<BlockingQueue<CallRunner>> queues;

public RandomQueueBalancer(Configuration conf, String executorName, List<BlockingQueue<CallRunner>> queues) {
this.queueSize = queues.size();
this.queues = queues;
}

@Override
public int getNextQueue(CallRunner callRunner) {
return ThreadLocalRandom.current().nextInt(queueSize);
}

/**
* Exposed for use in tests
*/
List<BlockingQueue<CallRunner>> getQueues() {
return queues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,29 @@

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.Map;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;

/**
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
Expand Down Expand Up @@ -73,6 +70,10 @@ public abstract class RpcExecutor {
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;

public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;


// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
Expand Down Expand Up @@ -295,19 +296,13 @@ protected void startHandlers(final String nameSuffix, final int numHandlers,
handlers.size(), threadPrefix, qsize, port);
}

public static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
*/
public abstract int getNextQueue();
}

public static QueueBalancer getBalancer(int queueSize) {
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
if (queueSize == 1) {
public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
if (queues.size() == 1) {
return ONE_QUEUE;
} else {
return new RandomQueueBalancer(queueSize);
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
}
}

Expand All @@ -316,27 +311,11 @@ public static QueueBalancer getBalancer(int queueSize) {
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue() {
public int getNextQueue(CallRunner callRunner) {
return 0;
}
};

/**
* Queue balancer that just randomly selects a queue in the range [0, num queues).
*/
private static class RandomQueueBalancer extends QueueBalancer {
private final int queueSize;

public RandomQueueBalancer(int queueSize) {
this.queueSize = queueSize;
}

@Override
public int getNextQueue() {
return ThreadLocalRandom.current().nextInt(queueSize);
}
}

/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
Expand Down
Loading