Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27819 10k RpcServer.MAX_REQUEST_SIZE is not enough in Replicati… #5208

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
Expand All @@ -36,9 +37,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,66 +47,40 @@
public class ReplicationDroppedTablesTestBase extends TestReplicationBase {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationDroppedTablesTestBase.class);

protected static final int ROWS_COUNT = 1000;

@Before
@Override
public void setUpBase() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
// Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
super.setUpBase();
int rowCount = UTIL1.countRows(tableName);
UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
// Instead, we truncate the first table and wait for all the Deletes to
// make it to the slave.
Scan scan = new Scan();
int lastCount = 0;
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for truncate");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(rowCount);
scanner.close();
if (res.length != 0) {
if (res.length < lastCount) {
i--; // Don't increment timeout if we make progress
}
lastCount = res.length;
LOG.info("Still got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
protected static byte[] VALUE;

private static boolean ALLOW_PROCEEDING;

protected static void setupClusters(boolean allowProceeding) throws Exception {
// Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
// batches. the default max request size is 256M, so all replication entries are in a batch, but
// when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
// may apply first, and then test_dropped table, and we will believe that the replication is not
// got stuck (HBASE-20475).
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
// we used to use 10K but the regionServerReport is greater than this limit in this test which
// makes this test fail, increase to 64K
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 64 * 1024);
// set a large value size to make sure we will split the replication to several batches
VALUE = new byte[4096];
ThreadLocalRandom.current().nextBytes(VALUE);
// make sure we have a single region server only, so that all
// edits for all tables go there
NUM_SLAVES1 = 1;
NUM_SLAVES2 = 1;
ALLOW_PROCEEDING = allowProceeding;
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
TestReplicationBase.setUpBeforeClass();
}

protected final byte[] generateRowKey(int id) {
return Bytes.toBytes(String.format("NormalPut%03d", id));
}

protected final void testEditsBehindDroppedTable(boolean allowProceeding, String tName)
throws Exception {
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);

// make sure we have a single region server only, so that all
// edits for all tables go there
restartSourceCluster(1);

protected final void testEditsBehindDroppedTable(String tName) throws Exception {
TableName tablename = TableName.valueOf(tName);
byte[] familyName = Bytes.toBytes("fam");
byte[] row = Bytes.toBytes("row");
Expand Down Expand Up @@ -137,13 +110,13 @@ protected final void testEditsBehindDroppedTable(boolean allowProceeding, String
try (Table droppedTable = connection1.getTable(tablename)) {
byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
Put put = new Put(rowKey);
put.addColumn(familyName, row, row);
put.addColumn(familyName, row, VALUE);
droppedTable.put(put);
}

try (Table table1 = connection1.getTable(tableName)) {
for (int i = 0; i < ROWS_COUNT; i++) {
Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
Put put = new Put(generateRowKey(i)).addColumn(famName, row, VALUE);
table1.put(put);
}
}
Expand All @@ -161,14 +134,12 @@ protected final void testEditsBehindDroppedTable(boolean allowProceeding, String
admin1.enableReplicationPeer(PEER_ID2);
}

if (allowProceeding) {
if (ALLOW_PROCEEDING) {
// in this we'd expect the key to make it over
verifyReplicationProceeded();
} else {
verifyReplicationStuck();
}
// just to be safe
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}

private boolean peerHasAllNormalRows() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.replication;

import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -33,6 +31,7 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -44,14 +43,9 @@ public class TestEditsBehindDroppedTableTiming extends ReplicationDroppedTablesT
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEditsBehindDroppedTableTiming.class);

@Override
public void setUpBase() throws Exception {
CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
super.setUpBase();
// make sure we have a single region server only, so that all
// edits for all tables go there
restartSourceCluster(1);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupClusters(true);
}

@Test
Expand Down Expand Up @@ -85,13 +79,13 @@ public void testEditsBehindDroppedTableTiming() throws Exception {
try (Table droppedTable = connection1.getTable(tablename)) {
byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
Put put = new Put(rowKey);
put.addColumn(familyName, row, row);
put.addColumn(familyName, row, VALUE);
droppedTable.put(put);
}

try (Table table1 = connection1.getTable(tableName)) {
for (int i = 0; i < ROWS_COUNT; i++) {
Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
Put put = new Put(generateRowKey(i)).addColumn(famName, row, VALUE);
table1.put(put);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -31,11 +32,16 @@ public class TestEditsDroppedWithDroppedTable extends ReplicationDroppedTablesTe
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEditsDroppedWithDroppedTable.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupClusters(true);
}

@Test
public void testEditsDroppedWithDroppedTable() throws Exception {
// Make sure by default edits for dropped tables are themselves dropped when the
// table(s) in question have been deleted on both ends.
testEditsBehindDroppedTable(true, "test_dropped");
testEditsBehindDroppedTable("test_dropped");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -33,17 +33,16 @@ public class TestEditsDroppedWithDroppedTableNS extends ReplicationDroppedTables
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEditsDroppedWithDroppedTableNS.class);

@Before
@Override
public void setUpBase() throws Exception {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupClusters(true);
// also try with a namespace
UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
super.setUpBase();
}

@Test
public void testEditsDroppedWithDroppedTableNS() throws Exception {
testEditsBehindDroppedTable(true, "NS:test_dropped");
testEditsBehindDroppedTable("NS:test_dropped");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -31,11 +32,15 @@ public class TestEditsStuckBehindDroppedTable extends ReplicationDroppedTablesTe
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEditsStuckBehindDroppedTable.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupClusters(false);
}

@Test
public void testEditsStuckBehindDroppedTable() throws Exception {
// Sanity check Make sure by default edits for dropped tables stall the replication queue, even
// when the table(s) in question have been deleted on both ends.
testEditsBehindDroppedTable(false, "test_dropped");
testEditsBehindDroppedTable("test_dropped");
}

}