Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26660 delayed FlushRegionEntry should be removed when we need a non-delayed one #4042

Merged
merged 3 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9227,6 +9227,10 @@ public void incrementFlushesQueuedCount() {
flushesQueued.incrementAndGet();
}

protected void decrementFlushesQueuedCount() {
flushesQueued.decrementAndGet();
}

/**
* Do not change this sequence id.
* @return sequenceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class MemStoreFlusher implements FlushRequester {
// a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue =
new DelayQueue<FlushQueueEntry>();
private final Map<Region, FlushRegionEntry> regionsInQueue =
protected final Map<Region, FlushRegionEntry> regionsInQueue =
new HashMap<Region, FlushRegionEntry>();
private AtomicBoolean wakeupPending = new AtomicBoolean();

Expand Down Expand Up @@ -363,16 +363,28 @@ private boolean isAboveLowWaterMark() {
@Override
public boolean requestFlush(Region r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
((HRegion)r).incrementFlushesQueuedCount();
return true;
FlushRegionEntry existFqe = regionsInQueue.get(r);
if (existFqe != null) {
// if a delayed one exists and not reach the time to execute, just remove it
if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
LOG.info("Remove the existing delayed flush entry for " + r + ", "
+ "because we need to flush it immediately");
this.regionsInQueue.remove(r);
this.flushQueue.remove(existFqe);
((HRegion)r).decrementFlushesQueuedCount();
} else {
LOG.info("Flush already requested on " + r);
return false;
}
}
return false;

// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
((HRegion)r).incrementFlushesQueuedCount();
return true;
}
}

Expand Down Expand Up @@ -752,6 +764,13 @@ public boolean isMaximumWait(final long maximumWait) {
return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
}

/**
* @return True if the entry is a delay flush task
*/
protected boolean isDelay() {
return this.whenToExpire > this.createTime;
}

/**
* @return Count of times {@link #requeue(long)} was called; i.e this is
* number of times we've been requeued.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(SmallTests.class)
public class TestMemStoreFlusher {
private MemStoreFlusher msf;

@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.hstore.flusher.count", "0");
msf = new MemStoreFlusher(conf, null);
}

@Test
public void testReplaceDelayedFlushEntry() {
HRegionInfo hri = new HRegionInfo(1, TableName.valueOf("TestTable"), 0);
HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo();

// put a delayed task with 30s delay
msf.requestDelayedFlush(r, 30000, false);
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());

// put a non-delayed task, then the delayed one should be replaced
assertTrue(msf.requestFlush(r, false));
assertEquals(1, msf.getFlushQueueSize());
assertFalse(msf.regionsInQueue.get(r).isDelay());
}

@Test
public void testNotReplaceDelayedFlushEntryWhichExpired() {
HRegionInfo hri = new HRegionInfo(1, TableName.valueOf("TestTable"), 0);
HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo();

// put a delayed task with 100ms delay
msf.requestDelayedFlush(r, 100, false);
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());

Threads.sleep(200);

// put a non-delayed task, and the delayed one is expired, so it should not be replaced
assertFalse(msf.requestFlush(r, false));
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
}
}