Skip to content

Enable subfield pushdown for map_subset function #25394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 26, 2025

Conversation

feilong-liu
Copy link
Contributor

@feilong-liu feilong-liu commented Jun 20, 2025

Description

MAP_SUBSET function is commonly used in ML workload, which is used to extract a subset of features. For example, map_subset(feature, array[301, 205]) is used to extract the features with key to be 301 and 205.
If this is the only case where feature is accessed, we only need to read two fields of the map. However currently PushDownSubfields does not recognize this pattern. In this PR I enabled subfield pushdown for map_subset when the input array is a constant array.
In one of the motivating queries we found, this enhancement reduce the resource usage by 17% (orig vs. improved)

Without change:

presto:tpch> explain (type distributed) select map_subset(map2, array[1]) from t1;
                                                                                                                                             Query Plan                                                            >
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                                               >
     Output layout: [map_subset]                                                                                                                                                                                   >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Output encoding: COLUMNAR                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Output[PlanNodeId 5][_col0] => [map_subset:map(integer, integer)]                                                                                                                                           >
             Estimates: {source: CostBasedSourceInfo, rows: 2 (102B), cpu: 128.00, memory: 0.00, network: 102.00}                                                                                                  >
             _col0 := map_subset (1:35)                                                                                                                                                                            >
         - RemoteSource[1] => [map_subset:map(integer, integer)]                                                                                                                                                   >
                                                                                                                                                                                                                   >
 Fragment 1 [SOURCE]                                                                                                                                                                                               >
     Output layout: [map_subset]                                                                                                                                                                                   >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Output encoding: COLUMNAR                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - ScanProject[PlanNodeId 0,1][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=t1, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.t1{}]'},>
             Estimates: {source: CostBasedSourceInfo, rows: 2 (102B), cpu: 26.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 2 (102B), cpu: 128.00, memory: 0.00, network: 0.00}             >
             map_subset := map_subset(map2, [Block: position count: 1; size: 60 bytes]) (1:68)                                                                                                                     >
             LAYOUT: tpch.t1{}                                                                                                                                                                                     >
             map2 := map2:map<int,int>:1:REGULAR (1:67)                                                                                                                                                            >
                                                                                                                                                                                                                   >
                                                                                                                                                                                                                   >
(1 row)
(END)

With change:

presto:tpch> explain (type distributed) select map_subset(map2, array[1]) from t1;
                                                                                                                                             Query Plan                                                            >
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                                               >
     Output layout: [map_subset]                                                                                                                                                                                   >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Output encoding: COLUMNAR                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - Output[PlanNodeId 5][_col0] => [map_subset:map(integer, integer)]                                                                                                                                           >
             Estimates: {source: CostBasedSourceInfo, rows: 2 (102B), cpu: 128.00, memory: 0.00, network: 102.00}                                                                                                  >
             _col0 := map_subset (1:35)                                                                                                                                                                            >
         - RemoteSource[1] => [map_subset:map(integer, integer)]                                                                                                                                                   >
                                                                                                                                                                                                                   >
 Fragment 1 [SOURCE]                                                                                                                                                                                               >
     Output layout: [map_subset]                                                                                                                                                                                   >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Output encoding: COLUMNAR                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                 >
     - ScanProject[PlanNodeId 0,1][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=t1, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.t1{}]'},>
             Estimates: {source: CostBasedSourceInfo, rows: 2 (102B), cpu: 26.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 2 (102B), cpu: 128.00, memory: 0.00, network: 0.00}             >
             map_subset := map_subset(map2, [Block: position count: 1; size: 60 bytes]) (1:68)                                                                                                                     >
             LAYOUT: tpch.t1{}                                                                                                                                                                                     >
             map2 := map2:map<int,int>:1:REGULAR:[map2[1]] (1:67)                                                                                                                                                  >
                                                                                                                                                                                                                   >
                                                                                                                                                                                                                   >
(1 row)
(END)

map2 := map2:map<int,int>:1:REGULAR (1:67) vs. map2 := map2:map<int,int>:1:REGULAR:[map2[1]] (1:67)

Motivation and Context

As in description

Impact

In one of the motivating queries we found, this enhancement reduce the resource usage by 17%

Test Plan

unit tests, also end to end test locally

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Improve query resource usage by enabling subfield pushdown for :func:`map_subset` when the input array is a constant array. 

@feilong-liu feilong-liu requested a review from a team as a code owner June 20, 2025 17:40
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Jun 20, 2025
@feilong-liu feilong-liu marked this pull request as draft June 20, 2025 17:40
@feilong-liu feilong-liu force-pushed the map_subset_pushdown branch from f894a58 to 63eace6 Compare June 20, 2025 17:55
@feilong-liu feilong-liu marked this pull request as ready for review June 21, 2025 03:47
@feilong-liu feilong-liu force-pushed the map_subset_pushdown branch from 63eace6 to fb0de3c Compare June 23, 2025 16:32
@feilong-liu feilong-liu requested a review from hantangwangd June 23, 2025 16:35
@@ -704,7 +713,23 @@ public Void visitCall(CallExpression call, Context context)
}
if (!isPushDownSubfieldsFromLambdasEnabled) {
context.setLambdaSubfields(Context.ALL_SUBFIELDS_OF_ARRAY_ELEMENT_OR_MAP_VALUE);
call.getArguments().forEach(argument -> argument.accept(this, context));
// map_subset(feature, constant_array) is only accessing fields specified in feature map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does isPushDownSubfieldsFromLambdasEnabled have to be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need to, it's just the lambda handling part is so complex and I haven't figured out how to make it work for it. Since this isPushDownSubfieldsFromLambdasEnabled is default to false, and what is added here is more like a new feature, so I chose to fix for the case when isPushDownSubfieldsFromLambdasEnabled for now. We can create a new issue for it when this PR is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed @kevintang2022 's suggestion and the fix worked for lambda too (and also added test)

call.getArguments().forEach(argument -> argument.accept(this, context));
// map_subset(feature, constant_array) is only accessing fields specified in feature map.
// For example map_subset(feature, array[1, 2]) is equivalent to calling element_at(feature, 1) and element_at(feature, 2) for subfield extraction
if (isPushdownSubfieldsForMapSubsetEnabled && functionResolution.isMapSubSetFunction(call.getFunctionHandle()) && call.getArguments().get(0) instanceof VariableReferenceExpression && call.getArguments().get(1) instanceof ConstantExpression) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if PushdownSubfields is the right place for this optimization because here only the context is changed and the name of this visitor also is SubfieldExtractor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have been thinking to add a new optimizer which translates all map_subset(feature, array[1, 2, 3]) with constant array elements to something like map(array[1, 2, 3], array[element_at(feature, 1), element_at(feature, 2), element_at(feature, 3)]).
However then I realized that this rewrite will not be useful if there are other reference to this feature map. In order to make it work, we need to check for other non subfield access of feature, which is nontrivial and is basically duplicating most of the logic here.
And also I think it kind of make sense to put it here, as the optimizer is to extract subfield access and push it down, which is what map_subset is doing, i.e. accessing only a part of a map.

@steveburnett
Copy link
Contributor

Thanks for the release note entry! Suggest adding a link to the doc. See Formatting in the Release Notes Guidelines.

Also, can you describe what user-visible effect this might have? Reading the PR description I came up with this. Let me know what you think!

== RELEASE NOTES ==

General Changes
* Improve query resource usage by enabling subfield pushdown for :func:`map_subset` when the input array is a constant array. 

@kevintang2022
Copy link
Contributor

I think there's a place in the same file PushdownSubfields.java that is doing something similar. Is there a way to add this change in the same place? It's in the toSubfield method

                if (expression instanceof CallExpression &&
                        isSubscriptOrElementAtFunction((CallExpression) expression, functionResolution, functionAndTypeManager)) {
                    List<RowExpression> arguments = ((CallExpression) expression).getArguments();
                    RowExpression indexExpression = expressionOptimizer.optimize(

@feilong-liu feilong-liu force-pushed the map_subset_pushdown branch 2 times, most recently from 16f3445 to 750a2e7 Compare June 24, 2025 22:30
@@ -570,17 +574,18 @@ private static String getColumnName(Session session, Metadata metadata, TableHan
return metadata.getColumnMetadata(session, tableHandle, columnHandle).getName();
}

private static Optional<Subfield> toSubfield(
private static Optional<List<Subfield>> toSubfield(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_subset can return a list of subfields

@feilong-liu
Copy link
Contributor Author

feilong-liu commented Jun 24, 2025

I think there's a place in the same file PushdownSubfields.java that is doing something similar. Is there a way to add this change in the same place? It's in the toSubfield method

                if (expression instanceof CallExpression &&
                        isSubscriptOrElementAtFunction((CallExpression) expression, functionResolution, functionAndTypeManager)) {
                    List<RowExpression> arguments = ((CallExpression) expression).getArguments();
                    RowExpression indexExpression = expressionOptimizer.optimize(

@kevintang2022 Thanks for the suggestion, updated the PR to move the change here.

kevintang2022
kevintang2022 previously approved these changes Jun 24, 2025
jaystarshot
jaystarshot previously approved these changes Jun 25, 2025
hantangwangd
hantangwangd previously approved these changes Jun 25, 2025
Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the optimization, LGTM! There is an unused import in TestHiveLogicalPlanner that causes the checkstyle failure.

hantangwangd
hantangwangd previously approved these changes Jun 25, 2025
if (index == null) {
return Optional.empty();
}
if (index instanceof Number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought map_subset specifies map keys and not indexes? why can't they be negative? And why do they need to be numeric?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the case we cannot translate to elementAt function i think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

element_at for maps also takes a key and not an index (see docs here: https://prestodb.io/docs/current/functions/map.html). element_at returns the value for a given key. map_subset returns a subset of the map that includes only the entries with keys in the specified array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I have tried removing the restriction on negative values and adding support for varchar type here, and found that both can be successfully pushed down and got the expected results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought map_subset specifies map keys and not indexes?

Rename to mapKey

And why do they need to be numeric?

Add case for varchar keys

why can't they be negative?

You are right, the associated issue is for array, not for map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, since subfield pushdown of negative keys work for map, I drafted another PR #25445 to fix the current implementation for element_at and subscript functions.

@@ -1461,6 +1462,31 @@ public void testPushdownSubfields()
assertUpdate("DROP TABLE test_pushdown_struct_subfields");
}

@Test
public void testPushdownSubfieldsForMapSubset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test where the specified keys to subset are not also the index of that key? Also for non-numeric map keys.

Copy link
Contributor Author

@feilong-liu feilong-liu Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a test where the specified keys to subset are not also the index of that key

What do you mean by not also the index of that key? Do you mean the type of index in array is different from map key?
If map key is of integer type, and element in array is of bigint type, the map will be wrapped with a cast and thie pushdown field will not work (and this needs to be resolved with this PR #25395)
If element type of array is not compatible with map key type, for example integer vs. varchar, query will fail during during parsing

Also for non-numeric map keys.

Added tests for varchar keys too

@feilong-liu feilong-liu merged commit a05bc68 into prestodb:master Jun 26, 2025
106 checks passed
@feilong-liu feilong-liu deleted the map_subset_pushdown branch June 26, 2025 16:12
@prestodb-ci prestodb-ci mentioned this pull request Jul 28, 2025
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
from:Meta PR from Meta
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants