Skip to content

Commit

Permalink
HBASE-23181 Blocked WAL archive: "LogRoller: Failed to schedule flush…
Browse files Browse the repository at this point in the history
… of XXXX, because it is not online on us" (#753)

Signed-off-by: Lijin Bin <binlijin@apache.org>
Signed-off-by: stack <stack@apache.org>
  • Loading branch information
Apache9 committed Oct 26, 2019
1 parent 33e8156 commit 3dba799
Show file tree
Hide file tree
Showing 33 changed files with 399 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) {
return new ImmutableByteArray(b);
}

public String toStringUtf8() {
return Bytes.toString(b);
public String toString() {
return Bytes.toStringBinary(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ public void testPartialRead() throws Exception {
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts, scopes), edit);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
Expand All @@ -148,10 +148,10 @@ public void testPartialRead() throws Exception {

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
log.sync();
log.shutdown();
walfactory.shutdown();
Expand Down Expand Up @@ -192,17 +192,16 @@ public void testWALRecordReader() throws Exception {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid);

Thread.sleep(1); // make sure 2nd log gets a later timestamp
long secondTs = System.currentTimeMillis();
log.rollWriter();

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7940,7 +7940,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
}
WriteEntry writeEntry = null;
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ protected void atHeadOfRingBufferEventHandlerAppend() {
// Noop
}

protected final boolean append(W writer, FSWALEntry entry) throws IOException {
protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
atHeadOfRingBufferEventHandlerAppend();
long start = EnvironmentEdgeManager.currentTime();
Expand All @@ -959,8 +959,13 @@ protected final boolean append(W writer, FSWALEntry entry) throws IOException {
doAppend(writer, entry);
assert highestUnsyncedTxid < entry.getTxid();
highestUnsyncedTxid = entry.getTxid();
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
entry.isInMemStore());
if (entry.isCloseRegion()) {
// let's clean all the records of this region
sequenceIdAccounting.onRegionClose(encodedRegionName);
} else {
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
entry.isInMemStore());
}
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
Expand Down Expand Up @@ -1010,11 +1015,11 @@ protected final void postSync(final long timeInNanos, final int handlerSyncs) {
}

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
Expand All @@ -1024,7 +1029,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down Expand Up @@ -1060,7 +1065,24 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
}
}

@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true, false);
}

@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
throws IOException {
return append(info, key, edits, false, closeRegion);
}

/**
* Append a set of edits to the WAL.
* <p/>
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
* <p/>
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
Expand All @@ -1071,10 +1093,21 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
* passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
* immediately available on return from this method. It WILL be available subsequent to a sync of
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
* @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param inMemstore Always true except for case where we are writing a region event marker, for
* example, a compaction completion record into the WAL; in this case the entry is just
* so we can finish an unfinished compaction -- it is not an edit for memstore.
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
* region on this region server. The WAL implementation should remove all the related
* stuff, for example, the sequence id accounting.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
@Override
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException;
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException;

protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ private void appendAndSync() {
FSWALEntry entry = iter.next();
boolean appended;
try {
appended = append(writer, entry);
appended = appendEntry(writer, entry);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
Expand Down Expand Up @@ -564,10 +564,10 @@ private boolean shouldScheduleConsumer() {
}

@Override
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException {
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,10 @@ protected void doShutdown() throws IOException {
}
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
justification = "Will never be null")
@Override
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore, boolean closeRegion) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
disruptor.getRingBuffer());
}

Expand Down Expand Up @@ -1096,7 +1094,7 @@ private void attainSafePoint(final long currentSequence) {
*/
void append(final FSWALEntry entry) throws Exception {
try {
FSHLog.this.append(writer, entry);
FSHLog.this.appendEntry(writer, entry);
} catch (Exception e) {
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
+ ", requesting roll of WAL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
// they are only in memory and held here while passing over the ring buffer.
private final transient long txid;
private final transient boolean inMemstore;
private final transient boolean closeRegion;
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient ServerCall<?> rpcCall;

FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) {
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.closeRegion = closeRegion;
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
Expand Down Expand Up @@ -98,6 +100,10 @@ boolean isInMemStore() {
return this.inMemstore;
}

boolean isCloseRegion() {
return closeRegion;
}

RegionInfo getRegionInfo() {
return this.regionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray;
Expand Down Expand Up @@ -183,6 +184,30 @@ void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
}
}

/**
* Clear all the records of the given region as it is going to be closed.
* <p/>
* We will call this once we get the region close marker. We need this because that, if we use
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
* that has not been processed yet, this will lead to orphan records in the
* lowestUnflushedSequenceIds and then cause too many WAL files.
* <p/>
* See HBASE-23157 for more details.
*/
void onRegionClose(byte[] encodedRegionName) {
synchronized (tieLock) {
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
if (flushing != null) {
LOG.warn("Still have flushing records when closing {}, {}",
Bytes.toString(encodedRegionName),
flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue())
.collect(Collectors.joining(",", "{", "}")));
}
}
this.highestSequenceIds.remove(encodedRegionName);
}

/**
* Update the store sequence id, e.g., upon executing in-memory compaction
*/
Expand Down Expand Up @@ -363,7 +388,7 @@ void abortCacheFlush(final byte[] encodedRegionName) {
Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family "
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
+ e.getKey().toString() + " acquired edits out of order current memstore seq="
+ currentId + ", previous oldest unflushed id=" + e.getValue();
LOG.error(errorStr);
Runtime.getRuntime().halt(1);
Expand Down
Loading

0 comments on commit 3dba799

Please sign in to comment.