Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log rows and values in DefaultMismatchDetector #129

Merged
merged 1 commit into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
this.configuration.mirroringOptions));
this.mismatchDetector =
ReflectionConstructor.construct(
this.configuration.mirroringOptions.mismatchDetectorClass, this.mirroringTracer);
this.configuration.mirroringOptions.mismatchDetectorClass,
this.mirroringTracer,
configuration.mirroringOptions.maxLoggedBinaryValueLength);

this.failedWritesLogger =
new Logger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_SYNCHRONOUS_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_CONSUMER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_APPENDER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS;

import com.google.api.core.InternalApi;
Expand All @@ -45,6 +46,7 @@ public class MirroringOptions {
public final int flowControllerMaxUsedBytes;
public final long bufferedMutatorBytesToFlush;
public final String writeErrorConsumerClass;
public final int maxLoggedBinaryValueLength;
public final int readSamplingRate;

public final String writeErrorLogAppenderClass;
Expand Down Expand Up @@ -85,6 +87,8 @@ public MirroringOptions(Configuration configuration) {
this.writeErrorLogSerializerClass =
configuration.get(
MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS, DefaultSerializer.class.getCanonicalName());
this.maxLoggedBinaryValueLength =
configuration.getInt(MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH, 32);
this.performWritesConcurrently = configuration.getBoolean(MIRRORING_CONCURRENT_WRITES, false);
this.waitForSecondaryWrites = configuration.getBoolean(MIRRORING_SYNCHRONOUS_WRITES, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class MirroringConfigurationHelper {
public static final String MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS =
"google.bigtable.mirroring.write-error-log.serializer.impl";

/**
* Integer value representing how many first bytes of binary values (such as row) should be
* converted to hex and then logged in case of error. Defaults to 32.
*/
public static final String MIRRORING_WRITE_ERROR_LOG_MAX_BINARY_VALUE_LENGTH =
"google.bigtable.mirroring.write-error-log.max-binary-value-bytes-logged";

/**
* Integer value representing percentage of read operations performed on primary database that
* should be verified against secondary. Each call to {@link Table#get(Get)}, {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.OPERATION_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MISMATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_WRITE_ERRORS;

Expand Down Expand Up @@ -78,4 +79,11 @@ public void recordSecondaryWriteErrors(HBaseOperation operation, int numberOfErr
map.put(SECONDARY_WRITE_ERRORS, numberOfErrors);
map.record(tagContext);
}

public void recordReadMatches(HBaseOperation operation, int numberOfMatches) {
TagContext tagContext = getTagContext(operation);
MeasureMap map = statsRecorder.newMeasureMap();
map.put(READ_MATCHES, numberOfMatches);
map.record(tagContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.OPERATION_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_ERRORS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_LATENCY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.READ_MISMATCHES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_ERRORS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.SECONDARY_LATENCY;
Expand Down Expand Up @@ -116,13 +117,23 @@ public class MirroringMetricsViews {
SUM,
ImmutableList.of(OPERATION_KEY));

/** {@link View} for Mirroring client's secondary read mismatches. */
private static final View READ_MATCH_VIEW =
View.create(
View.Name.create("cloud.google.com/java/mirroring/read_match"),
"Detected read matches count.",
READ_MATCHES,
SUM,
ImmutableList.of(OPERATION_KEY));

private static final ImmutableSet<View> MIRRORING_CLIENT_VIEWS_SET =
ImmutableSet.of(
PRIMARY_OPERATION_LATENCY_VIEW,
PRIMARY_OPERATION_ERROR_VIEW,
SECONDARY_OPERATION_LATENCY_VIEW,
SECONDARY_OPERATION_ERROR_VIEW,
MIRRORING_OPERATION_LATENCY_VIEW,
READ_MATCH_VIEW,
READ_MISMATCH_VIEW,
SECONDARY_WRITE_ERROR_VIEW);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ public class MirroringSpanConstants {
"Count of errors on secondary database.",
"1");

public static final MeasureLong READ_MATCHES =
MeasureLong.create(
"com/google/cloud/bigtable/mirroring/read_verification/matches",
"Count of successfully verified reads.",
"1");

public static final MeasureLong READ_MISMATCHES =
MeasureLong.create(
"com/google/cloud/bigtable/mirroring/mismatch/read",
"com/google/cloud/bigtable/mirroring/read_verification/mismatches",
"Count of read mismatches detected.",
"1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package com.google.cloud.bigtable.mirroring.hbase1_x.verification;

import static com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector.LazyBytesHexlifier.listOfHexRows;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Comparators;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringMetricsRecorder;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -29,107 +32,232 @@

@InternalApi("For internal usage only")
public class DefaultMismatchDetector implements MismatchDetector {
private final int maxValueBytesLogged;
private static final Logger Log = new Logger(DefaultMismatchDetector.class);
private final MirroringMetricsRecorder metricsRecorder;

public DefaultMismatchDetector(MirroringTracer mirroringTracer) {
public DefaultMismatchDetector(MirroringTracer mirroringTracer, Integer maxValueBytesLogged) {
this.metricsRecorder = mirroringTracer.metricsRecorder;
this.maxValueBytesLogged = maxValueBytesLogged;
}

public void exists(Get request, boolean primary, boolean secondary) {
if (primary != secondary) {
Log.debug("exists mismatch");
if (primary == secondary) {
this.metricsRecorder.recordReadMatches(HBaseOperation.EXISTS, 1);
} else {
Log.debug(
"exists(row=%s) mismatch: (%b, %b)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), primary, secondary);
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS, 1);
}
}

@Override
public void exists(Get request, Throwable throwable) {
Log.debug("exists failed");
Log.debug(
"exists(row=%s) failed: (throwable=%s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), throwable);
}

@Override
public void existsAll(List<Get> request, boolean[] primary, boolean[] secondary) {
if (!Arrays.equals(primary, secondary)) {
Log.debug("existsAll mismatch");
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS, primary.length);
int mismatches = 0;
for (int i = 0; i < primary.length; i++) {
if (primary[i] != secondary[i]) {
Log.debug(
"existsAll(row=%s) mismatch: (%b, %b)",
new LazyBytesHexlifier(request.get(i).getRow(), maxValueBytesLogged),
primary[i],
secondary[i]);
mismatches++;
}
}
if (mismatches != primary.length) {
this.metricsRecorder.recordReadMatches(
HBaseOperation.EXISTS_ALL, primary.length - mismatches);
}
this.metricsRecorder.recordReadMismatches(HBaseOperation.EXISTS_ALL, mismatches);
}
}

@Override
public void existsAll(List<Get> request, Throwable throwable) {
Log.debug("existsAll failed");
Log.debug(
"existsAll(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

public void get(Get request, Result primary, Result secondary) {
if (!Comparators.resultsEqual(primary, secondary)) {
Log.debug("get mismatch");
if (Comparators.resultsEqual(primary, secondary)) {
this.metricsRecorder.recordReadMatches(HBaseOperation.GET, 1);
} else {
Log.debug(
"get(row=%s) mismatch: (%s, %s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged),
new LazyBytesHexlifier(primary.value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary.value(), maxValueBytesLogged));
this.metricsRecorder.recordReadMismatches(HBaseOperation.GET, 1);
}
}

@Override
public void get(Get request, Throwable throwable) {
Log.debug("get failed");
Log.debug(
"get(row=%s) failed: (throwable=%s)",
new LazyBytesHexlifier(request.getRow(), maxValueBytesLogged), throwable);
}

@Override
public void get(List<Get> request, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "getAll mismatch", HBaseOperation.GET_LIST);
verifyResults(primary, secondary, "get", HBaseOperation.GET_LIST);
}

@Override
public void get(List<Get> request, Throwable throwable) {
Log.debug("getAll failed");
Log.debug(
"get(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

@Override
public void scannerNext(Scan request, int entriesAlreadyRead, Result primary, Result secondary) {
if (!Comparators.resultsEqual(primary, secondary)) {
Log.debug("scan() mismatch");
if (Comparators.resultsEqual(primary, secondary)) {
this.metricsRecorder.recordReadMatches(HBaseOperation.NEXT, 1);
} else {
Log.debug(
"scan[id=%s, entriesRead=%d] mismatch: (%s, %s)",
request.getId(),
entriesAlreadyRead,
new LazyBytesHexlifier(primary.value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary.value(), maxValueBytesLogged));
this.metricsRecorder.recordReadMismatches(HBaseOperation.NEXT, 1);
}
}

@Override
public void scannerNext(Scan request, int entriesAlreadyRead, Throwable throwable) {
Log.debug("scan() failed");
Log.debug(
"scan[id=%s, entriesRead=%d] failed: (throwable=%s)",
request.getId(), entriesAlreadyRead, throwable);
}

@Override
public void scannerNext(
Scan request, int entriesAlreadyRead, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "scan(i) mismatch", HBaseOperation.NEXT_MULTIPLE);
verifyResults(
primary,
secondary,
String.format("scan[id=%s]", request.getId()),
HBaseOperation.NEXT_MULTIPLE);
}

@Override
public void scannerNext(
Scan request, int entriesAlreadyRead, int entriesRequested, Throwable throwable) {
Log.debug("scan(i) failed");
Log.debug(
"scan[id=%s, entriesRead=%d, entriesRequested=%d] failed: (throwable=%s)",
request.getId(), entriesAlreadyRead, entriesRequested, throwable);
}

@Override
public void batch(List<Get> request, Result[] primary, Result[] secondary) {
verifyResults(primary, secondary, "batch() mismatch", HBaseOperation.BATCH);
verifyResults(primary, secondary, "batch", HBaseOperation.BATCH);
}

@Override
public void batch(List<Get> request, Throwable throwable) {
Log.debug("batch() failed");
Log.debug(
"batch(rows=%s) failed: (throwable=%s)",
listOfHexRows(request, maxValueBytesLogged), throwable);
}

private void verifyResults(
Result[] primary, Result[] secondary, String errorMessage, HBaseOperation operation) {
Result[] primary, Result[] secondary, String operationName, HBaseOperation operation) {
int minLength = Math.min(primary.length, secondary.length);
int errors = Math.max(primary.length, secondary.length) - minLength;
int maxLength = Math.max(primary.length, secondary.length);
int errors = maxLength - minLength;
int matches = 0;
for (int i = 0; i < minLength; i++) {
if (Comparators.resultsEqual(primary[i], secondary[i])) {
Log.debug(errorMessage);
matches++;
} else {
Log.debug(
"%s(row=%s) mismatch: (%s, %s)",
operationName,
new LazyBytesHexlifier(primary[i].getRow(), maxValueBytesLogged),
new LazyBytesHexlifier(primary[i].value(), maxValueBytesLogged),
new LazyBytesHexlifier(secondary[i].value(), maxValueBytesLogged));
errors++;
}
}
if (matches > 0) {
this.metricsRecorder.recordReadMatches(operation, matches);
}
if (errors > 0) {
this.metricsRecorder.recordReadMismatches(operation, errors);
}
}

// Used for logging. Overrides toString() in order to be as lazy as possible.
// Adapted from Apache Common Codec's Hex.
public static class LazyBytesHexlifier {
private static final char[] DIGITS = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
};

public static List<LazyBytesHexlifier> listOfHexRows(List<Get> gets, int maxBytesPrinted) {
List<LazyBytesHexlifier> out = new ArrayList<>(gets.size());
for (Get get : gets) {
out.add(new LazyBytesHexlifier(get.getRow(), maxBytesPrinted));
}
return out;
}

private final byte[] bytes;
private final int maxBytesPrinted;

public LazyBytesHexlifier(byte[] bytes, int maxBytesPrinted) {
this.bytes = bytes;
this.maxBytesPrinted = maxBytesPrinted;
}

private void bytesToHex(
final char[] out, final int outOffset, final int bytesOffset, final int bytesLength) {
for (int i = bytesOffset, j = outOffset; i < bytesOffset + bytesLength; i++) {
out[j++] = DIGITS[(0xF0 & this.bytes[i]) >>> 4];
out[j++] = DIGITS[0x0F & this.bytes[i]];
}
}

@Override
public String toString() {
int bytesToPrint = Math.min(this.bytes.length, maxBytesPrinted);
if (bytesToPrint <= 0) {
return "";
}
boolean skipSomeBytes = bytesToPrint != this.bytes.length;
char[] out;
if (skipSomeBytes) {
int numEndBytes = bytesToPrint / 2;
int numStartBytes = bytesToPrint - numEndBytes;
int numDots = 3;

int startDotsIdx = 2 * numStartBytes;
int endDotsIdx = 2 * numStartBytes + numDots;

out = new char[numDots + (bytesToPrint << 1)];

bytesToHex(out, 0, 0, numStartBytes);
for (int i = startDotsIdx; i < endDotsIdx; i++) {
out[i] = '.';
}
bytesToHex(out, endDotsIdx, this.bytes.length - numEndBytes, numEndBytes);
} else {
out = new char[bytesToPrint << 1];
bytesToHex(out, 0, 0, bytesToPrint);
}
return new String(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setUp() {
primaryTable,
secondaryTable,
this.executorServiceRule.executorService,
new DefaultMismatchDetector(tracer),
new DefaultMismatchDetector(tracer, 32),
flowController,
new SecondaryWriteErrorConsumerWithMetrics(
tracer, mock(SecondaryWriteErrorConsumer.class)),
Expand Down
Loading