Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Client-side timestamping #165

Merged
merged 1 commit into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static Put createPut(

public static Put createPut(int id, byte[] family, byte[] qualifier) {
byte[] rowAndValue = Longs.toByteArray(id);
return createPut(rowAndValue, family, qualifier, id, rowAndValue);
return createPut(rowAndValue, family, qualifier, rowAndValue);
}

public static Get createGet(byte[] row, byte[] family, byte[] qualifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestamperFactory;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -90,6 +92,8 @@ public class MirroringConnection implements Connection {
protected final boolean performWritesConcurrently;

protected final boolean waitForSecondaryWrites;
// Depending on configuration, ensures that mutations have an explicitly set timestamp.
protected final Timestamper timestamper;

/**
* The constructor called from {@link
Expand Down Expand Up @@ -162,6 +166,9 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate);
this.performWritesConcurrently = this.configuration.mirroringOptions.performWritesConcurrently;
this.waitForSecondaryWrites = this.configuration.mirroringOptions.waitForSecondaryWrites;
this.timestamper =
TimestamperFactory.create(
this.configuration.mirroringOptions.enableDefaultClientSideTimestamps);
}

@Override
Expand Down Expand Up @@ -198,6 +205,7 @@ public Table call() throws IOException {
this.flowController,
this.secondaryWriteErrorConsumer,
this.readSampler,
this.timestamper,
this.performWritesConcurrently,
this.waitForSecondaryWrites,
this.mirroringTracer,
Expand All @@ -223,6 +231,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams bufferedMutatorP
flowController,
executorService,
secondaryWriteErrorConsumer,
timestamper,
mirroringTracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONNECTION_CONNECTION_TERMINATION_TIMEOUT;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_DROP_ON_OVERFLOW_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_MAX_BUFFER_SIZE_KEY;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_PREFIX_PATH_KEY;
Expand All @@ -42,6 +43,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Serializer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowControlStrategy;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestCountingFlowControlStrategy;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestampingMode;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -106,6 +108,7 @@ public static class Faillog {
public final Faillog faillog;

public final int resultScannerBufferedMismatchedResults;
public final TimestampingMode enableDefaultClientSideTimestamps;

public MirroringOptions(Configuration configuration) {
this.mismatchDetectorFactoryClass =
Expand Down Expand Up @@ -153,5 +156,9 @@ public MirroringOptions(Configuration configuration) {
"Performing writes concurrently and not waiting for writes is forbidden. "
+ "It has no advantage over performing writes asynchronously and not waiting for them.");
this.faillog = new Faillog(configuration);

this.enableDefaultClientSideTimestamps =
configuration.getEnum(
MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, TimestampingMode.inplace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.HierarchicalReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -114,6 +115,7 @@ public boolean apply(@NullableDecl Object o) {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final SettableFuture<Void> closedFuture = SettableFuture.create();
private final int resultScannerBufferedMismatchedResults;
private final Timestamper timestamper;
/**
* @param executorService ExecutorService is used to perform operations on secondaryTable and
* verification tasks.
Expand All @@ -129,6 +131,7 @@ public MirroringTable(
FlowController flowController,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
ReadSampler readSampler,
Timestamper timestamper,
boolean performWritesConcurrently,
boolean waitForSecondaryWrites,
MirroringTracer mirroringTracer,
Expand All @@ -148,6 +151,7 @@ public MirroringTable(
this.mirroringTracer = mirroringTracer;
this.requestScheduler =
new RequestScheduler(flowController, this.mirroringTracer, this.referenceCounter);
this.timestamper = timestamper;
this.batcher =
new Batcher(
this.primaryTable,
Expand All @@ -156,6 +160,7 @@ public MirroringTable(
this.secondaryWriteErrorConsumer,
this.verificationContinuationFactory,
this.readSampler,
this.timestamper,
resultIsFaultyPredicate,
waitForSecondaryWrites,
performWritesConcurrently,
Expand Down Expand Up @@ -545,6 +550,7 @@ private boolean checkAndMutateWithSpan(
final byte[] value,
final RowMutations rowMutations)
throws IOException {
this.timestamper.fillTimestamp(rowMutations);
boolean wereMutationsApplied =
this.mirroringTracer.spanFactory.wrapPrimaryOperation(
new CallableThrowingIOException<Boolean>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -78,6 +79,7 @@ public ConcurrentMirroringBufferedMutator(
BufferedMutatorParams bufferedMutatorParams,
MirroringConfiguration configuration,
ExecutorService executorService,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
super(
Expand All @@ -86,6 +88,7 @@ public ConcurrentMirroringBufferedMutator(
bufferedMutatorParams,
configuration,
executorService,
timestamper,
mirroringTracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -60,6 +61,9 @@
*/
@InternalApi("For internal usage only")
public abstract class MirroringBufferedMutator<BufferEntryType> implements BufferedMutator {

private final Timestamper timestamper;

public static BufferedMutator create(
boolean concurrent,
Connection primaryConnection,
Expand All @@ -69,6 +73,7 @@ public static BufferedMutator create(
FlowController flowController,
ExecutorService executorService,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
if (concurrent) {
Expand All @@ -78,6 +83,7 @@ public static BufferedMutator create(
bufferedMutatorParams,
configuration,
executorService,
timestamper,
mirroringTracer);
} else {
return new SequentialMirroringBufferedMutator(
Expand All @@ -88,6 +94,7 @@ public static BufferedMutator create(
flowController,
executorService,
secondaryWriteErrorConsumer,
timestamper,
mirroringTracer);
}
}
Expand Down Expand Up @@ -121,6 +128,7 @@ public MirroringBufferedMutator(
BufferedMutatorParams bufferedMutatorParams,
MirroringConfiguration configuration,
ExecutorService executorService,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
this.userListener = bufferedMutatorParams.getListener();
Expand Down Expand Up @@ -167,12 +175,14 @@ public void onException(
this.mutationsBufferFlushThresholdBytes,
this.ongoingFlushesCounter,
this.mirroringTracer);
this.timestamper = timestamper;
}

@Override
public void mutate(Mutation mutation) throws IOException {
try (Scope scope =
this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BUFFERED_MUTATOR_MUTATE)) {
mutation = timestamper.fillTimestamp(mutation);
mutateScoped(Collections.singletonList(mutation));
}
}
Expand All @@ -182,7 +192,8 @@ public void mutate(final List<? extends Mutation> list) throws IOException {
try (Scope scope =
this.mirroringTracer.spanFactory.operationScope(
HBaseOperation.BUFFERED_MUTATOR_MUTATE_LIST)) {
mutateScoped(list);
List<? extends Mutation> timestampedList = timestamper.fillTimestamp(list);
mutateScoped(timestampedList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -137,6 +138,7 @@ public SequentialMirroringBufferedMutator(
FlowController flowController,
ExecutorService executorService,
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
Timestamper timestamper,
MirroringTracer mirroringTracer)
throws IOException {
super(
Expand All @@ -145,6 +147,7 @@ public SequentialMirroringBufferedMutator(
bufferedMutatorParams,
configuration,
executorService,
timestamper,
mirroringTracer);
this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer;
this.flowController = flowController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class Batcher {
private final boolean waitForSecondaryWrites;
private final boolean performWritesConcurrently;
private final MirroringTracer mirroringTracer;
private final Timestamper timestamper;

public Batcher(
Table primaryTable,
Expand All @@ -78,6 +80,7 @@ public Batcher(
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
VerificationContinuationFactory verificationContinuationFactory,
ReadSampler readSampler,
Timestamper timestamper,
Predicate<Object> resultIsFaultyPredicate,
boolean waitForSecondaryWrites,
boolean performWritesConcurrently,
Expand All @@ -92,6 +95,7 @@ public Batcher(
this.waitForSecondaryWrites = waitForSecondaryWrites;
this.performWritesConcurrently = performWritesConcurrently;
this.mirroringTracer = mirroringTracer;
this.timestamper = timestamper;
}

public void batchSingleWriteOperation(Row operation) throws IOException {
Expand Down Expand Up @@ -129,8 +133,9 @@ public <R> void batch(
final Object[] results,
@Nullable final Callback<R> callback)
throws IOException, InterruptedException {
List<? extends Row> timestampedInputOperations = timestamper.fillTimestamp(inputOperations);
final RewrittenIncrementAndAppendIndicesInfo<? extends Row> actions =
new RewrittenIncrementAndAppendIndicesInfo<>(inputOperations);
new RewrittenIncrementAndAppendIndicesInfo<>(timestampedInputOperations);
Log.trace(
"[%s] batch(operations=%s, results)", this.primaryTable.getName(), actions.operations);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
Expand Down Expand Up @@ -280,6 +282,32 @@ public class MirroringConfigurationHelper {
public static final String MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS =
"google.bigtable.mirroring.result-scanner.buffered-mismatched-reads";

/**
* Enables timestamping {@link org.apache.hadoop.hbase.client.Put}s without timestamp set based on
* client's host local time. Client-side timestamps assigned by {@link Table}s and {@link
* BufferedMutator}`s created by one {@link Connection} are always increasing, even if system
* clock is moved backwards, for example by NTP or manually by the user.
*
* <p>There are three possible modes of client-side timestamping:
*
* <ul>
* <li>disabled - leads to inconsistencies between mirrored databases because timestamps are
* assigned separately on databases' severs.
* <li>inplace - all mutations without timestamp are modified in place and have timestamps
* assigned when submitted to Table or BufferedMutator. If mutation objects are reused then
* COPY mode should be used.
* <li>copy - timestamps are added to `Put`s after copying them, increases CPU load, but
* mutations submitted to Tables and BufferedMutators can be reused. This mode, combined
* with synchronous writes, gives a guarantee that after HBase API call returns submitted
* mutation objects are no longer used and can be safely modified by the user and submitted
* again.
* </ul>
*
* <p>Default value: inplace.
*/
public static final String MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS =
"google.bigtable.mirroring.enable-default-client-side-timestamps";

public static void fillConnectionConfigWithClassImplementation(
Configuration connectionConfig,
Configuration config,
Expand Down
Loading