-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve IncrementalIndex concurrency scalability #7838
Conversation
- Shared state is encapsulated in a new class - DimensionData. This includes dimensionDescs, dimensionDescsList and columnCapabilities - Concurrent threads share an atomic reference to an instance of DimensionData - CoW: Only when a thread needs to update the shared state, it will copy the instance, update the copy, and eventually swap the reference atomically. - Consistency is maintained when the reference is updated. This simplifies row processing, removes the need for keeping an “overflow” array, and allows fast failure when a row contains duplicate dimensions. - New multi-threaded ingestion benchmark: IndexIngestionMultithreadedBenchmark
Added missing CAS operation after dimension updates
@leventov thanks for properly tagging this. Would greatly appreciate it if you could review the changes and/or tag relevant committers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the benchmark results charts that you included in the PDF it seems to me that in some cases the new version is slower. Could you please post raw numbers?
@State(Scope.Thread) | ||
public static class ThreadState | ||
{ | ||
int id = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a name like rowOffset
should be more descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
|
||
if (rowDimKeys.containsKey(dimension)) { | ||
// If the dims map already contains a mapping at this index, it means we have seen this dimension already on this input row. | ||
throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, it would be better to print the full faulty row.
Second, I think the behavior should still be permissive. We can ignore repetitive occurrences of a key, just log it on ERROR level. Or, IncrementalIndexRowResult
should be amended with extra error data.
I invite @egor-ryashin, @jihoonson, and @jon-wei to chime if they disagree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, I think this is where that check originated: #63
For the duplicate dimension case, making it permissive seems like it would be fine, adding an entry to parseExceptionMessages
in toIncrementalIndexRow
when this happens will ultimately result in the row being ingested, with the processedWithError
ingestion counter being incremented, which sounds appropriate to me.
The check mentioned at #7838 (comment) I think can be handled similarly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made permissive. Instead of throwing an exception, an error message is appended to parseExceptionMessages
|
||
DimensionData prevDimensionData = this.dimensions.get(); | ||
DimensionData dimensionData = null; | ||
for (String dimension : rowDimensions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this loop with new dimensionData
preparation be extracted as a method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if (desc != null) { | ||
capabilities = desc.getCapabilities(); | ||
} else { | ||
if (dimensionData == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this block (new dimension creation) be extracted as a separate method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this block (or a method, if extracted) should have a comment indicating that it might be processed concurrently from multiple threads. Therefore making thread-unsafe calls from the block (method) would be a concurrency bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
newDims[dims.length + i] = overflow.get(i); | ||
|
||
if (dimensionData != null) { | ||
while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be simpler to do double-checked locking here than a CAS loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I gather, there's not a lot of contention in updating dimension data. The motivation to use CAS rather than locking was to save thread state changes, which may result in more costly context switching.
dimensionData = prevDimensionData; | ||
} | ||
Object[] dims = new Object[dimensionData.size()]; | ||
for (String dimension : rowDimKeys.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A loop over entrySet()
or map.forEach()
would be more optimal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
DimensionData dimensionData = null; | ||
for (String dimension : rowDimensions) { | ||
if (Strings.isNullOrEmpty(dimension)) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though it's existing code, I think this condition should be logged on ERROR level. Or, IncrementalIndexRowResult
should be amended with extra error data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An error message is now appended to parseExceptionMessages
if (dimensionData != null) { | ||
while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) { | ||
prevDimensionData = dimensions.get(); | ||
dimensionData.rebase(prevDimensionData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like there is a race condition because the result of processRowValsToUnsortedEncodedKeyComponent()
for other dimensionData
(hence other indexer
) could be different. Please also reconcile with "Thread Safety" section in DimensionIndexer
's Javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about how the problem of creeping bugs like this (if this turns out to be a bug) should be addressed more reliably (rather than just assuming there will be somebody making a review on a sufficiently deep level when they fear "danger" of concurrent non-blocking code) and came up with this checklist item: https://github.com/code-review-checklists/java-concurrency#non-blocking-warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was indeed a possible race condition in which one thread would use indexer with an inconsistent state. Fixed so that indexer state updates are also repeated if contention was detected.
@@ -1154,22 +1220,23 @@ protected ColumnSelectorFactory makeColumnSelectorFactory( | |||
|
|||
protected final Comparator<IncrementalIndexRow> dimsComparator() | |||
{ | |||
return new IncrementalIndexRowComparator(dimensionDescsList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to mention in the PR description that you fixed a concurrency bug: there was unsynchronized access to a concurrently updated ArrayList.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, here is an explanation of this kind of bug, including specifically in application to ArrayList: https://github.com/code-review-checklists/java-concurrency#unsafe-concurrent-point-read
Hi @eranmeir, thanks for your contribution! It looks that the PDF you attached shows we can gain some performance benefit with this PR when multiple threads write into the same incremental index. But, I'm wondering if we need that kind of performance benefit. Currently, the most common use case is serving multiple queries from different threads while building an incremental index with a single thread. So, it's like the single write and multiple reads case. Also, in indexing, the most expensive part is I think segment merge rather than building incremental indexes. Would you elaborate more on your plan regarding this point of view? More specifically, do you see something we can do better when multiple threads write into the same incremental index? |
@leventov Thanks for the review. I will soon add a commit that addresses the issues you raise. I have updated the attached pdf with raw numbers of pre- and post-fix benchmarks. @jihoonson Thanks for the feedback. We used Oak for indexing large amounts of data and saw significant performance gains from off-heap storage and multi-threaded indexing. This separate PR may be thought of as ground work for Oak, but multi-threaded performance gains are obviously independent. The way I understand what you write, queries that happen before segment merge may benefit from increase in indexing throughput. We haven't tried comparing costs of index building and segment merging. Perhaps we can try and leverage multi-threading in the merge process. I'll be happy to discuss this. |
@jihoonson , indeed thank you for raising this important question! I would like to add a bit to @eranmeir answer. Without relevance to Oak, it might be interesting that other databases found it beneficial to make the inserts to in-memory component concurrent. Here is a link to academic paper about cLSM (concurrent LSM) showing benefits from making in-memory component concurrent. And here are RocksDB release notes from 2017 presenting concurrent writes to memtable enabled as a default:
And with relevance to Oak, OakMap brings with it two benefits: (1) taking the key-value mapping off-heap, (2) high multi-threading scalability. Due to off-heaping, the Oak-based IncrementalIndex might manage more data in-memory thus require less often flushes to disk. Such IncrementalIndex might be responsible for a bigger keys partition and that way cause less merges. Clearly, managing bigger map is more efficient when multi-thread writes are allowed and are efficient. |
- Permissive duplicate dimension handling (tests updated) - Additional error logging
@eranmeir and @sanastas, thank you for the explanation. For faster data ingestion, our previous work has been focused on distributed data ingestion. This is already done for stream ingestion in Kafka/Kinesis indexing service. For batch ingestion, we already support ingestion with Hadoop and native distributed batch indexing is also in development (#5543). For stream ingestion, I think it's more important to serve as many queries as possible at the same time than faster indexing. That means, most threads should be assigned for query processing rather than indexing. However, in batch ingestion, there's no need to serve queries while indexing, so it makes more sense to apply multi-threaded indexing. Less segment merges sounds great, but it's still unclear to me exactly how this PR and Oak could improve data ingestion performance of Druid. Actually, I thought Oak is for improving query performance of incremental index. My feeling is that my confusion comes from the lack of the proposal. I know you opened #5698 and #7676, but it still seems many parts are unclear. For example, what's the motivation of using Oak exactly? Is it better memory management and better concurrent writes? I think it would be nicer if things are explained and described in one proposal. I would happily help you with the proposal if you need. |
@jihoonson, thanks for all the insightful questions and comments. Let me chime in :) All-in-all, the motivation for our work is getting data into Druid faster, and making the system performance more predictable over time. How? By using Oak as primary index for data ingestion, and exploiting more of its capabilities over time. Oak is an off-heap data structure, pretty immune to GC, which behaves much better than the JDK concurrent skip list in microbenchmarks (and we keep improving it). The master plan is:
Technically, the Incremental Index is a complex beast that does many things around the basic fact table management. Therefore, it's nontrivial to show the gains from using Oak immediately. This patch is a step in the direction of goal #2 above - making the whole solution more concurrency-friendly in a way that is independent on the fact table implementation. Doing this will make it easier to merge with the non-blocking Oak code down the road. The benchmark's goal was to show that we don't make things worse, and improve as the number of threads scales. Does this make more sense? |
@jihoonson , you have a very valuable input! Let me prepare a draft for Oak proposal for your review (according to your template). |
- Refactoring toIncrementalIndexRow - Additional comments indicating code that should be thread-safe
@jihoonson I pasted a proposal draft on issue #5698, please take a look there. Your comments are very welcome! |
|
||
private final Map<String, Object> rowDimKeys; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put fields above methods
private final @Nullable DimensionData updatedDimensionData; | ||
private final List<String> parseExceptionMessages; | ||
|
||
private RowDimsKeyComponents(Map<String, Object> rowDimKeys, @Nullable DimensionData updatedDimensionData, List<String> parseExceptionMessages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 columns.
} | ||
dimensionData = prevDimensionData.clone(); | ||
if (dimensionData.size() != 0) { | ||
throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionData.getDimensionDescs().keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 columns. If you use IDE for developing Druid, please set up the ruler at 120 columns to avoid adding such code.
if (rowDimKeys.containsKey(dimension)) { | ||
// If the dims map already contains a mapping at this index, it means we have seen this dimension already on this input row. | ||
throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); | ||
parseExceptionMessages.add(StringUtils.nonStrictFormat("Dimension[%s] occurred more than once in InputRow", dimension)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line longer than 120 columns
return new IncrementalIndexRowResult(incrementalIndexRow, dimsKeyComponents.getParseExceptionMessages()); | ||
} | ||
|
||
// Note: This method might be called from multiple parallel threads without synchronization. Making thread-unsafe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please turn this into a Javadoc comment.
DimensionData dimensionData = dimsKeyComponents.getUpdatedDimensionData(); | ||
while (dimensionData != null && !dimensions.compareAndSet(prevDimensionData, dimensionData)) { | ||
prevDimensionData = dimensions.get(); | ||
dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, this line is the key for preventing a race condition. Please add a comment emphasizing this fact and explaining what race condition would be possible otherwise.
DimensionData prevDimensionData = this.dimensions.get(); | ||
RowDimsKeyComponents dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData); | ||
DimensionData dimensionData = dimsKeyComponents.getUpdatedDimensionData(); | ||
while (dimensionData != null && !dimensions.compareAndSet(prevDimensionData, dimensionData)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not documented what does it mean for dimensionData
to be null here. Please add a Javadoc comment to RowDimsKeyComponents
's field.
@@ -235,6 +235,8 @@ public ColumnCapabilities getColumnCapabilities(String columnName) | |||
|
|||
private final Map<String, MetricDesc> metricDescs; | |||
|
|||
// Dimension data may be updated by concurrent non-blocking threads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make a Javadoc comment.
@@ -235,6 +235,8 @@ public ColumnCapabilities getColumnCapabilities(String columnName) | |||
|
|||
private final Map<String, MetricDesc> metricDescs; | |||
|
|||
// Dimension data may be updated by concurrent non-blocking threads. | |||
// Mutating the reference directly wouble be a concurrency bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "wouble"
@ebortnik thanks! Your comment makes things more clear. I have a follow-up question on the above comment though. As I mentioned earlier, our previous work has focused on distributed data ingestion in which multiple tasks can run at the same time to ingest data into the same dataSource. Each task uses a few threads for indexing (basically read thread, persist thread and merge thread), but they can run in parallel in multiple middleManagers (or even same middleManager) so that faster ingestion could be achieved. It seems that your comment assumes that multi-thread indexing will bring us other benefits which is missing in distributed data ingestion. What kind of benefits can we expect with multi-thread indexing? And how is it different from what we can get from distributed indexing? |
@jihoonson Thanks for sharpening the focus. With a faster concurrent incremental index, we could introduce more reader threads (I guess those are the threads that actually build the index ..) In other words, this will improve vertical scalability. This is complementary to horizontal scalability (distributed indexing). All in all, we expect running fewer tasks, merging less, and completing the overall indexing work faster. Does this make sense? |
@jihoonson , in addition to multi-threading indexing (writes), we can benefit from faster read-write mixture. Index building from streaming and concurrent queries. Can you recall any Druid Benchmark that measure such behavior of IncrementalIndex? |
@ebortnik thanks! It does make sense especially when it comes to ingestion with perfect rollup. Distributed indexing would involve shuffle for perfect rollup while multi-thread indexing can be done in a single or a few machines. I would expect we can benefit from using both of them, like distributed multi-thread indexing. @sanastas that sounds good. Looks like |
@jihoonson , thank you very much! We will run this benchmark, it would be interesting to see the comparison with Oak :) |
@sanastas after looking further into this class, it looks like more a unit test rather than benchmark. Also it's written a long time ago and doesn't work now. I would recommend to write a similar but new benchmark. |
Thank you for taking the time to discuss and review the code. It seems that this kind of performance improvement may not be of immediate use in Druid, so I'm closing this PR. It is probably better to unify discussions of multi-threaded ingestion in the Oak issue (#5698). I will open a different PR fixing just the concurrency bug and relaxing the duplicate dimension requirement, since most of the changes in this branch are unnecessary. |
Background
Our work on Oak (see PR #7676), shows that there are significant performance gains with multi-threaded indexing (even when not using Oak). In our benchmarks we noticed that ingestion was not scaling as expected with multiple threads.
We traced the threads’ blocking states to two causes:
IncrementalIndex
that synchronized access todimensionDescs
StringDimensionIndexer
This PR proposes a solution to the first issue. The proposed solution is based on the observation that dimension data is updated infrequently and so ongoing exclusive locking is wasteful.
Summary of changes
DimensionData
. This includesdimensionDescs
,dimensionDescsList
andcolumnCapabilities
DimensionData
IndexIngestionMultithreadedBenchmark
dimensionDescsList
inIncrementalIndexRowComparator
For benchmark results see attached document:
Incremental Index Scaling.pdf