Skip to content

Commit

Permalink
Revert "Update from master-for-merge-to-release (apache#343)" (apache…
Browse files Browse the repository at this point in the history
…#347)

This reverts commit 3e456ac.
  • Loading branch information
suneet-s authored Apr 18, 2021
1 parent 3e456ac commit 2007ab5
Show file tree
Hide file tree
Showing 57 changed files with 8,685 additions and 15,235 deletions.
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,9 @@ jobs:
- name: "web console end-to-end test"
before_install: *setup_generate_license
install: web-console/script/druid build
before_script:
- nvm install 10.24.0
- web-console/script/druid start
before_script: web-console/script/druid start
script: (cd web-console && npm run test-e2e)
after_script: web-console/script/druid stop
after_script: web-console/script/druid stop

- name: "docs"
install: (cd website && npm install)
Expand Down
31 changes: 7 additions & 24 deletions docs/querying/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,45 +113,28 @@ use table datasources.
### `union`

<!--DOCUSAURUS_CODE_TABS-->
<!--SQL-->
```sql
SELECT column1, column2
FROM (
SELECT column1, column2 FROM table1
UNION ALL
SELECT column1, column2 FROM table2
UNION ALL
SELECT column1, column2 FROM table3
)
```
<!--Native-->
```json
{
"queryType": "scan",
"dataSource": {
"type": "union",
"dataSources": ["table1", "table2", "table3"]
"dataSources": ["<tableDataSourceName1>", "<tableDataSourceName2>", "<tableDataSourceName3>"]
},
"columns": ["column1", "column2"],
"intervals": ["0000/3000"]
}
```
<!--END_DOCUSAURUS_CODE_TABS-->

Unions allow you to treat two or more tables as a single datasource. In SQL, this is done with the UNION ALL operator
applied directly to tables, called a ["table-level union"](sql.md#table-level). In native queries, this is done with a
"union" datasource.

With SQL [table-level unions](sql.md#table-level) the same columns must be selected from each table in the same order,
and those columns must either have the same types, or types that can be implicitly cast to each other (such as different
numeric types). For this reason, it is more robust to write your queries to select specific columns.
Union datasources allow you to treat two or more table datasources as a single datasource. The datasources being unioned
do not need to have identical schemas. If they do not fully match up, then columns that exist in one table but not
another will be treated as if they contained all null values in the tables where they do not exist.

With the native union datasource, the tables being unioned do not need to have identical schemas. If they do not fully
match up, then columns that exist in one table but not another will be treated as if they contained all null values in
the tables where they do not exist.
The list of "dataSources" must be nonempty. If you want to query an empty dataset, use an [`inline` datasource](#inline)
instead.

In either case, features like expressions, column aliasing, JOIN, GROUP BY, ORDER BY, and so on cannot be used with
table unions.
Union datasources are not available in Druid SQL.

Refer to the [Query execution](query-execution.md#union) page for more details on how queries are executed when you
use union datasources.
Expand Down
15 changes: 7 additions & 8 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ top-level and table-level. Queries that use UNION ALL in any other way will not
#### Top-level

UNION ALL can be used at the very top outer layer of a SQL query (not in a subquery, and not in the FROM clause). In
this case, the underlying queries will be run separately, back to back. Their results will be concatenated together
and appear one after the other.
this case, the underlying queries will be run separately, back to back, and their results will all be returned in
one result set.

For example:

Expand All @@ -164,15 +164,14 @@ UNION ALL
SELECT COUNT(*) FROM tbl WHERE my_column = 'value2'
```

With top-level UNION ALL, no further processing can be done after the UNION ALL. For example, the results of the
UNION ALL cannot have GROUP BY, ORDER BY, or any other operators applied to them.
When UNION ALL occurs at the top level of a query like this, the results from the unioned queries are concatenated
together and appear one after the other.

#### Table-level

UNION ALL can be used to query multiple tables at the same time. In this case, it must appear in a subquery in the
FROM clause, and the lower-level subqueries that are inputs to the UNION ALL operator must be simple table SELECTs.
Features like expressions, column aliasing, JOIN, GROUP BY, ORDER BY, and so on cannot be used. The query will run
natively using a [union datasource](datasource.md#union).
FROM clause, and the lower-level subqueries that are inputs to the UNION ALL operator must be simple table SELECTs
(no expressions, column aliasing, etc). The query will run natively using a [union datasource](datasource.md#union).

The same columns must be selected from each table in the same order, and those columns must either have the same types,
or types that can be implicitly cast to each other (such as different numeric types). For this reason, it is generally
Expand All @@ -191,7 +190,7 @@ FROM (
GROUP BY col1
```

With table-level UNION ALL, the rows from the unioned tables are not guaranteed to be processed in
When UNION ALL occurs at the table level, the rows from the unioned tables are not guaranteed to be processed in
any particular order. They may be processed in an interleaved fashion. If you need a particular result ordering,
use [ORDER BY](#order-by) on the outer query.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -84,24 +81,6 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSele
);
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
return new HllSketchBuildVectorAggregator(
selectorFactory,
getFieldName(),
getLgK(),
TgtHllType.valueOf(getTgtHllType()),
getMaxIntermediateSize()
);
}

/**
* For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases.
* The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,49 @@

package org.apache.druid.query.aggregation.datasketches.hll;

import com.google.common.util.concurrent.Striped;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;

/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or any numeric type.
*/
public class HllSketchBuildBufferAggregator implements BufferAggregator
{

/**
* for locking per buffer position (power of 2 to make index computation faster)
*/
private static final int NUM_STRIPES = 64;

private final ColumnValueSelector<Object> selector;
private final HllSketchBuildBufferAggregatorHelper helper;
private final int lgK;
private final TgtHllType tgtHllType;
private final int size;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);

/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
* {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
*/
private final byte[] emptySketch;

public HllSketchBuildBufferAggregator(
final ColumnValueSelector<Object> selector,
Expand All @@ -43,36 +71,80 @@ public HllSketchBuildBufferAggregator(
)
{
this.selector = selector;
this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
this.lgK = lgK;
this.tgtHllType = tgtHllType;
this.size = size;
this.emptySketch = new byte[size];

//noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
}

@Override
public void init(final ByteBuffer buf, final int position)
{
helper.init(buf, position);
// Copy prebuilt empty sketch object.

final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptySketch);
}
finally {
buf.position(oldPosition);
}

// Add an HllSketch for this chunk to our sketchCache.
final WritableMemory mem = getMemory(buf).writableRegion(position, size);
putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
}

/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* See https://github.com/druid-io/druid/pull/3956
*/
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
final Object value = selector.getObject();
if (value == null) {
return;
}

HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
lock.lock();
try {
final HllSketch sketch = sketchCache.get(buf).get(position);
HllSketchBuildAggregator.updateSketch(sketch, value);
}
finally {
lock.unlock();
}
}

/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* See https://github.com/druid-io/druid/pull/3956
*/
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
lock.lock();
try {
return sketchCache.get(buf).get(position).copy();
}
finally {
lock.unlock();
}
}

@Override
public void close()
{
helper.clear();
memCache.clear();
sketchCache.clear();
}

@Override
Expand All @@ -87,14 +159,56 @@ public long getLong(final ByteBuffer buf, final int position)
throw new UnsupportedOperationException("Not implemented");
}

private WritableMemory getMemory(final ByteBuffer buf)
{
return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
}

/**
* In very rare cases sketches can exceed given memory, request on-heap memory and move there.
* We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
*/
@Override
public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
{
helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition);
final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
if (sketch.isSameResource(oldMem)) { // sketch has not moved
final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
sketch = HllSketch.writableWrap(newMem);
}
putSketchIntoCache(newBuf, newPosition, sketch);
}

private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
{
final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
}

/**
* compute lock index to avoid boxing in Striped.get() call
*
* @param position
*
* @return index
*/
static int lockIndex(final int position)
{
return smear(position) % NUM_STRIPES;
}

/**
* see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
*
* @param hashCode
*
* @return smeared hashCode
*/
private static int smear(int hashCode)
{
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
}

@Override
Expand All @@ -104,6 +218,6 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
// lgK should be inspected because different execution paths exist in HllSketch.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028
inspector.visit("lgK", helper.getLgK());
inspector.visit("lgK", lgK);
}
}
Loading

0 comments on commit 2007ab5

Please sign in to comment.