Skip to content

Commit

Permalink
HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL …
Browse files Browse the repository at this point in the history
…needs a roll #2155

Co-authored-by: wen_yi <liu.wenwen@immomo.com>
Signed-off-by: Anoop <anoopsamjohn@apache.org>
Signed-off-by: Ramkrishna <ramkrishna@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
WenFeiYi and wen_yi authored Jul 28, 2020
1 parent ece0792 commit 7ef1aca
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected void scheduleFlush(String encodedRegionName) {
}

@VisibleForTesting
Map<WAL, Boolean> getWalNeedsRoll() {
return this.walNeedsRoll;
Map<WAL, RollController> getWalNeedsRoll() {
return this.wals;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -56,31 +55,31 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread

protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";

protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
protected final T abortable;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private final long checkLowReplicationInterval;

private volatile boolean running = true;

public void addWAL(WAL wal) {
// check without lock first
if (walNeedsRoll.containsKey(wal)) {
if (wals.containsKey(wal)) {
return;
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized (AbstractWALRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
controller.requestRoll();
AbstractWALRoller.this.notifyAll();
}
}
Expand All @@ -91,9 +90,8 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) {

public void requestRollAll() {
synchronized (this) {
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
for (WAL wal : wals) {
walNeedsRoll.put(wal, Boolean.TRUE);
for (RollController controller : wals.values()) {
controller.requestRoll();
}
notifyAll();
}
Expand All @@ -113,9 +111,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
*/
private void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
for (Entry<WAL, RollController> entry : wals.entrySet()) {
WAL wal = entry.getKey();
boolean needRollAlready = entry.getValue();
boolean needRollAlready = entry.getValue().needsRoll(now);
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
Expand All @@ -131,7 +129,7 @@ private void abort(String reason, Throwable cause) {
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
// is already broken.
for (WAL wal : walNeedsRoll.keySet()) {
for (WAL wal : wals.keySet()) {
// shutdown rather than close here since we are going to abort the RS and the wals need to be
// split when recovery
try {
Expand All @@ -146,42 +144,39 @@ private void abort(String reason, Throwable cause) {
@Override
public void run() {
while (running) {
boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
// Time for periodic roll, fall through
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
// WAL roll requested, fall through
LOG.debug("WAL roll requested");
} else {
try {
wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// restore the interrupt state
Thread.currentThread().interrupt();
}
// goto the beginning to check whether again whether we should fall through to roll
// several WALs, and also check whether we should quit.
continue;
synchronized (this) {
if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
try {
wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// restore the interrupt state
Thread.currentThread().interrupt();
}
// goto the beginning to check whether again whether we should fall through to roll
// several WALs, and also check whether we should quit.
continue;
}
}
try {
this.lastRollTime = System.currentTimeMillis();
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
.hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator();
iter.hasNext();) {
Entry<WAL, RollController> entry = iter.next();
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
RollController controller = entry.getValue();
if (controller.isRollRequested()) {
// WAL roll requested, fall through
LOG.debug("WAL {} roll requested", wal);
} else if (controller.needsPeriodicRoll(now)){
// Time for periodic roll, fall through
LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
} else {
continue;
}
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
byte[][] regionsToFlush = controller.rollWal(now);
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(Bytes.toString(r));
Expand Down Expand Up @@ -223,7 +218,8 @@ private boolean isWaiting() {
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis()))
&& isWaiting();
}

/**
Expand All @@ -240,4 +236,43 @@ public void close() {
running = false;
interrupt();
}

/**
* Independently control the roll of each wal. When use multiwal,
* can avoid all wal roll together. see HBASE-24665 for detail
*/
protected class RollController {
private final WAL wal;
private final AtomicBoolean rollRequest;
private long lastRollTime;

RollController(WAL wal) {
this.wal = wal;
this.rollRequest = new AtomicBoolean(false);
this.lastRollTime = System.currentTimeMillis();
}

public void requestRoll() {
this.rollRequest.set(true);
}

public byte[][] rollWal(long now) throws IOException {
this.lastRollTime = now;
// reset the flag in front to avoid missing roll request before we return from rollWriter.
this.rollRequest.set(false);
return wal.rollWriter(true);
}

public boolean isRollRequested() {
return rollRequest.get();
}

public boolean needsPeriodicRoll(long now) {
return (now - this.lastRollTime) > rollPeriod;
}

public boolean needsRoll(long now) {
return isRollRequested() || needsPeriodicRoll(now);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

@Category({RegionServerTests.class, MediumTests.class})
public class TestLogRoller {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLogRoller.class);

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private static final int LOG_ROLL_PERIOD = 20 * 1000;
private static final String LOG_DIR = "WALs";
private static final String ARCHIVE_DIR = "archiveWALs";
private static final String WAL_PREFIX = "test-log-roller";
private static Configuration CONF;
private static LogRoller ROLLER;
private static Path ROOT_DIR;
private static FileSystem FS;

@Before
public void setup() throws Exception {
CONF = TEST_UTIL.getConfiguration();
CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
ROOT_DIR = TEST_UTIL.getRandomDir();
FS = FileSystem.get(CONF);
RegionServerServices services = Mockito.mock(RegionServerServices.class);
Mockito.when(services.getConfiguration()).thenReturn(CONF);
ROLLER = new LogRoller(services);
ROLLER.start();
}

@After
public void tearDown() throws Exception {
ROLLER.close();
FS.close();
TEST_UTIL.shutdownMiniCluster();
}

/**
* verify that each wal roll separately
*/
@Test
public void testRequestRollWithMultiWal() throws Exception {
// add multiple wal
Map<FSHLog, Path> wals = new HashMap<>();
for (int i = 1; i <= 3; i++) {
FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
true, WAL_PREFIX, "." + i);
wal.init();
wals.put(wal, wal.getCurrentFileName());
ROLLER.addWAL(wal);
Thread.sleep(1000);
}

// request roll
Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
Map.Entry<FSHLog, Path> walEntry = it.next();
walEntry.getKey().requestLogRoll();
Thread.sleep(5000);

assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
walEntry.setValue(walEntry.getKey().getCurrentFileName());
while (it.hasNext()) {
walEntry = it.next();
assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
}

// period roll
Thread.sleep(LOG_ROLL_PERIOD + 5000);
for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
entry.getKey().close();
}
}
}

0 comments on commit 7ef1aca

Please sign in to comment.