Skip to content

Commit

Permalink
feat: log rows and values in DefaultMismatchDetector (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
prawilny committed May 11, 2022
1 parent 9955f7b commit ab79483
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
this.configuration.mirroringOptions));
this.mismatchDetector =
ReflectionConstructor.construct(
this.configuration.mirroringOptions.mismatchDetectorClass, this.mirroringTracer);
this.configuration.mirroringOptions.mismatchDetectorClass,
this.mirroringTracer,
configuration.mirroringOptions.maxLoggedBinaryValueLength);

this.failedWritesLogger =
new Logger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_SYNCHRONOUS_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_CONSUMER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_APPENDER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS;

import com.google.api.core.InternalApi;
Expand All @@ -45,6 +46,7 @@ public class MirroringOptions {
public final int flowControllerMaxUsedBytes;
public final long bufferedMutatorBytesToFlush;
public final String writeErrorConsumerClass;
public final int maxLoggedBinaryValueLength;
public final int readSamplingRate;

public final String writeErrorLogAppenderClass;
Expand Down Expand Up @@ -85,6 +87,8 @@ public MirroringOptions(Configuration configuration) {
this.writeErrorLogSerializerClass =
configuration.get(
MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS, DefaultSerializer.class.getCanonicalName());
this.maxLoggedBinaryValueLength =
configuration.getInt(MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH, 32);
this.performWritesConcurrently = configuration.getBoolean(MIRRORING_CONCURRENT_WRITES, false);
this.waitForSecondaryWrites = configuration.getBoolean(MIRRORING_SYNCHRONOUS_WRITES, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class MirroringConfigurationHelper {
public static final String MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS =
"google.bigtable.mirroring.write-error-log.serializer.impl";

/**
* Integer value representing how many first bytes of binary values (such as row) should be
* converted to hex and then logged in case of error. Defaults to 32.
*/
public static final String MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH =
"google.bigtable.mirroring.write-error-log.max-binary-value-bytes-logged";

/**
* Integer value representing percentage of read operations performed on primary database that
* should be verified against secondary. Each call to {@link Table#get(Get)}, {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.OPERATION_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MISMATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_WRITE_ERRORS;

Expand Down Expand Up @@ -78,4 +79,11 @@ public void recordSecondaryWriteErrors(HBaseOperation operation, int numberOfErr
map.put(SECONDARY_WRITE_ERRORS, numberOfErrors);
map.record(tagContext);
}

public void recordReadMatches(HBaseOperation operation, int numberOfMatches) {
TagContext tagContext = getTagContext(operation);
MeasureMap map = statsRecorder.newMeasureMap();
map.put(READ_MATCHES, numberOfMatches);
map.record(tagContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.OPERATION_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_ERRORS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_LATENCY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MISMATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_ERRORS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_LATENCY;
Expand Down Expand Up @@ -116,13 +117,23 @@ public class MirroringMetricsViews {
SUM,
ImmutableList.of(OPERATION_KEY));

/** {@link View} for Mirroring client's secondary read mismatches. */
private static final View READ_MATCH_VIEW =
View.create(
View.Name.create("cloud.google.com/java/mirroring/read_match"),
"Detected read matches count.",
READ_MATCHES,
SUM,
ImmutableList.of(OPERATION_KEY));

private static final ImmutableSet<View> MIRRORING_CLIENT_VIEWS_SET =
ImmutableSet.of(
PRIMARY_OPERATION_LATENCY_VIEW,
PRIMARY_OPERATION_ERROR_VIEW,
SECONDARY_OPERATION_LATENCY_VIEW,
SECONDARY_OPERATION_ERROR_VIEW,
MIRRORING_OPERATION_LATENCY_VIEW,
READ_MATCH_VIEW,
READ_MISMATCH_VIEW,
SECONDARY_WRITE_ERROR_VIEW);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ public class MirroringSpanConstants {
"Count of errors on secondary database.",
"1");

public static final MeasureLong READ_MATCHES =
MeasureLong.create(
"com/google/cloud/bigtable/mirroring/read_verification/matches",
"Count of successfully verified reads.",
"1");

public static final MeasureLong READ_MISMATCHES =
MeasureLong.create(
"com/google/cloud/bigtable/mirroring/mismatch/read",
"com/google/cloud/bigtable/mirroring/read_verification/mismatches",
"Count of read mismatches detected.",
"1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package com.google.cloud.bigtable.mirroring.hbase1_x.verification;

import static com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector.LazyBytesHexlifier.listOfHexRows;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Comparators;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringMetricsRecorder;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -29,107 +32,232 @@

@InternalApi("For internal usage only")
public class DefaultMismatchDetector implements MismatchDetector {
private final int maxValueBytesLogged;
private static final Logger Log = new Logger(DefaultMismatchDetector.class);
private final MirroringMetricsRecorder metricsRecorder;

public DefaultMismatchDetector(MirroringTracer mirroringTracer) {
public DefaultMismatchDetector(MirroringTracer mirroringTracer, Integer maxValueBytesLogged) {
this.metricsRecorder = mirroringTracer.metricsRecorder;
this.maxValueBytesLogged = maxValueBytesLogged;
}

public void exists(Get request, boolean primary, boolean secondary) {
if (primary != secondary) {
Log.debug("exists mismatch");
if (primary == secondary) {
this.metricsRecorder.recordReadMatches(HBaseOperation.EXISTS, 1);
} else {
Log.debug(
"exists(row=%s) mismatch: (%b, %b)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), primary, secondary);
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS, 1);
}
}

@Override
public void exists(Get request, Throwable throwable) {
Log.debug("exists failed");
Log.debug(
"exists(row=%s) failed: (throwable=%s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), throwable);
}

@Override
public void existsAll(List<Get> request, boolean[] primary, boolean[] secondary) {
if (!Arrays.equals(primary, secondary)) {
Log.debug("existsAll mismatch");
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS, primary.length);
int mismatches = 0;
for (int i = 0; i < primary.length; i++) {
if (primary[i] != secondary[i]) {
Log.debug(
"existsAll(row=%s) mismatch: (%b, %b)",
new LazyBytesHexlifier(request.get(i).getRow(), maxValueBytesLogged),
primary[i],
secondary[i]);
mismatches++;
}
}
if (mismatches != primary.length) {
this.metricsRecorder.recordReadMatches(
HBaseOperation.EXISTS_ALL, primary.length - mismatches);
}
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS_ALL, mismatches);
}
}

@Override
public void existsAll(List<Get> request, Throwable throwable) {
Log.debug("existsAll failed");
Log.debug(
"existsAll(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

public void get(Get request, Result primary, Result secondary) {
if (!Comparators.resultsEqual(primary, secondary)) {
Log.debug("get mismatch");
if (Comparators.resultsEqual(primary, secondary)) {
this.metricsRecorder.recordReadMatches(HBaseOperation.GET, 1);
} else {
Log.debug(
"get(row=%s) mismatch: (%s, %s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged),
new LazyBytesHexlifier(primary.value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary.value(), maxValueBytesLogged));
this.metricsRecorder.recordReadMismatches(HBaseOperation.GET, 1);
}
}

@Override
public void get(Get request, Throwable throwable) {
Log.debug("get failed");
Log.debug(
"get(row=%s) failed: (throwable=%s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), throwable);
}

@Override
public void get(List<Get> request, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "getAll mismatch", HBaseOperation.GET_LIST);
verifyResults(primary, secondary, "get", HBaseOperation.GET_LIST);
}

@Override
public void get(List<Get> request, Throwable throwable) {
Log.debug("getAll failed");
Log.debug(
"get(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

@Override
public void scannerNext(Scan request, int entriesAlreadyRead, Result primary, Result secondary) {
if (!Comparators.resultsEqual(primary, secondary)) {
Log.debug("scan() mismatch");
if (Comparators.resultsEqual(primary, secondary)) {
this.metricsRecorder.recordReadMatches(HBaseOperation.NEXT, 1);
} else {
Log.debug(
"scan[id=%s, entriesRead=%d] mismatch: (%s, %s)",
request.getId(),
entriesAlreadyRead,
new LazyBytesHexlifier(primary.value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary.value(), maxValueBytesLogged));
this.metricsRecorder.recordReadMismatches(HBaseOperation.NEXT, 1);
}
}

@Override
public void scannerNext(Scan request, int entriesAlreadyRead, Throwable throwable) {
Log.debug("scan() failed");
Log.debug(
"scan[id=%s, entriesRead=%d] failed: (throwable=%s)",
request.getId(), entriesAlreadyRead, throwable);
}

@Override
public void scannerNext(
Scan request, int entriesAlreadyRead, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "scan(i) mismatch", HBaseOperation.NEXT_MULTIPLE);
verifyResults(
primary,
secondary,
String.format("scan[id=%s]", request.getId()),
HBaseOperation.NEXT_MULTIPLE);
}

@Override
public void scannerNext(
Scan request, int entriesAlreadyRead, int entriesRequested, Throwable throwable) {
Log.debug("scan(i) failed");
Log.debug(
"scan[id=%s, entriesRead=%d, entriesRequested=%d] failed: (throwable=%s)",
request.getId(), entriesAlreadyRead, entriesRequested, throwable);
}

@Override
public void batch(List<Get> request, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "batch() mismatch", HBaseOperation.BATCH);
verifyResults(primary, secondary, "batch", HBaseOperation.BATCH);
}

@Override
public void batch(List<Get> request, Throwable throwable) {
Log.debug("batch() failed");
Log.debug(
"batch(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

private void verifyResults(
Result[] primary, Result[] secondary, String errorMessage, HBaseOperation operation) {
Result[] primary, Result[] secondary, String operationName, HBaseOperation operation) {
int minLength = Math.min(primary.length, secondary.length);
int errors = Math.max(primary.length, secondary.length) - minLength;
int maxLength = Math.max(primary.length, secondary.length);
int errors = maxLength - minLength;
int matches = 0;
for (int i = 0; i < minLength; i++) {
if (Comparators.resultsEqual(primary[i], secondary[i])) {
Log.debug(errorMessage);
matches++;
} else {
Log.debug(
"%s(row=%s) mismatch: (%s, %s)",
operationName,
new LazyBytesHexlifier(primary[i].getRow(), maxValueBytesLogged),
new LazyBytesHexlifier(primary[i].value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary[i].value(), maxValueBytesLogged));
errors++;
}
}
if (matches > 0) {
this.metricsRecorder.recordReadMatches(operation, matches);
}
if (errors > 0) {
this.metricsRecorder.recordReadMismatches(operation, errors);
}
}

// Used for logging. Overrides toString() in order to be as lazy as possible.
// Adapted from Apache Common Codec's Hex.
public static class LazyBytesHexlifier {
private static final char[] DIGITS = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
};

public static List<LazyBytesHexlifier> listOfHexRows(List<Get> gets, int maxBytesPrinted) {
List<LazyBytesHexlifier> out = new ArrayList<>(gets.size());
for (Get get : gets) {
out.add(new LazyBytesHexlifier(get.getRow(), maxBytesPrinted));
}
return out;
}

private final byte[] bytes;
private final int maxBytesPrinted;

public LazyBytesHexlifier(byte[] bytes, int maxBytesPrinted) {
this.bytes = bytes;
this.maxBytesPrinted = maxBytesPrinted;
}

private void bytesToHex(
final char[] out, final int outOffset, final int bytesOffset, final int bytesLength) {
for (int i = bytesOffset, j = outOffset; i < bytesOffset + bytesLength; i++) {
out[j++] = DIGITS[(0xF0 & this.bytes[i]) >>> 4];
out[j++] = DIGITS[0x0F & this.bytes[i]];
}
}

@Override
public String toString() {
int bytesToPrint = Math.min(this.bytes.length, maxBytesPrinted);
if (bytesToPrint <= 0) {
return "";
}
boolean skipSomeBytes = bytesToPrint != this.bytes.length;
char[] out;
if (skipSomeBytes) {
int numEndBytes = bytesToPrint / 2;
int numStartBytes = bytesToPrint - numEndBytes;
int numDots = 3;

int startDotsIdx = 2 * numStartBytes;
int endDotsIdx = 2 * numStartBytes + numDots;

out = new char[numDots + (bytesToPrint << 1)];

bytesToHex(out, 0, 0, numStartBytes);
for (int i = startDotsIdx; i < endDotsIdx; i++) {
out[i] = '.';
}
bytesToHex(out, endDotsIdx, this.bytes.length - numEndBytes, numEndBytes);
} else {
out = new char[bytesToPrint << 1];
bytesToHex(out, 0, 0, bytesToPrint);
}
return new String(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setUp() {
primaryTable,
secondaryTable,
this.executorServiceRule.executorService,
new DefaultMismatchDetector(tracer),
new DefaultMismatchDetector(tracer, 32),
flowController,
new SecondaryWriteErrorConsumerWithMetrics(
tracer, mock(SecondaryWriteErrorConsumer.class)),
Expand Down
Loading

0 comments on commit ab79483

Please sign in to comment.