Skip to content

Commit

Permalink
Add slow sync log rolling test in TestAsyncLogRolling.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuyaogai committed Nov 7, 2023
1 parent ab4b1d8 commit 9662011
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,14 @@ private static void split(final Configuration conf, final Path p) throws IOExcep
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}

W getWriter() {
return this.writer;
}

void setWriter(W writer) {
this.writer = writer;
}

private static void usage() {
System.err.println("Usage: AbstractFSWAL <ARGS>");
System.err.println("Arguments:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,4 @@ DatanodeInfo[] getPipeline() {
}
return new DatanodeInfo[0];
}

Writer getWriter() {
return this.writer;
}

void setWriter(Writer writer) {
this.writer = writer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.BeforeClass;
Expand All @@ -49,6 +61,122 @@ public static void setUpBeforeClass() throws Exception {
conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100);
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
AbstractTestLogRolling.setUpBeforeClass();

// For slow sync threshold test: roll once after a sync above this threshold
TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
}

@Test
public void testSlowSyncLogRolling() throws Exception {
// Create the test table
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
int row = 1;
try {
// Get a reference to the AsyncFSWAL
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
final AsyncFSWAL log = (AsyncFSWAL) server.getWAL(region);

// Register a WALActionsListener to observe if a SLOW_SYNC roll is requested

final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
switch (reason) {
case SLOW_SYNC:
slowSyncHookCalled.lazySet(true);
break;
default:
break;
}
}
});

// Write some data

for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

// Set up for test
slowSyncHookCalled.set(false);

// Wrap the current writer with the anonymous class below that adds 5000 ms of
// latency to any sync on the hlog.
// This will trip the other threshold.
final WALProvider.AsyncWriter oldWriter2 = log.getWriter();
final WALProvider.AsyncWriter newWriter2 = new WALProvider.AsyncWriter() {
@Override
public void close() throws IOException {
oldWriter2.close();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return oldWriter2.sync(forceSync);
}

@Override
public void append(WAL.Entry entry) {
oldWriter2.append(entry);
}

@Override
public long getLength() {
return oldWriter2.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

// Write some data. Should only take one sync.

writeData(table, row++);

// Wait for our wait injecting writer to get rolled out, as needed.

TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != newWriter2;
}

@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});

assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

// Set up for test
slowSyncHookCalled.set(false);

// Write some data
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

} finally {
table.close();
}
}

@Test
Expand Down

0 comments on commit 9662011

Please sign in to comment.