-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Druid with Oak supporting also plain mode v04 #6235
Conversation
@Override | ||
public Row deserialize(ByteBuffer serializedValue) | ||
{ | ||
throw new UnsupportedOperationException(); // cannot be deserialized without the IncrementalIndexRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be implemented ? Iterators may use it.
public class OakValueSerializer implements Serializer<Row> | ||
{ | ||
|
||
private List<DimensionDesc> dimensionDescsList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never used here. Can be removed.
|
||
// This is modified on add() in a critical section. | ||
private final ThreadLocal<InputRow> in = new ThreadLocal<>(); | ||
private final Supplier<InputRow> rowSupplier = in::get; | ||
protected final ThreadLocal<InputRow> in = new ThreadLocal<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have private data members and protected accessors
memoryCapacity = (int) maxDirectMemory; | ||
} | ||
|
||
OakMapBuilder builder = new OakMapBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is some builder for IncrementalIndex, maybe it should have its own permanent OakMapBuilder. What is the rationale in creating the builder ad hoc?
} | ||
} | ||
|
||
oak = builder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, the builder should be singleton, and this line should be the only one in the c'tor - no?
} | ||
}; | ||
|
||
OakMap tmpOakMap = descending ? oak.descendingMap() : oak; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could keep using "oak"
@Override | ||
public Iterable<Row> iterableWithPostAggregations(List<PostAggregator> postAggs, boolean descending) | ||
{ | ||
Function<Map.Entry<ByteBuffer, ByteBuffer>, Row> transformer = new Function<Map.Entry<ByteBuffer, ByteBuffer>, Row>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this line should be a separate function ..
final Object lhsIdxs = lhs.getDims()[index]; | ||
final Object rhsIdxs = rhs.getDims()[index]; | ||
|
||
if (lhsIdxs == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lhsIdxs == null && rhsIdxs == null
|
||
int index = 0; | ||
while (retVal == 0 && index < numComparisons) { | ||
final Object lhsIdxs = lhs.getDims()[index]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lhs.getDims() and rhs.getDims() to local vars (not sure the compiler does the job)
} | ||
|
||
@Override | ||
public int compareKeys(IncrementalIndexRow lhs, IncrementalIndexRow rhs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code a copy-and-paste from somewhere? Any chance for code reuse?
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
|
||
public class OakKeysComparator implements OakComparator<IncrementalIndexRow> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of similar code down the road. Can the code be restructured into smaller methods, to be used in a modular way?
aggregate(reportParseExceptions, row, rowContainer, byteBuffer); | ||
} | ||
|
||
public void aggregate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like copy-and-paste from OakIncrementalIndex, please reuse
return dimObject; | ||
} | ||
|
||
static boolean checkDimsAllNull(ByteBuffer buff, int numComparisons) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to move it to external DimsUtils class. It is not related to internals of Incremental index.
|
||
static ValueType getDimValueType(int dimIndex, List<DimensionDesc> dimensionDescsList) | ||
{ | ||
DimensionDesc dimensionDesc = dimensionDescsList.get(dimIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider to move it to external DimsUtils class. It is not related to internals of Incremental index.
|
||
private Integer addToOak( | ||
InputRow row, | ||
AtomicInteger numEntries, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be more clear to let the method use numEntries
field instead.
InputRow row, | ||
AtomicInteger numEntries, | ||
IncrementalIndexRow incrementalIndexRow, | ||
ThreadLocal<InputRow> rowContainer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using in
field instead. Will reduce method signature and it will be easier to understand the functionality.
if (numEntries.get() < maxRowCount || skipMaxRowsInMemoryCheck) { | ||
oak.putIfAbsentComputeIfPresent(incrementalIndexRow, row, computer); | ||
|
||
int currSize = oak.entries(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code tries to sync numEntries
field with oak.entries()
value.
Consider to override methods querying numEntries
to return oak.entries()
value and ignore numEntries
in OakIncrementalIndex
.
return aggsManager.getMetricAggs(); | ||
} | ||
|
||
public static void aggregate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this method never used.
OakMap tmpOakMap = descending ? oak.descendingMap() : oak; | ||
OakTransformView transformView = tmpOakMap.createTransformView(transformer); | ||
CloseableIterator<Row> valuesIterator = transformView.valuesIterator(); | ||
return new Iterable<Row>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closable iterator close() can not be invoked here. May block memory GC.
{ | ||
CloseableIterator<IncrementalIndexRow> keysIterator = oak.keysIterator(); | ||
|
||
return new Iterable<IncrementalIndexRow>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No close for keysIterator
here, may block GC.
if (descending == true) { | ||
subMap = subMap.descendingMap(); | ||
} | ||
CloseableIterator<IncrementalIndexRow> keysIterator = subMap.keysIterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No close()
call for keysIter here. May cause GC block.
this.versionCounter = new AtomicInteger(0); | ||
IncrementalIndexRow minIncrementalIndexRow = getMinIncrementalIndexRow(); | ||
|
||
long maxDirectMemory = VM.maxDirectMemory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better to get the capacity in constructor, e.g. instead of rowsCount.
@sanastas do you mind fixing merge conflicts? |
Sorry for chiming in early, but is there any way to use this in a real (not just tests/benchmarks) scenario? I've tried "brute force" by modifying realtime/plumber/Sink to use OffheapOak, instead of the current Onheap default. But all that's happening is index Task spinning the CPU for a looong time and eventually going OOM. |
@fjy , thanks for your comment! Also the code changes are constantly happening, we will try to resolve the merge conflicts once in a week or so. |
@KenjiTakahashi , thanks for taking a try! Unfortunately, indeed Oak is not yet connected to be tested end-to-end (in a real scenario). The integration of an off-heap index (specially when also keys are off-heap) requires some baby steps in order to make it efficiently. So we are moving inside-out checking how the oak off-heap index can be used and introducing some Druid changes so the usage is (at least) more or less efficient. As off-heap index (buffer based) and on-heap index (object based) should be used differently. However, we clearly understand the need and the urgency to allow the real usage of Oak so we hope to make it work soon. In a meanwhile, as this Pool Request is going bigger and bigger we would like to hear your comments about the code so far. Thanking all reviewers in advance! |
@sanastas Thanks for the explanations. Well, I'll leave the CR here to the ones more familiar with this code :-). |
Thank you, @KenjiTakahashi ! We are working to connect the Oak end-to-end soon! |
According to the request we have fixed the merge conflicts. The result is PR#6327. I am closing this PR, the comment existing here will be all fixed. The discussion is moved to PR#6327. |
Hi Everybody,
Please take a look on the following Oak Incremental Index Pool Request. Below please find a list of the help material. Although we will soon present also the performance results, we would like to hear your thoughts and comments from the correctness and software engineering point of views.
Thanks,
Anastasia
In continuation to our great talk yesterday (where we agreed about publishing a PR for Oak Incremental Index for Druid), hereby please find some reading material about Oak. The pool request itself will be ready for tomorrow. I would strongly suggest to read some of the documentation prior to looking on the code, as it may make the last task easier and clearer. As I said the Oak is now an open source library (https://github.com/yahoo/Oak) and its README might help you to understand the Oak's interface and new Druid's code.
Recall we had an issue#5698 about Oak Incremental Index (#5698) and there you can find another useful documents to start from. We will publish the latest single-thread ingestion benchmark results tomorrow, together with the pool request. Feel free to ask questions!
Files from the issue:
Here one can see initial ingestion benchmark results:
IndexIngestionBenchmarkWithOak.pdf