Skip to content

Commit

Permalink
HBASE-23223 Support the offsetLock of bucketCache to use strong ref (#…
Browse files Browse the repository at this point in the history
…764)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
bsglz authored and wchevreuil committed Nov 21, 2019
1 parent 77b4e8c commit 4ea7922
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,6 +115,10 @@ public class BucketCache implements BlockCache, HeapSize {
static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";

/** Use strong reference for offsetLock or not */
private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
private static final boolean STRONG_REF_DEFAULT = false;

/** Priority buckets */
@VisibleForTesting
static final float DEFAULT_SINGLE_FACTOR = 0.25f;
Expand Down Expand Up @@ -199,10 +205,9 @@ public class BucketCache implements BlockCache, HeapSize {
* A ReentrantReadWriteLock to lock on a particular block identified by offset.
* The purpose of this is to avoid freeing the block which is being read.
* <p>
* Key set of offsets in BucketCache is limited so soft reference is the best choice here.
*/
@VisibleForTesting
transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
transient final IdReadWriteLock<Long> offsetLock;

private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
int nameComparison = a.getHfileName().compareTo(b.getHfileName());
Expand Down Expand Up @@ -257,6 +262,12 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf) throws IOException {
boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
if (useStrongRef) {
this.offsetLock = new IdReadWriteLockStrongRef<>();
} else {
this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
}
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
this.writerThreads = new WriterThread[writerThreadNum];
Expand All @@ -277,7 +288,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck

LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor +
", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor +
", memoryFactor: " + memoryFactor);
", memoryFactor: " + memoryFactor + ", useStrongRef: " + useStrongRef);

this.cacheCapacity = capacity;
this.persistencePath = persistencePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
import org.apache.hadoop.hbase.util.ZKDataMigrator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
Expand All @@ -62,7 +63,7 @@ public class TableStateManager {
private static final String MIGRATE_TABLE_STATE_FROM_ZK_KEY =
"hbase.migrate.table.state.from.zookeeper";

private final IdReadWriteLock<TableName> tnLock = new IdReadWriteLock<>();
private final IdReadWriteLock<TableName> tnLock = new IdReadWriteLockWithObjectPool<>();
protected final MasterServices master;

private final ConcurrentMap<TableName, TableState.State> tableName2State =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/
package org.apache.hadoop.hbase.util;

import java.lang.ref.Reference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
Expand All @@ -42,80 +40,13 @@
* For write lock, use lock.writeLock()
*/
@InterfaceAudience.Private
public class IdReadWriteLock<T> {
// The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000;
/**
* The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
* garbage-collected by JVM
*/
private final ObjectPool<T, ReentrantReadWriteLock> lockPool;
private final ReferenceType refType;

public IdReadWriteLock() {
this(ReferenceType.WEAK);
}

/**
* Constructor of IdReadWriteLock
* @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
* default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
* be reused with a high frequency
*/
public IdReadWriteLock(ReferenceType referenceType) {
this.refType = referenceType;
switch (referenceType) {
case SOFT:
lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
break;
case WEAK:
default:
lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
}
}

public static enum ReferenceType {
WEAK, SOFT
}

/**
* Get the ReentrantReadWriteLock corresponding to the given id
* @param id an arbitrary number to identify the lock
*/
public ReentrantReadWriteLock getLock(T id) {
lockPool.purge();
ReentrantReadWriteLock readWriteLock = lockPool.get(id);
return readWriteLock;
}

/** For testing */
@VisibleForTesting
int purgeAndGetEntryPoolSize() {
gc();
Threads.sleep(200);
lockPool.purge();
return lockPool.size();
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
private void gc() {
System.gc();
}
public abstract class IdReadWriteLock<T> {
public abstract ReentrantReadWriteLock getLock(T id);

@VisibleForTesting
public void waitForWaiters(T id, int numWaiters) throws InterruptedException {
for (ReentrantReadWriteLock readWriteLock;;) {
readWriteLock = lockPool.get(id);
readWriteLock = getLock(id);
if (readWriteLock != null) {
synchronized (readWriteLock) {
if (readWriteLock.getQueueLength() >= numWaiters) {
Expand All @@ -126,9 +57,4 @@ public void waitForWaiters(T id, int numWaiters) throws InterruptedException {
Thread.sleep(50);
}
}

@VisibleForTesting
public ReferenceType getReferenceType() {
return this.refType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

@InterfaceAudience.Private
public class IdReadWriteLockStrongRef<T> extends IdReadWriteLock<T> {

final ConcurrentHashMap<T, ReentrantReadWriteLock> map = new ConcurrentHashMap<>();

@VisibleForTesting
@Override
public ReentrantReadWriteLock getLock(T id) {
ReentrantReadWriteLock existing = map.get(id);
if (existing != null) {
return existing;
}

ReentrantReadWriteLock newLock = new ReentrantReadWriteLock();
existing = map.putIfAbsent(id, newLock);
if (existing == null) {
return newLock;
} else {
return existing;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.lang.ref.Reference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

@InterfaceAudience.Private
public class IdReadWriteLockWithObjectPool<T> extends IdReadWriteLock<T>{
// The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000;
/**
* The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
* garbage-collected by JVM
*/
private final ObjectPool<T, ReentrantReadWriteLock> lockPool;
private final ReferenceType refType;

public IdReadWriteLockWithObjectPool() {
this(ReferenceType.WEAK);
}

/**
* Constructor of IdReadWriteLockWithObjectPool
* @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
* default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
* be reused with a high frequency
*/
public IdReadWriteLockWithObjectPool(ReferenceType referenceType) {
this.refType = referenceType;
switch (referenceType) {
case SOFT:
lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
break;
case WEAK:
default:
lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<T, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(T id) {
return new ReentrantReadWriteLock();
}
}, NB_CONCURRENT_LOCKS);
}
}

public static enum ReferenceType {
WEAK, SOFT
}

/**
* Get the ReentrantReadWriteLock corresponding to the given id
* @param id an arbitrary number to identify the lock
*/
@Override
public ReentrantReadWriteLock getLock(T id) {
lockPool.purge();
ReentrantReadWriteLock readWriteLock = lockPool.get(id);
return readWriteLock;
}

/** For testing */
@VisibleForTesting
int purgeAndGetEntryPoolSize() {
gc();
Threads.sleep(200);
lockPool.purge();
return lockPool.size();
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
private void gc() {
System.gc();
}

@VisibleForTesting
public ReferenceType getReferenceType() {
return this.refType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Category({ SmallTests.class })
public class TestIdReadWriteLockStrongRef {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestIdReadWriteLockStrongRef.class);

private static final Logger LOG = LoggerFactory.getLogger(TestIdReadWriteLockStrongRef.class);

private IdReadWriteLockStrongRef<Long> idLock = new IdReadWriteLockStrongRef<>();

@Test
public void testGetLock() throws Exception {
Long offset_1 = 1L;
Long offset_2 = 2L;
ReentrantReadWriteLock offsetLock_1 = idLock.getLock(offset_1);
ReentrantReadWriteLock offsetLock_2 = idLock.getLock(offset_1);
Assert.assertEquals(offsetLock_1,offsetLock_2);
ReentrantReadWriteLock offsetLock_3 = idLock.getLock(offset_2);
Assert.assertNotEquals(offsetLock_1,offsetLock_3);
}

}

Loading

0 comments on commit 4ea7922

Please sign in to comment.