Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent writes in MirroringBufferedMutator #80

Merged
merged 1 commit into from
Oct 29, 2021

Conversation

mwalkiewicz
Copy link
Collaborator

@mwalkiewicz mwalkiewicz commented Oct 14, 2021

This change is Reviewable

@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch from a86c9a7 to 157b0c2 Compare October 15, 2021 10:02
@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch from 157b0c2 to 45e74b5 Compare October 15, 2021 10:59
@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch from 45e74b5 to c0740fd Compare October 15, 2021 11:46
@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-table/1 branch 2 times, most recently from 5fcff49 to 50b7d22 Compare October 19, 2021 09:37
@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch from c0740fd to 23c8174 Compare October 19, 2021 09:42
@mwalkiewicz mwalkiewicz changed the base branch from mw/concurrent-writes-table/1 to mw/request-scheduling-refactor/1 October 19, 2021 09:43
Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 14 files at r3, all commit messages.
Reviewable status: 3 of 14 files reviewed, 7 unresolved discussions (waiting on @dopiera and @mwalkiewicz)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java, line 189 at r3 (raw file):

    ListenableFuture<Void> secondaryFlushFinished = scheduleSecondaryFlush();

    final AtomicBoolean firstFinished = new AtomicBoolean(false);

This works, but why not use a standard solution, i.e. Futures.allAsList?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 53 at r3 (raw file):

 * <p>We want to perform a secondary write only if we are certain that it was successfully applied
 * on primary database.

That is no longer true, is it?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 107 at r3 (raw file):

  protected final BufferedMutator primaryBufferedMutator;
  protected final BufferedMutator secondaryBufferedMutator;
  protected final FlowController flowController;

As we discussed, flowControler is specific the sequential version.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 109 at r3 (raw file):

  protected final FlowController flowController;
  protected final ListeningExecutorService executorService;
  protected final SecondaryWriteErrorConsumer secondaryWriteErrorConsumer;

The secondaryWriteErrorConsumer should likely be different between implementations, right? In in sequential we put the errors into the faillog, in concurrent we need to wait for them. I think you only want to provide this member in sequential (async) mode, right?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 148 at r3 (raw file):

SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,

As mentioned before - this is specific to the sequential version, right?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 159 at r3 (raw file):

              throws RetriesExhaustedWithDetailsException {
            handlePrimaryException(e);
            userListener.onException(e, bufferedMutator);

In concurrent (sync), this is no longer the case, right?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 342 at r3 (raw file):

  }

  protected static class FlushFutures {

Depending on the context, you're using the secondaryFlushFinished future to indicate what its name suggests or to indicate that both flushes have completed. Why not Introduce a third future?

@mwalkiewicz mwalkiewicz force-pushed the mw/request-scheduling-refactor/1 branch 2 times, most recently from 954d3f3 to 586ba4a Compare October 26, 2021 10:39
@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch 4 times, most recently from 8a3851b to 06eaac1 Compare October 28, 2021 12:39
@mwalkiewicz mwalkiewicz changed the base branch from mw/request-scheduling-refactor/1 to master October 28, 2021 12:40
Copy link
Collaborator Author

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 15 files reviewed, 7 unresolved discussions (waiting on @dopiera)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java, line 189 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

This works, but why not use a standard solution, i.e. Futures.allAsList?

Futures.allAsList is @Beta, I don't know any stable method that is able to do it.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 53 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…
 * <p>We want to perform a secondary write only if we are certain that it was successfully applied
 * on primary database.

That is no longer true, is it?

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 107 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

As we discussed, flowControler is specific the sequential version.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 109 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

The secondaryWriteErrorConsumer should likely be different between implementations, right? In in sequential we put the errors into the faillog, in concurrent we need to wait for them. I think you only want to provide this member in sequential (async) mode, right?

There shouldn't be any secondaryWriteErrorConsumer in concurrent version, you are right, TBD.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 148 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…
SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,

As mentioned before - this is specific to the sequential version, right?

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 159 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

In concurrent (sync), this is no longer the case, right?

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 342 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

Depending on the context, you're using the secondaryFlushFinished future to indicate what its name suggests or to indicate that both flushes have completed. Why not Introduce a third future?

Done.

@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch from 06eaac1 to cb42f42 Compare October 28, 2021 12:58
Copy link
Collaborator Author

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 15 files reviewed, 7 unresolved discussions (waiting on @dopiera)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 109 at r3 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

There shouldn't be any secondaryWriteErrorConsumer in concurrent version, you are right, TBD.

Done.

Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 15 files reviewed, all discussions resolved (waiting on @dopiera)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 107 at r3 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Done.

But you left it here.

@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-buffered-mutator/1 branch from cb42f42 to ab721ca Compare October 29, 2021 06:28
Copy link
Collaborator Author

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 15 files reviewed, all discussions resolved (waiting on @dopiera)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java, line 107 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

But you left it here.

Done.

Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 15 files reviewed, all discussions resolved (waiting on @dopiera)

dopiera added a commit that referenced this pull request May 11, 2022
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (#125)

* feat: concurrent writes in MirroringBufferedMutator (#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75)

* feat: implement MirroringAsyncTable#getName() (#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (#128)

* feat: synchronous writes (#88)

* fix: implement heapSize method for RowCell (#111)

* feat: FlowController accounts for memory usage (#137)

* refactor: remove Configuration as a base of MirroringConfiguration (#127)

* feat: MirroringAsyncBufferedMutator (#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (#110)

* feat: 2.x reads sampling (#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134)

* ConcurrentBufferedMutator integration tests (#135)

* feat: add synchronous MirroringConnection to 2.x (#109)

* fix: MirroringConnection in 2.x failed to compile (#139)

* fix: fix BufferedMutator ITs (#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (#116)

* fix: fix build (#142)

* refactor: minor fixes after review (#117)

* feat: MirroringAsyncTable#getScanner() (#58)

* test: 2.x integration tests (#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (#144)

* feat: log rows and values in DefaultMismatchDetector (#129)

* fix: ITs - add expected parameter to MismatchDetectors (#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (#133)

* feat: better mismatch detection of scan results (#130)

* feat: quickstart (#105)

* fix: 2.x scan ITs (#158)

* fix: DefaultMismatchDetector tests (#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161)

* fix: additional minor fixes after review (#163)

* fix: BufferedMutator review fixes (#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
mwalkiewicz added a commit that referenced this pull request May 18, 2022
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (#125)

* feat: concurrent writes in MirroringBufferedMutator (#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75)

* feat: implement MirroringAsyncTable#getName() (#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (#128)

* feat: synchronous writes (#88)

* fix: implement heapSize method for RowCell (#111)

* feat: FlowController accounts for memory usage (#137)

* refactor: remove Configuration as a base of MirroringConfiguration (#127)

* feat: MirroringAsyncBufferedMutator (#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (#110)

* feat: 2.x reads sampling (#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134)

* ConcurrentBufferedMutator integration tests (#135)

* feat: add synchronous MirroringConnection to 2.x (#109)

* fix: MirroringConnection in 2.x failed to compile (#139)

* fix: fix BufferedMutator ITs (#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (#116)

* fix: fix build (#142)

* refactor: minor fixes after review (#117)

* feat: MirroringAsyncTable#getScanner() (#58)

* test: 2.x integration tests (#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (#144)

* feat: log rows and values in DefaultMismatchDetector (#129)

* fix: ITs - add expected parameter to MismatchDetectors (#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (#133)

* feat: better mismatch detection of scan results (#130)

* feat: quickstart (#105)

* fix: 2.x scan ITs (#158)

* fix: DefaultMismatchDetector tests (#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161)

* fix: additional minor fixes after review (#163)

* fix: BufferedMutator review fixes (#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants