diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index db8e93fb3a73..ad1747ba3b1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.ipc; import java.util.concurrent.BlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -47,13 +47,13 @@ public BalancedQueueRpcExecutor(final String name, final int handlerCount, final String callQueueType, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); - this.balancer = getBalancer(this.numCallQueues); initializeQueues(this.numCallQueues); + this.balancer = getBalancer(name, conf, getQueues()); } @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { - int queueIndex = balancer.getNextQueue(); + int queueIndex = balancer.getNextQueue(callTask); BlockingQueue queue = queues.get(queueIndex); // that means we can overflow by at most size (5), that's ok if (queue.size() >= currentQueueLimit) { @@ -61,4 +61,13 @@ public boolean dispatch(final CallRunner callTask) throws InterruptedException { } return queue.offer(callTask); } + + @Override + public void onConfigurationChange(Configuration conf) { + super.onConfigurationChange(conf); + + if (balancer instanceof ConfigurationObserver) { + ((ConfigurationObserver) balancer).onConfigurationChange(conf); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java new file mode 100644 index 000000000000..d1141d093edb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QueueBalancer.java @@ -0,0 +1,35 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Interface for balancing requests across IPC queues + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +@InterfaceStability.Stable +public interface QueueBalancer { + /** + * @return the index of the next queue to which a request should be inserted + */ + int getNextQueue(CallRunner callRunner); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 43c6ce468d73..b2df2254130a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -22,22 +22,22 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; /** * RPC Executor that uses different queues for reads and writes. @@ -97,14 +97,17 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m numScanQueues = scanQueues; scanHandlersCount = scanHandlers; - this.writeBalancer = getBalancer(numWriteQueues); - this.readBalancer = getBalancer(numReadQueues); - this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null; - initializeQueues(numWriteQueues); initializeQueues(numReadQueues); initializeQueues(numScanQueues); + this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues)); + this.readBalancer = getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues)); + this.scanBalancer = numScanQueues > 0 ? + getBalancer(name, conf, queues.subList(numWriteQueues + numReadQueues, + numWriteQueues + numReadQueues + numScanQueues)) : + null; + LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues=" + numScanQueues + " scanHandlers=" + scanHandlersCount); @@ -139,11 +142,11 @@ protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue, final CallRunner callTask) { int queueIndex; if (toWriteQueue) { - queueIndex = writeBalancer.getNextQueue(); + queueIndex = writeBalancer.getNextQueue(callTask); } else if (toScanQueue) { - queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(); + queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask); } else { - queueIndex = numWriteQueues + readBalancer.getNextQueue(); + queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask); } Queue queue = queues.get(queueIndex); if (queue.size() >= currentQueueLimit) { @@ -234,6 +237,18 @@ protected boolean isWriteRequest(final RequestHeader header, final Message param return false; } + QueueBalancer getWriteBalancer() { + return writeBalancer; + } + + QueueBalancer getReadBalancer() { + return readBalancer; + } + + QueueBalancer getScanBalancer() { + return scanBalancer; + } + private boolean isScanRequest(final RequestHeader header, final Message param) { return param instanceof ScanRequest; } @@ -266,4 +281,18 @@ private static int calcNumWriters(final int count, final float readShare) { private static int calcNumReaders(final int count, final float readShare) { return count - calcNumWriters(count, readShare); } + + @Override + public void onConfigurationChange(Configuration conf) { + super.onConfigurationChange(conf); + propagateBalancerConfigChange(writeBalancer, conf); + propagateBalancerConfigChange(readBalancer, conf); + propagateBalancerConfigChange(scanBalancer, conf); + } + + private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) { + if (balancer instanceof ConfigurationObserver) { + ((ConfigurationObserver) balancer).onConfigurationChange(conf); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java new file mode 100644 index 000000000000..528affc48049 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RandomQueueBalancer.java @@ -0,0 +1,54 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Queue balancer that just randomly selects a queue in the range [0, num queues). + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class RandomQueueBalancer implements QueueBalancer { + private final int queueSize; + private final List> queues; + + public RandomQueueBalancer(Configuration conf, String executorName, List> queues) { + this.queueSize = queues.size(); + this.queues = queues; + } + + @Override + public int getNextQueue(CallRunner callRunner) { + return ThreadLocalRandom.current().nextInt(queueSize); + } + + /** + * Exposed for use in tests + */ + List> getQueues() { + return queues; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 5775fc19d0fe..aafd6984b7b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -20,32 +20,29 @@ import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; -import java.util.Map; -import java.util.HashMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; - +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; +import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; /** * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular @@ -73,6 +70,10 @@ public abstract class RpcExecutor { public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; + public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class"; + public static final Class CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class; + + // These 3 are only used by Codel executor public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay"; public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval"; @@ -295,19 +296,13 @@ protected void startHandlers(final String nameSuffix, final int numHandlers, handlers.size(), threadPrefix, qsize, port); } - public static abstract class QueueBalancer { - /** - * @return the index of the next queue to which a request should be inserted - */ - public abstract int getNextQueue(); - } - - public static QueueBalancer getBalancer(int queueSize) { - Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); - if (queueSize == 1) { + public static QueueBalancer getBalancer(String executorName, Configuration conf, List> queues) { + Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); + if (queues.size() == 1) { return ONE_QUEUE; } else { - return new RandomQueueBalancer(queueSize); + Class balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); + return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues); } } @@ -316,27 +311,11 @@ public static QueueBalancer getBalancer(int queueSize) { */ private static QueueBalancer ONE_QUEUE = new QueueBalancer() { @Override - public int getNextQueue() { + public int getNextQueue(CallRunner callRunner) { return 0; } }; - /** - * Queue balancer that just randomly selects a queue in the range [0, num queues). - */ - private static class RandomQueueBalancer extends QueueBalancer { - private final int queueSize; - - public RandomQueueBalancer(int queueSize) { - this.queueSize = queueSize; - } - - @Override - public int getNextQueue() { - return ThreadLocalRandom.current().nextInt(queueSize); - } - } - /** * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java new file mode 100644 index 000000000000..ae4fc415fd7f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java @@ -0,0 +1,109 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY; +import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY; +import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RPCTests.class, MediumTests.class}) +public class TestRWQueueRpcExecutor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRWQueueRpcExecutor.class); + + @Rule + public TestName testName = new TestName(); + + private Configuration conf; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); + conf.setFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); + conf.setFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); + } + + @Test + public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { + PriorityFunction qosFunction = mock(PriorityFunction.class); + RWQueueRpcExecutor executor = + new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction, conf, null); + + QueueBalancer readBalancer = executor.getReadBalancer(); + QueueBalancer writeBalancer = executor.getWriteBalancer(); + QueueBalancer scanBalancer = executor.getScanBalancer(); + + assertTrue(readBalancer instanceof RandomQueueBalancer); + assertTrue(writeBalancer instanceof RandomQueueBalancer); + assertTrue(scanBalancer instanceof RandomQueueBalancer); + + List> readQueues = ((RandomQueueBalancer) readBalancer).getQueues(); + List> writeQueues = ((RandomQueueBalancer) writeBalancer).getQueues(); + List> scanQueues = ((RandomQueueBalancer) scanBalancer).getQueues(); + + assertEquals(25, readQueues.size()); + assertEquals(50, writeQueues.size()); + assertEquals(25, scanQueues.size()); + + verifyDistinct(readQueues, writeQueues, scanQueues); + verifyDistinct(writeQueues, readQueues, scanQueues); + verifyDistinct(scanQueues, readQueues, writeQueues); + + } + + private void verifyDistinct(List> queues, List>... others) + throws InterruptedException { + CallRunner mock = mock(CallRunner.class); + for (BlockingQueue queue : queues) { + queue.put(mock); + assertEquals(1, queue.size()); + } + + for (List> other : others) { + for (BlockingQueue queue : other) { + assertEquals(0, queue.size()); + } + } + + // clear them for next test + for (BlockingQueue queue : queues) { + queue.clear(); + } + } +}