Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,16 @@ public class CalcitePlanContext {

@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;

/**
* List of captured variables from outer scope for lambda functions. When a lambda body references
* a field that is not a lambda parameter, it gets captured and stored here. The captured
* variables are passed as additional arguments to the transform function.
*/
@Getter private List<RexNode> capturedVariables;

/** Whether we're currently inside a lambda context. */
@Getter @Setter private boolean inLambdaContext = false;

private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
this.config = config;
this.sysLimit = sysLimit;
Expand All @@ -70,6 +81,24 @@ private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
this.functionProperties = new FunctionProperties(QueryType.PPL);
this.rexLambdaRefMap = new HashMap<>();
this.capturedVariables = new ArrayList<>();
}

/**
* Private constructor for creating a context that shares relBuilder with parent. Used by clone()
* to create lambda contexts that can resolve fields from the parent context.
*/
private CalcitePlanContext(CalcitePlanContext parent) {
this.config = parent.config;
this.sysLimit = parent.sysLimit;
this.queryType = parent.queryType;
this.connection = parent.connection;
this.relBuilder = parent.relBuilder; // Share the same relBuilder
this.rexBuilder = parent.rexBuilder; // Share the same rexBuilder
this.functionProperties = parent.functionProperties;
this.rexLambdaRefMap = new HashMap<>(); // New map for lambda variables
this.capturedVariables = new ArrayList<>(); // New list for captured variables
this.inLambdaContext = true; // Mark that we're inside a lambda
}

public RexNode resolveJoinCondition(
Expand Down Expand Up @@ -101,8 +130,13 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
}
}

/**
* Creates a clone of this context that shares the relBuilder with the parent. This allows lambda
* expressions to reference fields from the current row while having their own lambda variable
* mappings.
*/
public CalcitePlanContext clone() {
return new CalcitePlanContext(config, sysLimit, queryType);
return new CalcitePlanContext(this);
}

public static CalcitePlanContext create(
Expand Down Expand Up @@ -134,4 +168,42 @@ public static boolean isLegacyPreferred() {
public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
this.rexLambdaRefMap.putAll(candidateMap);
}

/**
* Captures an external variable for use inside a lambda. Returns a RexLambdaRef that references
* the captured variable by its index in the captured variables list. The actual RexNode value is
* stored in capturedVariables and will be passed as additional arguments to the transform
* function.
*
* @param fieldRef The RexInputRef representing the external field
* @param fieldName The name of the field being captured
* @return A RexLambdaRef that can be used inside the lambda to reference the captured value
*/
public RexLambdaRef captureVariable(RexNode fieldRef, String fieldName) {
// Check if this variable is already captured
for (int i = 0; i < capturedVariables.size(); i++) {
if (capturedVariables.get(i).equals(fieldRef)) {
// Return existing reference - offset by number of lambda params (1 for array element)
return rexLambdaRefMap.get("__captured_" + i);
}
}

// Add to captured variables list
int captureIndex = capturedVariables.size();
capturedVariables.add(fieldRef);

// Create a lambda ref for this captured variable
// The index is offset by the number of lambda parameters (1 for single-param lambda)
// Count only actual lambda parameters, not captured variables
int lambdaParamCount =
(int)
rexLambdaRefMap.keySet().stream().filter(key -> !key.startsWith("__captured_")).count();
RexLambdaRef lambdaRef =
new RexLambdaRef(lambdaParamCount + captureIndex, fieldName, fieldRef.getType());

// Store it so we can find it again if the same field is referenced multiple times
rexLambdaRefMap.put("__captured_" + captureIndex, lambdaRef);

return lambdaRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,20 @@ public RexNode visitLambdaFunction(LambdaFunction node, CalcitePlanContext conte
TYPE_FACTORY.createSqlType(SqlTypeName.ANY))))
.collect(Collectors.toList());
RexNode body = node.getFunction().accept(this, context);

// Add captured variables as additional lambda parameters
// They are stored with keys like "__captured_0", "__captured_1", etc.
List<RexNode> capturedVars = context.getCapturedVariables();
if (capturedVars != null && !capturedVars.isEmpty()) {
args = new ArrayList<>(args);
for (int i = 0; i < capturedVars.size(); i++) {
RexLambdaRef capturedRef = context.getRexLambdaRefMap().get("__captured_" + i);
if (capturedRef != null) {
args.add(capturedRef);
}
}
}

RexNode lambdaNode = context.rexBuilder.makeLambdaCall(body, args);
return lambdaNode;
} catch (Exception e) {
Expand Down Expand Up @@ -391,6 +405,7 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
context.setInCoalesceFunction(true);
}

List<RexNode> capturedVars = null;
try {
for (UnresolvedExpression arg : args) {
if (arg instanceof LambdaFunction) {
Expand All @@ -409,6 +424,8 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
lambdaNode = analyze(arg, lambdaContext);
}
arguments.add(lambdaNode);
// Capture any external variables that were referenced in the lambda
capturedVars = lambdaContext.getCapturedVariables();
} else {
arguments.add(analyze(arg, context));
}
Expand All @@ -419,6 +436,15 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
}
}

// For transform/mvmap functions with captured variables, add them as additional arguments
if (capturedVars != null && !capturedVars.isEmpty()) {
if (node.getFuncName().equalsIgnoreCase("mvmap")
|| node.getFuncName().equalsIgnoreCase("transform")) {
arguments = new ArrayList<>(arguments);
arguments.addAll(capturedVars);
}
}

if ("LIKE".equalsIgnoreCase(node.getFuncName()) && arguments.size() == 2) {
RexNode defaultCaseSensitive =
CalcitePlanContext.isLegacyPreferred()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,30 @@ private static RexNode resolveInNonJoinCondition(
QualifiedName nameNode, CalcitePlanContext context) {
log.debug("resolveInNonJoinCondition() called with nameNode={}", nameNode);

return resolveLambdaVariable(nameNode, context)
.or(() -> resolveFieldDirectly(nameNode, context, 1))
.or(() -> resolveFieldWithAlias(nameNode, context, 1))
.or(() -> resolveFieldWithoutAlias(nameNode, context, 1))
.or(() -> resolveRenamedField(nameNode, context))
.or(() -> resolveCorrelationField(nameNode, context))
// First try to resolve as lambda variable
Optional<RexNode> lambdaVar = resolveLambdaVariable(nameNode, context);
if (lambdaVar.isPresent()) {
return lambdaVar.get();
}

// Try to resolve as regular field
Optional<RexNode> fieldRef =
resolveFieldDirectly(nameNode, context, 1)
.or(() -> resolveFieldWithAlias(nameNode, context, 1))
.or(() -> resolveFieldWithoutAlias(nameNode, context, 1))
.or(() -> resolveRenamedField(nameNode, context));

if (fieldRef.isPresent()) {
// If we're in a lambda context and this is not a lambda variable,
// we need to capture it as an external variable
if (context.isInLambdaContext()) {
log.debug("Capturing external field {} in lambda context", nameNode);
return context.captureVariable(fieldRef.get(), nameNode.toString());
}
return fieldRef.get();
}

return resolveCorrelationField(nameNode, context)
.or(() -> replaceWithNullLiteralInCoalesce(context))
.orElseThrow(() -> getNotFoundException(nameNode));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public enum BuiltinFunctionName {
MVZIP(FunctionName.of("mvzip")),
SPLIT(FunctionName.of("split")),
MVDEDUP(FunctionName.of("mvdedup")),
MVMAP(FunctionName.of("mvmap")),
FORALL(FunctionName.of("forall")),
EXISTS(FunctionName.of("exists")),
FILTER(FunctionName.of("filter")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public static Object eval(Object... args) {
List<Object> target = (List<Object>) args[0];
List<Object> results = new ArrayList<>();
SqlTypeName returnType = (SqlTypeName) args[args.length - 1];

// Check for captured variables: args structure is [array, lambda, captured1, captured2, ...,
// returnType]
// If there are more than 3 args (array, lambda, returnType), we have captured variables
boolean hasCapturedVars = args.length > 3;

if (args[1] instanceof org.apache.calcite.linq4j.function.Function1) {
org.apache.calcite.linq4j.function.Function1 lambdaFunction =
(org.apache.calcite.linq4j.function.Function1) args[1];
Expand All @@ -90,9 +96,27 @@ public static Object eval(Object... args) {
org.apache.calcite.linq4j.function.Function2 lambdaFunction =
(org.apache.calcite.linq4j.function.Function2) args[1];
try {
for (int i = 0; i < target.size(); i++) {
results.add(
transferLambdaOutputToTargetType(lambdaFunction.apply(target.get(i), i), returnType));
if (hasCapturedVars) {
// Lambda has captured variables - pass the first captured variable as second arg
// LIMITATION: Currently only the first captured variable (args[2]) is supported.
// Supporting multiple captured variables would require either:
// 1. Packing args[2..args.length-1] into an Object[] and modifying lambda generation
// to accept a container as the second parameter, or
// 2. Using higher-arity function interfaces (Function3, Function4, etc.)
// For now, lambdas that capture multiple external variables may not work correctly.
Object capturedVar = args[2];
for (Object candidate : target) {
results.add(
transferLambdaOutputToTargetType(
lambdaFunction.apply(candidate, capturedVar), returnType));
}
} else {
// Original behavior: lambda with index (x, i) -> expr
for (int i = 0; i < target.size(); i++) {
results.add(
transferLambdaOutputToTargetType(
lambdaFunction.apply(target.get(i), i), returnType));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVDEDUP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVMAP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVZIP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
Expand Down Expand Up @@ -1044,6 +1045,7 @@ void populate() {
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
registerOperator(MVDEDUP, SqlLibraryOperators.ARRAY_DISTINCT);
registerOperator(MVZIP, PPLBuiltinOperators.MVZIP);
registerOperator(MVMAP, PPLBuiltinOperators.TRANSFORM);
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);
Expand Down
68 changes: 68 additions & 0 deletions docs/user/ppl/functions/collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,74 @@ fetched rows / total rows = 1/1
+--------------------------+
```

## MVMAP

### Description

Usage: mvmap(array, expression) iterates over each element of a multivalue array, applies the expression to each element, and returns a multivalue array with the transformed results. The field name in the expression is implicitly bound to each element value.
Argument type: array: ARRAY, expression: EXPRESSION
Return type: ARRAY
Example

```ppl
source=people
| eval array = array(1, 2, 3), result = mvmap(array, array * 10)
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+------------+
| result |
|------------|
| [10,20,30] |
+------------+
```

```ppl
source=people
| eval array = array(1, 2, 3), result = mvmap(array, array + 5)
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+---------+
| result |
|---------|
| [6,7,8] |
+---------+
```

Note: For nested expressions like ``mvmap(mvindex(arr, 1, 3), arr * 2)``, the field name (``arr``) is extracted from the first argument and must match the field referenced in the expression.

The expression can also reference other single-value fields:

```ppl
source=people
| eval array = array(1, 2, 3), multiplier = 10, result = mvmap(array, array * multiplier)
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+------------+
| result |
|------------|
| [10,20,30] |
+------------+
```


## MVZIP

### Description
Expand Down
Loading
Loading