Skip to content

Commit

Permalink
HBASE-27392 Add a new procedure type for implementing some global ope…
Browse files Browse the repository at this point in the history
…rations such as migration (#4803)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
  • Loading branch information
Apache9 committed Sep 29, 2022
1 parent 8ba809c commit de2d20a
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ public enum LockedResourceType {
TABLE,
REGION,
PEER,
META
META,
GLOBAL
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Procedure interface for global operations, such as migration.
*/
@InterfaceAudience.Private
public interface GlobalProcedureInterface {

String getGlobalId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class GlobalQueue extends Queue<String> {

public GlobalQueue(String globalId, LockStatus lockStatus) {
super(globalId, lockStatus);
}

@Override
boolean requireExclusiveLock(Procedure<?> proc) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
(n, k) -> n.compareKey((String) k);
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((String) k);

private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
private final FairQueue<String> globalRunQueue = new FairQueue<>();

private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
private MetaQueue metaMap = null;
private GlobalQueue globalMap = null;

private final SchemaLocking locking;

Expand All @@ -128,6 +133,8 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else if (isGlobalProcedure(proc)) {
doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
Expand Down Expand Up @@ -163,14 +170,19 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,

@Override
protected boolean queueHasRunnables() {
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
|| serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
|| tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
|| peerRunQueue.hasRunnables();
}

@Override
protected Procedure dequeue() {
// meta procedure is always the first priority
Procedure<?> pollResult = doPoll(metaRunQueue);
// pull global first
Procedure<?> pollResult = doPoll(globalRunQueue);
// then meta procedure
if (pollResult == null) {
pollResult = doPoll(metaRunQueue);
}
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
Expand Down Expand Up @@ -268,6 +280,14 @@ private void clearQueue() {
clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
peerMap = null;

// Remove Meta
clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
metaMap = null;

// Remove Global
clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
globalMap = null;

assert size() == 0 : "expected queue size to be 0, got " + size();
}

Expand Down Expand Up @@ -300,6 +320,7 @@ protected int queueSize() {
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
count += queueSize(globalMap);
return count;
}

Expand Down Expand Up @@ -502,6 +523,51 @@ private static boolean isMetaProcedure(Procedure<?> proc) {
return proc instanceof MetaProcedureInterface;
}

// ============================================================================
// Global Queue Lookup Helpers
// ============================================================================
private GlobalQueue getGlobalQueue(String globalId) {
GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
globalMap = AvlTree.insert(globalMap, node);
return node;
}

private void removeGlobalQueue(String globalId) {
globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
locking.removeGlobalLock(globalId);
}

private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
schedLock();
try {
GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
if (queue == null) {
return;
}

final LockAndQueue lock = locking.getGlobalLock(globalId);
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(globalRunQueue, queue,
() -> "clean up global queue after " + procedure + " completed");
removeGlobalQueue(globalId);
}
} finally {
schedUnlock();
}
}

private static boolean isGlobalProcedure(Procedure<?> proc) {
return proc instanceof GlobalProcedureInterface;
}

private static String getGlobalId(Procedure<?> proc) {
return ((GlobalProcedureInterface) proc).getGlobalId();
}

// ============================================================================
// Table Locking Helpers
// ============================================================================
Expand Down Expand Up @@ -1006,6 +1072,51 @@ public void wakeMetaExclusiveLock(Procedure<?> procedure) {
}
}

// ============================================================================
// Global Locking Helpers
// ============================================================================
/**
* Try to acquire the share lock on global.
* @see #wakeGlobalExclusiveLock(Procedure, String)
* @param procedure the procedure trying to acquire the lock
* @return true if the procedure has to wait for global to be available
*/
public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
schedLock();
try {
final LockAndQueue lock = locking.getGlobalLock(globalId);
if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
() -> procedure + " held shared lock");
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
return true;
} finally {
schedUnlock();
}
}

/**
* Wake the procedures waiting for global.
* @see #waitGlobalExclusiveLock(Procedure, String)
* @param procedure the procedure releasing the lock
*/
public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
schedLock();
try {
final LockAndQueue lock = locking.getGlobalLock(globalId);
lock.releaseExclusiveLock(procedure);
addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
() -> procedure + " released shared lock");
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}

/**
* For debugging. Expensive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SchemaLocking {
// Single map for all regions irrespective of tables. Key is encoded region name.
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
private final Map<String, LockAndQueue> globalLocks = new HashMap<>();
private final LockAndQueue metaLock;

public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
Expand Down Expand Up @@ -94,6 +95,10 @@ LockAndQueue getMetaLock() {
return metaLock;
}

LockAndQueue getGlobalLock(String globalId) {
return getLock(globalLocks, globalId);
}

LockAndQueue removeRegionLock(String encodedRegionName) {
return regionLocks.remove(encodedRegionName);
}
Expand All @@ -114,6 +119,10 @@ LockAndQueue removePeerLock(String peerId) {
return peerLocks.remove(peerId);
}

LockAndQueue removeGlobalLock(String globalId) {
return globalLocks.remove(globalId);
}

private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
LockAndQueue queue) {
LockType lockType;
Expand Down Expand Up @@ -164,6 +173,8 @@ List<LockedResource> getLocks() {
addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock),
tn -> tn.getNameAsString(), LockedResourceType.META);
addToLockedResources(lockedResources, globalLocks, Function.identity(),
LockedResourceType.GLOBAL);
return lockedResources;
}

Expand Down Expand Up @@ -191,6 +202,10 @@ LockedResource getLockResource(LockedResourceType resourceType, String resourceN
break;
case META:
queue = metaLock;
break;
case GLOBAL:
queue = globalLocks.get(resourceName);
break;
default:
queue = null;
break;
Expand All @@ -216,7 +231,8 @@ public String toString() {
+ filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks)
+ ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks="
+ filterUnlocked(this.peerLocks) + ", metaLocks="
+ filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock));
+ filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks="
+ filterUnlocked(globalLocks);
}

private String filterUnlocked(Map<?, LockAndQueue> locks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,21 @@ public PeerOperationType getPeerOperationType() {
}
}

public static class TestGlobalProcedure extends TestProcedure
implements GlobalProcedureInterface {
private final String globalId;

public TestGlobalProcedure(long procId, String globalId) {
super(procId);
this.globalId = globalId;
}

@Override
public String getGlobalId() {
return globalId;
}
}

private static LockProcedure createLockProcedure(LockType lockType, long procId)
throws Exception {
LockProcedure procedure = new LockProcedure();
Expand Down Expand Up @@ -1093,6 +1108,39 @@ public void testListLocksPeer() throws Exception {
assertEquals(1, resource.getWaitingProcedures().size());
}

@Test
public void testListLocksGlobal() throws Exception {
String globalId = "1";
LockProcedure procedure = createExclusiveLockProcedure(4);
queue.waitGlobalExclusiveLock(procedure, globalId);

List<LockedResource> locks = queue.getLocks();
assertEquals(1, locks.size());

LockedResource resource = locks.get(0);
assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
assertExclusiveLock(resource, procedure);
assertTrue(resource.getWaitingProcedures().isEmpty());

// Try to acquire the exclusive lock again with same procedure
assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId));

// Try to acquire the exclusive lock again with new procedure
LockProcedure procedure2 = createExclusiveLockProcedure(5);
assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId));

// Same peerId, still only has 1 LockedResource
locks = queue.getLocks();
assertEquals(1, locks.size());

resource = locks.get(0);
assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
// LockedResource owner still is the origin procedure
assertExclusiveLock(resource, procedure);
// The new procedure should in the waiting list
assertEquals(1, resource.getWaitingProcedures().size());
}

@Test
public void testListLocksWaiting() throws Exception {
LockProcedure procedure1 = createExclusiveLockProcedure(1);
Expand Down

0 comments on commit de2d20a

Please sign in to comment.