Skip to content

Commit

Permalink
Add slow sync log rolling test in TestAsyncLogRolling.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuyaogai committed Nov 7, 2023
1 parent 027a119 commit 4cf793d
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2245,6 +2245,14 @@ private static void split(final Configuration conf, final Path p) throws IOExcep
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}

W getWriter() {
return this.writer;
}

void setWriter(W writer) {
this.writer = writer;
}

private static void usage() {
System.err.println("Usage: AbstractFSWAL <ARGS>");
System.err.println("Arguments:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,6 @@ DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}

Writer getWriter() {
return this.writer;
}

void setWriter(Writer writer) {
this.writer = writer;
}

@Override
protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) {
// put remote writer first as usually it will cost more time to finish, so we write to it first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.BeforeClass;
Expand All @@ -49,6 +61,122 @@ public static void setUpBeforeClass() throws Exception {
conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100);
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
AbstractTestLogRolling.setUpBeforeClass();

// For slow sync threshold test: roll once after a sync above this threshold
TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
}

@Test
public void testSlowSyncLogRolling() throws Exception {
// Create the test table
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
int row = 1;
try {
// Get a reference to the AsyncFSWAL
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
final AsyncFSWAL log = (AsyncFSWAL) server.getWAL(region);

// Register a WALActionsListener to observe if a SLOW_SYNC roll is requested

final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
switch (reason) {
case SLOW_SYNC:
slowSyncHookCalled.lazySet(true);
break;
default:
break;
}
}
});

// Write some data

for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

// Set up for test
slowSyncHookCalled.set(false);

// Wrap the current writer with the anonymous class below that adds 5000 ms of
// latency to any sync on the hlog.
// This will trip the other threshold.
final WALProvider.AsyncWriter oldWriter2 = log.getWriter();
final WALProvider.AsyncWriter newWriter2 = new WALProvider.AsyncWriter() {
@Override
public void close() throws IOException {
oldWriter2.close();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return oldWriter2.sync(forceSync);
}

@Override
public void append(WAL.Entry entry) {
oldWriter2.append(entry);
}

@Override
public long getLength() {
return oldWriter2.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

// Write some data. Should only take one sync.

writeData(table, row++);

// Wait for our wait injecting writer to get rolled out, as needed.

TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return log.getWriter() != newWriter2;
}

@Override
public String explainFailure() throws Exception {
return "Waited too long for our test writer to get rolled out";
}
});

assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

// Set up for test
slowSyncHookCalled.set(false);

// Write some data
for (int i = 0; i < 10; i++) {
writeData(table, row++);
}

assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get());

} finally {
table.close();
}
}

@Test
Expand Down

0 comments on commit 4cf793d

Please sign in to comment.