Skip to content

Commit

Permalink
Merge pull request #35 from apache/master
Browse files Browse the repository at this point in the history
merge master
  • Loading branch information
deepthi912 authored Oct 18, 2024
2 parents f4e4904 + 76b219b commit 5fb8a8b
Show file tree
Hide file tree
Showing 99 changed files with 3,101 additions and 1,207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions);
queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs;
database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
QueryEnvironment queryEnvironment = new QueryEnvironment(database, _tableCache, _workerManager);
boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT,
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
//@formatter:off
QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
.defaultInferPartitionHint(inferPartitionHint)
.build());
//@formatter:on
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
boolean askServers = QueryOptionsUtils.isExplainAskingServers(queryOptions)
Expand Down
9 changes: 9 additions & 0 deletions pinot-clients/pinot-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
</profiles>
</project>
6 changes: 6 additions & 0 deletions pinot-clients/pinot-jdbc-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,11 @@
<shade.phase.prop>package</shade.phase.prop>
</properties>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
</profiles>
</project>
6 changes: 6 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@
<shade.phase.prop>package</shade.phase.prop>
</properties>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
<profile>
<!-- The fmpp-maven-plugin doesn't care about unchanged (re)sources and will always generate.
This causes the maven-compiler-plugin to detect changes and always recompile the Java sources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ public static boolean isServerReturnFinalResultKeyUnpartitioned(Map<String, Stri
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT_KEY_UNPARTITIONED));
}

public static boolean isFilteredAggregationsSkipEmptyGroups(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
}

@Nullable
public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private static class Entry {
final Function<IdealState, IdealState> _updater;
IdealState _updatedIdealState = null;
AtomicBoolean _sent = new AtomicBoolean(false);
Throwable _exception;

Entry(String resourceName, Function<IdealState, IdealState> updater) {
_resourceName = resourceName;
Expand Down Expand Up @@ -106,8 +107,8 @@ public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNum
* @param updater the idealState updater to be applied
* @return IdealState if the update is successful, null if not
*/
public IdealState commit(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
public IdealState commit(HelixManager helixManager, String resourceName, Function<IdealState, IdealState> updater,
RetryPolicy retryPolicy, boolean noChangeOk) {
Queue queue = getQueue(resourceName);
Entry entry = new Entry(resourceName, updater);

Expand All @@ -120,39 +121,41 @@ public IdealState commit(HelixManager helixManager, String resourceName,
// All pending entries have been processed, the updatedIdealState should be set.
return entry._updatedIdealState;
}
// remove from queue
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);

/**
* If the local cache does not contain a value, need to check if there is a
* value in ZK; use it as initial value if exists
*/
IdealState updatedIdealState = first._updater.apply(idealStateCopy);
first._updatedIdealState = updatedIdealState;
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._resourceName.equals(mergedResourceName)) {
continue;
IdealState response = updateIdealState(helixManager, resourceName, idealState -> {
IdealState updatedIdealState = idealState;
if (!processed.isEmpty()) {
queue._pending.addAll(processed);
processed.clear();
}
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._resourceName.equals(resourceName)) {
continue;
}
processed.add(ent);
it.remove();
updatedIdealState = ent._updater.apply(updatedIdealState);
ent._updatedIdealState = updatedIdealState;
ent._exception = null;
}
processed.add(ent);
updatedIdealState = ent._updater.apply(idealStateCopy);
ent._updatedIdealState = updatedIdealState;
it.remove();
return updatedIdealState;
}, retryPolicy, noChangeOk);
if (response == null) {
RuntimeException ex = new RuntimeException("Failed to update IdealState");
for (Entry ent : processed) {
ent._exception = ex;
ent._updatedIdealState = null;
}
throw ex;
}
} catch (Throwable e) {
// If there is an exception, set the exception for all processed entries
for (Entry ent : processed) {
ent._exception = e;
ent._updatedIdealState = null;
}
IdealState finalUpdatedIdealState = updatedIdealState;
updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState,
retryPolicy, noChangeOk);
throw e;
} finally {
queue._running.set(null);
for (Entry e : processed) {
Expand All @@ -176,6 +179,10 @@ public IdealState commit(HelixManager helixManager, String resourceName,
}
}
}
if (entry._exception != null) {
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName,
entry._exception);
}
return entry._updatedIdealState;
}

Expand Down Expand Up @@ -298,7 +305,7 @@ private boolean shouldCompress(IdealState is) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
} catch (Throwable e) {
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
Expand Down
62 changes: 39 additions & 23 deletions pinot-connectors/pinot-spark-2-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,45 @@
</properties>

<profiles>
<profile>
<id>build-shaded-jar</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
<profile>
<id>scala-2.12</id>
<activation>
Expand Down Expand Up @@ -91,29 +130,6 @@
<build>
<plugins>
<!-- scala build -->
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Expand Down
60 changes: 37 additions & 23 deletions pinot-connectors/pinot-spark-3-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,43 @@
</properties>

<profiles>
<profile>
<id>build-shaded-jar</id>
<build>
<plugins>

<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
<profile>
<id>scala-2.12</id>
<activation>
Expand Down Expand Up @@ -62,29 +99,6 @@
<build>
<plugins>
<!-- scala build -->
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Expand Down
Loading

0 comments on commit 5fb8a8b

Please sign in to comment.