Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24877 addendum: additional checks to avoid one extra possible race control in the initialize loop #2400

Merged
merged 5 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -629,23 +629,34 @@ private void initialize() {

@Override
public void startup() {
retryStartup.set(true);
do {
if(retryStartup.get()) {
// mark we are running now
this.sourceRunning = true;
startupOngoing.set(true);
retryStartup.set(false);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
(t,e) -> {
// mark we are running now
this.sourceRunning = true;
startupOngoing.set(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

Thread.currentThread().getName() + ".replicationSource," + this.queueId,
(t,e) -> {
//if first initialization attempt failed, and abortOnError is false, we will
//keep looping in this thread until initialize eventually succeeds,
//while the server main startup one can go on with its work.
sourceRunning = false;
uncaughtException(t, e, null, null);
retryStartup.set(!this.abortOnError);
});
}
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
do {
if(retryStartup.get()) {
this.sourceRunning = true;
startupOngoing.set(true);
retryStartup.set(false);
try {
initialize();
} catch(Throwable error){
sourceRunning = false;
uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError);
}
}
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ protected void doStop() {
/**
* Deadend Endpoint. Does nothing.
*/
public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {

static int count = 0;

Expand All @@ -416,6 +416,17 @@ public synchronized UUID getPeerUUID() {

}

public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping into this during rebase of HBASE-18070 branch. If count is not used, just remove it from this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess it's a left over from previous commits on this same PR.


static int count = 0;

@Override
public synchronized UUID getPeerUUID() {
throw new RuntimeException();
}

}

/**
* Test HBASE-20497
* Moved here from TestReplicationSource because doesn't need cluster.
Expand Down Expand Up @@ -444,23 +455,16 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception
assertEquals(1001L, shipper.getStartPosition());
}

/**
* Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnError() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
String endpointName) throws IOException {
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
FaultyReplicationEndpoint.count = 0;
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(FaultyReplicationEndpoint.class.getName());
thenReturn(endpointName);
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Expand All @@ -469,6 +473,20 @@ public void testAbortFalseOnError() throws IOException {
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}

/**
* Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnError() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
FlakyReplicationEndpoint.class.getName());
try {
rs.startup();
assertTrue(rs.isSourceActive());
Expand All @@ -483,30 +501,35 @@ public void testAbortFalseOnError() throws IOException {
}
}

/**
* Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
* when <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
FaultyReplicationEndpoint.class.getName());
try {
rs.startup();
assertTrue(true);
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}

/**
* Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>replication.source.regionserver.abort</b> is set to false.
* during initialization and <b>replication.source.regionserver.abort</b> is set to true.
*/
@Test
public void testAbortTrueOnError() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
FaultyReplicationEndpoint.count = 0;
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(FaultyReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
ReplicationSource rs = new ReplicationSource();
RegionServerServices rss = setupForAbortTests(rs, conf,
FlakyReplicationEndpoint.class.getName());
try {
rs.startup();
Waiter.waitFor(conf, 1000, () -> rss.isAborted());
Expand Down