-
Notifications
You must be signed in to change notification settings - Fork 14
SnapshotTransaction stream refactor proof-of-concept #7003
base: develop
Are you sure you want to change the base?
Conversation
Generate changelog in
|
.thenApply(remainingRawResults -> getWithPostFilteringInternal( | ||
tableRef, | ||
remainingRawResults, | ||
asyncKeyValueService, | ||
asyncTransactionService, | ||
iterations + 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recursion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a max iterations precondition already so no danger of stack overflow. the values on the stack are just references to the result maps that need to be kept alive during collecting postfiltered results in any case
Map<Cell, Value> filteredResults = EntryStream.of(rawResults) | ||
.removeKeys(keysToDelete::containsKey) | ||
.removeKeys(keysToReload::containsKey) | ||
.removeValues(value -> value.getContents().length == 0) | ||
// The value has a commit timestamp less than our start timestamp, and is visible and valid. | ||
.collect(MoreCollectors.entriesToCustomMap(LinkedHashMap::new)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra allocation should be avoided
Map<Cell, Long> keysToDelete = EntryStream.of(rawResults) | ||
.mapValues(Value::getTimestamp) | ||
.filterKeyValue((key, ts) -> commitTimestamps.getIfAbsent(ts, TransactionConstants.FAILED_COMMIT_TS) | ||
== TransactionConstants.FAILED_COMMIT_TS) | ||
.toImmutableMap(); | ||
Map<Cell, Long> keysToReload = EntryStream.of(rawResults) | ||
.removeKeys(keysToDelete::containsKey) | ||
.mapValues(Value::getTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double iteration should be avoided
As an exercise I think this is fine, but to actually merge this, we'd probably need more than just "streams are easier to read" as a justification, especially if there's additional allocation cost in the hot read path. |
I was talking to Will about this refactor in general, and how some of the control flow (specifically the use of a |
yup, motivation was just a proof-of-concept since raymond and i were discussing the code in the context of db upgrading and that it was difficult to plug in the kvs migrator, and also the code was a bit hard to follow for the uninitiated because of the various future chaining. i suggested streams and either raymond or jakub said it wouldn't be possible to use them async so this was a proof of concept using streams while keeping async behavior the extra allocation of keysToDeleteOrReload could be removed with a map view -- we were already allocating for the result map, just renamed it to I think this version is easier to reason about since there's no bouncing back and forth between getWithPostFilteringIterate/getWithPostFilteringInternal, i believe anyway not suggesting we merge this right now, but the Future Spliterator approach could be something to keep in mind/reference if this code is going to be refactored anyway |
Could probably also separate out all the postfiltering logic into its own
PostFilter
classBirthday present for @rhuffy
General
Before this PR:
After this PR:
==COMMIT_MSG==
==COMMIT_MSG==
Priority:
Concerns / possible downsides (what feedback would you like?):
Is documentation needed?:
Compatibility
Does this PR create any API breaks (e.g. at the Java or HTTP layers) - if so, do we have compatibility?:
Does this PR change the persisted format of any data - if so, do we have forward and backward compatibility?:
The code in this PR may be part of a blue-green deploy. Can upgrades from previous versions safely coexist? (Consider restarts of blue or green nodes.):
Does this PR rely on statements being true about other products at a deployment - if so, do we have correct product dependencies on these products (or other ways of verifying that these statements are true)?:
Does this PR need a schema migration?
Testing and Correctness
What, if any, assumptions are made about the current state of the world? If they change over time, how will we find out?:
What was existing testing like? What have you done to improve it?:
If this PR contains complex concurrent or asynchronous code, is it correct? The onus is on the PR writer to demonstrate this.:
If this PR involves acquiring locks or other shared resources, how do we ensure that these are always released?:
Execution
How would I tell this PR works in production? (Metrics, logs, etc.):
Has the safety of all log arguments been decided correctly?:
Will this change significantly affect our spending on metrics or logs?:
How would I tell that this PR does not work in production? (monitors, etc.):
If this PR does not work as expected, how do I fix that state? Would rollback be straightforward?:
If the above plan is more complex than “recall and rollback”, please tag the support PoC here (if it is the end of the week, tag both the current and next PoC):
Scale
Would this PR be expected to pose a risk at scale? Think of the shopping product at our largest stack.:
Would this PR be expected to perform a large number of database calls, and/or expensive database calls (e.g., row range scans, concurrent CAS)?:
Would this PR ever, with time and scale, become the wrong thing to do - and if so, how would we know that we need to do something differently?:
Development Process
Where should we start reviewing?:
If this PR is in excess of 500 lines excluding versions lock-files, why does it not make sense to split it?:
Please tag any other people who should be aware of this PR: