Skip to content

Commit

Permalink
HBASE-28683 Only allow one TableProcedureInterface for a single table…
Browse files Browse the repository at this point in the history
… to run at the same time for some special procedure types
  • Loading branch information
Apache9 committed Jul 4, 2024
1 parent 3d4bf2e commit 7f9a413
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ protected synchronized void setExecuted() {
this.wasExecuted = true;
}

protected synchronized boolean wasExecuted() {
public synchronized boolean wasExecuted() {
return wasExecuted;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,28 @@ private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waiti
private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList,
List<Procedure<TEnvironment>> failedList) {
failedList.forEach(scheduler::addBack);
// Put the procedures which have been executed first
// For table procedures, to prevent concurrent modifications, we only allow one procedure to run
// for a single table at the same time, this is done via inserting a waiting queue before
// actually add the procedure to run queue. So when loading here, we should add the procedures
// which have been executed first, otherwise another procedure which was in the waiting queue
// before restarting may be added to run queue first and still cause concurrent modifications.
// See HBASE-28263 for the reason why we need this
runnableList.sort((p1, p2) -> {
if (p1.wasExecuted()) {
if (p2.wasExecuted()) {
return Long.compare(p1.getProcId(), p2.getProcId());
} else {
return -1;
}
} else {
if (p2.wasExecuted()) {
return 1;
} else {
return Long.compare(p1.getProcId(), p2.getProcId());
}
}
});
runnableList.forEach(p -> {
p.afterReplay(getEnvironment());
if (!p.hasParent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -115,6 +117,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {

private final SchemaLocking locking;

// To prevent multiple Create/Modify/Disable/Enable table procedure run at the same time, we will
// keep table procedure in this queue first before actually enqueuing it to tableQueue
// Seee HBASE-28683 for more details
private final Map<TableName, TableProcedureWaitingQueue> tableProcsWaitingEnqueue =
new HashMap<>();

public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
locking = new SchemaLocking(procedureRetriever);
}
Expand All @@ -124,11 +132,26 @@ public void yield(final Procedure proc) {
push(proc, false, true);
}

private boolean shouldWaitBeforeEnqueuing(TableProcedureInterface proc) {
return TableQueue.requireTableExclusiveLock(proc);
}

@Override
protected void enqueue(final Procedure proc, final boolean addFront) {
if (isMetaProcedure(proc)) {
doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
} else if (isTableProcedure(proc)) {
TableProcedureInterface tableProc = (TableProcedureInterface) proc;
if (shouldWaitBeforeEnqueuing(tableProc)) {
TableProcedureWaitingQueue waitingQueue = tableProcsWaitingEnqueue
.computeIfAbsent(tableProc.getTableName(), k -> new TableProcedureWaitingQueue());
if (!waitingQueue.procedureSubmitted(proc)) {
// there is a table procedure for this table already enqueued, waiting
LOG.debug("There is already a procedure running for table {}, add {} to waiting queue",
tableProc.getTableName(), proc);
return;
}
}
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
Expand Down Expand Up @@ -328,12 +351,29 @@ protected int queueSize() {

@Override
public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
if (isTableProcedure(proc)) {
TableProcedureInterface tableProc = (TableProcedureInterface) proc;
if (shouldWaitBeforeEnqueuing(tableProc)) {
TableProcedureWaitingQueue waitingQueue =
tableProcsWaitingEnqueue.get(tableProc.getTableName());
if (waitingQueue != null) {
waitingQueue.procedureCompleted(proc).ifPresentOrElse(next -> {
// enqueue it
LOG.debug("{} completed, enqueue a new procedure {}", proc, next);
doAdd(tableRunQueue, getTableQueue(tableProc.getTableName()), next, false);
}, () -> {
// there is no waiting procedures in it, remove
tableProcsWaitingEnqueue.remove(tableProc.getTableName());
});
} else {
// this should not happen normally, warn it
LOG.warn("no waiting queue while completing {}, which should not happen", proc);
}
}
boolean tableDeleted;
if (proc.hasException()) {
Exception procEx = proc.getException().unwrapRemoteException();
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
if (tableProc.getTableOperationType() == TableOperationType.CREATE) {
// create failed because the table already exist
tableDeleted = !(procEx instanceof TableExistsException);
} else {
Expand All @@ -342,11 +382,10 @@ public void completionCleanup(final Procedure proc) {
}
} else {
// the table was deleted
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
tableDeleted = (tableProc.getTableOperationType() == TableOperationType.DELETE);
}
if (tableDeleted) {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
markTableAsDeleted(tableProc.getTableName(), proc);
}
} else if (proc instanceof PeerProcedureInterface) {
tryCleanupPeerQueue(getPeerId(proc), proc);
Expand Down Expand Up @@ -1149,6 +1188,7 @@ public String toString() {
serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]);
}
builder.append("tableMap", tableMap);
builder.append("tableWaitingMap", tableProcsWaitingEnqueue);
builder.append("peerMap", peerMap);
builder.append("metaMap", metaMap);
builder.append("globalMap", globalMap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;

/**
* To prevent multiple Create/Modify/Disable/Enable table procedures run at the same time, we will
* keep table procedure in this queue first before actually enqueuing it to
* MasterProcedureScheduler's tableQueue. See HBASE-28683 for more details
*/
@InterfaceAudience.Private
class TableProcedureWaitingQueue {

// whether there is already a table procedure enqueued in ProcedureScheduler.
private Procedure<?> enqueuedProc;

private final Queue<Procedure<?>> queue = new ArrayDeque<>();

/**
* Return whether we can enqueue this procedure to ProcedureScheduler.
*/
boolean procedureSubmitted(Procedure<?> proc) {
if (enqueuedProc == null) {
// no procedure enqueued yet, record it and return
enqueuedProc = proc;
return true;
}
if (proc == enqueuedProc) {
// the same procedure is enqueued again, this usually because the procedure comes back from
// WAITING state, such as all child procedures are finished
return true;
}
queue.add(proc);
return false;
}

/**
* Return the next procedure which can be enqueued to ProcedureScheduler.
*/
Optional<Procedure<?>> procedureCompleted(Procedure<?> proc) {
assert proc == enqueuedProc;
if (!queue.isEmpty()) {
enqueuedProc = queue.poll();
return Optional.of(enqueuedProc);
} else {
enqueuedProc = null;
return Optional.empty();
}
}

@Override
public String toString() {
return "TableProcedureWaitingQueue [enqueuedProc=" + enqueuedProc + ", queue=" + queue + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public boolean requireExclusiveLock(Procedure<?> proc) {
/**
* @param proc must not be null
*/
private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
switch (proc.getTableOperationType()) {
case CREATE:
case DELETE:
Expand Down

0 comments on commit 7f9a413

Please sign in to comment.