Skip to content

Commit

Permalink
HBASE-28683 Only allow one TableProcedureInterface for a single table…
Browse files Browse the repository at this point in the history
… to run at the same time for some special procedure types (#6046) (#6084)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 50a495f)
(cherry picked from commit 855410f)
  • Loading branch information
Apache9 committed Jul 17, 2024
1 parent 855851a commit e1bd632
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ protected synchronized void setExecuted() {
this.wasExecuted = true;
}

protected synchronized boolean wasExecuted() {
public synchronized boolean wasExecuted() {
return wasExecuted;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,28 @@ private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waiti
private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList,
List<Procedure<TEnvironment>> failedList) {
failedList.forEach(scheduler::addBack);
// Put the procedures which have been executed first
// For table procedures, to prevent concurrent modifications, we only allow one procedure to run
// for a single table at the same time, this is done via inserting a waiting queue before
// actually add the procedure to run queue. So when loading here, we should add the procedures
// which have been executed first, otherwise another procedure which was in the waiting queue
// before restarting may be added to run queue first and still cause concurrent modifications.
// See HBASE-28263 for the reason why we need this
runnableList.sort((p1, p2) -> {
if (p1.wasExecuted()) {
if (p2.wasExecuted()) {
return Long.compare(p1.getProcId(), p2.getProcId());
} else {
return -1;
}
} else {
if (p2.wasExecuted()) {
return 1;
} else {
return Long.compare(p1.getProcId(), p2.getProcId());
}
}
});
runnableList.forEach(p -> {
p.afterReplay(getEnvironment());
if (!p.hasParent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -109,9 +112,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private PeerQueue peerMap = null;
private MetaQueue metaMap = null;

private final Function<Long, Procedure<?>> procedureRetriever;
private final SchemaLocking locking;

// To prevent multiple Create/Modify/Disable/Enable table procedure run at the same time, we will
// keep table procedure in this queue first before actually enqueuing it to tableQueue
// Seee HBASE-28683 for more details
private final Map<TableName, TableProcedureWaitingQueue> tableProcsWaitingEnqueue =
new HashMap<>();

public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
this.procedureRetriever = procedureRetriever;
locking = new SchemaLocking(procedureRetriever);
}

Expand All @@ -120,11 +131,26 @@ public void yield(final Procedure proc) {
push(proc, false, true);
}

private boolean shouldWaitBeforeEnqueuing(TableProcedureInterface proc) {
return TableQueue.requireTableExclusiveLock(proc);
}

@Override
protected void enqueue(final Procedure proc, final boolean addFront) {
if (isMetaProcedure(proc)) {
doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
} else if (isTableProcedure(proc)) {
TableProcedureInterface tableProc = (TableProcedureInterface) proc;
if (shouldWaitBeforeEnqueuing(tableProc)) {
TableProcedureWaitingQueue waitingQueue = tableProcsWaitingEnqueue.computeIfAbsent(
tableProc.getTableName(), k -> new TableProcedureWaitingQueue(procedureRetriever));
if (!waitingQueue.procedureSubmitted(proc)) {
// there is a table procedure for this table already enqueued, waiting
LOG.debug("There is already a procedure running for table {}, added {} to waiting queue",
tableProc.getTableName(), proc);
return;
}
}
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
Expand Down Expand Up @@ -266,6 +292,7 @@ private void clearQueue() {
// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
tableProcsWaitingEnqueue.clear();

// Remove Peers
clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
Expand Down Expand Up @@ -303,17 +330,46 @@ protected int queueSize() {
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
for (TableProcedureWaitingQueue waitingQ : tableProcsWaitingEnqueue.values()) {
count += waitingQ.waitingSize();
}
return count;
}

@Override
public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
if (isTableProcedure(proc)) {
TableProcedureInterface tableProc = (TableProcedureInterface) proc;
if (shouldWaitBeforeEnqueuing(tableProc)) {
schedLock();
try {
TableProcedureWaitingQueue waitingQueue =
tableProcsWaitingEnqueue.get(tableProc.getTableName());
if (waitingQueue != null) {
Optional<Procedure<?>> nextProc = waitingQueue.procedureCompleted(proc);
if (nextProc.isPresent()) {
// enqueue it
Procedure<?> next = nextProc.get();
LOG.debug("{} completed, enqueue a new procedure {}", proc, next);
doAdd(tableRunQueue, getTableQueue(tableProc.getTableName()), next, false);
} else {
if (waitingQueue.isEmpty()) {
// there is no waiting procedures in it, remove
tableProcsWaitingEnqueue.remove(tableProc.getTableName());
}
}
} else {
// this should not happen normally, warn it
LOG.warn("no waiting queue while completing {}, which should not happen", proc);
}
} finally {
schedUnlock();
}
}
boolean tableDeleted;
if (proc.hasException()) {
Exception procEx = proc.getException().unwrapRemoteException();
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
if (tableProc.getTableOperationType() == TableOperationType.CREATE) {
// create failed because the table already exist
tableDeleted = !(procEx instanceof TableExistsException);
} else {
Expand All @@ -322,11 +378,10 @@ public void completionCleanup(final Procedure proc) {
}
} else {
// the table was deleted
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
tableDeleted = (tableProc.getTableOperationType() == TableOperationType.DELETE);
}
if (tableDeleted) {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
markTableAsDeleted(tableProc.getTableName(), proc);
}
} else if (proc instanceof PeerProcedureInterface) {
tryCleanupPeerQueue(getPeerId(proc), proc);
Expand Down Expand Up @@ -657,7 +712,9 @@ boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure)
try {
final TableQueue queue = getTableQueue(table);
final LockAndQueue tableLock = locking.getTableLock(table);
if (queue == null) return true;
if (queue == null) {
return true;
}

if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
// remove the table from the run-queue and the map
Expand Down Expand Up @@ -1042,6 +1099,7 @@ public String toString() {
serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]);
}
builder.append("tableMap", tableMap);
builder.append("tableWaitingMap", tableProcsWaitingEnqueue);
builder.append("peerMap", peerMap);
builder.append("metaMap", metaMap);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Function;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* To prevent multiple Create/Modify/Disable/Enable table procedures run at the same time, we will
* keep table procedure in this queue first before actually enqueuing it to
* MasterProcedureScheduler's tableQueue. See HBASE-28683 for more details
*/
@InterfaceAudience.Private
class TableProcedureWaitingQueue {

private final Function<Long, Procedure<?>> procedureRetriever;

// whether there is already a table procedure enqueued in ProcedureScheduler.
private Procedure<?> enqueuedProc;

private final Queue<Procedure<?>> queue = new ArrayDeque<>();

TableProcedureWaitingQueue(Function<Long, Procedure<?>> procedureRetriever) {
this.procedureRetriever = procedureRetriever;
}

private boolean isSubProcedure(Procedure<?> proc) {
while (proc.hasParent()) {
if (proc.getParentProcId() == enqueuedProc.getProcId()) {
return true;
}
proc = Preconditions.checkNotNull(procedureRetriever.apply(proc.getParentProcId()),
"can not find parent procedure pid=%s", proc.getParentProcId());
}
return false;
}

/**
* Return whether we can enqueue this procedure to ProcedureScheduler.
* <p>
* If returns {@code true}, you should enqueue this procedure, otherwise you just need to do
* nothing, as we will queue it in the waitingQueue, and you will finally get it again by calling
* {@link #procedureCompleted(Procedure)} method in the future.
*/
boolean procedureSubmitted(Procedure<?> proc) {
if (enqueuedProc == null) {
// no procedure enqueued yet, record it and return
enqueuedProc = proc;
return true;
}
if (proc == enqueuedProc) {
// the same procedure is enqueued again, this usually because the procedure comes back from
// WAITING state, such as all child procedures are finished
return true;
}
// check whether this is a sub procedure of the enqueued procedure
if (isSubProcedure(proc)) {
return true;
}
queue.add(proc);
return false;
}

/**
* Return the next procedure which can be enqueued to ProcedureScheduler.
*/
Optional<Procedure<?>> procedureCompleted(Procedure<?> proc) {
Preconditions.checkState(enqueuedProc != null, "enqueued procedure should not be null");
if (enqueuedProc == proc) {
if (!queue.isEmpty()) {
enqueuedProc = queue.poll();
return Optional.of(enqueuedProc);
} else {
enqueuedProc = null;
return Optional.empty();
}
} else {
Preconditions.checkState(isSubProcedure(proc),
"procedure %s is not a sub procedure of enqueued procedure %s", proc, enqueuedProc);
return Optional.empty();
}
}

boolean isEmpty() {
return enqueuedProc == null;
}

int waitingSize() {
return queue.size();
}

@Override
public String toString() {
return "TableProcedureWaitingQueue [enqueuedProc=" + enqueuedProc + ", queue=" + queue + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public boolean requireExclusiveLock(Procedure<?> proc) {
/**
* @param proc must not be null
*/
private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
switch (proc.getTableOperationType()) {
case CREATE:
case DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public LockState acquireLock(Void env) {
@Override
public void releaseLock(Void env) {
procedureScheduler.wakeTableExclusiveLock(this, getTableName());
procedureScheduler.completionCleanup(this);
}
}

Expand Down
Loading

0 comments on commit e1bd632

Please sign in to comment.