Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] BufferAggregator support for growable sketches. #8126

Open
himanshug opened this issue Jul 22, 2019 · 18 comments
Open

[Proposal] BufferAggregator support for growable sketches. #8126

himanshug opened this issue Jul 22, 2019 · 18 comments

Comments

@himanshug
Copy link
Contributor

himanshug commented Jul 22, 2019

Motivation

  1. From IncrementalIndex generally overestimates theta sketch size #6743
    "Theta sketches have a very large max size by default, relative to typical row sizes (about 250KB with "size" set to the default of 16384). The ingestion-time row size estimator (getMaxBytesPerRowForAggregators in OnheapIncrementalIndex) uses this figure to estimate row sizes when theta sketches are used at ingestion time, leading to way more spills than is reasonable. It would be better to use an estimate based more on actual current size. I'm not sure how to get this, though."

  2. From [Proposal] Resizable Buffer in BufferAggregator #2963
    "In case of complex aggregators like thetaSketch, more than 80% of the time sketches don't grow to full capacity but query processing still reserves the full max size. The idea is to support use of a resizable aggregator by BufferAggregator so that maximum space is not reserved but BufferAggregator should be able to re-allocate the buffer on demand."

  3. Some sketches e.g. doublesSketch do not have an upper bound on size and can't provide a correct number for AggregatorFactory.getMaxIntermediateSize() . Current workaround, they use, is to use some number and then fall back to on-heap objects if sketch grows bigger than that.

Proposed changes

Add following methods to AggregatorFactory

  /**
   * Does BufferAggregator support handling of varying ByteBuffer sizes by overriding
   * {@link BufferAggregator#aggregate(ByteBuffer, int, int)}
   * @return
   */
  public boolean isDynamicallyResizable()
  {
    return getMinIntermediateSize() < getMaxIntermediateSize();
  }
  
  /**
   * Start size of ByteBuffer to be used with BufferAggregator.
   * @return
   */
  public int getMinIntermediateSize()
  {
    return getMaxIntermediateSize();
  }

Add following methods to BufferAggregator

  /**
   * Returns false if aggregation requires a bigger buffer than capacity arg or true.
   * Return status must be used exclusively to signal "low memory in buffer" condition and
   * nothing else.
   */
  default boolean aggregate(ByteBuffer buff, int position, int capacity)
  {
    aggregate(buff, position);
    return true;
  }

  // Following methods are equivalent of old methods with same name except they provide access to capacity
  // of ByteBuffer which is assumed to be AggregatorFactory.getMaxIntermediateSize() by older methods.

  default void init(ByteBuffer buff, int position, int capacity)
  {
    init(buff, position);
  }

  default void relocate(ByteBuffer oldBuff, int oldPosition, int oldCapacity, ByteBuffer newwBuff, int newwPosition, int newwCapacity)
  {
    relocate(oldPosition, newwPosition, oldBuff, newwBuff);
  }

  default Object get(ByteBuffer buff, int position, int capacity)
  {
    return get(buff, position);
  }

  default float getFloat(ByteBuffer buff, int position, int capacity)
  {
    return getFloat(buff, position);
  }

  default double getDouble(ByteBuffer buff, int position, int capacity)
  {
    return getDouble(buff, position);
  }

  default long getLong(ByteBuffer buff, int position, int capacity)
  {
    return getLong(buff, position);
  }

  default boolean isNull(ByteBuffer buff, int position, int capacity)
  {
    return isNull(buff, position);
  }

Update all of Druid core code to remove usage of Aggregator and use BufferAggregator in all places using the newly introduced methods. For example, see the changes made to OnheapIncrementalIndex in #8127 . Once that is done in all places, Aggregator interface and AggregatorFactory.factorize(ColumnSelectorFactory) and AggregatorFactory.getMaxIntermediateSize() can be removed.

At least BufferAggregator that work on top of sketches of variable sizes should be updated to implement newly introduced methods. In those extensions, AggregatorFactory.getMinIntermediateSize() should return a value that can be overridden by user per query/indexing to allow fine tuning.

Following interfaces and classes are introduced to aid usage of new methods in aggregator extension.

public interface MemoryAllocator
{
  BufferHolder allocate(int capacity);
  void free(BufferHolder bh);
}

public interface BufferHolder
{
  int position();
  int capacity();
  ByteBuffer get();
}

public class SimpleOnHeapMemoryAllocator implements MemoryAllocator
{
  @Override
  public BufferHolder allocate(int capacity)
  {
    return new SimplerBufferHolder(ByteBuffer.allocate(capacity));
  }

  @Override
  public void free(BufferHolder ignored)
  {

  }
  
  // This would be used in OnHeapIncrementalIndex
  private static class SimplerBufferHolder implements BufferHolder
  {
    private final ByteBuffer bb;

    public SimplerBufferHolder(ByteBuffer bb)
    {
      this.bb = bb;
    }

    @Override
    public int position()
    {
      return 0;
    }

    @Override
    public int capacity()
    {
      return bb.capacity();
    }

    @Override
    public ByteBuffer get()
    {
      return bb;
    }
  }
}

Rationale

#2963

Operational impact

None

Test plan (optional)

Unit Tests should cover the changes.

Future Work

I think MemoryAllocator interface might change a bit when we write code to use it in off-heap use cases in querying.
With these changes we can possibly use off-heap memory for aggregators in IncrementalIndex too.
This would also enable removal of v1 groupBy implementation that is kept around due to its usage of Aggregator that allowed onheap growable sketches.

@leventov
Copy link
Member

Related: #5335 (comment) and #5335 (comment)

@Eshcar
Copy link

Eshcar commented Jul 30, 2019

As part of the work we are doing towards integrating Oak (off-heap based incremental index) integration into druid #5698, we invested some thought on how to bridge the gap between Oak--with its internal memory management, off-heap sketches--based on WritableMemory, and druid aggregators.
I can share our thoughts, they aim to handle the same problems raised in this issue however the solution is different, hence it might be better to introduce it in a different issue.
In a nutshell,
(1) Oak manages its memory and needs all allocations of buffers to go through the internal memory manager and only be exposed through Oak's API.
(2) Off-heap sketches are based on WritableMemory, and can work with external memory manager that can allocate new WritableMemory when the sketch needs to grow
(3) Druid aggregators access sketches through the aggregator API (init, aggregate, get) and a mapping from bytebuffer,position -> sketch

What we suggest is to have oak manage the memory, including re-allocation of space when needed. Oak will implement its own WritebleMemory and MemoryRequestServer that are needed for a correct behaviour of the sketches wrt Oak index. Finally, we suggest to have a new Aggregator type - WritableMemoryAggregator that maps WritableMemory to sketch and can work the same way as buffer aggregators are working, and it does not need to worry about growing size of sketches.

There might be other alternatives for closing this loop; let's discuss them.

Does all this make sense @leventov @himanshug ?

@leventov
Copy link
Member

The "WritableMemoryAggregator" proposal is the new take on #3892. I support it.

@Eshcar
Copy link

Eshcar commented Jul 31, 2019

Thanks Roman.
#3892 is a very long issue that splits into multiple discussions covering many different things, so I am not sure what is the bottom line. Also it has not been discussed over the last year.

Can you summarize any progress made wrt Memory aggregator if any.
If it is blocked then what is the reason - is it community rejection due to backward compatibility? fear of performance degradation? or simply lack of working hands?

@Eshcar
Copy link

Eshcar commented Jul 31, 2019

2 naive question -

  1. Does buffer aggregators used by on-heap incremental index or only by off-heap incremental index?
  2. If the answer to (1) is only off-heap incremental index, then does off-heap incremental index being used in production anywhere? does it perform well enough?

@himanshug
Copy link
Contributor Author

@Eshcar
regarding memory management: I would like it to be an interface inside druid core and later be a pluggable entity that can be changed inside extensions e.g. say a oak based extension or users who know their data and can write a more appropriate one, they should be able to do so. that also allows ppl to experiment with different memory management strategies/impls.

regarding growth strategy: current strategy I described was to double the size every time, but I also thought about having following(notice that return type is "int" here not boolean)

  // Returns 0, if success
  // Returns positive value, indicating what should be the size of bigger allocated BufferSize
  // Returns negative value, indicating bigger buffer is needed and do something default
  default int aggregate(ByteBuffer buff, int position, int capacity)
  {
    aggregate(buff, position);
    return true;
  }

that could let aggregator impl control the growth strategy as it has best information about what it needs.

later if needed, "default" grown strategy could be made pluggable so that extension writers could control that. That said, It doesn't matter for OakIncrementalIndex as that would choose its own default growth strategy.

regarding using Memory instead of ByteBuffer : I am sure that is good idea for performance, so instead of adding more methods to BufferAggregator , we can introduce WritableMemoryAggregator. Only wrinkle is around maintaining backward compatibility, I haven't thought it through but will definitely try it. That said, this is somewhat orthogonal and could be done separately but I agree it is good to do that from the get go if possible.

@Eshcar
Copy link

Eshcar commented Aug 1, 2019

@himanshug - What is the context of the current proposal? Does it refers to aggregators that are used only in the context of queries, when querying immutable segments that are cached off-heap? Does it also cover off-heap incremental index roll-up aggregation? If it is the first then it is reasonable to define an API for external memory allocator, if it is the second then what I am suggesting is more relevant.
Note that Oak is considered as a core contrib and not extension, and is proposed as an efficient alternative for the existing off-heap incremental index. Which brings me back to my previous question - does the current off-heap incremental index considered operational, with reasonable performance?

I think that by introducing a new writable memory aggregator we avoid backward compatibility issues, do we not?

@leventov
Copy link
Member

leventov commented Aug 2, 2019

#3892 is a very long issue that splits into multiple discussions covering many different things, so I am not sure what is the bottom line

There is no bottom line, but there is kind of a "top line" that we want to move to Memory from ByteBuffers. This still stands.

If it is blocked then what is the reason - is it community rejection due to backward compatibility? fear of performance degradation? or simply lack of working hands?

No working hands.

Does buffer aggregators used by on-heap incremental index or only by off-heap incremental index?

They are used by off-heap incremental index, and in the implementations of queries such as groupBy and topN.

If the answer to (1) is only off-heap incremental index, then does off-heap incremental index being used in production anywhere? does it perform well enough?

It's used in the implementation of groupBy queries. It's not used more widely, including during the data ingestion specifically because of the unsolved problem with growable complex aggregations - this is what this proposal is all about.

@Eshcar
Copy link

Eshcar commented Aug 4, 2019

[off-heap incremental index] It's not used more widely, including during the data ingestion specifically because of the unsolved problem with growable complex aggregations - this is what this proposal is all about.

Then presenting a solution to this problem using Oak to manage the off-heap index, handling growable sketches using Memory, and showing that it performs as good as or better than the existing implementation would be a win-win-win solution, correct?
But I understand that it may not cover the entire scope of this issue, namely queries aggregation then I think the best thing to do would be to open a new issue for it.

BTW, if the current off-heap solution is operational we can compare against it in the current system (cluster) benchmarks that we are running, and compare performance (without sketches at this point). From what I know last time we tried to evaluate the off-heap incremental index through component level test it crashed and we were told it is not properly maintained.
Nevertheless, we will try running it in cluster mode. Any documentation on how we should configure the cluster to allow it running in off-heap mode?

@leventov
Copy link
Member

leventov commented Aug 5, 2019

Then presenting a solution to this problem using Oak to manage the off-heap index, handling growable sketches using Memory, and showing that it performs as good as or better than the existing implementation would be a win-win-win solution, correct?

Yes. I just want to warn you about trying to bring all of that at once - the extent of the change may become unbearable. So even if it incurs more work, it's better to make those changes separately: e. g. first, the transition to Memory (in the given subsystem), then - Oak. Doing it in the opposite order may be harder because Oak is already based on Memory, despite there is a method unsafeByteBufferView.

Any documentation on how we should configure the cluster to allow it running in off-heap mode?

Not sure what do you mean by that. It's not possible to use the off-heap incremental index for indexing even when all aggregations are constant-sized - it's not implemented.

@leventov
Copy link
Member

leventov commented Aug 5, 2019

Currently, the off-heap incremental index is used only in GroupBy V1, which is itself deprecated and may not work at all. So I don't really even see a point in running benchmarks against the current off-heap incremental index to prove that Oak is better.

@Eshcar
Copy link

Eshcar commented Aug 5, 2019

Thanks, that was our impression - that off-heap incremental index is not operational (however it does exist in the code). So, indeed there is no way to compare to it.

I also agree that doing the oak-sketches-druid integration in one step might be too complicated.
We already have an open issue #5698 and a PR #7676 for getting Oak incremental index into Druid and we hope to get progress there soon.
Oak is not based on Memory , yet :).
The context of my suggestion is how to support growable sketches off-heap while ingesting data in druid - namely, in the context of Oak. So the order should be first integrating Oak, then having Oak support growable sketches.

If I now understand correctly the purpose of the current proposal is to handle the queries aggregation problem. Oak might be a solution also for this problem but this is something we haven't looked at yet.

@himanshug
Copy link
Contributor Author

If I now understand correctly the purpose of the current proposal is to handle the queries aggregation problem. Oak might be a solution also for this problem but this is something we haven't looked at yet.

yes, the context here applies to every place aggregation happens and also let it not be dependent on something specific but allow extensions to eventually add optimal behavior.

@jon-wei
Copy link
Contributor

jon-wei commented Sep 26, 2019

The proposal LGTM in general, there are a couple of areas I think need adjustment:

  • With vectorization, the growable aggs would need to handle batch adds
  • When the agg grows, the new size should be reflected in the Appenderator's memory tracking so that maxBytesInMemory would function correctly

@himanshug
Copy link
Contributor Author

@jon-wei thanks for taking a look, sounds like there is consensus in general.

@gianm
Copy link
Contributor

gianm commented Apr 14, 2020

@himanshug, I'm wondering if you are still interested in pursuing this?

@himanshug
Copy link
Contributor Author

@gianm this comes back :) . not sure if you noticed #8127 , does that look close to what you imagined , I wanted to have early feedback on that before making further changes ?

@himanshug
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants