Skip to content

Commit

Permalink
feat: Client-side timestamping
Browse files Browse the repository at this point in the history
  • Loading branch information
mwalkiewicz committed Jan 21, 2022
1 parent 1f08c09 commit daf5f22
Show file tree
Hide file tree
Showing 38 changed files with 547 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.hbase.mirroring;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_STRATEGY_MAX_OUTSTANDING_REQUESTS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_SYNCHRONOUS_WRITES;
import static com.google.common.truth.Truth.assertThat;
Expand Down Expand Up @@ -104,6 +105,7 @@ private Configuration createConfiguration() {
// testing the other case here anyways) and set it to true to meet the requirements otherwise.
configuration.set(MIRRORING_CONCURRENT_WRITES, String.valueOf(this.mutateConcurrently));
configuration.set(MIRRORING_SYNCHRONOUS_WRITES, String.valueOf(this.mutateConcurrently));
configuration.setBoolean(MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, true);
return configuration;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.hbase.mirroring.utils;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -184,6 +185,12 @@ public MirroringConnection createConnection() throws IOException {
return connectionRule.createConnection(this.executorServiceRule.executorService);
}

public MirroringConnection createTimestampedConnection() throws IOException {
Configuration conf = ConfigurationHelper.newConfiguration();
conf.setBoolean(MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, true);
return connectionRule.createConnection(this.executorServiceRule.executorService, conf);
}

public MirroringConnection createConnection(Configuration configuration) throws IOException {
return connectionRule.createConnection(this.executorServiceRule.executorService, configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static Put createPut(

public static Put createPut(int id, byte[] family, byte[] qualifier) {
byte[] rowAndValue = Longs.toByteArray(id);
return createPut(rowAndValue, family, qualifier, id, rowAndValue);
return createPut(rowAndValue, family, qualifier, rowAndValue);
}

public static Get createGet(byte[] row, byte[] family, byte[] qualifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestamperFactory;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class MirroringConnection implements Connection {
protected final boolean performWritesConcurrently;

protected final boolean waitForSecondaryWrites;
protected final Timestamper timestamper;

/**
* The constructor called from {@link
Expand Down Expand Up @@ -162,6 +165,9 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate);
this.performWritesConcurrently = this.configuration.mirroringOptions.performWritesConcurrently;
this.waitForSecondaryWrites = this.configuration.mirroringOptions.waitForSecondaryWrites;
this.timestamper =
TimestamperFactory.create(
this.configuration.mirroringOptions.enableDefaultClientSideTimestamps);
}

@Override
Expand Down Expand Up @@ -198,6 +204,7 @@ public Table call() throws IOException {
this.flowController,
this.secondaryWriteErrorConsumer,
this.readSampler,
this.timestamper,
this.performWritesConcurrently,
this.waitForSecondaryWrites,
this.mirroringTracer,
Expand All @@ -223,6 +230,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams bufferedMutatorP
flowController,
executorService,
secondaryWriteErrorConsumer,
timestamper,
mirroringTracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONNECTION_CONNECTION_TERMINATION_TIMEOUT;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_DROP_ON_OVERFLOW_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_MAX_BUFFER_SIZE_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_PREFIX_PATH_KEY;
Expand Down Expand Up @@ -106,6 +107,7 @@ public static class Faillog {
public final Faillog faillog;

public final int resultScannerBufferedMismatchedResults;
public final boolean enableDefaultClientSideTimestamps;

public MirroringOptions(Configuration configuration) {
this.mismatchDetectorFactoryClass =
Expand Down Expand Up @@ -153,5 +155,8 @@ public MirroringOptions(Configuration configuration) {
"Performing writes concurrently and not waiting for writes is forbidden. "
+ "It has no advantage over performing writes asynchronously and not waiting for them.");
this.faillog = new Faillog(configuration);

this.enableDefaultClientSideTimestamps =
configuration.getBoolean(MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.HierarchicalReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -114,6 +115,7 @@ public boolean apply(@NullableDecl Object o) {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final SettableFuture<Void> closedFuture = SettableFuture.create();
private final int resultScannerBufferedMismatchedResults;
private final Timestamper timestamper;
/**
* @param executorService ExecutorService is used to perform operations on secondaryTable and
* verification tasks.
Expand All @@ -129,6 +131,7 @@ public MirroringTable(
FlowController flowController,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
ReadSampler readSampler,
Timestamper timestamper,
boolean performWritesConcurrently,
boolean waitForSecondaryWrites,
MirroringTracer mirroringTracer,
Expand All @@ -148,6 +151,7 @@ public MirroringTable(
this.mirroringTracer = mirroringTracer;
this.requestScheduler =
new RequestScheduler(flowController, this.mirroringTracer, this.referenceCounter);
this.timestamper = timestamper;
this.batcher =
new Batcher(
this.primaryTable,
Expand All @@ -156,6 +160,7 @@ public MirroringTable(
this.secondaryWriteErrorConsumer,
this.verificationContinuationFactory,
this.readSampler,
this.timestamper,
resultIsFaultyPredicate,
waitForSecondaryWrites,
performWritesConcurrently,
Expand Down Expand Up @@ -545,6 +550,7 @@ private boolean checkAndMutateWithSpan(
final byte[] value,
final RowMutations rowMutations)
throws IOException {
this.timestamper.fillTimestamp(rowMutations);
boolean wereMutationsApplied =
this.mirroringTracer.spanFactory.wrapPrimaryOperation(
new CallableThrowingIOException<Boolean>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -78,6 +79,7 @@ public ConcurrentMirroringBufferedMutator(
BufferedMutatorParams bufferedMutatorParams,
MirroringConfiguration configuration,
ExecutorService executorService,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
super(
Expand All @@ -86,6 +88,7 @@ public ConcurrentMirroringBufferedMutator(
bufferedMutatorParams,
configuration,
executorService,
timestamper,
mirroringTracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -60,6 +61,9 @@
*/
@InternalApi("For internal usage only")
public abstract class MirroringBufferedMutator<BufferEntryType> implements BufferedMutator {

private final Timestamper timestamper;

public static BufferedMutator create(
boolean concurrent,
Connection primaryConnection,
Expand All @@ -69,6 +73,7 @@ public static BufferedMutator create(
FlowController flowController,
ExecutorService executorService,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
if (concurrent) {
Expand All @@ -78,6 +83,7 @@ public static BufferedMutator create(
bufferedMutatorParams,
configuration,
executorService,
timestamper,
mirroringTracer);
} else {
return new SequentialMirroringBufferedMutator(
Expand All @@ -88,6 +94,7 @@ public static BufferedMutator create(
flowController,
executorService,
secondaryWriteErrorConsumer,
timestamper,
mirroringTracer);
}
}
Expand Down Expand Up @@ -121,6 +128,7 @@ public MirroringBufferedMutator(
BufferedMutatorParams bufferedMutatorParams,
MirroringConfiguration configuration,
ExecutorService executorService,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
this.userListener = bufferedMutatorParams.getListener();
Expand Down Expand Up @@ -167,13 +175,16 @@ public void onException(
this.mutationsBufferFlushThresholdBytes,
this.ongoingFlushesCounter,
this.mirroringTracer);
this.timestamper = timestamper;
}

@Override
public void mutate(Mutation mutation) throws IOException {
try (Scope scope =
this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BUFFERED_MUTATOR_MUTATE)) {
mutateScoped(Collections.singletonList(mutation));
List<Mutation> m = Collections.singletonList(mutation);
timestamper.fillTimestamp(m);
mutateScoped(m);
}
}

Expand All @@ -182,6 +193,7 @@ public void mutate(final List<? extends Mutation> list) throws IOException {
try (Scope scope =
this.mirroringTracer.spanFactory.operationScope(
HBaseOperation.BUFFERED_MUTATOR_MUTATE_LIST)) {
timestamper.fillTimestamp(list);
mutateScoped(list);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -137,6 +138,7 @@ public SequentialMirroringBufferedMutator(
FlowController flowController,
ExecutorService executorService,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
super(
Expand All @@ -145,6 +147,7 @@ public SequentialMirroringBufferedMutator(
bufferedMutatorParams,
configuration,
executorService,
timestamper,
mirroringTracer);
this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer;
this.flowController = flowController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class Batcher {
private final boolean waitForSecondaryWrites;
private final boolean performWritesConcurrently;
private final MirroringTracer mirroringTracer;
private final Timestamper timestamper;

public Batcher(
Table primaryTable,
Expand All @@ -78,6 +80,7 @@ public Batcher(
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
VerificationContinuationFactory verificationContinuationFactory,
ReadSampler readSampler,
Timestamper timestamper,
Predicate<Object> resultIsFaultyPredicate,
boolean waitForSecondaryWrites,
boolean performWritesConcurrently,
Expand All @@ -92,6 +95,7 @@ public Batcher(
this.waitForSecondaryWrites = waitForSecondaryWrites;
this.performWritesConcurrently = performWritesConcurrently;
this.mirroringTracer = mirroringTracer;
this.timestamper = timestamper;
}

public void batchSingleWriteOperation(Row operation) throws IOException {
Expand Down Expand Up @@ -129,6 +133,7 @@ public <R> void batch(
final Object[] results,
@Nullable final Callback<R> callback)
throws IOException, InterruptedException {
timestamper.fillTimestamp(inputOperations);
final RewrittenIncrementAndAppendIndicesInfo<? extends Row> actions =
new RewrittenIncrementAndAppendIndicesInfo<>(inputOperations);
Log.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
Expand Down Expand Up @@ -280,6 +282,21 @@ public class MirroringConfigurationHelper {
public static final String MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS =
"google.bigtable.mirroring.result-scanner.buffered-mismatched-reads";

/**
* If set to {@code true} the Mirroring Client will automatically add timestamps to {@link
* org.apache.hadoop.hbase.client.Put}s without timestamp set based on client's host local time.
* Client-side timestamps assigned by {@link Table}s and {@link BufferedMutator}`s created by one
* {@link Connection} are always increasing, even if system clock is moved backwards, for example
* by NTP or manually by the user. To enable client-side timestamping set this key to `true`.
*
* <p>Be aware that client-side timestamping modifies only `Put`s - `Delete`s, `Increment`s and
* `Append`s are not affected by this setting and will cause inconsistencies between databases.
*
* <p>Default value: false.
*/
public static final String MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS =
"google.bigtable.mirroring.enable-default-client-side-timestamps";

public static void fillConnectionConfigWithClassImplementation(
Configuration connectionConfig,
Configuration config,
Expand Down
Loading

0 comments on commit daf5f22

Please sign in to comment.