Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the data source before unnest #13085

Merged
merged 34 commits into from
Oct 26, 2022

Conversation

somu-imply
Copy link
Contributor

@somu-imply somu-imply commented Sep 14, 2022

Motivation

There are a number of use cases that require “flattening” records when processing. The need for this is exemplified in the existed of MVDs, which basically have a native “flattening” built-in. Other SQL systems can also do similar behaviors, they expect an ARRAY type of data and then you put that into a FLATTEN or UNNEST or other operator and it “explodes” out one new row for every value in the array.

We need to implement a similar operator for Druid. This dove-tails a bit with some of the recent work done for handling arrays in Druid. Essentially, the operator would take in an array and then “flatten” the array into N (N=number of elements in the array) rows where each row has one of the values from the array.

Related Work

The inspiration for using unnest as a data source comes from (Work with arrays | BigQuery | Google Cloud ) where unnest has been used as a data source. Clickhouse also has a flatten functionality (Array Functions | ClickHouse Docs ) but that does not transform a dataset by adding more rows. Since for our use case, unnest can be used with dateExpand functionality (coming after unnest) we model unnest as a data source similar to Bigquery.

Methodology

Refactoring

The underlying principle here is an operation on a data source that works on a segment and creates additional rows. Joins have a similar principle where the number of rows can be more than the input table after the join operation. The current framework supports that in the following way:

Having a join data source, alongside a factory and then a specialized wrapper (JoinableFactoryWrapper.java ) around it which creates a function to transform one segment into another by the notion of a segment function.

Have a separate implementation of segment reference through HashJoinSegment.java which uses a custom storage adapter to access a cursor to the segment for processing.

The goal here is to move out the creation of the segment map function from outside the wrapper to individual datasources. In cases where the segment map function is not an identity function (in case of join and also for unnest) the segment functions can be created accordingly for each data source. This makes the abstraction generic and readily extendable to other datasources we might create in future (e.g flatten)

The changes here:

  1. Creation of a segment map function moved into each data source
  2. Code refactored from JoinableFactoryWrapper to JoinDataSource
  3. PlannerContext was updated with an additional JoinableFactoryWrapper object during this process
  4. Several test cases were updated with the change to the JoinDataSource
  5. Some Guice injector things were updated as a part of the process, additionally nullables need not be bound to CliRouter using noops now
  6. New testcases for broadcast join in MSQ
  7. getCacheKey method updated to move inside each data source

New changes (will be in a followup)

After the new abstraction is in place, the following needs to be done for unnest

  • Have a separate UnnestDataSource class that deals with unnest
  • This should have it’s own segment map function
  • A separate UnnestSegment with a custom storage adapter to gain access to cursor in each segment
  • Design of the native query for Unnest considering we also need to pass the column(s) to unnest
  • Finally, the SQL query that maps to the native query

Each of these development phases should be backed by unit tests/ integration tests as applicable

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@somu-imply somu-imply changed the title Unnest v1 Refactoring the data source before unnest Sep 14, 2022
@somu-imply
Copy link
Contributor Author

Travis is failing from a branch coverage from the changes to BaseLeafFrameProcessor. While I am working on that I am setting this PR up for review now.

@somu-imply
Copy link
Contributor Author

Guice issues in Integration tests. Taking a look

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments/requests for changes.

Btw, in doing the PR, I came across methods in CacheUtils that are written like

  public static <T> boolean isUseSegmentCache(
      Query<T> query,
      @Nullable CacheStrategy<T, Object, Query<T>> cacheStrategy,
      CacheConfig cacheConfig,
      ServerType serverType
  )
  {
    return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
           && QueryContexts.isUseCache(query)
           && cacheConfig.isUseCache();
  }

isPopulateCache is the same. This is really unfortunate because cacheConfig.isUseCache is the cheapest of the calls, QueryContexts.isUseCache(query) is the second cheapest and isQueryCacheable(...) is the most expensive. AND logic will fail on the first false and not do other calls, so you always want the cheapest ones first, while we are doing the most expensive operation first. Can you take a few moments to invert the order of the parameters of the AND as well?

// was using the JoinableFactoryWrapper to create segment map function.
// After refactoring, the segment map function creation is moved to data source
// Hence for InputNumberDataSource we are setting the broadcast join helper for the data source
// and moving the segment map function creation there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to pick on a comment, but this comment does what most comments in code do: it talks from the context of the developer who is writing the code. Any new developer who comes and reads the code for the first time doesn't even know that this code was refactored, let alone know why it was refactored or what the state of the code was before the refactor. Instead of explain the change of the refactor, the comment should attempt to help explain what the current, post-refactor code is attempting to accomplish. I.e. something like

// The InputNumberDataSource requires a BroadcastJoinHelper to be able to create its
// segment map function.  It would be a lot better if the InputNumberDataSource actually
// had a way to get that injected into it on its own, but the relationship between these objects
// was figured out during a refactor and using a setter here seemed like the least-bad way to
// make progress on the refactor without breaking functionality.  Hopefully, some future 
// developer will move this away from a setter.

Or something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -565,6 +572,8 @@ private ObjectMapper setupObjectMapper(Injector injector)
"compaction"
)
).registerSubtypes(ExternalDataSource.class));
DruidSecondaryModule.setupJackson(injector, mapper);
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the code instead of comment it out please.

* from materializing things needlessly. Useful for unit tests that want to compare equality of different
* InlineDataSource instances.
*/
private static boolean rowsEqual(final Iterable<Object[]> rowsA, final Iterable<Object[]> rowsB)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These private methods seem to have moved above some public methods, was it just your IDE doing stuff or was there a specific reason? Generally speaking, the flow is always public methods first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my IDE doing stuff. I reformatted the code in the IDE but it does not seem to push public methods in the top of the file

@@ -127,12 +155,22 @@ public static JoinDataSource create(
final String rightPrefix,
final JoinConditionAnalysis conditionAnalysis,
final JoinType joinType,
final DimFilter leftFilter
final DimFilter leftFilter,
@Nullable @JacksonInject final JoinableFactoryWrapper joinableFactoryWrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having @JacksonInject on a method that isn't used by Jackson can be confusing (it will make future developers try to figure out how Jackson uses this method). I don't think it needs to be there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point ! Removed

* We should likely look into moving this functionality into the DataSource object itself so that they
* can walk and create new objects on their own. This will be necessary as we expand the set of DataSources
* that do actual work, as each of them will need to show up in this if/then waterfall.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure it should be safe to move something logically equivalent to this method into a method on DataSource itself. Let's do that inside of this refactor, so that we leave this refactor with hopefully never needing to have random methods of instanceof checks again.

Copy link
Contributor Author

@somu-imply somu-imply Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imply-cheddar this was moved to the DataSource , no more instanceof checks here

Comment on lines 195 to 203
public JoinableFactory getJoinableFactory()
{
return joinableFactory;
}

/**
* Compute a cache key prefix for a join data source. This includes the data sources that participate in the RHS of a
* join as well as any query specific constructs associated with join data source such as base table filter. This key prefix
* can be used in segment level cache or result level cache. The function can return following wrapped in an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of this code looks like it changed, but I think it just had methods re-ordered? Was that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IDE did this. I am using the druid intellij code formatter. Let me check how this can be avoided

* @return the optional cache key to be used as part of query cache key
* @throws {@link IAE} if this operation is called on a non-join data source
*/
public Optional<byte[]> computeJoinDataSourceCacheKey(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existence of this method is another point where things seem a little suspect. I think the DataSource object needs a public byte[] getCacheKey() method. Most of the current implementations would just return new byte[]{}, but the join one should return this (or null if not cacheable). The unnest implementation will need to override the method as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved. The test cases in JoinableFactoryWrapperTest have been refactored and
moved inside JoinDataSourceTest

Comment on lines 96 to 101
Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource()
.createSegmentMapFunction(
query,
cpuAccumulator
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that there's a lot less arguments here, I'm pretty sure this formatting is no longer the best way to represent this code. Please condense things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting very close, I think.

One thing that we need to go back and double check is that a lot of the call sites for the makeSegmentMapFn are just passing along a random AtomicLong for the cpu time tracker thingie. That's gonna mess up CPU tracking, so we need to look at all of those sites and give them the real tracker thingie instead of an object that just gets thrown away after things are done.

Comment on lines 89 to 90
@Warmup(iterations = 3)
@Measurement(iterations = 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the changes in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was reverted. Wherever the cpuAccumulators were reused, we passed a variable but where they were not used downstream I have used a new AtomicLong().

}
catch (ConfigurationException ce) {
//check if annotation is nullable
if (forProperty.getAnnotation(Nullable.class).annotationType().isAnnotation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that forProperty.getAnnotation() never returns null? If it does, I fear that this code is going to convert an actually useful ConfigurationException into a completely uninterpretable NullPointerException

AtomicLong cpuTime
)
{
final DataSource dataSourceWithInlinedChannelData = broadcastJoinHelper.inlineChannelData(query.getDataSource());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably nice to do a null check on the broadcastJoinHelper here and produce an error message that's meaningful to the developer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the check

/**
* Converts any join clauses to filters that can be converted, and returns the rest as-is.
*
* <p>
* See {@link #convertJoinToFilter} for details on the logic.
*/
@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really need this annotation on a public method :)

@@ -112,7 +114,8 @@ protected List<? extends Module> getModules()
binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>() {})
.toProvider(TieredBrokerSelectorStrategiesProvider.class)
.in(LazySingleton.class);


binder.bind(JoinableFactory.class).to(NoopJoinableFactory.class).in(LazySingleton.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary anymore, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

inputNumberDataSource.setBroadcastJoinHelper(broadcastJoinHelper);
segmentMapFn = inputNumberDataSource.createSegmentMapFunction(query, cpuAccumulator);
} else {
segmentMapFn = inlineChannelDataSource.createSegmentMapFunction(query, cpuAccumulator);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhishekagarwal87 I have updated this part. Should address your earlier concern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we find a test that was broken with the previous code? If not, then please revert back to the code that @abhishekagarwal87 initially commented on and keep that code active. That code is not broken until we can actually reproduce the breakage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query would fail in MSQ

SELECT t1.dim2, AVG(t1.m2) FROM "
            + "foo "
            + "INNER JOIN (SELECT * FROM foo LIMIT 10) AS t1 "
            + "ON t1.m1 = foo.m1 "
            + "GROUP BY t1.dim2

Copy link
Contributor Author

@somu-imply somu-imply Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example passes with the new code changes. The problem discovered was :

  1. There was a lack of a proper test case for this to be discovered. I am adding the test suggested here
  2. In the previous case, the identity function in the last else part was incorrect as it would not create the correct segment function for this example. This has been now updated by calling createSegmentMapFunction on the correct data source.
  3. All unit tests for MSQ, CalciteJoins etc. now complete successfully

@@ -195,18 +194,10 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
} else {
return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
}
Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource().createSegmentMapFunction(query, cpuTimeAccumulator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be the analysis.getDataSource() instead of query.getDataSource()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get confused between use of analysis.getDataSource() and query.getDataSource() especially when analysis is created from DataSourceAnalysis.fromDataSource(query.getDataSource()). This is the case here. I have updated it to use query

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is super easy to get confused by that...

@somu-imply
Copy link
Contributor Author

@imply-cheddar , @clintropolis I have addressed the comments and also resolved the conflicts. The thing remaining to address it use of a set in GuiceInjectableValues so the nullable check can be avoided

@somu-imply somu-imply requested review from clintropolis and imply-cheddar and removed request for clintropolis and imply-cheddar October 21, 2022 19:18
@somu-imply
Copy link
Contributor Author

somu-imply commented Oct 21, 2022

I have introduced an atomic reference to a set to not do the check on every run in GuiceInjectableValues. @clintropolis @imply-cheddar @abhishekagarwal87 this is ready for review. Can you please take a look

@somu-imply
Copy link
Contributor Author

somu-imply commented Oct 22, 2022

The conflicts have been resolved. Some additional refactoring introduced due to change in underlying test framework in #12965. CalciteTests do not have the createDefaultJoinableFactory anymore and all references for JoinDataSource now use CalciteTests.createJoinableFactoryWrapper()


public GuiceInjectableValues(Injector injector)
{
this.injector = injector;
this.nullables = new AtomicReference<>();
this.nullables.set(new HashSet<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an AtomicReference constructor that takes an initial value. It's a nit, but you can remove this line by using it.

}
catch (ConfigurationException ce) {
// check if nullable annotation is present for this
if (nullables.get().contains((Key) valueId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of caching these is so that we only have Guice generate the exception once. Putting this check inside of the catch means that we don't get any benefit. This check needs to be done before we even ask the injector to create the instance.

return null;
} else if (forProperty.getAnnotation(Nullable.class) != null) {
HashSet<Key> encounteredNullables = nullables.get();
encounteredNullables.add((Key) valueId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non-thread-safe mutation of the HashSet. The goal of the AtomicReference was to make thinds thread-safe. You must create a brand new Set, add all of the old values, add the new value and then set the new reference on the AtomicReference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@clintropolis clintropolis merged commit affc522 into apache:master Oct 26, 2022
@kfaraz kfaraz added this to the 25.0 milestone Nov 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants