diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index debca0f4aae0..a92a2759ad27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -579,63 +579,64 @@ private boolean isTooManyStoreFiles(Region region) { * amount of memstore consumption. */ public void reclaimMemStoreMemory() { - TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); - if (isAboveHighWaterMark()) { - if (Trace.isTracing()) { - scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); - } - long start = EnvironmentEdgeManager.currentTime(); - long nextLogTimeMs = start; - synchronized (this.blockSignal) { - boolean blocked = false; - long startTime = 0; - boolean interrupted = false; - try { - while (isAboveHighWaterMark() && !server.isStopped()) { - if (!blocked) { - startTime = EnvironmentEdgeManager.currentTime(); - LOG.info("Blocking updates on " - + server.toString() - + ": the global memstore size " - + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting() - .getGlobalMemstoreSize(), "", 1) + " is >= than blocking " - + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size"); - } - blocked = true; - wakeupFlushThread(); - try { - // we should be able to wait forever, but we've seen a bug where - // we miss a notify, so put a 5 second bound on it at least. - blockSignal.wait(5 * 1000); - } catch (InterruptedException ie) { - LOG.warn("Interrupted while waiting"); - interrupted = true; + try (TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory")) { + if (isAboveHighWaterMark()) { + if (Trace.isTracing()) { + scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); + } + long start = EnvironmentEdgeManager.currentTime(); + long nextLogTimeMs = start; + synchronized (this.blockSignal) { + boolean blocked = false; + long startTime = 0; + boolean interrupted = false; + try { + while (isAboveHighWaterMark() && !server.isStopped()) { + if (!blocked) { + startTime = EnvironmentEdgeManager.currentTime(); + LOG.info("Blocking updates on " + + server.toString() + + ": the global memstore size " + + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting() + .getGlobalMemstoreSize(), "", 1) + " is >= than blocking " + + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size"); + } + blocked = true; + wakeupFlushThread(); + try { + // we should be able to wait forever, but we've seen a bug where + // we miss a notify, so put a 5 second bound on it at least. + blockSignal.wait(5 * 1000); + } catch (InterruptedException ie) { + LOG.warn("Interrupted while waiting"); + interrupted = true; + } + long nowMs = EnvironmentEdgeManager.currentTime(); + if (nowMs >= nextLogTimeMs) { + LOG.warn("Memstore is above high water mark and block " + (nowMs - start) + " ms"); + nextLogTimeMs = nowMs + 1000; + } } - long nowMs = EnvironmentEdgeManager.currentTime(); - if (nowMs >= nextLogTimeMs) { - LOG.warn("Memstore is above high water mark and block " + (nowMs - start) + " ms"); - nextLogTimeMs = nowMs + 1000; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); } } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - if(blocked){ - final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; - if(totalTime > 0){ - this.updatesBlockedMsHighWater.add(totalTime); + if (blocked) { + final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; + if (totalTime > 0) { + this.updatesBlockedMsHighWater.add(totalTime); + } + LOG.info("Unblocking updates for server " + server.toString()); } - LOG.info("Unblocking updates for server " + server.toString()); } + } else if (isAboveLowWaterMark()) { + wakeupFlushThread(); } - } else if (isAboveLowWaterMark()) { - wakeupFlushThread(); } - scope.close(); } + @Override public String toString() { return "flush_queue="