Skip to content

Commit

Permalink
HBASE-25484 Add trace support for WAL sync (#2892)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <zghao@apache.org>
  • Loading branch information
Apache9 committed Apr 25, 2021
1 parent 03e12bf commit 2be2c63
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public final class TraceUtil {
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
AttributeKey.booleanKey("db.hbase.rowlock.readlock");

public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");

private TraceUtil() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.lmax.disruptor.RingBuffer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -571,6 +570,35 @@ public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IO
return rollWriter(false);
}

@Override
public final void sync() throws IOException {
sync(useHsync);
}

@Override
public final void sync(long txid) throws IOException {
sync(txid, useHsync);
}

@Override
public final void sync(boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(forceSync);
return null;
}, () -> createSpan("WAL.sync"));
}

@Override
public final void sync(long txid, boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(txid, forceSync);
return null;
}, () -> createSpan("WAL.sync"));
}

protected abstract void doSync(boolean forceSync) throws IOException;

protected abstract void doSync(long txid, boolean forceSync) throws IOException;
/**
* This is a convenience method that computes a new filename with a given file-number.
* @param filenum to use
Expand Down Expand Up @@ -672,7 +700,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
if (regions != null) {
List<String> listForPrint = new ArrayList();
List<String> listForPrint = new ArrayList<>();
for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
StringBuilder families = new StringBuilder();
for (int i = 0; i < r.getValue().size(); i++) {
Expand Down Expand Up @@ -815,6 +843,10 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
}
}

private Span createSpan(String name) {
return TraceUtil.createSpan(name).setAttribute(TraceUtil.WAL_IMPL, implClassName);
}

/**
* Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
* <p/>
Expand All @@ -832,13 +864,10 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
* @throws IOException if there is a problem flushing or closing the underlying FS
*/
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHFile.replaceWriter").startSpan();
try (Scope scope = span.makeCurrent()) {
return TraceUtil.trace(() -> {
doReplaceWriter(oldPath, newPath, nextWriter);
return newPath;
} finally {
span.end();
}
}, () -> createSpan("WAL.replaceWriter"));
}

protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
Expand Down Expand Up @@ -876,8 +905,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
return ioe;
}

@Override
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
rollWriterLock.lock();
try {
if (this.closed) {
Expand All @@ -888,8 +916,7 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
return null;
}
Map<byte[], List<byte[]>> regionsToFlush = null;
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.rollWriter").startSpan();
try (Scope scope = span.makeCurrent()) {
try {
Path oldPath = getOldPath();
Path newPath = getNewPath();
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
Expand All @@ -914,17 +941,20 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
// If the underlying FileSystem can't do what we ask, treat as IO failure so
// we'll abort.
throw new IOException(
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
} finally {
span.end();
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
}
return regionsToFlush;
} finally {
rollWriterLock.unlock();
}
}

@Override
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
}

// public only until class moves to o.a.h.h.wal
/** @return the size of log files in use */
public long getLogFileSize() {
Expand Down Expand Up @@ -1099,7 +1129,6 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
.append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
.append(" ms, current pipeline: ")
.append(Arrays.toString(getPipeline())).toString();
Span.current().addEvent(msg);
LOG.info(msg);
if (timeInNanos > this.rollOnSyncNs) {
// A single sync took too long.
Expand All @@ -1122,8 +1151,7 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
}

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
Expand All @@ -1135,14 +1163,12 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
Span span = TraceUtil.getGlobalTracer().spanBuilder(implClassName + ".append").startSpan();
try (Scope scope = span.makeCurrent()) {
try {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
span.end();
}
return txid;
}
Expand Down Expand Up @@ -1176,13 +1202,14 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {

@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true);
return TraceUtil.trace(() -> append(info, key, edits, true),
() -> createSpan("WAL.appendData"));
}

@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
throws IOException {
return append(info, key, edits, false);
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return TraceUtil.trace(() -> append(info, key, edits, false),
() -> createSpan("WAL.appendMarker"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -53,7 +51,6 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -345,7 +342,7 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
break;
}
}
postSync(System.nanoTime() - startTimeNs, finishSync(true));
postSync(System.nanoTime() - startTimeNs, finishSync());
if (trySetReadyForRolling()) {
// we have just finished a roll, then do not need to check for log rolling, the writer will be
// closed soon.
Expand Down Expand Up @@ -394,23 +391,14 @@ private void sync(AsyncWriter writer) {
}, consumeExecutor);
}

private void addTimeAnnotation(SyncFuture future, String annotation) {
Span.current().addEvent(annotation);
// TODO handle htrace API change, see HBASE-18895
// future.setSpan(scope.getSpan());
}

private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
private int finishSyncLowerThanTxid(long txid) {
int finished = 0;
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
SyncFuture sync = iter.next();
if (sync.getTxid() <= txid) {
sync.done(txid, null);
iter.remove();
finished++;
if (addSyncTrace) {
addTimeAnnotation(sync, "writer synced");
}
} else {
break;
}
Expand All @@ -419,7 +407,7 @@ private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
}

// try advancing the highestSyncedTxid as much as possible
private int finishSync(boolean addSyncTrace) {
private int finishSync() {
if (unackedAppends.isEmpty()) {
// All outstanding appends have been acked.
if (toWriteAppends.isEmpty()) {
Expand All @@ -428,9 +416,6 @@ private int finishSync(boolean addSyncTrace) {
for (SyncFuture sync : syncFutures) {
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
sync.done(maxSyncTxid, null);
if (addSyncTrace) {
addTimeAnnotation(sync, "writer synced");
}
}
highestSyncedTxid.set(maxSyncTxid);
int finished = syncFutures.size();
Expand All @@ -444,23 +429,23 @@ private int finishSync(boolean addSyncTrace) {
assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
long doneTxid = lowestUnprocessedAppendTxid - 1;
highestSyncedTxid.set(doneTxid);
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
return finishSyncLowerThanTxid(doneTxid);
}
} else {
// There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
// first unacked append minus 1.
long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
highestSyncedTxid.set(doneTxid);
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
return finishSyncLowerThanTxid(doneTxid);
}
}

private void appendAndSync() {
final AsyncWriter writer = this.writer;
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
// finish some.
finishSync(false);
finishSync();
long newHighestProcessedAppendTxid = -1L;
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next();
Expand Down Expand Up @@ -501,7 +486,7 @@ private void appendAndSync() {
// stamped some region sequence id.
if (unackedAppends.isEmpty()) {
highestSyncedTxid.set(highestProcessedAppendTxid);
finishSync(false);
finishSync();
trySetReadyForRolling();
}
return;
Expand Down Expand Up @@ -648,74 +633,54 @@ protected boolean markerEditOnly() {

@Override
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
waitingConsumePayloads);
throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
return txid;
}

@Override
public void sync() throws IOException {
sync(useHsync);
}

@Override
public void sync(long txid) throws IOException {
sync(txid, useHsync);
}

@Override
public void sync(boolean forceSync) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
try (Scope scope = span.makeCurrent()) {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(txid);
truck.load(future);
} finally {
waitingConsumePayloads.publish(txid);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
protected void doSync(boolean forceSync) throws IOException {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(txid);
truck.load(future);
} finally {
span.end();
waitingConsumePayloads.publish(txid);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
}

@Override
public void sync(long txid, boolean forceSync) throws IOException {
protected void doSync(long txid, boolean forceSync) throws IOException {
if (highestSyncedTxid.get() >= txid) {
return;
}
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
try (Scope scope = span.makeCurrent()) {
// here we do not use ring buffer sequence as txid
long sequence = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
truck.load(future);
} finally {
waitingConsumePayloads.publish(sequence);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
// here we do not use ring buffer sequence as txid
long sequence = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
truck.load(future);
} finally {
span.end();
waitingConsumePayloads.publish(sequence);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
}

protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
Expand Down
Loading

0 comments on commit 2be2c63

Please sign in to comment.