Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;

public static final String DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY =
"dfs.datanode.dataset.sublock.count";
public static final long DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT = 1000L;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {
*/
enum LockLevel {
BLOCK_POOl,
VOLUME
VOLUME,
DIR
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ private String generateLockName(LockLevel level, String... resources) {
+ resources[0] + "volume lock :" + resources[1]);
}
return resources[0] + resources[1];
} else if (resources.length == 3 && level == LockLevel.DIR) {
if (resources[0] == null || resources[1] == null || resources[2] == null) {
throw new IllegalArgumentException("acquire a null dataset lock : "
+ resources[0] + ",volume lock :" + resources[1]
+ ",subdir lock :" + resources[2]);
}
return resources[0] + resources[1] + resources[2];
} else {
throw new IllegalArgumentException("lock level do not match resource");
}
Expand Down Expand Up @@ -153,7 +160,7 @@ public DataSetLockManager() {
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getReadLock(level, resources[0]);
} else {
} else if (level == LockLevel.VOLUME){
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(level, resources);
volLock.setParentLock(bpLock);
Expand All @@ -162,14 +169,25 @@ public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
resources[0]);
}
return volLock;
} else {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
volLock.setParentLock(bpLock);
AutoCloseDataSetLock dirLock = getReadLock(level, resources);
dirLock.setParentLock(volLock);
if (openLockTrace) {
LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
resources[0] + resources[1]);
}
return dirLock;
}
}

@Override
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getWriteLock(level, resources[0]);
} else {
} else if (level == LockLevel.VOLUME) {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getWriteLock(level, resources);
volLock.setParentLock(bpLock);
Expand All @@ -178,6 +196,17 @@ public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
resources[0]);
}
return volLock;
} else {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
volLock.setParentLock(bpLock);
AutoCloseDataSetLock dirLock = getWriteLock(level, resources);
dirLock.setParentLock(volLock);
if (openLockTrace) {
LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
resources[0] + resources[1]);
}
return dirLock;
}
}

Expand Down Expand Up @@ -224,8 +253,13 @@ public void addLock(LockLevel level, String... resources) {
String lockName = generateLockName(level, resources);
if (level == LockLevel.BLOCK_POOl) {
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
} else if (level == LockLevel.VOLUME) {
lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
} else {
lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0], resources[1]),
new ReentrantReadWriteLock(isFair));
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.datanode;

import java.util.List;

/**
* This interface is used to generate sub lock name for a blockid.
*/
public interface DataSetSubLockStrategy {

/**
* Generate sub lock name for the given blockid.
* @param blockid the block id.
* @return sub lock name for the input blockid.
*/
String blockIdToSubLock(long blockid);

List<String> getAllSubLockName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.datanode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class ModDataSetSubLockStrategy implements DataSetSubLockStrategy {
public static final Logger LOG = LoggerFactory.getLogger(DataSetSubLockStrategy.class);

private static final String LOCK_NAME_PERFIX = "SubLock";
private long modFactor;

public ModDataSetSubLockStrategy(long mod) {
if (mod <= 0) {
mod = 1L;
}
this.modFactor = mod;
}

@Override
public String blockIdToSubLock(long blockid) {
return LOCK_NAME_PERFIX + (blockid % modFactor);
}

@Override
public List<String> getAllSubLockName() {
List<String> res = new ArrayList<>();
for (long i = 0L; i < modFactor; i++) {
res.add(LOCK_NAME_PERFIX + i);
}
return res;
}
}
Loading
Loading