From 9f86148e4022897624ae46f0fd53f98f897df80a Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Mon, 5 Jul 2021 18:49:23 +0800 Subject: [PATCH 1/2] HBASE-26036 DBB released too early dirty data for some operations --- .../hadoop/hbase/io/ByteBuffAllocator.java | 22 ++- .../DeallocateRewriteByteBuffAllocator.java | 59 ++++++ .../coprocessor/MultiRowMutationEndpoint.java | 37 ++-- .../hadoop/hbase/regionserver/HRegion.java | 178 ++++++++++-------- .../visibility/VisibilityController.java | 29 +-- .../TestCheckAndMutateWithByteBuff.java | 135 +++++++++++++ ...ExpAsStringVisibilityLabelServiceImpl.java | 92 ++++----- 7 files changed, 386 insertions(+), 166 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/DeallocateRewriteByteBuffAllocator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutateWithByteBuff.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java index 65a9908f935b..81adedfbbec9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,13 @@ public class ByteBuffAllocator { public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size"; + /** + * Set an alternate bytebuffallocator by setting this config, + * e.g. we can config {@link DeallocateRewriteByteBuffAllocator} to find out + * prematurely release issues + */ + public static final String BYTEBUFF_ALLOCATOR_CLASS = "hbase.bytebuff.allocator.class"; + /** * @deprecated since 2.3.0 and will be removed in 4.0.0. Use * {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead. @@ -117,8 +125,8 @@ public interface Recycler { void free(); } - private final boolean reservoirEnabled; - private final int bufSize; + protected final boolean reservoirEnabled; + protected final int bufSize; private final int maxBufCount; private final AtomicInteger usedBufCount = new AtomicInteger(0); @@ -169,7 +177,9 @@ public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnab conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6); - return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse); + Class clazz = conf.getClass(BYTEBUFF_ALLOCATOR_CLASS, ByteBuffAllocator.class); + return (ByteBuffAllocator) ReflectionUtils + .newInstance(clazz, true, maxBuffCount, poolBufSize, minSizeForReservoirUse); } else { return HEAP; } @@ -184,8 +194,8 @@ private static ByteBuffAllocator createOnHeap() { return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE); } - ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, - int minSizeForReservoirUse) { + protected ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, + int minSizeForReservoirUse) { this.reservoirEnabled = reservoirEnabled; this.maxBufCount = maxBufCount; this.bufSize = bufSize; @@ -377,7 +387,7 @@ private ByteBuffer getBuffer() { * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning. * @param buf ByteBuffer to return. */ - private void putbackBuffer(ByteBuffer buf) { + protected void putbackBuffer(ByteBuffer buf) { if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) { LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); return; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DeallocateRewriteByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DeallocateRewriteByteBuffAllocator.java new file mode 100644 index 000000000000..39617e4ab27c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DeallocateRewriteByteBuffAllocator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A ByteBuffAllocator that rewrite the bytebuffers right after released. + * It can be used for test whether there are prematurely releasing backing bytebuffers. + */ +@InterfaceAudience.Private +public class DeallocateRewriteByteBuffAllocator extends ByteBuffAllocator { + private static final Logger LOG = LoggerFactory.getLogger( + DeallocateRewriteByteBuffAllocator.class); + + DeallocateRewriteByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, + int minSizeForReservoirUse) { + super(reservoirEnabled, maxBufCount, bufSize, minSizeForReservoirUse); + } + + @Override + protected void putbackBuffer(ByteBuffer buf) { + if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) { + LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); + return; + } + buf.clear(); + byte[] tmp = generateTmpBytes(buf.capacity()); + buf.put(tmp, 0, tmp.length); + super.putbackBuffer(buf); + } + + private byte[] generateTmpBytes(int length) { + StringBuilder result = new StringBuilder(); + while (result.length() < length) { + result.append("-"); + } + return Bytes.toBytes(result.substring(0, length)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index 271ae87f79bd..5262732a45c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.WrongRegionException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -221,22 +223,27 @@ private boolean matches(Region region, ClientProtos.Condition condition) throws get.setTimeRange(timeRange.getMin(), timeRange.getMax()); } - List result = region.get(get, false); boolean matches = false; - if (filter != null) { - if (!result.isEmpty()) { - matches = true; - } - } else { - boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; - if (result.isEmpty() && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { - matches = true; - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - int compareResult = PrivateCellUtil.compareValue(kv, comparator); - matches = matches(op, compareResult); + try (RegionScanner scanner = region.getScanner(new Scan(get))) { + // NOTE: Please don't use HRegion.get() instead, + // because it will copy cells to heap. See HBASE-26036 + List result = new ArrayList<>(); + scanner.next(result); + if (filter != null) { + if (!result.isEmpty()) { + matches = true; + } + } else { + boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; + if (result.isEmpty() && valueIsNull) { + matches = true; + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { + matches = true; + } else if (result.size() == 1 && !valueIsNull) { + Cell kv = result.get(0); + int compareResult = PrivateCellUtil.compareValue(kv, comparator); + matches = matches(op, compareResult); + } } } return matches; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 943312016987..697806a1b18c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; - import edu.umd.cs.findbugs.annotations.Nullable; import io.opentelemetry.api.trace.Span; import java.io.EOFException; @@ -74,6 +73,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellComparator; @@ -176,7 +176,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -191,7 +190,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; @@ -3244,18 +3242,23 @@ private void prepareDeleteTimestamps(Mutation mutation, Map> private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, byte[] byteNow) throws IOException { - List result = get(get, false); - - if (result.size() < count) { - // Nothing to delete - PrivateCellUtil.updateLatestStamp(cell, byteNow); - return; - } - if (result.size() > count) { - throw new RuntimeException("Unexpected size: " + result.size()); + try (RegionScanner scanner = getScanner(new Scan(get))) { + // NOTE: Please don't use HRegion.get() instead, + // because it will copy cells to heap. See HBASE-26036 + List result = new ArrayList<>(); + scanner.next(result); + + if (result.size() < count) { + // Nothing to delete + PrivateCellUtil.updateLatestStamp(cell, byteNow); + return; + } + if (result.size() > count) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + Cell getCell = result.get(count - 1); + PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); } - Cell getCell = result.get(count - 1); - PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); } @Override @@ -4044,60 +4047,60 @@ private List reckonDeltasByStore(HStore store, Mutation mutation, long now get.setTimeRange(tr.getMin(), tr.getMax()); } - List currentValues = region.get(get, false); - - // Iterate the input columns and update existing values if they were found, otherwise - // add new column initialized to the delta amount - int currentValuesIndex = 0; - for (int i = 0; i < deltas.size(); i++) { - Cell delta = deltas.get(i); - Cell currentValue = null; - if (currentValuesIndex < currentValues.size() && - CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) { - currentValue = currentValues.get(currentValuesIndex); - if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) { - currentValuesIndex++; + try (RegionScanner scanner = region.getScanner(new Scan(get))) { + // NOTE: Please don't use HRegion.get() instead, + // because it will copy cells to heap. See HBASE-26036 + List currentValues = new ArrayList<>(); + scanner.next(currentValues); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the delta amount + int currentValuesIndex = 0; + for (int i = 0; i < deltas.size(); i++) { + Cell delta = deltas.get(i); + Cell currentValue = null; + if (currentValuesIndex < currentValues.size() && CellUtil + .matchingQualifier(currentValues.get(currentValuesIndex), delta)) { + currentValue = currentValues.get(currentValuesIndex); + if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) { + currentValuesIndex++; + } } - } - // Switch on whether this an increment or an append building the new Cell to apply. - Cell newCell; - if (mutation instanceof Increment) { - long deltaAmount = getLongValue(delta); - final long newValue = currentValue == null ? - deltaAmount : getLongValue(currentValue) + deltaAmount; - newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, - (oldCell) -> Bytes.toBytes(newValue)); - } else { - newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, - (oldCell) -> - ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) + // Switch on whether this an increment or an append building the new Cell to apply. + Cell newCell; + if (mutation instanceof Increment) { + long deltaAmount = getLongValue(delta); + final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount; + newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); + } else { + newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, + (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength()) - .array() - ); - } - if (region.maxCellSize > 0) { - int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell); - if (newCellSize > region.maxCellSize) { - String msg = "Cell with size " + newCellSize + " exceeds limit of " - + region.maxCellSize + " bytes in region " + this; - LOG.debug(msg); - throw new DoNotRetryIOException(msg); + .array()); + } + if (region.maxCellSize > 0) { + int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell); + if (newCellSize > region.maxCellSize) { + String msg = + "Cell with size " + newCellSize + " exceeds limit of " + region.maxCellSize + " bytes in region " + this; + LOG.debug(msg); + throw new DoNotRetryIOException(msg); + } + } + cellPairs.add(new Pair<>(currentValue, newCell)); + // Add to results to get returned to the Client. If null, cilent does not want results. + if (results != null) { + results.add(newCell); } } - cellPairs.add(new Pair<>(currentValue, newCell)); - // Add to results to get returned to the Client. If null, cilent does not want results. - if (results != null) { - results.add(newCell); + // Give coprocessors a chance to update the new cells before apply to WAL or memstore + if (region.coprocessorHost != null) { + // Here the operation must be increment or append. + cellPairs = mutation instanceof Increment ? + region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) : + region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs); } } - // Give coprocessors a chance to update the new cells before apply to WAL or memstore - if (region.coprocessorHost != null) { - // Here the operation must be increment or append. - cellPairs = mutation instanceof Increment ? - region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) : - region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs); - } return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList()); } @@ -4858,26 +4861,32 @@ private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutat // NOTE: We used to wait here until mvcc caught up: mvcc.await(); // Supposition is that now all changes are done under row locks, then when we go to read, // we'll get the latest on this row. - List result = get(get, false); boolean matches = false; long cellTs = 0; - if (filter != null) { - if (!result.isEmpty()) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } - } else { - boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; - if (result.isEmpty() && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - cellTs = kv.getTimestamp(); - int compareResult = PrivateCellUtil.compareValue(kv, comparator); - matches = matches(op, compareResult); + try (RegionScanner scanner = getScanner(new Scan(get))) { + // NOTE: Please don't use HRegion.get() instead, + // because it will copy cells to heap. See HBASE-26036 + List result = new ArrayList<>(1); + scanner.next(result); + if (filter != null) { + if (!result.isEmpty()) { + matches = true; + cellTs = result.get(0).getTimestamp(); + } + } else { + boolean valueIsNull = + comparator.getValue() == null || comparator.getValue().length == 0; + if (result.isEmpty() && valueIsNull) { + matches = true; + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { + matches = true; + cellTs = result.get(0).getTimestamp(); + } else if (result.size() == 1 && !valueIsNull) { + Cell kv = result.get(0); + cellTs = kv.getTimestamp(); + int compareResult = PrivateCellUtil.compareValue(kv, comparator); + matches = matches(op, compareResult); + } } } @@ -7541,6 +7550,9 @@ private List get(Get get, boolean withCoprocessor, long nonceGroup, long n () -> createRegionSpan("Region.get")); } + /** + * This method will return onheap cells, for more details, please see HBASE-26036. + */ private List getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce) throws IOException { List results = new ArrayList<>(); @@ -7558,7 +7570,13 @@ private List getInternal(Get get, boolean withCoprocessor, long nonceGroup scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault()); } try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) { - scanner.next(results); + List tmp = new ArrayList<>(); + scanner.next(tmp); + // copy EC to heap, then close the scanner + for (Cell cell : tmp) { + results.add(cell instanceof ByteBufferExtendedCell ? + ((ByteBufferExtendedCell) cell).deepClone(): cell); + } } // post-get CP hook diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 630e5db82957..5833ebc92cc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -397,20 +397,23 @@ public void prePrepareTimeStampForDeleteVersion( } get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags, VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT)); - List result = ctx.getEnvironment().getRegion().get(get, false); - - if (result.size() < get.getMaxVersions()) { - // Nothing to delete - PrivateCellUtil.updateLatestStamp(cell, byteNow); - return; - } - if (result.size() > get.getMaxVersions()) { - throw new RuntimeException("Unexpected size: " + result.size() - + ". Results more than the max versions obtained."); + try (RegionScanner scanner = ctx.getEnvironment().getRegion().getScanner(new Scan(get))) { + // NOTE: Please don't use HRegion.get() instead, + // because it will copy cells to heap. See HBASE-26036 + List result = new ArrayList<>(); + scanner.next(result); + + if (result.size() < get.getMaxVersions()) { + // Nothing to delete + PrivateCellUtil.updateLatestStamp(cell, byteNow); + return; + } + if (result.size() > get.getMaxVersions()) { + throw new RuntimeException("Unexpected size: " + result.size() + ". Results more than the max versions obtained."); + } + Cell getCell = result.get(get.getMaxVersions() - 1); + PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); } - Cell getCell = result.get(get.getMaxVersions() - 1); - PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); - // We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would // update with the current timestamp after again doing a get. As the hook as already determined // the needed timestamp we need to bypass here. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutateWithByteBuff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutateWithByteBuff.java new file mode 100644 index 000000000000..876259f4e817 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutateWithByteBuff.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.util.Threads.sleep; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator; +import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(LargeTests.class) +public class TestCheckAndMutateWithByteBuff { + private static final Logger LOG = LoggerFactory.getLogger(TestCheckAndMutateWithByteBuff.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCheckAndMutateWithByteBuff.class); + + @Rule + public TestName name = new TestName(); + + private static final byte[] CF = Bytes.toBytes("CF"); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Configuration conf = TEST_UTIL.getConfiguration(); + private static Admin admin = null; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf.set(HConstants.REGION_IMPL, TestCheckAndMutateRegion.class.getName()); + conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS, + DeallocateRewriteByteBuffAllocator.class.getName()); + conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 1); + conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20); + conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024); + conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); + conf.setInt("hbase.client.retries.number", 1); + TEST_UTIL.startMiniCluster(); + admin = TEST_UTIL.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testCheckAndMutateWithByteBuff() throws Exception { + Table testTable = createTable(TableName.valueOf(name.getMethodName())); + byte[] checkRow = Bytes.toBytes("checkRow"); + byte[] checkQualifier = Bytes.toBytes("cq"); + byte[] checkValue = Bytes.toBytes("checkValue"); + + Put put = new Put(checkRow); + put.addColumn(CF, checkQualifier, checkValue); + testTable.put(put); + admin.flush(testTable.getName()); + + assertTrue(testTable.checkAndMutate(checkRow, CF).qualifier(checkQualifier). + ifEquals(checkValue) + .thenPut(new Put(checkRow).addColumn(CF, Bytes.toBytes("q1"), + Bytes.toBytes("testValue")))); + } + + private Table createTable(TableName tableName) + throws IOException { + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).setBlocksize(100).build()) + .build(); + return TEST_UTIL.createTable(td, null); + } + + /** + * An override of HRegion to allow sleep after get(), waiting for the release of DBB + */ + public static class TestCheckAndMutateRegion extends HRegion { + public TestCheckAndMutateRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, + RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, confParam, info, htd, rsServices); + } + + public TestCheckAndMutateRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, + TableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + @Override + public List get(Get get, boolean withCoprocessor) throws IOException { + List cells = super.get(get, withCoprocessor); + sleep(600); + return cells; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java index 58e8075ed493..05c65ab8c069 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY; import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL; - import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -31,7 +30,6 @@ import java.util.Iterator; import java.util.List; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.AuthUtil; @@ -49,10 +47,12 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode; @@ -164,33 +164,7 @@ public List getUserAuths(byte[] user, boolean systemCall) throws IOExcep assert (labelsRegion != null || systemCall); List auths = new ArrayList<>(); Get get = new Get(user); - List cells = null; - if (labelsRegion == null) { - Table table = null; - Connection connection = null; - try { - connection = ConnectionFactory.createConnection(conf); - table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME); - Result result = table.get(get); - cells = result.listCells(); - } finally { - if (table != null) { - table.close(); - } - if (connection != null){ - connection.close(); - } - } - } else { - cells = this.labelsRegion.get(get, false); - } - if (cells != null) { - for (Cell cell : cells) { - String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength()); - auths.add(auth); - } - } + getAuths(get, auths); return auths; } @@ -201,34 +175,48 @@ public List getGroupAuths(String[] groups, boolean systemCall) throws IO if (groups != null && groups.length > 0) { for (String group : groups) { Get get = new Get(Bytes.toBytes(AuthUtil.toGroupEntry(group))); - List cells = null; - if (labelsRegion == null) { - Table table = null; - Connection connection = null; - try { - connection = ConnectionFactory.createConnection(conf); - table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME); - Result result = table.get(get); - cells = result.listCells(); - } finally { - if (table != null) { - table.close(); - connection.close(); - } + getAuths(get, auths); + } + } + return auths; + } + + private void getAuths(Get get, List auths) throws IOException { + List cells = new ArrayList<>(); + RegionScanner scanner = null; + try { + if (labelsRegion == null) { + Table table = null; + Connection connection = null; + try { + connection = ConnectionFactory.createConnection(conf); + table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME); + Result result = table.get(get); + cells = result.listCells(); + } finally { + if (table != null) { + table.close(); } - } else { - cells = this.labelsRegion.get(get, false); - } - if (cells != null) { - for (Cell cell : cells) { - String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength()); - auths.add(auth); + if (connection != null) { + connection.close(); } } + } else { + // NOTE: Please don't use HRegion.get() instead, + // because it will copy cells to heap. See HBASE-26036 + scanner = this.labelsRegion.getScanner(new Scan(get)); + scanner.next(cells); + } + for (Cell cell : cells) { + String auth = Bytes + .toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + auths.add(auth); + } + } finally { + if (scanner != null) { + scanner.close(); } } - return auths; } @Override From 72c1055ceb15820bfefa4b17a4fd0cb139d16cd5 Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Tue, 6 Jul 2021 18:40:20 +0800 Subject: [PATCH 2/2] address the checkstyle issue --- .../hadoop/hbase/regionserver/HRegion.java | 19 +++++++++++-------- .../visibility/VisibilityController.java | 3 ++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 697806a1b18c..8c4660cda586 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4069,11 +4069,14 @@ private List reckonDeltasByStore(HStore store, Mutation mutation, long now Cell newCell; if (mutation instanceof Increment) { long deltaAmount = getLongValue(delta); - final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount; - newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); + final long newValue = currentValue == null ? deltaAmount : + getLongValue(currentValue) + deltaAmount; + newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, + (oldCell) -> Bytes.toBytes(newValue)); } else { newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, - (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) + (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() + + oldCell.getValueLength()]) .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength()) .array()); @@ -4082,7 +4085,8 @@ private List reckonDeltasByStore(HStore store, Mutation mutation, long now int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell); if (newCellSize > region.maxCellSize) { String msg = - "Cell with size " + newCellSize + " exceeds limit of " + region.maxCellSize + " bytes in region " + this; + "Cell with size " + newCellSize + " exceeds limit of " + region.maxCellSize + + " bytes in region " + this; LOG.debug(msg); throw new DoNotRetryIOException(msg); } @@ -7550,9 +7554,6 @@ private List get(Get get, boolean withCoprocessor, long nonceGroup, long n () -> createRegionSpan("Region.get")); } - /** - * This method will return onheap cells, for more details, please see HBASE-26036. - */ private List getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce) throws IOException { List results = new ArrayList<>(); @@ -7572,7 +7573,9 @@ private List getInternal(Get get, boolean withCoprocessor, long nonceGroup try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) { List tmp = new ArrayList<>(); scanner.next(tmp); - // copy EC to heap, then close the scanner + // Copy EC to heap, then close the scanner. + // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers. + // See more details in HBASE-26036. for (Cell cell : tmp) { results.add(cell instanceof ByteBufferExtendedCell ? ((ByteBufferExtendedCell) cell).deepClone(): cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 5833ebc92cc5..66dd862b663e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -409,7 +409,8 @@ public void prePrepareTimeStampForDeleteVersion( return; } if (result.size() > get.getMaxVersions()) { - throw new RuntimeException("Unexpected size: " + result.size() + ". Results more than the max versions obtained."); + throw new RuntimeException("Unexpected size: " + result.size() + + ". Results more than the max versions obtained."); } Cell getCell = result.get(get.getMaxVersions() - 1); PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());