Skip to content

Commit

Permalink
Log rows and values in DefaultMismatchDetector
Browse files Browse the repository at this point in the history
  • Loading branch information
prawilny committed Nov 9, 2021
1 parent f09cc3a commit bfd3c32
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_SYNCHRONOUS_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_CONSUMER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_APPENDER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS;

import com.google.api.core.InternalApi;
Expand All @@ -45,6 +46,7 @@ public class MirroringOptions {
public final int flowControllerMaxUsedBytes;
public final long bufferedMutatorBytesToFlush;
public final String writeErrorConsumerClass;
public final int maxLoggedBinaryValueLength;
public final int readSamplingRate;

public final String writeErrorLogAppenderClass;
Expand Down Expand Up @@ -85,6 +87,8 @@ public MirroringOptions(Configuration configuration) {
this.writeErrorLogSerializerClass =
configuration.get(
MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS, DefaultSerializer.class.getCanonicalName());
this.maxLoggedBinaryValueLength =
configuration.getInt(MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH, 32);
this.performWritesConcurrently = configuration.getBoolean(MIRRORING_CONCURRENT_WRITES, false);
this.waitForSecondaryWrites = configuration.getBoolean(MIRRORING_SYNCHRONOUS_WRITES, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class MirroringConfigurationHelper {
public static final String MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS =
"google.bigtable.mirroring.write-error-log.serializer.impl";

/**
* Integer value representing how many first bytes of binary values (such as row) should be
* converted to hex and then logged in case of error. Defaults to 32.
*/
public static final String MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH =
"google.bigtable.mirroring.write-error-log.max-binary-value-bytes-logged";

/**
* Integer value representing percentage of read operations performed on primary database that
* should be verified against secondary. Each call to {@link Table#get(Get)}, {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.OPERATION_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MISMATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_WRITE_ERRORS;

Expand Down Expand Up @@ -78,4 +79,11 @@ public void recordSecondaryWriteErrors(HBaseOperation operation, int numberOfErr
map.put(SECONDARY_WRITE_ERRORS, numberOfErrors);
map.record(tagContext);
}

public void recordReadMatches(HBaseOperation operation, int numberOfMatches) {
TagContext tagContext = getTagContext(operation);
MeasureMap map = statsRecorder.newMeasureMap();
map.put(READ_MATCHES, numberOfMatches);
map.record(tagContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.OPERATION_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_ERRORS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_LATENCY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MISMATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_ERRORS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_LATENCY;
Expand Down Expand Up @@ -116,13 +117,23 @@ public class MirroringMetricsViews {
SUM,
ImmutableList.of(OPERATION_KEY));

/** {@link View} for Mirroring client's secondary read mismatches. */
private static final View READ_MATCH_VIEW =
View.create(
View.Name.create("cloud.google.com/java/mirroring/read_match"),
"Detected read matches count.",
READ_MATCHES,
SUM,
ImmutableList.of(OPERATION_KEY));

private static final ImmutableSet<View> MIRRORING_CLIENT_VIEWS_SET =
ImmutableSet.of(
PRIMARY_OPERATION_LATENCY_VIEW,
PRIMARY_OPERATION_ERROR_VIEW,
SECONDARY_OPERATION_LATENCY_VIEW,
SECONDARY_OPERATION_ERROR_VIEW,
MIRRORING_OPERATION_LATENCY_VIEW,
READ_MATCH_VIEW,
READ_MISMATCH_VIEW,
SECONDARY_WRITE_ERROR_VIEW);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ public class MirroringSpanConstants {
"Count of errors on secondary database.",
"1");

public static final MeasureLong READ_MATCHES =
MeasureLong.create(
"com/google/cloud/bigtable/mirroring/read_verification/matches",
"Count of successfully verified reads.",
"1");

public static final MeasureLong READ_MISMATCHES =
MeasureLong.create(
"com/google/cloud/bigtable/mirroring/mismatch/read",
"com/google/cloud/bigtable/mirroring/read_verification/mismatches",
"Count of read mismatches detected.",
"1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package com.google.cloud.bigtable.mirroring.hbase1_x.verification;

import static com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector.LazyBytesHexlifier.listOfHexRows;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Comparators;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringMetricsRecorder;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -29,107 +32,232 @@

@InternalApi("For internal usage only")
public class DefaultMismatchDetector implements MismatchDetector {
private final int maxValueBytesLogged;
private static final Logger Log = new Logger(DefaultMismatchDetector.class);
private final MirroringMetricsRecorder metricsRecorder;

public DefaultMismatchDetector(MirroringTracer mirroringTracer) {
public DefaultMismatchDetector(MirroringTracer mirroringTracer, int maxValueBytesLogged) {
this.metricsRecorder = mirroringTracer.metricsRecorder;
this.maxValueBytesLogged = maxValueBytesLogged;
}

public void exists(Get request, boolean primary, boolean secondary) {
if (primary != secondary) {
Log.debug("exists mismatch");
if (primary == secondary) {
this.metricsRecorder.recordReadMatches(HBaseOperation.EXISTS, 1);
} else {
Log.debug(
"exists(row=%s) mismatch: (%b, %b)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), primary, secondary);
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS, 1);
}
}

@Override
public void exists(Get request, Throwable throwable) {
Log.debug("exists failed");
Log.debug(
"exists(row=%s) failed: (throwable=%s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), throwable);
}

@Override
public void existsAll(List<Get> request, boolean[] primary, boolean[] secondary) {
if (!Arrays.equals(primary, secondary)) {
Log.debug("existsAll mismatch");
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS, primary.length);
int mismatches = 0;
for (int i = 0; i < primary.length; i++) {
if (primary[i] != secondary[i]) {
Log.debug(
"existsAll(row=%s) mismatch: (%b, %b)",
new LazyBytesHexlifier(request.get(i).getRow(), maxValueBytesLogged),
primary[i],
secondary[i]);
mismatches++;
}
}
if (mismatches != primary.length) {
this.metricsRecorder.recordReadMatches(
HBaseOperation.EXISTS_ALL, primary.length - mismatches);
}
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS_ALL, mismatches);
}
}

@Override
public void existsAll(List<Get> request, Throwable throwable) {
Log.debug("existsAll failed");
Log.debug(
"existsAll(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

public void get(Get request, Result primary, Result secondary) {
if (!Comparators.resultsEqual(primary, secondary)) {
Log.debug("get mismatch");
if (Comparators.resultsEqual(primary, secondary)) {
this.metricsRecorder.recordReadMatches(HBaseOperation.GET, 1);
} else {
Log.debug(
"get(row=%s) mismatch: (%s, %s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged),
new LazyBytesHexlifier(primary.value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary.value(), maxValueBytesLogged));
this.metricsRecorder.recordReadMismatches(HBaseOperation.GET, 1);
}
}

@Override
public void get(Get request, Throwable throwable) {
Log.debug("get failed");
Log.debug(
"get(row=%s) failed: (throwable=%s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), throwable);
}

@Override
public void get(List<Get> request, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "getAll mismatch", HBaseOperation.GET_LIST);
verifyResults(primary, secondary, "get", HBaseOperation.GET_LIST);
}

@Override
public void get(List<Get> request, Throwable throwable) {
Log.debug("getAll failed");
Log.debug(
"get(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

@Override
public void scannerNext(Scan request, int entriesAlreadyRead, Result primary, Result secondary) {
if (!Comparators.resultsEqual(primary, secondary)) {
Log.debug("scan() mismatch");
if (Comparators.resultsEqual(primary, secondary)) {
this.metricsRecorder.recordReadMatches(HBaseOperation.NEXT, 1);
} else {
Log.debug(
"scan[id=%s, entriesRead=%d] mismatch: (%s, %s)",
request.getId(),
entriesAlreadyRead,
new LazyBytesHexlifier(primary.value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary.value(), maxValueBytesLogged));
this.metricsRecorder.recordReadMismatches(HBaseOperation.NEXT, 1);
}
}

@Override
public void scannerNext(Scan request, int entriesAlreadyRead, Throwable throwable) {
Log.debug("scan() failed");
Log.debug(
"scan[id=%s, entriesRead=%d] failed: (throwable=%s)",
request.getId(), entriesAlreadyRead, throwable);
}

@Override
public void scannerNext(
Scan request, int entriesAlreadyRead, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "scan(i) mismatch", HBaseOperation.NEXT_MULTIPLE);
verifyResults(
primary,
secondary,
String.format("scan[id=%s]", request.getId()),
HBaseOperation.NEXT_MULTIPLE);
}

@Override
public void scannerNext(
Scan request, int entriesAlreadyRead, int entriesRequested, Throwable throwable) {
Log.debug("scan(i) failed");
Log.debug(
"scan[id=%s, entriesRead=%d, entriesRequested=%d] failed: (throwable=%s)",
request.getId(), entriesAlreadyRead, entriesRequested, throwable);
}

@Override
public void batch(List<Get> request, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "batch() mismatch", HBaseOperation.BATCH);
verifyResults(primary, secondary, "batch", HBaseOperation.BATCH);
}

@Override
public void batch(List<Get> request, Throwable throwable) {
Log.debug("batch() failed");
Log.debug(
"batch(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

private void verifyResults(
Result[] primary, Result[] secondary, String errorMessage, HBaseOperation operation) {
Result[] primary, Result[] secondary, String operationName, HBaseOperation operation) {
int minLength = Math.min(primary.length, secondary.length);
int errors = Math.max(primary.length, secondary.length) - minLength;
int maxLength = Math.max(primary.length, secondary.length);
int errors = maxLength - minLength;
int matches = 0;
for (int i = 0; i < minLength; i++) {
if (Comparators.resultsEqual(primary[i], secondary[i])) {
Log.debug(errorMessage);
matches++;
} else {
Log.debug(
"%s(row=%s) mismatch: (%s, %s)",
operationName,
new LazyBytesHexlifier(primary[i].getRow(), maxValueBytesLogged),
new LazyBytesHexlifier(primary[i].value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary[i].value(), maxValueBytesLogged));
errors++;
}
}
if (matches > 0) {
this.metricsRecorder.recordReadMatches(operation, matches);
}
if (errors > 0) {
this.metricsRecorder.recordReadMismatches(operation, errors);
}
}

// Used for logging. Overrides toString() in order to be as lazy as possible.
// Adapted from Apache Common Codec's Hex.
public static class LazyBytesHexlifier {
private static final char[] DIGITS = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
};

public static List<LazyBytesHexlifier> listOfHexRows(List<Get> gets, int maxBytesPrinted) {
List<LazyBytesHexlifier> out = new ArrayList<>(gets.size());
for (Get get : gets) {
out.add(new LazyBytesHexlifier(get.getRow(), maxBytesPrinted));
}
return out;
}

private final byte[] bytes;
private final int maxBytesPrinted;

public LazyBytesHexlifier(byte[] bytes, int maxBytesPrinted) {
this.bytes = bytes;
this.maxBytesPrinted = maxBytesPrinted;
}

private void bytesToHex(
final char[] out, final int outOffset, final int bytesOffset, final int bytesLength) {
for (int i = bytesOffset, j = outOffset; i < bytesOffset + bytesLength; i++) {
out[j++] = DIGITS[(0xF0 & this.bytes[i]) >>> 4];
out[j++] = DIGITS[0x0F & this.bytes[i]];
}
}

@Override
public String toString() {
int bytesToPrint = Math.min(this.bytes.length, maxBytesPrinted);
if (bytesToPrint <= 0) {
return "";
}
boolean skipSomeBytes = bytesToPrint != this.bytes.length;
char[] out;
if (skipSomeBytes) {
int numEndBytes = bytesToPrint / 2;
int numStartBytes = bytesToPrint - numEndBytes;
int numDots = 3;

int startDotsIdx = 2 * numStartBytes;
int endDotsIdx = 2 * numStartBytes + numDots;

out = new char[numDots + (bytesToPrint << 1)];

bytesToHex(out, 0, 0, numStartBytes);
for (int i = startDotsIdx; i < endDotsIdx; i++) {
out[i] = '.';
}
bytesToHex(out, endDotsIdx, this.bytes.length - numEndBytes, numEndBytes);
} else {
out = new char[bytesToPrint << 1];
bytesToHex(out, 0, 0, bytesToPrint);
}
return new String(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setUp() {
primaryTable,
secondaryTable,
this.executorServiceRule.executorService,
new DefaultMismatchDetector(tracer),
new DefaultMismatchDetector(tracer, 32),
flowController,
new SecondaryWriteErrorConsumerWithMetrics(
tracer, mock(SecondaryWriteErrorConsumer.class)),
Expand Down
Loading

0 comments on commit bfd3c32

Please sign in to comment.