Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28190 Add slow sync log rolling test in TestAsyncLogRolling. #5507

Merged
merged 9 commits into from
Dec 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2245,6 +2245,10 @@ private static void split(final Configuration conf, final Path p) throws IOExcep
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}

W getWriter() {
return this.writer;
}

private static void usage() {
System.err.println("Usage: AbstractFSWAL <ARGS>");
System.err.println("Arguments:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,6 @@ DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}

Writer getWriter() {
return this.writer;
}

void setWriter(Writer writer) {
this.writer = writer;
}

@Override
protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) {
// put remote writer first as usually it will cost more time to finish, so we write to it first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -48,6 +51,7 @@
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -74,6 +78,7 @@ public abstract class AbstractTestLogRolling {
protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
@Rule
public final TestName name = new TestName();
private static int syncLatencyMillis;
Apache9 marked this conversation as resolved.
Show resolved Hide resolved

public AbstractTestLogRolling() {
this.server = null;
Expand Down Expand Up @@ -118,6 +123,12 @@ public static void setUpBeforeClass() throws Exception {
// disable low replication check for log roller to get a more stable result
// TestWALOpenAfterDNRollingStart will test this option.
conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000);

// For slow sync threshold test: roll after 5 slow syncs in 10 seconds
conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
// For slow sync threshold test: roll once after a sync above this threshold
conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
}

@Before
Expand Down Expand Up @@ -158,6 +169,138 @@ private void startAndWriteData() throws IOException, InterruptedException {
}
}

public static void setSyncLatencyMillis(int latency) {
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
syncLatencyMillis = latency;
}

public static int getSyncLatencyMillis() {
return syncLatencyMillis;
}

@Test
public void testSlowSyncLogRolling() throws Exception {
// Create the test table
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
int row = 1;
try {
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
// Get a reference to the wal.
final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region);

final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
// Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
switch (reason) {
case SLOW_SYNC:
slowSyncHookCalled.lazySet(true);
break;
default:
break;
}
}
});

// Write some data
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

// Only test for FSHLog.
if ("filesystem".equals(TEST_UTIL.getConfiguration().get(WALFactory.WAL_PROVIDER))) {
Apache9 marked this conversation as resolved.
Show resolved Hide resolved

// Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to
// trigger slow sync warnings.
setSyncLatencyMillis(200);
setSlowLogWriter(log.conf);
log.rollWriter(true);

// Set up for test
slowSyncHookCalled.set(false);

final WALProvider.WriterBase oldWriter1 = log.getWriter();

// Write some data.
// We need to write at least 5 times, but double it. We should only request
// a SLOW_SYNC roll once in the current interval.
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

// Wait for our wait injecting writer to get rolled out, as needed.
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != oldWriter1;
}

@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});

assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());
}

// Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold.
setSyncLatencyMillis(5000);
setSlowLogWriter(log.conf);
log.rollWriter(true);

// Set up for test
slowSyncHookCalled.set(false);

final WALProvider.WriterBase oldWriter2 = log.getWriter();

// Write some data. Should only take one sync.
writeData(table, row++);

// Wait for our wait injecting writer to get rolled out, as needed.
TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != oldWriter2;
}

@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});

assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

// Set default log writer, no additional latency to any sync on the hlog.
setDefaultLogWriter(log.conf);
log.rollWriter(true);

// Set up for test
slowSyncHookCalled.set(false);

// Write some data
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

} finally {
table.close();
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
}
}

protected abstract void setSlowLogWriter(Configuration conf);

protected abstract void setDefaultLogWriter(Configuration conf);

/**
* Tests that log rolling doesn't hang when no data is written.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -36,6 +37,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;

@Category({ VerySlowRegionServerTests.class, LargeTests.class })
public class TestAsyncLogRolling extends AbstractTestLogRolling {

Expand All @@ -51,6 +55,33 @@ public static void setUpBeforeClass() throws Exception {
AbstractTestLogRolling.setUpBeforeClass();
}

public static class SlowSyncLogWriter extends AsyncProtobufLogWriter {

public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
super(eventLoopGroup, channelClass);
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
try {
Thread.sleep(getSyncLatencyMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.sync(forceSync);
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
protected void setSlowLogWriter(Configuration conf) {
conf.set(AsyncFSWALProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName());
}

@Override
protected void setDefaultLogWriter(Configuration conf) {
conf.set(AsyncFSWALProvider.WRITER_IMPL, AsyncProtobufLogWriter.class.getName());
}

@Test
public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException {
dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null);
Expand Down
Loading