Skip to content

Commit

Permalink
feat: concurrent writes in MirroringTable
Browse files Browse the repository at this point in the history
  • Loading branch information
mwalkiewicz committed Oct 14, 2021
1 parent f115ceb commit 4dfb80a
Show file tree
Hide file tree
Showing 11 changed files with 647 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class MirroringConnection implements Connection {
private final Connection secondaryConnection;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean aborted = new AtomicBoolean(false);
private final boolean performWritesConcurrently;

/**
* The constructor called from {@link
Expand Down Expand Up @@ -117,6 +118,7 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
new SecondaryWriteErrorConsumerWithMetrics(
this.mirroringTracer, secondaryWriteErrorConsumer);
this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate);
this.performWritesConcurrently = this.configuration.mirroringOptions.performWritesConcurrently;
}

@Override
Expand Down Expand Up @@ -154,6 +156,7 @@ public Table call() throws IOException {
this.flowController,
this.secondaryWriteErrorConsumer,
this.readSampler,
this.performWritesConcurrently,
this.mirroringTracer);
this.referenceCounter.holdReferenceUntilClosing(table);
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.mirroring.hbase1_x;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_STRATEGY_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_MISMATCH_DETECTOR_CLASS;
Expand Down Expand Up @@ -46,6 +47,8 @@ public class MirroringOptions {
public final String writeErrorLogAppenderClass;
public final String writeErrorLogSerializerClass;

public final boolean performWritesConcurrently;

public MirroringOptions(Configuration configuration) {
this.mismatchDetectorClass =
configuration.get(
Expand Down Expand Up @@ -76,5 +79,6 @@ public MirroringOptions(Configuration configuration) {
this.writeErrorLogSerializerClass =
configuration.get(
MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS, DefaultSerializer.class.getCanonicalName());
this.performWritesConcurrently = configuration.getBoolean(MIRRORING_CONCURRENT_WRITES, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private <T> void scheduleRequest(
return;
}
this.listenableReferenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
RequestScheduling.scheduleRequestAndVerificationWithFlowControl(
requestResourcesDescription,
nextSupplier,
this.mirroringTracer.spanFactory.wrapReadVerificationCallback(scannerNext),
Expand Down
Loading

0 comments on commit 4dfb80a

Please sign in to comment.