diff --git a/common/src/main/java/org/opensearch/sql/common/utils/DebugUtils.java b/common/src/main/java/org/opensearch/sql/common/utils/DebugUtils.java index 9082839360d..cbb5c7f16d1 100644 --- a/common/src/main/java/org/opensearch/sql/common/utils/DebugUtils.java +++ b/common/src/main/java/org/opensearch/sql/common/utils/DebugUtils.java @@ -8,8 +8,6 @@ import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * Utility class for debugging operations. This class is only for debugging purpose, and not @@ -18,7 +16,6 @@ public class DebugUtils { // Update this to true while you are debugging. (Safe guard to avoid usage in production code. ) private static final boolean IS_DEBUG = false; - private static final Logger logger = LogManager.getLogger(DebugUtils.class); public static T debug(T obj, String message) { verifyDebug(); @@ -39,7 +36,7 @@ private static void verifyDebug() { } private static void print(String format, Object... args) { - logger.info(String.format(format, args)); + System.out.println(String.format(format, args)); } private static String getCalledFrom(int pos) { diff --git a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionResult.java b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionResult.java index 3c433dbb4a0..76d6b038f60 100644 --- a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionResult.java +++ b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionResult.java @@ -28,17 +28,17 @@ public class FieldResolutionResult { @NonNull private final Set regularFields; @NonNull private final Wildcard wildcard; - public FieldResolutionResult(Set regularFields) { + public FieldResolutionResult(Collection regularFields) { this.regularFields = new HashSet<>(regularFields); this.wildcard = NULL_WILDCARD; } - public FieldResolutionResult(Set regularFields, Wildcard wildcard) { + public FieldResolutionResult(Collection regularFields, Wildcard wildcard) { this.regularFields = new HashSet<>(regularFields); this.wildcard = wildcard; } - public FieldResolutionResult(Set regularFields, String wildcard) { + public FieldResolutionResult(Collection regularFields, String wildcard) { this.regularFields = new HashSet<>(regularFields); this.wildcard = getWildcard(wildcard); } @@ -53,12 +53,12 @@ private static Wildcard getWildcard(String wildcard) { } } - public FieldResolutionResult(Set regularFields, Set wildcards) { + public FieldResolutionResult(Collection regularFields, Collection wildcards) { this.regularFields = new HashSet<>(regularFields); this.wildcard = createOrWildcard(wildcards); } - private static Wildcard createOrWildcard(Set patterns) { + private static Wildcard createOrWildcard(Collection patterns) { if (patterns == null || patterns.isEmpty()) { return NULL_WILDCARD; } @@ -70,38 +70,50 @@ private static Wildcard createOrWildcard(Set patterns) { return new OrWildcard(wildcards); } + /** Returns unmodifiable view of regular fields. */ public Set getRegularFieldsUnmodifiable() { return Collections.unmodifiableSet(regularFields); } + /** Checks if result contains any wildcard patterns. */ public boolean hasWildcards() { return wildcard != NULL_WILDCARD; } + /** Checks if result contains partial wildcard patterns (not '*'). */ + public boolean hasPartialWildcards() { + return wildcard != NULL_WILDCARD && wildcard != ANY_WILDCARD; + } + + /** Checks if result contains regular fields. */ public boolean hasRegularFields() { return !regularFields.isEmpty(); } + /** Creates new result excluding specified fields. */ public FieldResolutionResult exclude(Collection fields) { Set combinedFields = new HashSet<>(this.regularFields); combinedFields.removeAll(fields); return new FieldResolutionResult(combinedFields, this.wildcard); } - public FieldResolutionResult or(Set fields) { + /** Creates new result combining this result with additional fields (union). */ + public FieldResolutionResult or(Collection fields) { Set combinedFields = new HashSet<>(this.regularFields); combinedFields.addAll(fields); return new FieldResolutionResult(combinedFields, this.wildcard); } - private Set and(Set fields) { + private Set and(Collection fields) { return fields.stream() .filter(field -> this.getRegularFields().contains(field) || this.wildcard.matches(field)) .collect(Collectors.toSet()); } + /** Creates new result intersecting this result with another (intersection). */ public FieldResolutionResult and(FieldResolutionResult other) { - Set combinedFields = this.and(other.regularFields); + Set combinedFields = new HashSet<>(); + combinedFields.addAll(this.and(other.regularFields)); combinedFields.addAll(other.and(this.regularFields)); Wildcard combinedWildcard = this.wildcard.and(other.wildcard); @@ -111,6 +123,7 @@ public FieldResolutionResult and(FieldResolutionResult other) { /** Interface for wildcard pattern matching. */ public interface Wildcard { + /** Checks if field name matches wildcard pattern. */ boolean matches(String fieldName); default Wildcard and(Wildcard other) { diff --git a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java index 238fb8f6dc1..68396e9e770 100644 --- a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java @@ -75,6 +75,8 @@ */ public class FieldResolutionVisitor extends AbstractNodeVisitor { + private static final String ALL_FIELDS = "*"; + /** * Analyzes PPL query plan to determine required fields at each node. * @@ -110,10 +112,10 @@ private void acceptAndVerifyNodeVisited(Node node, FieldResolutionContext contex @Override public Node visitProject(Project node, FieldResolutionContext context) { - boolean isSelectAll = - node.getProjectList().stream().anyMatch(expr -> expr instanceof AllFields); + boolean isSingleSelectAll = + node.getProjectList().size() == 1 && node.getProjectList().get(0) instanceof AllFields; - if (isSelectAll) { + if (isSingleSelectAll) { visitChildren(node, context); } else { Set projectFields = new HashSet<>(); @@ -179,15 +181,14 @@ public Node visitSpath(SPath node, FieldResolutionContext context) { return visitEval(node.rewriteAsEval(), context); } else { // set requirements for spath command; - context.setResult(node, context.getCurrentRequirements()); FieldResolutionResult requirements = context.getCurrentRequirements(); - if (requirements.hasWildcards()) { + context.setResult(node, requirements); + if (requirements.hasPartialWildcards()) { throw new IllegalArgumentException( - "Spath command cannot extract arbitrary fields. Please project fields explicitly by" - + " fields command without wildcard or stats command."); + "Spath command cannot be used with partial wildcard such as `prefix*`."); } - context.pushRequirements(context.getCurrentRequirements().or(Set.of(node.getInField()))); + context.pushRequirements(requirements.or(Set.of(node.getInField()))); visitChildren(node, context); context.popRequirements(); return node; @@ -238,6 +239,8 @@ private Set extractFieldsFromExpression(UnresolvedExpression expr) { if (expr instanceof Field) { Field field = (Field) expr; fields.add(field.getField().toString()); + } else if (expr instanceof AllFields) { + fields.add(ALL_FIELDS); } else if (expr instanceof QualifiedName) { QualifiedName name = (QualifiedName) expr; fields.add(name.toString()); @@ -496,43 +499,56 @@ public Node visitStreamWindow(StreamWindow node, FieldResolutionContext context) @Override public Node visitFillNull(FillNull node, FieldResolutionContext context) { + if (node.isAgainstAllFields()) { + throw new IllegalArgumentException("Fields need to be specified with fillnull command"); + } + Set fields = new HashSet<>(); + node.getFields().forEach(field -> fields.addAll(extractFieldsFromExpression(field))); + + context.pushRequirements(context.getCurrentRequirements().or(fields)); visitChildren(node, context); + context.popRequirements(); return node; } @Override public Node visitAppendCol(AppendCol node, FieldResolutionContext context) { - visitChildren(node, context); - return node; + throw new IllegalArgumentException( + "AppendCol command cannot be used together with spath command"); } @Override public Node visitAppend(Append node, FieldResolutionContext context) { + // dispatch requirements to subsearch and main + acceptAndVerifyNodeVisited(node.getSubSearch(), context); visitChildren(node, context); return node; } @Override public Node visitMultisearch(Multisearch node, FieldResolutionContext context) { - visitChildren(node, context); - return node; + throw new IllegalArgumentException( + "Multisearch command cannot be used together with spath command"); } @Override public Node visitLookup(Lookup node, FieldResolutionContext context) { - visitChildren(node, context); - return node; + throw new IllegalArgumentException("Lookup command cannot be used together with spath command"); } @Override public Node visitValues(Values node, FieldResolutionContext context) { - visitChildren(node, context); - return node; + throw new IllegalArgumentException("Values command cannot be used together with spath command"); } @Override public Node visitReplace(Replace node, FieldResolutionContext context) { + Set fields = new HashSet<>(); + node.getFieldList().forEach(field -> fields.addAll(extractFieldsFromExpression(field))); + + context.pushRequirements(context.getCurrentRequirements().or(fields)); visitChildren(node, context); + context.popRequirements(); return node; } @@ -627,6 +643,10 @@ private Set extractFieldsFromAggregation(UnresolvedExpression expr) { } } } - return fields; + return excludeAllFieldsWildcard(fields); + } + + private Set excludeAllFieldsWildcard(Set fields) { + return fields.stream().filter(f -> !f.equals(ALL_FIELDS)).collect(Collectors.toSet()); } } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/FillNull.java b/core/src/main/java/org/opensearch/sql/ast/tree/FillNull.java index 4c5ab21c2bc..c81b329bde1 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/FillNull.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/FillNull.java @@ -68,6 +68,10 @@ public List getFields() { return getReplacementPairs().stream().map(Pair::getLeft).collect(Collectors.toList()); } + public boolean isAgainstAllFields() { + return !replacementForAll.isEmpty() && getReplacementPairs().isEmpty(); + } + @Override public FillNull attach(UnresolvedPlan child) { this.child = child; diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Join.java b/core/src/main/java/org/opensearch/sql/ast/tree/Join.java index e55cc67e1c5..dc27d601bab 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Join.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Join.java @@ -17,6 +17,7 @@ import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; import org.opensearch.sql.ast.expression.UnresolvedExpression; @ToString @@ -90,6 +91,14 @@ public T accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitJoin(this, context); } + /** + * @return `overwrite` option value in argumentMap + */ + public boolean isOverwrite() { + return getArgumentMap().get("overwrite") == null // 'overwrite' default value is true + || getArgumentMap().get("overwrite").equals(Literal.TRUE); + } + public enum JoinType { INNER, LEFT, diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index d23d29d6db6..7774c27b096 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -14,6 +14,7 @@ import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_DESC; import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC; import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC; +import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupNotNull; import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupOrNull; import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN; @@ -151,6 +152,7 @@ import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.utils.BinUtils; import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; +import org.opensearch.sql.calcite.utils.JoinAndLookupUtils.OverwriteMode; import org.opensearch.sql.calcite.utils.PPLHintUtils; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; @@ -374,6 +376,10 @@ public RelNode visitProject(Project node, CalcitePlanContext context) { return handleAllFieldsProject(node, context); } + if (DynamicFieldsHelper.isDynamicFieldsExists(context)) { + rejectWildcardNotAtTheEnd(node); + } + List currentFields = context.relBuilder.peek().getRowType().getFieldNames(); List expandedFields = expandProjectFields(node.getProjectList(), currentFields, context); @@ -390,6 +396,20 @@ public RelNode visitProject(Project node, CalcitePlanContext context) { return context.relBuilder.peek(); } + /** + * Reject if wildcard(`*`) is specified other than at the end, this is a limitation until field + * ordering for dynamic field is implemented. + */ + private void rejectWildcardNotAtTheEnd(Project node) { + List list = node.getProjectList(); + for (int i = 0; i < list.size() - 1; i++) { + if (list.get(i) instanceof AllFields) { + throw new IllegalArgumentException( + "Wildcard can be placed only at the end of the fields list (limit of spath command)."); + } + } + } + private boolean isSingleAllFieldsProject(Project node) { return node.getProjectList().size() == 1 && node.getProjectList().get(0) instanceof AllFields; @@ -423,7 +443,7 @@ private List expandProjectFields( if (WildcardUtils.containsWildcard(fieldName)) { List matchingFields = WildcardUtils.expandWildcardPattern(fieldName, currentFields).stream() - .filter(f -> !isMetadataField(f)) + .filter(f -> !isMetadataField(f) && !f.equals(DYNAMIC_FIELDS_MAP)) .filter(addedFields::add) .collect(Collectors.toList()); if (matchingFields.isEmpty()) { @@ -478,7 +498,7 @@ private void validateWildcardPatterns( } } - private boolean isMetadataField(String fieldName) { + private static boolean isMetadataField(String fieldName) { return OpenSearchConstants.METADATAFIELD_TYPE_MAP.containsKey(fieldName); } @@ -712,54 +732,31 @@ private RelNode spathExtractAll(SPath node, CalcitePlanContext context) { visitChildren(node, context); FieldResolutionResult resolutionResult = context.resolveFields(node); - if (resolutionResult.hasWildcards()) { - // Logic for handling wildcards (dynamic fields) will be implemented later. - throw new IllegalArgumentException( - "spath command failed to identify fields to extract. Use fields/stats command to specify" - + " output fields."); - } // 1. Extract all fields from JSON in `inField` RexNode inField = rexVisitor.analyze(AstDSL.field(node.getInField()), context); - RexNode map = makeCall(context, BuiltinFunctionName.JSON_EXTRACT_ALL, inField); + RexNode map = context.rexBuilder.makeCall(BuiltinFunctionName.JSON_EXTRACT_ALL, inField); // 2. Project items from FieldResolutionResult - Set existingFields = - new HashSet<>(context.relBuilder.peek().getRowType().getFieldNames()); - List fieldNames = + Set existingFields = new HashSet<>(DynamicFieldsHelper.getStaticFields(context)); + List sortedRegularFieldNames = resolutionResult.getRegularFields().stream().sorted().collect(Collectors.toList()); - List fields = new ArrayList<>(); - for (String fieldName : fieldNames) { - RexNode item = itemCall(map, fieldName, context); - // Cast to string for type consistency. (This cast will be removed once functions are adopted - // to ANY type) - item = context.relBuilder.cast(item, SqlTypeName.VARCHAR); - // Append if field already exist - if (existingFields.contains(fieldName)) { - item = - makeCall( - context, - BuiltinFunctionName.INTERNAL_APPEND, - context.relBuilder.field(fieldName), - item); - } - fields.add(context.relBuilder.alias(item, fieldName)); + List fields = + DynamicFieldsHelper.buildRegularFieldProjections( + map, sortedRegularFieldNames, existingFields, context); + + // 3. Add _MAP field for dynamic fields when wildcards present + if (resolutionResult.hasWildcards()) { + RexNode dynamicMapField = + DynamicFieldsHelper.buildDynamicMapFieldProjection( + map, sortedRegularFieldNames, existingFields, context); + fields.add(context.relBuilder.alias(dynamicMapField, DYNAMIC_FIELDS_MAP)); } context.relBuilder.project(fields); return context.relBuilder.peek(); } - private RexNode itemCall(RexNode node, String key, CalcitePlanContext context) { - return makeCall( - context, BuiltinFunctionName.INTERNAL_ITEM, node, context.rexBuilder.makeLiteral(key)); - } - - private RexNode makeCall( - CalcitePlanContext context, BuiltinFunctionName functionName, RexNode... args) { - return PPLFuncImpTable.INSTANCE.resolve(context.rexBuilder, functionName, args); - } - @Override public RelNode visitPatterns(Patterns node, CalcitePlanContext context) { visitChildren(node, context); @@ -1319,12 +1316,19 @@ private Optional extractAliasLiteral(RexNode node) { public RelNode visitJoin(Join node, CalcitePlanContext context) { List children = node.getChildren(); children.forEach(c -> analyze(c, context)); + DynamicFieldsHelper.adjustJoinInputsForDynamicFields( + node.getLeftAlias(), node.getRightAlias(), context); if (node.getJoinCondition().isEmpty()) { // join-with-field-list grammar - List leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames(); - List rightColumns = context.relBuilder.peek().getRowType().getFieldNames(); + List leftColumns = DynamicFieldsHelper.getLeftStaticFields(context); + List rightColumns = DynamicFieldsHelper.getRightStaticFields(context); List duplicatedFieldNames = - leftColumns.stream().filter(rightColumns::contains).collect(Collectors.toList()); + leftColumns.stream() + .filter( + column -> + !DynamicFieldsHelper.isDynamicFieldMap(column) + && rightColumns.contains(column)) + .collect(Collectors.toList()); RexNode joinCondition; if (node.getJoinFields().isPresent()) { joinCondition = @@ -1346,8 +1350,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { return context.relBuilder.peek(); } List toBeRemovedFields; - if (node.getArgumentMap().get("overwrite") == null // 'overwrite' default value is true - || (node.getArgumentMap().get("overwrite").equals(Literal.TRUE))) { + if (node.isOverwrite()) { toBeRemovedFields = duplicatedFieldNames.stream() .map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context)) @@ -1382,6 +1385,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { if (!toBeRemovedFields.isEmpty()) { context.relBuilder.projectExcept(toBeRemovedFields); } + JoinAndLookupUtils.mergeDynamicFieldsAsNeeded( + context, node.isOverwrite() ? OverwriteMode.RIGHT_WINS : OverwriteMode.LEFT_WINS); } else { // The join-with-criteria grammar doesn't allow empty join condition RexNode joinCondition = @@ -1398,8 +1403,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { // column name with numeric suffix, e.g. ON t1.id = t2.id, the output contains `id` and `id0` // when a new project add to stack. To avoid `id0`, we will rename the `id0` to `alias.id` // or `tableIdentifier.id`: - List leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames(); - List rightColumns = context.relBuilder.peek().getRowType().getFieldNames(); + List leftColumns = DynamicFieldsHelper.getLeftStaticFields(context); + List rightColumns = DynamicFieldsHelper.getRightStaticFields(context); List rightTableName = PlanUtils.findTable(context.relBuilder.peek()).getQualifiedName(); // Using `table.column` instead of `catalog.database.table.column` as column prefix because @@ -1414,7 +1419,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { rightColumns.stream() .map( col -> - leftColumns.contains(col) + !DynamicFieldsHelper.isDynamicFieldMap(col) && leftColumns.contains(col) ? node.getRightAlias() .map(a -> a + "." + col) .orElse(rightTableQualifiedName + "." + col) @@ -1437,6 +1442,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) { addSysLimitForJoinSubsearch(context); context.relBuilder.join( JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition); + JoinAndLookupUtils.mergeDynamicFieldsAsNeeded(context, OverwriteMode.LEFT_WINS); JoinAndLookupUtils.renameToExpectedFields( rightColumnsWithAliasIfConflict, leftColumns.size(), context); } @@ -2213,6 +2219,10 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) { private RelNode mergeTableAndResolveColumnConflict( RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) { + mainNode = DynamicFieldsHelper.adjustFieldsForDynamicFields(mainNode, subqueryNode, context); + subqueryNode = + DynamicFieldsHelper.adjustFieldsForDynamicFields(subqueryNode, mainNode, context); + // Use shared schema merging logic that handles type conflicts via field renaming List nodesToMerge = Arrays.asList(mainNode, subqueryNode); List projectedNodes = diff --git a/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java new file mode 100644 index 00000000000..a8073772a95 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java @@ -0,0 +1,291 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite; + +import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.opensearch.sql.calcite.plan.OpenSearchConstants; +import org.opensearch.sql.expression.function.BuiltinFunctionName; + +/** Helper class for dynamic fields operations in Calcite plan building. */ +class DynamicFieldsHelper { + + /** Check if a field name is the dynamic fields map constant. */ + static boolean isDynamicFieldMap(String field) { + return DYNAMIC_FIELDS_MAP.equals(field); + } + + /** Check if dynamic fields map exists in current relation. */ + static boolean isDynamicFieldsExists(CalcitePlanContext context) { + return context.relBuilder.peek().getRowType().getFieldNames().contains(DYNAMIC_FIELDS_MAP); + } + + /** Check if a RelNode has dynamic fields. */ + private static boolean hasDynamicFields(RelNode node) { + return node.getRowType().getFieldNames().contains(DYNAMIC_FIELDS_MAP); + } + + /** Check if a field name is a metadata field. */ + private static boolean isMetadataField(String fieldName) { + return OpenSearchConstants.METADATAFIELD_TYPE_MAP.containsKey(fieldName); + } + + /** Get static fields from the left side of a join (excludes dynamic fields map). */ + static List getLeftStaticFields(CalcitePlanContext context) { + return excludeDynamicFields(context.relBuilder.peek(1).getRowType().getFieldNames()); + } + + /** Get static fields from current relation (excludes dynamic fields map). */ + static List getStaticFields(CalcitePlanContext context) { + return excludeDynamicFields(context.relBuilder.peek().getRowType().getFieldNames()); + } + + /** Get static fields from node */ + static List getStaticFields(RelNode node) { + return excludeDynamicFields(node.getRowType().getFieldNames()); + } + + /** Get static fields from the right side of a join (excludes dynamic fields map). */ + static List getRightStaticFields(CalcitePlanContext context) { + return getStaticFields(context); + } + + /** Filter out dynamic fields map from field names collection. */ + private static List excludeDynamicFields(Collection fields) { + return fields.stream().filter(field -> !isDynamicFieldMap(field)).collect(Collectors.toList()); + } + + /** Filter out metadata fields from field names collection. */ + private static List excludeMetaFields(Collection fields) { + return fields.stream().filter(field -> !isMetadataField(field)).collect(Collectors.toList()); + } + + /** Filter out dynamic fields and metadata fields from field names collection */ + private static List excludeSpecialFields(Collection fields) { + return fields.stream() + .filter(field -> !isMetadataField(field) && !isDynamicFieldMap(field)) + .collect(Collectors.toList()); + } + + /** Get remaining fields after excluding specified fields. */ + private static List getRemainingFields( + Collection existingFields, Collection excluded) { + List keys = excludeSpecialFields(existingFields); + keys.removeAll(excluded); + Collections.sort(keys); + return keys; + } + + /** Adjust fields to align the static/dynamic fields for join. */ + static void adjustJoinInputsForDynamicFields( + Optional leftAlias, Optional rightAlias, CalcitePlanContext context) { + if (hasDynamicFields(context.relBuilder.peek()) + || hasDynamicFields(context.relBuilder.peek(1))) { + // build once to modify the inputs already in the stack. + RelNode right = context.relBuilder.build(); + RelNode left = context.relBuilder.build(); + left = adjustFieldsForDynamicFields(left, right, context); + right = adjustFieldsForDynamicFields(right, left, context); + context.relBuilder.push(left); + // `as(alias)` is needed since `build()` won't preserve alias + leftAlias.map(alias -> context.relBuilder.as(alias)); + context.relBuilder.push(right); + rightAlias.map(alias -> context.relBuilder.as(alias)); + } + } + + /** Adjust fields to align the static/dynamic fields in `target` to `theOtherInput` */ + static RelNode adjustFieldsForDynamicFields( + RelNode target, RelNode theOtherInput, CalcitePlanContext context) { + if (hasDynamicFields(theOtherInput) && !hasDynamicFields(target)) { + List requiredStaticFields = getStaticFields(theOtherInput); + return adjustFieldsForDynamicFields(target, requiredStaticFields, context); + } + return target; + } + + /** + * Project node's fields in `requiredFieldNames` as static field, and put other fields into `_MAP` + * (dynamic fields) This projection is needed when merging an input with dynamic fields and an + * input without dynamic fields. This process can be eliminated once we fully integrate + * schema-on-read (https://github.com/opensearch-project/sql/issues/4984) + */ + static RelNode adjustFieldsForDynamicFields( + RelNode node, List staticFieldNames, CalcitePlanContext context) { + context.relBuilder.push(node); + List existingFields = node.getRowType().getFieldNames(); + List project = new ArrayList<>(); + for (String existingField : existingFields) { + if (staticFieldNames.contains(existingField)) { + project.add(context.rexBuilder.makeInputRef(node, existingFields.indexOf(existingField))); + } + } + project.add( + context.relBuilder.alias( + getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP)); + return context.relBuilder.project(project).build(); + } + + /** + * Build regular field projections from a map, with optional append logic for existing fields. + * + * @param map The source map made by JSON_EXTRACT_ALL + * @param regularFieldNames List of field names to extract + * @param existingFields Set of fields that already exist in the current relation + * @param context CalcitePlanContext + * @return List of RexNode projections with aliases + */ + static List buildRegularFieldProjections( + RexNode map, + List regularFieldNames, + Set existingFields, + CalcitePlanContext context) { + List fields = new ArrayList<>(); + for (String fieldName : regularFieldNames) { + RexNode item = getItemAsString(map, fieldName, context); + // Append if field already exists + if (existingFields.contains(fieldName)) { + item = + context.rexBuilder.makeCall( + BuiltinFunctionName.INTERNAL_APPEND, context.relBuilder.field(fieldName), item); + item = castToString(item, context); + } + fields.add(context.relBuilder.alias(item, fieldName)); + } + return fields; + } + + /** + * Build dynamic map field projection when wildcards are present. + * + * @param map The source map RexNode + * @param sortedRegularFields Sorted list of regular fields to exclude from dynamic map + * @param existingFields Set of existing fields in the current relation + * @param context CalcitePlanContext + * @return RexNode for the dynamic map field + */ + static RexNode buildDynamicMapFieldProjection( + RexNode map, + List sortedRegularFields, + Set existingFields, + CalcitePlanContext context) { + RexNode dynamicMapField = createDynamicMapField(map, sortedRegularFields, context); + List remainingFields = getRemainingFields(existingFields, sortedRegularFields); + + if (!remainingFields.isEmpty()) { + // Add existing fields to map + RexNode existingFieldsMap = getFieldsAsMap(existingFields, sortedRegularFields, context); + dynamicMapField = + context.rexBuilder.makeCall( + BuiltinFunctionName.MAP_APPEND, existingFieldsMap, dynamicMapField); + } + + if (isDynamicFieldsExists(context)) { + RexNode existingMap = context.relBuilder.field(DYNAMIC_FIELDS_MAP); + dynamicMapField = + context.rexBuilder.makeCall(BuiltinFunctionName.MAP_APPEND, existingMap, dynamicMapField); + } + + return dynamicMapField; + } + + /** Get map item as string with type casting. */ + private static RexNode getItemAsString( + RexNode map, String fieldName, CalcitePlanContext context) { + RexNode item = context.rexBuilder.itemCall(map, fieldName); + // Cast to string for type consistency. (This cast will be removed once functions are adopted + // to ANY type) + return context.relBuilder.cast(item, SqlTypeName.VARCHAR); + } + + /** Cast a RexNode to string type. */ + private static RexNode castToString(RexNode node, CalcitePlanContext context) { + return context.relBuilder.cast(node, SqlTypeName.VARCHAR); + } + + /** Create dynamic map field by removing regular fields from full map */ + private static RexNode createDynamicMapField( + RexNode fullMap, List sortedRegularFields, CalcitePlanContext context) { + if (sortedRegularFields.isEmpty()) { + return fullMap; + } + + List stringLiteralList = getStringLiteralList(sortedRegularFields, context); + + RelDataType stringArrayType = + context + .rexBuilder + .getTypeFactory() + .createArrayType( + context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), -1); + RexNode keyArray = + context.rexBuilder.makeCall( + stringArrayType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, stringLiteralList); + + // MAP_REMOVE(fullMap, keyArray) → filtered map with only unmapped fields + return context.rexBuilder.makeCall(BuiltinFunctionName.MAP_REMOVE, fullMap, keyArray); + } + + /** Convert fields to map representation */ + private static RexNode getFieldsAsMap( + Collection existingFields, Collection excluded, CalcitePlanContext context) { + List keys = excludeMetaFields(existingFields); + keys.removeAll(excluded); + Collections.sort(keys); + RexNode keysArray = getStringLiteralArray(keys, context); + List values = + keys.stream().map(key -> context.relBuilder.field(key)).collect(Collectors.toList()); + RexNode valuesArray = makeArray(values, context); + return context.rexBuilder.makeCall(BuiltinFunctionName.MAP_FROM_ARRAYS, keysArray, valuesArray); + } + + /** Create string literal array from collection of strings */ + private static RexNode getStringLiteralArray( + Collection keys, CalcitePlanContext context) { + List stringLiteralList = getStringLiteralList(keys, context); + + return context.rexBuilder.makeCall( + getStringArrayType(context), + SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, + stringLiteralList); + } + + /** Create list of string literals from list of strings */ + private static List getStringLiteralList( + Collection strings, CalcitePlanContext context) { + return strings.stream() + .sorted() + .map(name -> context.rexBuilder.makeLiteral(name)) + .collect(Collectors.toList()); + } + + /** Create an array from list of RexNodes */ + private static RexNode makeArray(List items, CalcitePlanContext context) { + return context.rexBuilder.makeCall( + getStringArrayType(context), SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, items); + } + + /** Get RelDataType for string arrays */ + private static RelDataType getStringArrayType(CalcitePlanContext context) { + return context + .rexBuilder + .getTypeFactory() + .createArrayType( + context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), -1); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java index 35db7c064fd..adefaaae22a 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java +++ b/core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java @@ -27,7 +27,9 @@ import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.PPLBuiltinOperators; +import org.opensearch.sql.expression.function.PPLFuncImpTable; public class ExtendedRexBuilder extends RexBuilder { @@ -48,6 +50,17 @@ public RexNode and(RexNode left, RexNode right) { return this.makeCall(booleanType, SqlStdOperatorTable.AND, List.of(left, right)); } + /** Create an item access call for map/array access. */ + public RexNode itemCall(RexNode node, String key) { + return PPLFuncImpTable.INSTANCE.resolve( + this, BuiltinFunctionName.INTERNAL_ITEM, node, this.makeLiteral(key)); + } + + /** Create a function call using PPLFuncImpTable. */ + public RexNode makeCall(BuiltinFunctionName functionName, RexNode... args) { + return PPLFuncImpTable.INSTANCE.resolve(this, functionName, args); + } + public RelDataType commonType(RexNode... nodes) { return this.getTypeFactory() .leastRestrictive(Arrays.stream(nodes).map(RexNode::getType).collect(Collectors.toList())); diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/DynamicFieldsConstants.java b/core/src/main/java/org/opensearch/sql/calcite/plan/DynamicFieldsConstants.java new file mode 100644 index 00000000000..07554e6a511 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/DynamicFieldsConstants.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class DynamicFieldsConstants { + /** Special field name for the map containing dynamic fields */ + public static final String DYNAMIC_FIELDS_MAP = "_MAP"; +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/DynamicFieldsResultProcessor.java b/core/src/main/java/org/opensearch/sql/calcite/utils/DynamicFieldsResultProcessor.java new file mode 100644 index 00000000000..01eade7e18e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/DynamicFieldsResultProcessor.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.experimental.UtilityClass; +import org.opensearch.sql.data.model.ExprNullValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.executor.ExecutionEngine.Schema; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; + +/** Utility class for expanding dynamic fields in QueryResponse into result columns */ +@UtilityClass +public class DynamicFieldsResultProcessor { + + /** + * Expand dynamic fields into individual columns in QueryResponse. + * + * @param response Original QueryResponse with _MAP column + * @return New QueryResponse with expanded individual columns + */ + public static QueryResponse expandDynamicFields(QueryResponse response) { + if (!hasDynamicFields(response)) { + return response; + } + + Map dynamicFieldTypes = getDynamicFieldTypes(response.getResults()); + Schema expandedSchema = createExpandedSchema(response.getSchema(), dynamicFieldTypes); + List expandedRows = expandResultRows(response.getResults(), expandedSchema); + + return new QueryResponse(expandedSchema, expandedRows, response.getCursor()); + } + + private static boolean hasDynamicFields(QueryResponse response) { + return response.getSchema().getColumns().stream() + .anyMatch(column -> DYNAMIC_FIELDS_MAP.equals(column.getName())); + } + + private static Schema createExpandedSchema( + Schema originalSchema, Map dynamicFieldTypes) { + List expandedColumns = + originalSchema.getColumns().stream() + .filter(col -> !DYNAMIC_FIELDS_MAP.equals(col.getName())) + .collect(Collectors.toList()); + Set staticFields = + expandedColumns.stream().map(col -> col.getName()).collect(Collectors.toSet()); + + List sortedDynamicFields = + dynamicFieldTypes.keySet().stream().sorted().collect(Collectors.toList()); + for (String dynamicFieldName : sortedDynamicFields) { + ExprType fieldType = dynamicFieldTypes.get(dynamicFieldName); + if (!staticFields.contains(dynamicFieldName)) { + expandedColumns.add(new Column(dynamicFieldName, null, fieldType)); + } + } + + return new Schema(expandedColumns); + } + + /** Expands result rows by extracting MAP field values into individual columns. */ + private static List expandResultRows( + List originalResults, Schema expandedSchema) { + List expandedResults = new ArrayList<>(); + + for (ExprValue row : originalResults) { + if (row instanceof ExprTupleValue) { + expandedResults.add(expandRow((ExprTupleValue) row, expandedSchema)); + } else { + // Non-tuple rows are passed through unchanged + expandedResults.add(row); + } + } + + return expandedResults; + } + + private static ExprTupleValue expandRow(ExprTupleValue row, Schema expandedSchema) { + Map expandedRow = new LinkedHashMap<>(); + Map originalRow = row.tupleValue(); + Map dynamicFields = getDynamicFields(row); + + for (Column column : expandedSchema.getColumns()) { + String colName = column.getName(); + expandedRow.put(colName, getColValue(originalRow, dynamicFields, colName)); + } + return ExprTupleValue.fromExprValueMap(expandedRow); + } + + private static ExprValue getColValue( + Map originalRow, Map dynamicFields, String colName) { + if (originalRow.containsKey(colName)) { + return originalRow.get(colName); + } else if (dynamicFields.containsKey(colName)) { + // All the dynamic fields are converted to STRING for type consistency. + return convertToStringValue(dynamicFields.get(colName)); + } else { + return ExprValueUtils.nullValue(); + } + } + + private static ExprValue convertToStringValue(ExprValue v) { + if (v instanceof ExprStringValue || v instanceof ExprNullValue) { + return v; + } else { + return new ExprStringValue(String.valueOf(v)); + } + } + + private static Map getDynamicFieldTypes(List results) { + Set fieldNames = collectDynamicFields(results); + Map inferredTypes = new LinkedHashMap<>(); + for (String fieldName : fieldNames) { + inferredTypes.put(fieldName, ExprCoreType.STRING); + } + return inferredTypes; + } + + private Set collectDynamicFields(List results) { + Set fieldNames = new HashSet<>(); + + for (ExprValue row : results) { + if (row instanceof ExprTupleValue) { + Map dynamicFields = getDynamicFields(row); + fieldNames.addAll(dynamicFields.keySet()); + } + } + return fieldNames; + } + + private static Map getDynamicFields(ExprValue row) { + ExprValue mapValue = row.tupleValue().get(DYNAMIC_FIELDS_MAP); + if (mapValue == null || mapValue.isNull() || mapValue.isMissing()) { + return Map.of(); + } + + if (mapValue instanceof ExprTupleValue) { + return mapValue.tupleValue(); + } + + return Map.of(); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java index e662fabb557..fc8090a9513 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/JoinAndLookupUtils.java @@ -5,6 +5,8 @@ package org.opensearch.sql.calcite.utils; +import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; + import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -12,12 +14,17 @@ import java.util.stream.Collectors; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlLibraryOperators; import org.apache.calcite.util.Pair; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.calcite.CalcitePlanContext; public interface JoinAndLookupUtils { + public enum OverwriteMode { + LEFT_WINS, + RIGHT_WINS + } static JoinRelType translateJoinType(Join.JoinType joinType) { switch (joinType) { @@ -96,12 +103,57 @@ static void renameToExpectedFields( int sourceFieldsCountLeft, CalcitePlanContext context) { List oldFields = context.relBuilder.peek().getRowType().getFieldNames(); - assert sourceFieldsCountLeft + expectedProvidedFieldNames.size() == oldFields.size() + boolean hasDynamicFields = oldFields.contains(DYNAMIC_FIELDS_MAP); + int dynamicFieldsCount = hasDynamicFields ? 1 : 0; + assert sourceFieldsCountLeft + expectedProvidedFieldNames.size() + dynamicFieldsCount + == oldFields.size() : "The source fields count left plus new provided fields count must equal to the output" + " fields count of current plan(i.e project-join)."; List newFields = new ArrayList<>(oldFields.size()); newFields.addAll(oldFields.subList(0, sourceFieldsCountLeft)); newFields.addAll(expectedProvidedFieldNames); + if (hasDynamicFields) { + newFields.add(DYNAMIC_FIELDS_MAP); + } context.relBuilder.rename(newFields); } + + /** + * Merge _MAP (dynamic fields map) if there are two. This method should be called after join when + * both inputs could contain _MAP + */ + static void mergeDynamicFieldsAsNeeded(CalcitePlanContext context, OverwriteMode overwriteMode) { + List fieldNames = context.relBuilder.peek().getRowType().getFieldNames(); + List fields = context.relBuilder.fields(); + List dynamicFieldMaps = new ArrayList<>(); + List result = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + String fieldName = fieldNames.get(i); + RexNode field = fields.get(i); + if (fieldName.startsWith(DYNAMIC_FIELDS_MAP)) { + dynamicFieldMaps.add(field); + } else { + result.add(field); + } + } + + if (dynamicFieldMaps.size() <= 1) { + return; + } else if (dynamicFieldMaps.size() == 2) { + RexNode concat = + (overwriteMode == OverwriteMode.RIGHT_WINS + ? context.relBuilder.call( + SqlLibraryOperators.MAP_CONCAT, + dynamicFieldMaps.get(0), + dynamicFieldMaps.get(dynamicFieldMaps.size()-1)) + : context.relBuilder.call( + SqlLibraryOperators.MAP_CONCAT, + dynamicFieldMaps.get(dynamicFieldMaps.size()-1), + dynamicFieldMaps.get(0))); + result.add(context.relBuilder.alias(concat, DYNAMIC_FIELDS_MAP)); + } else if (dynamicFieldMaps.size() > 2) { + throw new IllegalStateException("More than two _MAP exist while joining dynamic fields."); + } + context.relBuilder.project(result); + } } diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index 58898213f1b..58af6f7d288 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -74,6 +74,7 @@ public enum BuiltinFunctionName { MAP_APPEND(FunctionName.of("map_append"), true), MAP_CONCAT(FunctionName.of("map_concat"), true), MAP_REMOVE(FunctionName.of("map_remove"), true), + MAP_FROM_ARRAYS(FunctionName.of("map_from_arrays"), true), MVAPPEND(FunctionName.of("mvappend")), INTERNAL_APPEND(FunctionName.of("append"), true), MVJOIN(FunctionName.of("mvjoin")), diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index 345bd2985c4..9f787e87978 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -128,6 +128,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.MAKETIME; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MAP_APPEND; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MAP_CONCAT; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.MAP_FROM_ARRAYS; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MAP_REMOVE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MATCH; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MATCH_BOOL_PREFIX; @@ -1055,6 +1056,7 @@ void populate() { registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND); registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT); registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE); + registerOperator(MAP_FROM_ARRAYS, SqlLibraryOperators.MAP_FROM_ARRAYS); registerOperator(ARRAY_LENGTH, SqlLibraryOperators.ARRAY_LENGTH); registerOperator(ARRAY_SLICE, SqlLibraryOperators.ARRAY_SLICE); registerOperator(FORALL, PPLBuiltinOperators.FORALL); diff --git a/core/src/test/java/org/opensearch/sql/ast/analysis/FieldResolutionResultTest.java b/core/src/test/java/org/opensearch/sql/ast/analysis/FieldResolutionResultTest.java index 31c009f76c1..bcaafb77726 100644 --- a/core/src/test/java/org/opensearch/sql/ast/analysis/FieldResolutionResultTest.java +++ b/core/src/test/java/org/opensearch/sql/ast/analysis/FieldResolutionResultTest.java @@ -343,6 +343,28 @@ void testFieldResolutionResultAndWithBothNullWildcards() { assertFalse(combined.hasWildcards()); } + @Test + void testWildcardAnyAndAny() { + FieldResolutionResult result1 = new FieldResolutionResult(Set.of("a"), "*"); + FieldResolutionResult result2 = new FieldResolutionResult(Set.of("b"), "*"); + + FieldResolutionResult combined = result1.and(result2); + + assertEquals(Set.of("a", "b"), combined.getRegularFields()); + assertEquals("*", combined.getWildcard().toString()); + } + + @Test + void testWildcardAnyAndNull() { + FieldResolutionResult result1 = new FieldResolutionResult(Set.of("a"), "*"); + FieldResolutionResult result2 = new FieldResolutionResult(Set.of("b")); + + FieldResolutionResult combined = result1.and(result2); + + assertEquals(Set.of("b"), combined.getRegularFields()); + assertEquals("", combined.getWildcard().toString()); + } + @Test void testFieldResolutionResultOrOperation() { FieldResolutionResult result = new FieldResolutionResult(Set.of("field1"), "user*"); @@ -514,4 +536,22 @@ void testAndWildcardAndWithAnyWildcard() { assertEquals(and, result); assertTrue(result.matches("username")); } + + @Test + void testHasPartialWildcardsWithNoWildcard() { + FieldResolutionResult result = new FieldResolutionResult(Set.of("field1", "field2")); + assertFalse(result.hasPartialWildcards()); + } + + @Test + void testHasPartialWildcardsWithAnyWildcard() { + FieldResolutionResult result = new FieldResolutionResult(Set.of("field1"), "*"); + assertFalse(result.hasPartialWildcards()); + } + + @Test + void testHasPartialWildcardsWithSingleWildcard() { + FieldResolutionResult result = new FieldResolutionResult(Set.of("field1"), "user*"); + assertTrue(result.hasPartialWildcards()); + } } diff --git a/core/src/test/java/org/opensearch/sql/calcite/utils/DynamicFieldsResultProcessorTest.java b/core/src/test/java/org/opensearch/sql/calcite/utils/DynamicFieldsResultProcessorTest.java new file mode 100644 index 00000000000..cb393f00fdb --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/utils/DynamicFieldsResultProcessorTest.java @@ -0,0 +1,346 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.opensearch.sql.data.model.ExprBooleanValue; +import org.opensearch.sql.data.model.ExprIntegerValue; +import org.opensearch.sql.data.model.ExprNullValue; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.executor.ExecutionEngine.Schema; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; +import org.opensearch.sql.executor.pagination.Cursor; + +class DynamicFieldsResultProcessorTest { + + @Test + void expandDynamicFields_withMultipleRowsAndFields() { + QueryResponse response = + createResponseWithDynamicFields( + List.of( + createRow(1, "Alice", Map.of("age", intVal(30), "city", strVal("NYC"))), + createRow(2, "Bob", Map.of("age", intVal(25), "country", strVal("USA"))))); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "name", "age", "city", "country"); + assertDynamicFieldsAreStrings(expanded, "age", "city", "country"); + + assertRowValues( + expanded, + 0, + Map.of( + "id", + intVal(1), + "name", + strVal("Alice"), + "age", + strVal("30"), + "city", + strVal("NYC"), + "country", + nullVal())); + assertRowValues( + expanded, + 1, + Map.of( + "id", + intVal(2), + "name", + strVal("Bob"), + "age", + strVal("25"), + "city", + nullVal(), + "country", + strVal("USA"))); + } + + @Test + void expandDynamicFields_returnsOriginalResponse_whenNoDynamicFieldsMap() { + Schema schema = + createSchema(List.of(col("id", ExprCoreType.INTEGER), col("name", ExprCoreType.STRING))); + List results = + List.of(createTuple(Map.of("id", intVal(1), "name", strVal("Alice")))); + QueryResponse response = new QueryResponse(schema, results, null); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSame(response, expanded); + } + + @Test + void expandDynamicFields_handlesNullMapValue() { + QueryResponse response = createResponseWithDynamicFields(List.of(createRowWithNullMap(1))); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "name"); + assertRowValues(expanded, 0, Map.of("id", intVal(1), "name", nullVal())); + } + + @Test + void expandDynamicFields_handlesEmptyMapValue() { + QueryResponse response = + createResponseWithDynamicFields(List.of(createRow(1, "Alice", Map.of()))); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "name"); + } + + @Test + void expandDynamicFields_staticFieldsTakePrecedence_overDynamicFields() { + Schema schema = createSchemaWithStaticAge(); + Map row = new LinkedHashMap<>(); + row.put("id", intVal(1)); + row.put("age", intVal(30)); + row.put(DYNAMIC_FIELDS_MAP, createTuple(Map.of("age", intVal(99), "city", strVal("NYC")))); + + QueryResponse response = new QueryResponse(schema, List.of(createTuple(row)), null); + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "age", "city"); + assertEquals(ExprCoreType.INTEGER, getColumnType(expanded, "age")); + assertRowValues(expanded, 0, Map.of("id", intVal(1), "age", intVal(30), "city", strVal("NYC"))); + } + + @Test + void expandDynamicFields_convertsNonStringValues_toString() { + QueryResponse response = + createResponseWithDynamicFields( + List.of( + createRow( + 1, + "Alice", + Map.of( + "intField", intVal(42), + "boolField", ExprBooleanValue.of(true), + "stringField", strVal("text"), + "nullField", nullVal())))); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertRowValues( + expanded, + 0, + Map.of( + "intField", strVal("42"), + "boolField", strVal("true"), + "stringField", strVal("text"), + "nullField", nullVal())); + } + + @Test + void expandDynamicFields_sortsDynamicFields_alphabetically() { + QueryResponse response = + createResponseWithDynamicFields( + List.of( + createRow( + 1, + "Alice", + Map.of( + "zebra", strVal("z"), + "apple", strVal("a"), + "mango", strVal("m"), + "banana", strVal("b"))))); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "name", "apple", "banana", "mango", "zebra"); + } + + @Test + void expandDynamicFields_mergesDynamicFields_acrossMultipleRows() { + QueryResponse response = + createResponseWithDynamicFields( + List.of( + createRow(1, "Alice", Map.of("field1", strVal("v1"), "field2", strVal("v2"))), + createRow(2, "Bob", Map.of("field2", strVal("v2b"), "field3", strVal("v3"))), + createRowWithNullMap(3))); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "name", "field1", "field2", "field3"); + assertRowValues( + expanded, + 0, + Map.of( + "id", + intVal(1), + "name", + strVal("Alice"), + "field1", + strVal("v1"), + "field2", + strVal("v2"), + "field3", + nullVal())); + assertRowValues( + expanded, + 1, + Map.of( + "id", + intVal(2), + "name", + strVal("Bob"), + "field1", + nullVal(), + "field2", + strVal("v2b"), + "field3", + strVal("v3"))); + assertRowValues( + expanded, + 2, + Map.of( + "id", intVal(3), "name", nullVal(), "field1", nullVal(), "field2", nullVal(), "field3", + nullVal())); + } + + @Test + void expandDynamicFields_handlesEmptyResults() { + Schema schema = createSchemaWithDynamicFields(); + QueryResponse response = new QueryResponse(schema, List.of(), null); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertSchemaColumns(expanded, "id", "name"); + assertEquals(0, expanded.getResults().size()); + } + + @Test + void expandDynamicFields_preservesCursor() { + Cursor cursor = new Cursor("test-cursor"); + QueryResponse response = + createResponseWithDynamicFields(List.of(createRow(1, "Alice", Map.of())), cursor); + + QueryResponse expanded = DynamicFieldsResultProcessor.expandDynamicFields(response); + + assertEquals(cursor, expanded.getCursor()); + } + + private QueryResponse createResponseWithDynamicFields(List> rows) { + return createResponseWithDynamicFields(rows, null); + } + + private QueryResponse createResponseWithDynamicFields( + List> rows, Cursor cursor) { + Schema schema = createSchemaWithDynamicFields(); + List results = + rows.stream().map(this::createTuple).map(tuple -> (ExprValue) tuple).collect(Collectors.toList()); + return new QueryResponse(schema, results, cursor); + } + + private Map createRow( + int id, String name, Map dynamicFields) { + Map row = new LinkedHashMap<>(); + row.put("id", intVal(id)); + row.put("name", strVal(name)); + row.put(DYNAMIC_FIELDS_MAP, createTuple(dynamicFields)); + return row; + } + + private Map createRowWithNullMap(int id) { + Map row = new LinkedHashMap<>(); + row.put("id", intVal(id)); + row.put("name", nullVal()); + row.put(DYNAMIC_FIELDS_MAP, nullVal()); + return row; + } + + private Schema createSchemaWithDynamicFields() { + return createSchema( + List.of( + col("id", ExprCoreType.INTEGER), + col("name", ExprCoreType.STRING), + col(DYNAMIC_FIELDS_MAP, ExprCoreType.STRUCT))); + } + + private Schema createSchemaWithStaticAge() { + return createSchema( + List.of( + col("id", ExprCoreType.INTEGER), + col("age", ExprCoreType.INTEGER), + col(DYNAMIC_FIELDS_MAP, ExprCoreType.STRUCT))); + } + + private Schema createSchema(List columns) { + return new Schema(columns); + } + + private Column col(String name, ExprType type) { + return new Column(name, null, type); + } + + private ExprTupleValue createTuple(Map values) { + return ExprTupleValue.fromExprValueMap(values); + } + + private ExprIntegerValue intVal(int value) { + return new ExprIntegerValue(value); + } + + private ExprStringValue strVal(String value) { + return new ExprStringValue(value); + } + + private ExprNullValue nullVal() { + return ExprNullValue.of(); + } + + private void assertSchemaColumns(QueryResponse response, String... expectedColumns) { + assertNotNull(response); + List columns = response.getSchema().getColumns(); + assertEquals(expectedColumns.length, columns.size()); + for (int i = 0; i < expectedColumns.length; i++) { + assertEquals(expectedColumns[i], columns.get(i).getName()); + } + } + + private void assertDynamicFieldsAreStrings(QueryResponse response, String... fieldNames) { + for (String fieldName : fieldNames) { + assertEquals(ExprCoreType.STRING, getColumnType(response, fieldName)); + } + } + + private ExprType getColumnType(QueryResponse response, String columnName) { + return response.getSchema().getColumns().stream() + .filter(col -> col.getName().equals(columnName)) + .findFirst() + .map(Column::getExprType) + .orElseThrow(() -> new AssertionError("Column not found: " + columnName)); + } + + private void assertRowValues( + QueryResponse response, int rowIndex, Map expectedValues) { + ExprTupleValue row = (ExprTupleValue) response.getResults().get(rowIndex); + Map actualValues = row.tupleValue(); + + for (Map.Entry entry : expectedValues.entrySet()) { + String fieldName = entry.getKey(); + ExprValue expectedValue = entry.getValue(); + ExprValue actualValue = actualValues.get(fieldName); + + assertEquals(expectedValue, actualValue); + } + } +} diff --git a/docs/user/ppl/cmd/spath.md b/docs/user/ppl/cmd/spath.md index 3e27ab04f8f..f107b0df030 100644 --- a/docs/user/ppl/cmd/spath.md +++ b/docs/user/ppl/cmd/spath.md @@ -15,7 +15,7 @@ The `spath` command extracts fields from structured JSON data. It supports two m spath input= [output=] [path=] ``` -### Field Resolution-based Extraction (Experimental) +### Auto Extraction (Experimental) ```syntax spath input= @@ -33,13 +33,18 @@ The `spath` command supports the following parameters. For more information about path syntax, see [json_extract](../functions/json.md#json_extract). -### Field Resolution-based Extraction Notes +### Auto Extraction Notes -* Extracts only required fields based on downstream commands requirements (interim solution until full fields extraction is implemented) -* **Limitation**: It raises error if extracted fields cannot be identified by following commands (i.e. `fields`, or `stats` command is needed) -* **Limitation**: Cannot use wildcards (`*`) in field selection - only explicit field names are supported -* **Limitation**: All extracted fields are returned as STRING type -* **Limitation**: Filter with query (`where in/exists [...]` ) is not supported after `spath` command +* Automatically extracts all the fields from input field. If a field with the same name already exists, extracted values will be appended to existing field value. +* **Limitation**: When `fields` command used together with `spath` command: partial wildcard (e.g. `prefix*`, `*suffix`) is not supported; and full wildcard (`*`) needs to be placed at the end of the fields list. +* **Limitation**: All extracted fields are returned as STRING type. +* **Limitation**: Field order in the result could be inconsistent with query without `spath` command, and the behavior might change in the future version. +* **Limitation**: Filter with subquery (`where in/exists [...]`) is not supported with `spath` command. +* **Limitation**: `fillnull` command requires to specify fields when used with `spath` command. +* **Limitation**: Following commands cannot be used together with `spath` command: `appendcol`, `multisearch`, `lookup`. +* **Performance**: Filter records before `spath` command for best performance (see Example 8) + +* **Internal Implementation**: The auto extraction feature uses an internal `_MAP` system column to store dynamic fields during query processing. This column is automatically expanded into individual columns in the final results and users don't need to reference it directly. For more information, see [System Columns](../general/identifiers.md#system-columns). ## Example 1: Basic field extraction @@ -89,7 +94,6 @@ fetched rows / total rows = 3/3 | {"list": [5, 6], "nest_out": {"nest_in": "a"}} | 5 | [5,6] | a | +------------------------------------------------------+---------------+--------------+--------+ ``` - ## Example 3: Sum of inner elements @@ -102,7 +106,7 @@ source=structured | stats sum(n) | fields `sum(n)` ``` - + The query returns the following results. The `spath` command always returns inner values as strings: ```text @@ -139,9 +143,9 @@ fetched rows / total rows = 3/3 +-------+---+ ``` -## Example 5: Field Resolution-based Extraction +## Example 5: Full Extraction -Extract multiple fields automatically based on downstream requirements. The `spath` command analyzes which fields are needed and extracts only those fields. +Extract multiple fields automatically. In case field already exists, extracted values are appended to the existing value. ```ppl source=structured @@ -154,16 +158,16 @@ Expected output: ```text fetched rows / total rows = 3/3 -+--------------------------------------+----+----+--------+ -| doc_multi | a | b | c | -|--------------------------------------+----+----+--------| -| {"a": 10, "b": 20, "c": 30, "d": 40} | 10 | 20 | [1,30] | -| {"a": 15, "b": 25, "c": 35, "d": 45} | 15 | 25 | [1,35] | -| {"a": 11, "b": 21, "c": 31, "d": 41} | 11 | 21 | [1,31] | -+--------------------------------------+----+----+--------+ ++--------------------------------------+----+----+---------+ +| doc_multi | a | b | c | +|--------------------------------------+----+----+---------| +| {"a": 10, "b": 20, "c": 30, "d": 40} | 10 | 20 | [1, 30] | +| {"a": 15, "b": 25, "c": 35, "d": 45} | 15 | 25 | [1, 35] | +| {"a": 11, "b": 21, "c": 31, "d": 41} | 11 | 21 | [1, 31] | ++--------------------------------------+----+----+---------+ ``` -This extracts only fields `a`, `b`, and `c` from the JSON in `doc_multi` field, even though the JSON contains fields `d` as well. All extracted fields are returned as STRING type. As `c` in the example, extracted value is appended to organize an array if an extracted field already exists. +All extracted fields are returned as STRING type. As shown with field `c` in the example, the extracted value is appended to organize an array if an extracted field already exists. ## Example 6: Field Merge with Dotted Names @@ -189,76 +193,48 @@ fetched rows / total rows = 1/1 In this example, the JSON contains both `"a.b": 1` (direct field with dot) and `"a": {"b": 2}` (nested path). The `spath` command extracts both values and merges them into the array `[1, 2]`. -## Example 7: Field Resolution with Eval +## Example 7: Auto Extraction Limitations -This example shows field resolution with computed fields. The `spath` command extracts only the fields needed by downstream commands. +**Important**: It raises error if partial wildcard is used. ```ppl source=structured | spath input=doc_multi -| eval sum_ab = cast(a as int) + cast(b as int) -| fields doc_multi, a, b, sum_ab -``` - -Expected output: - -```text -fetched rows / total rows = 3/3 -+--------------------------------------+----+----+--------+ -| doc_multi | a | b | sum_ab | -|--------------------------------------+----+----+--------| -| {"a": 10, "b": 20, "c": 30, "d": 40} | 10 | 20 | 30 | -| {"a": 15, "b": 25, "c": 35, "d": 45} | 15 | 25 | 40 | -| {"a": 11, "b": 21, "c": 31, "d": 41} | 11 | 21 | 32 | -+--------------------------------------+----+----+--------+ +| fields a, prefix*, b # ERROR ``` -The `spath` command extracts only fields `a` and `b` (needed by the `eval` command), which are then cast to integers and summed. Fields `c` and `d` are not extracted since they're not needed. - -## Example 8: Field Resolution with Stats - -This example demonstrates field resolution with aggregation. The `spath` command extracts only the fields needed for grouping and aggregation. +**Important**: It raises error if wildcard is used in the middle of field list. ```ppl source=structured +| fields doc_multi +| head 1 | spath input=doc_multi -| stats avg(cast(a as int)) as avg_a, sum(cast(b as int)) as sum_b by c -``` - -Expected output: - -```text -fetched rows / total rows = 3/3 -+-------+-------+----+ -| avg_a | sum_b | c | -|-------+-------+----| -| 10.0 | 20 | 30 | -| 11.0 | 21 | 31 | -| 15.0 | 25 | 35 | -+-------+-------+----+ +| fields doc_multi, *, b # ERROR ``` -The `spath` command extracts fields `a`, `b`, and `c` (needed by the `stats` command for aggregation and grouping). Field `d` is not extracted since it's not used. - -## Example 9: Field Resolution Limitations - -**Important**: It raises error if extracted fields cannot be identified by following commands +It works when wildcard is placed at the end of field list. ```ppl source=structured +| fields doc_multi +| head 1 | spath input=doc_multi -| eval x = a * b # ERROR: Requires field selection (fields or stats command) +| fields doc_multi, b, * ``` -**Important**: Wildcards are not supported in field resolution mode: +Expected output: -```ppl -source=structured -| spath input=doc_multi -| fields a, b* # ERROR: Spath command cannot extract arbitrary fields +```text +fetched rows / total rows = 1/1 ++--------------------------------------+----+----+----+----+ +| doc_multi | b | a | c | d | +|--------------------------------------+----+----+----+----| +| {"a": 10, "b": 20, "c": 30, "d": 40} | 20 | 10 | 30 | 40 | ++--------------------------------------+----+----+----+----+ ``` -## Example 10: Performance Considerations +## Example 8: Performance Considerations **Important**: The `spath` command processes data on the coordinator node after retrieval from data nodes. Commands placed after `spath` cannot utilize OpenSearch index capabilities, which significantly impacts performance on large datasets. diff --git a/docs/user/ppl/general/identifiers.md b/docs/user/ppl/general/identifiers.md index c532e9929f3..ca08b16a421 100644 --- a/docs/user/ppl/general/identifiers.md +++ b/docs/user/ppl/general/identifiers.md @@ -185,4 +185,17 @@ fetched rows / total rows = 4/4 | Dale | Adams | accounts | -2 | +-----------+----------+----------+-------+ ``` - \ No newline at end of file + + +## System Columns + +### Description + +PPL uses special system columns for internal processing. These columns are reserved and have special behavior: + +- `_MAP`: Internal system column used to store dynamic fields. This column is automatically expanded into individual columns in the final query results. Users don't need to reference this field directly, as it's handled transparently by the query engine. + +### Notes + +- System columns like `_MAP` are excluded from wildcard field selection +- Unlike metadata fields, system columns are not meant to be explicitly selected by users diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 8988cbc04d9..e1fe058e9d5 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -2354,6 +2354,14 @@ public void testSpathWithoutPathExplain() throws IOException { expected, explainQueryYaml(source(TEST_INDEX_LOGS, "spath input=message | fields test"))); } + @Test + public void testSpathWithDynamicFieldsExplain() throws IOException { + String expected = loadExpectedPlan("explain_spath_with_dynamic_fields.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml(source(TEST_INDEX_LOGS, "spath input=message | where status = '200'"))); + } + @Test public void testExplainInVariousModeAndFormat() throws IOException { enabledOnlyWhenPushdownIsEnabled(); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java index 6431590b096..0c2c2e94a7e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java @@ -5,7 +5,6 @@ package org.opensearch.sql.calcite.remote; -import static org.opensearch.sql.util.MatcherUtils.array; import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; @@ -14,68 +13,114 @@ import java.io.IOException; import org.json.JSONObject; import org.junit.jupiter.api.Test; -import org.opensearch.client.Request; -import org.opensearch.sql.ppl.PPLIntegTestCase; - -public class CalcitePPLSpathCommandIT extends PPLIntegTestCase { - private static final String INDEX = "test_spath"; +public class CalcitePPLSpathCommandIT extends CalcitePPLSpathTestBase { @Override public void init() throws Exception { super.init(); enableCalcite(); - putItem(1, "simple", sj("{'a': 1, 'b': 2, 'c': 3}")); - putItem(2, "simple", sj("{'a': 1, 'b': 2, 'c': 3}")); - putItem(3, "nested", sj("{'nested': {'d': [1, 2, 3], 'e': 'str'}}")); - putItem(4, "join1", sj("{'key': 'k1', 'left': 'l'}")); - putItem(5, "join2", sj("{'key': 'k1', 'right': 'r1'}")); - putItem(6, "join2", sj("{'key': 'k2', 'right': 'r2'}")); - putItem(7, "overwrap", sj("{'a.b': 1, 'a': {'b': 2, 'c': 3}}")); + putJsonItem(1, "simple", sj("{'a': 1, 'b': 2, 'c': 3}")); + putJsonItem(2, "simple", sj("{'a': 1, 'b': 2, 'c': 3}")); + putJsonItem(3, "nested", sj("{'nested': {'d': [1, 2, 3], 'e': 'str'}}")); + putJsonItem(4, "overwrap", sj("{'a.b': 1, 'a': {'b': 2, 'c': 3}}")); + putJsonItem( + 5, "types", sj("{'string': 'STRING', 'boolean': true, 'number': 10.1, 'null': null}")); } - private void putItem(int id, String testCase, String json) throws Exception { - Request request = new Request("PUT", String.format("/%s/_doc/%d?refresh=true", INDEX, id)); - request.setJsonEntity(docWithJson(testCase, json)); - client().performRequest(request); + @Test + public void testSimpleSpath() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData output=result path=a" + + " | fields result | head 2"); + verifySchema(result, schema("result", "string")); + verifyDataRows(result, rows("1"), rows("1")); } - private String docWithJson(String testCase, String json) { - return String.format(sj("{'testCase': '%s', 'doc': '%s'}"), testCase, escape(json)); - } + private static final String EXPECTED_SPATH_WILDCARD_ERROR = + "Spath command cannot be used with partial wildcard such as `prefix*`."; - private String escape(String json) { - return json.replace("\"", "\\\""); + @Test + public void testSpathWithWildcard() throws IOException { + verifyExplainException( + "source=test_json | spath input=userData | fields a, b*", EXPECTED_SPATH_WILDCARD_ERROR); } - private String sj(String singleQuoteJson) { - return singleQuoteJson.replace("'", "\""); + @Test + public void testSpathWithoutFields() throws IOException { + JSONObject result = + executeQuery("source=test_json | where category='simple' | spath input=userData | head 1"); + verifySchema( + result, + schema("a", "string"), + schema("b", "string"), + schema("c", "string"), + schema("category", "string"), + schema("userData", "string")); + verifyDataRows(result, rows("1", "2", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"))); } @Test - public void testSimpleSpath() throws IOException { + public void testSpathWithOnlyWildcard() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='simple' | spath input=doc output=result path=a |" - + " fields result | head 2"); - verifySchema(result, schema("result", "string")); - verifyDataRows(result, rows("1"), rows("1")); + "source=test_json | where category='simple' | spath input=userData | fields * | head" + + " 1"); + verifySchema( + result, + schema("a", "string"), + schema("b", "string"), + schema("c", "string"), + schema("category", "string"), + schema("userData", "string")); + verifyDataRows(result, rows("1", "2", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"))); } - private static final String EXPECTED_ARBITRARY_FIELD_ERROR = - "Spath command cannot extract arbitrary fields. " - + "Please project fields explicitly by fields command without wildcard or stats command."; + @Test + public void testSpathWithFieldAndWildcard() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData | fields c, * | head" + + " 1"); + verifySchema( + result, + schema("c", "string"), + schema("a", "string"), + schema("b", "string"), + schema("category", "string"), + schema("userData", "string")); + verifyDataRows(result, rows("3", "1", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"))); + } @Test - public void testSpathWithoutFields() throws IOException { + public void testSpathWithFieldAndWildcardAtMiddle() throws IOException { verifyExplainException( - "source=test_spath | spath input=doc | eval a = 1", EXPECTED_ARBITRARY_FIELD_ERROR); + "source=test_json | where category='simple' | spath input=userData | fields c, *, b", + "Wildcard can be placed only at the end of the fields list (limit of spath command)."); } @Test - public void testSpathWithWildcard() throws IOException { - verifyExplainException( - "source=test_spath | spath input=doc | fields a, b*", EXPECTED_ARBITRARY_FIELD_ERROR); + public void testSpathTypes() throws IOException { + JSONObject result = + executeQuery("source=test_json | where category='types' | spath input=userData | head 1"); + verifySchema( + result, + schema("boolean", "string"), + schema("category", "string"), + schema("null", "string"), + schema("number", "string"), + schema("string", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows( + "true", + "types", + null, + "10.1", + "STRING", + sj("{'string': 'STRING', 'boolean': true, 'number': 10.1, 'null': null}"))); } private static final String EXPECTED_SUBQUERY_ERROR = @@ -84,8 +129,8 @@ public void testSpathWithWildcard() throws IOException { @Test public void testSpathWithSubsearch() throws IOException { verifyExplainException( - "source=test_spath | spath input=doc | where b in [source=test_spath | fields a] | fields" - + " b", + "source=test_json | spath input=userData | where b in [source=test_json | fields a] |" + + " fields b", EXPECTED_SUBQUERY_ERROR); } @@ -93,8 +138,8 @@ public void testSpathWithSubsearch() throws IOException { public void testSpathWithFields() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='simple' | spath input=doc | fields a, b, c | head" - + " 1"); + "source=test_json | where category='simple' | spath input=userData | fields a, b, c |" + + " head 1"); verifySchema(result, schema("a", "string"), schema("b", "string"), schema("c", "string")); verifyDataRows(result, rows("1", "2", "3")); } @@ -103,7 +148,8 @@ public void testSpathWithFields() throws IOException { public void testSpathWithAbsentField() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='simple' | spath input=doc | fields a, x | head 1"); + "source=test_json | where category='simple' | spath input=userData | fields a, x | head" + + " 1"); verifySchema(result, schema("a", "string"), schema("x", "string")); verifyDataRows(result, rows("1", null)); } @@ -112,8 +158,8 @@ public void testSpathWithAbsentField() throws IOException { public void testOverwrap() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='overwrap' | spath input=doc | fields a.b | head" - + " 1"); + "source=test_json | where category='overwrap' | spath input=userData | fields a.b |" + + " head 1"); verifySchema(result, schema("a.b", "string")); verifyDataRows(result, rows("[1, 2]")); } @@ -122,17 +168,34 @@ public void testOverwrap() throws IOException { public void testSpathTwice() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='simple' | spath input=doc | spath input=doc |" - + " fields a, doc | head 1"); - verifySchema(result, schema("a", "array"), schema("doc", "string")); - verifyDataRows(result, rows(array("1", "1"), sj("{'a': 1, 'b': 2, 'c': 3}"))); + "source=test_json | where category='simple' | spath input=userData | spath" + + " input=userData | fields a, userData | head 1"); + verifySchema(result, schema("a", "string"), schema("userData", "string")); + verifyDataRows(result, rows("[1, 1]", sj("{'a': 1, 'b': 2, 'c': 3}"))); + } + + @Test + public void testSpathTwiceWithDynamicFields() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData | spath" + + " input=userData | fields b, * | head 1"); + verifySchema( + result, + schema("b", "string"), + schema("a", "string"), + schema("c", "string"), + schema("category", "string"), + schema("userData", "string")); + verifyDataRows( + result, rows("[2, 2]", "[1, 1]", "[3, 3]", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"))); } @Test public void testSpathWithEval() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='simple' | spath input=doc |" + "source=test_json | where category='simple' | spath input=userData |" + " eval result = a * b * c | fields result | head 1"); verifySchema(result, schema("result", "double")); verifyDataRows(result, rows(6)); @@ -142,7 +205,7 @@ public void testSpathWithEval() throws IOException { public void testSpathWithStats() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='simple' | spath input=doc |" + "source=test_json | where category='simple' | spath input=userData |" + "stats count by a, b | head 1"); verifySchema(result, schema("count", "bigint"), schema("a", "string"), schema("b", "string")); verifyDataRows(result, rows(2, "1", "2")); @@ -152,21 +215,51 @@ public void testSpathWithStats() throws IOException { public void testSpathWithNestedFields() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='nested' | spath input=doc | fields `nested.d{}`," - + " nested.e"); + "source=test_json | where category='nested' | spath input=userData | fields" + + " `nested.d{}`, nested.e"); verifySchema(result, schema("nested.d{}", "string"), schema("nested.e", "string")); verifyDataRows(result, rows("[1, 2, 3]", "str")); } @Test - public void testSpathWithJoin() throws IOException { + public void testAppendWithSpathInMainAndDynamicFields() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData | head 1 | append" + + " [source=test_json | where category='simple' | eval d = 4 | head 1 ] | fields" + + " a, c, *"); + verifySchema( + result, + schema("a", "string"), + schema("c", "string"), + schema("b", "string"), + schema("category", "string"), + schema("d", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows("1", "3", "2", "simple", null, sj("{'a': 1, 'b': 2, 'c': 3}")), + rows(null, null, null, "simple", "4", sj("{'a': 1, 'b': 2, 'c': 3}"))); + } + + @Test + public void testAppendWithSpathInSubsearchDynamicFields() throws IOException { JSONObject result = executeQuery( - "source=test_spath | where testCase='join1' | spath input=doc | fields key, left | join" - + " key [source=test_spath | where testCase='join2' | spath input=doc | fields key," - + " right ] |fields key, left, right"); + "source=test_json | where category='simple' | eval d = 4 | head 1 | append" + + " [source=test_json | where category='simple' | spath input=userData | head 1 ] |" + + " fields a, c, *"); verifySchema( - result, schema("key", "string"), schema("left", "string"), schema("right", "string")); - verifyDataRows(result, rows("k1", "l", "r1")); + result, + schema("a", "string"), + schema("c", "string"), + schema("b", "string"), + schema("category", "string"), + schema("d", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows(null, null, null, "simple", "4", sj("{'a': 1, 'b': 2, 'c': 3}")), + rows("1", "3", "2", "simple", null, sj("{'a': 1, 'b': 2, 'c': 3}"))); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathTestBase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathTestBase.java new file mode 100644 index 00000000000..f39746751b5 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathTestBase.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import org.opensearch.client.Request; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +/** + * Base class for Calcite PPL spath command integration tests. Provides common utility methods for + * working with JSON data. + */ +public abstract class CalcitePPLSpathTestBase extends PPLIntegTestCase { + protected static final String INDEX_JSON = "test_json"; + protected static final String INDEX_WITHOUT_JSON = "test_regular"; + protected static final String INDEX_JSON_CONFLICT = "test_json_conflict"; + + protected void putJsonItem(int id, String category, String json) throws Exception { + Request request = new Request("PUT", String.format("/%s/_doc/%d?refresh=true", INDEX_JSON, id)); + request.setJsonEntity(docWithJson(category, json)); + client().performRequest(request); + } + + protected void putJsonItemConflict(int id, String category, String json) throws Exception { + Request request = + new Request("PUT", String.format("/%s/_doc/%d?refresh=true", INDEX_JSON_CONFLICT, id)); + request.setJsonEntity(docWithJson(category, json)); + client().performRequest(request); + } + + protected void putRegularItem(int id, String userId, String orderId, double amount, String notes) + throws Exception { + Request request = + new Request("PUT", String.format("/%s/_doc/%d?refresh=true", INDEX_WITHOUT_JSON, id)); + request.setJsonEntity(regularDoc(userId, orderId, amount, notes)); + client().performRequest(request); + } + + protected String regularDoc(String userId, String orderId, double amount, String notes) { + return String.format( + java.util.Locale.ROOT, + sj("{'userId': '%s', 'orderId': '%s', 'amount': %.1f, 'notes': '%s'}"), + userId, + orderId, + amount, + notes); + } + + protected String docWithJson(String category, String json) { + return String.format(sj("{'category': '%s', 'userData': '%s'}"), category, escape(json)); + } + + protected String escape(String json) { + return json.replace("\"", "\\\""); + } + + /** Converts single-quote JSON to double-quote JSON. */ + protected String sj(String singleQuoteJson) { + return singleQuoteJson.replace("'", "\""); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathWithJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathWithJoinIT.java new file mode 100644 index 00000000000..30c76069ac5 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathWithJoinIT.java @@ -0,0 +1,404 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +public class CalcitePPLSpathWithJoinIT extends CalcitePPLSpathTestBase { + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + + // Index with JSON field - requires spath to extract fields + putJsonItem( + 1, + "user1", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}")); + putJsonItem( + 2, + "user2", + sj("{'userId': 'u2', 'name': 'Bob', 'role': 'user', 'notes': 'from_json_index'}")); + putJsonItem( + 3, + "user3", + sj("{'userId': 'u3', 'name': 'Charlie', 'role': 'user', 'notes': 'from_json_index'}")); + + // Index without JSON field - regular structured data + putRegularItem(1, "u1", "order1", 100.0, "from_regular_index"); + putRegularItem(2, "u1", "order2", 150.0, "from_regular_index"); + putRegularItem(3, "u2", "order3", 200.0, "from_regular_index"); + putRegularItem(4, "u3", "order4", 75.0, "from_regular_index"); + + // Index with conflicting field values for overwrite testing + putJsonItemConflict( + 1, + "profile1", + sj("{'userId': 'u1', 'department': 'Engineering', 'notes': 'from_conflict_index'}")); + putJsonItemConflict( + 2, + "profile2", + sj("{'userId': 'u2', 'department': 'Sales', 'notes': 'from_conflict_index'}")); + } + + @Test + public void testSpathLeftJoinRegularRight() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | fields userId, name | join userId [source=" + + INDEX_WITHOUT_JSON + + " | fields userId, orderId, amount] | fields userId, name, orderId, amount"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("orderId", "string"), + schema("amount", "float")); + verifyDataRows( + result, + rows("u1", "Alice", "order1", 100.0), + rows("u1", "Alice", "order2", 150.0), + rows("u2", "Bob", "order3", 200.0), + rows("u3", "Charlie", "order4", 75.0)); + } + + @Test + public void testRegularLeftJoinSpathRight() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_WITHOUT_JSON + + " | fields userId, orderId, amount | join userId [source=" + + INDEX_JSON + + " | spath input=userData | fields userId, name, role] | fields userId, orderId," + + " amount, name, role"); + verifySchema( + result, + schema("userId", "string"), + schema("orderId", "string"), + schema("amount", "float"), + schema("name", "string"), + schema("role", "string")); + verifyDataRows( + result, + rows("u1", "order1", 100.0, "Alice", "admin"), + rows("u1", "order2", 150.0, "Alice", "admin"), + rows("u2", "order3", 200.0, "Bob", "user"), + rows("u3", "order4", 75.0, "Charlie", "user")); + } + + @Test + public void testSpathBothSidesJoin() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | where category='user1' | spath input=userData | fields userId, name | join" + + " userId [source=" + + INDEX_JSON + + " | where category='user1' | spath input=userData | fields userId," + + " role] | fields userId, name, role"); + verifySchema( + result, schema("userId", "string"), schema("name", "string"), schema("role", "string")); + verifyDataRows(result, rows("u1", "Alice", "admin")); + } + + @Test + public void testSpathLeftJoinWithDynamicFields() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | join userId [source=" + + INDEX_WITHOUT_JSON + + " | fields userId, orderId, amount] | fields userId, name, *"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("amount", "string"), + schema("category", "string"), + schema("notes", "string"), + schema("orderId", "string"), + schema("role", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows( + "u1", + "Alice", + "100.0", + "user1", + "from_json_index", + "order1", + "admin", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}")), + rows( + "u1", + "Alice", + "150.0", + "user1", + "from_json_index", + "order2", + "admin", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}")), + rows( + "u2", + "Bob", + "200.0", + "user2", + "from_json_index", + "order3", + "user", + sj("{'userId': 'u2', 'name': 'Bob', 'role': 'user', 'notes': 'from_json_index'}")), + rows( + "u3", + "Charlie", + "75.0", + "user3", + "from_json_index", + "order4", + "user", + sj("{'userId': 'u3', 'name': 'Charlie', 'role': 'user', 'notes': 'from_json_index'}"))); + } + + @Test + public void testSpathRightJoinWithDynamicFields() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_WITHOUT_JSON + + " | fields userId, orderId, amount | join userId [source=" + + INDEX_JSON + + " | spath input=userData] | fields userId, name, *"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("amount", "string"), + schema("category", "string"), + schema("notes", "string"), + schema("orderId", "string"), + schema("role", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows( + "u1", + "Alice", + "100.0", + "user1", + "from_json_index", + "order1", + "admin", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}")), + rows( + "u1", + "Alice", + "150.0", + "user1", + "from_json_index", + "order2", + "admin", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}")), + rows( + "u2", + "Bob", + "200.0", + "user2", + "from_json_index", + "order3", + "user", + sj("{'userId': 'u2', 'name': 'Bob', 'role': 'user', 'notes': 'from_json_index'}")), + rows( + "u3", + "Charlie", + "75.0", + "user3", + "from_json_index", + "order4", + "user", + sj("{'userId': 'u3', 'name': 'Charlie', 'role': 'user', 'notes': 'from_json_index'}"))); + } + + @Test + public void testSpathBothSidesJoinWithDynamicFields() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | where category='user1' | spath input=userData | join userId [source=" + + INDEX_JSON + + " | where category='user1' | spath input=userData] | fields userId," + + " name, role, *"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("role", "string"), + schema("category", "string"), + schema("notes", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows( + "u1", + "Alice", + "admin", + "user1", + "from_json_index", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}"))); + } + + @Test + public void testSpathJoinWithOverwriteTrue() throws IOException { + // Default behavior (overwrite=true): right side values overwrite left side + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | fields userId, name, role, notes | join" + + " userId [source=" + + INDEX_JSON_CONFLICT + + " | spath input=userData | fields userId, notes, department] | fields userId," + + " name, role, notes, department | head 1"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("role", "string"), + schema("notes", "string"), + schema("department", "string")); + // Right side notes value should overwrite left side - notes should be from_conflict_index + verifyDataRows(result, rows("u1", "Alice", "admin", "from_conflict_index", "Engineering")); + } + + @Test + public void testSpathJoinWithOverwriteFalse() throws IOException { + // overwrite=false: left side values are preserved when there's a conflict + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | fields userId, name, role, notes | join" + + " overwrite=false userId [source=" + + INDEX_JSON_CONFLICT + + " | spath input=userData | fields userId, notes, department] | fields userId," + + " name, role, notes, department | head 1"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("role", "string"), + schema("notes", "string"), + schema("department", "string")); + // Left side notes value should be preserved - notes should be from_json_index + verifyDataRows(result, rows("u1", "Alice", "admin", "from_json_index", "Engineering")); + } + + @Test + public void testSpathJoinWithCriteria() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | join left=l right=r on" + + " l.userId = r.userId [source=" + + INDEX_WITHOUT_JSON + + " ] | fields userId, r.userId, * | head 1"); + verifySchema( + result, + schema("userId", "string"), + schema("r.userId", "string"), + schema("amount", "string"), + schema("category", "string"), + schema("name", "string"), + schema("notes", "string"), + schema("orderId", "string"), + schema("role", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows( + "u1", + "u1", + "100.0", + "user1", + "Alice", + "from_json_index", + "order1", + "admin", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}"))); + } + + @Test + public void testSpathJoinWithAggregation() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | join userId [source=" + + INDEX_WITHOUT_JSON + + " ] | stats sum(amount) as totalAmount by userId, name"); + verifySchema( + result, + schema("totalAmount", "double"), + schema("userId", "string"), + schema("name", "string")); + verifyDataRows( + result, rows(250.0, "u1", "Alice"), rows(200.0, "u2", "Bob"), rows(75.0, "u3", "Charlie")); + } + + @Test + public void testSpathJoinWithFilter() throws IOException { + JSONObject result = + executeQuery( + "source=" + + INDEX_JSON + + " | spath input=userData | join userId [source=" + + INDEX_WITHOUT_JSON + + " | where amount > 100] | fields userId, name, *"); + verifySchema( + result, + schema("userId", "string"), + schema("name", "string"), + schema("amount", "string"), + schema("category", "string"), + schema("notes", "string"), + schema("orderId", "string"), + schema("role", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows( + "u1", + "Alice", + "150.0", + "user1", + "from_regular_index", + "order2", + "admin", + sj("{'userId': 'u1', 'name': 'Alice', 'role': 'admin', 'notes': 'from_json_index'}")), + rows( + "u2", + "Bob", + "200.0", + "user2", + "from_regular_index", + "order3", + "user", + sj("{'userId': 'u2', 'name': 'Bob', 'role': 'user', 'notes': 'from_json_index'}"))); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java b/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java index 05bc3313dde..2240af796ac 100644 --- a/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java @@ -193,7 +193,11 @@ public static void verifyNumOfRows(JSONObject response, int numOfRow) { public static void verify(JSONArray array, Matcher... matchers) { List objects = new ArrayList<>(); array.iterator().forEachRemaining(o -> objects.add((T) o)); - assertEquals(matchers.length, objects.size()); + assertEquals( + String.format( + "Expected %d, but %d. objects=%s", matchers.length, objects.size(), objects.toString()), + matchers.length, + objects.size()); assertThat(objects, containsInAnyOrder(matchers)); } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_spath_with_dynamic_fields.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_spath_with_dynamic_fields.yaml new file mode 100644 index 00000000000..96d288e7fe1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_spath_with_dynamic_fields.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalFilter(condition=[=($0, '200')]) + LogicalProject(status=[CAST(ITEM(JSON_EXTRACT_ALL($3), 'status')):VARCHAR NOT NULL], _MAP=[map_append(MAP_FROM_ARRAYS(ARRAY('@timestamp', 'created_at', 'level', 'message', 'server'), ARRAY($2, $0, $4, $3, $1)), map_remove(JSON_EXTRACT_ALL($3), ARRAY('status')))]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) + physical: | + EnumerableCalc(expr#0..4=[{inputs}], expr#5=[JSON_EXTRACT_ALL($t0)], expr#6=['status'], expr#7=[ITEM($t5, $t6)], expr#8=[CAST($t7):VARCHAR NOT NULL], expr#9=['@timestamp'], expr#10=['created_at'], expr#11=['level'], expr#12=['message'], expr#13=['server'], expr#14=[ARRAY($t9, $t10, $t11, $t12, $t13)], expr#15=[ARRAY($t1, $t2, $t3, $t0, $t4)], expr#16=[MAP_FROM_ARRAYS($t14, $t15)], expr#17=[ARRAY($t6)], expr#18=[map_remove($t5, $t17)], expr#19=[map_append($t16, $t18)], status=[$t8], _MAP=[$t19]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[message, @timestamp, created_at, level, server], SCRIPT->=(CAST(ITEM(JSON_EXTRACT_ALL($0), 'status')):VARCHAR NOT NULL, '200'), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","query":{"script":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQHkXsKICAib3AiOiB7CiAgICAibmFtZSI6ICI9IiwKICAgICJraW5kIjogIkVRVUFMUyIsCiAgICAic3ludGF4IjogIkJJTkFSWSIKICB9LAogICJvcGVyYW5kcyI6IFsKICAgIHsKICAgICAgIm9wIjogewogICAgICAgICJuYW1lIjogIkNBU1QiLAogICAgICAgICJraW5kIjogIkNBU1QiLAogICAgICAgICJzeW50YXgiOiAiU1BFQ0lBTCIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiSVRFTSIsCiAgICAgICAgICAgICJraW5kIjogIklURU0iLAogICAgICAgICAgICAic3ludGF4IjogIlNQRUNJQUwiCiAgICAgICAgICB9LAogICAgICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgIm9wIjogewogICAgICAgICAgICAgICAgIm5hbWUiOiAiSlNPTl9FWFRSQUNUX0FMTCIsCiAgICAgICAgICAgICAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAgICAgICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgICAgICAgICAgewogICAgICAgICAgICAgICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICAgICAgfQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICAgIF0sCiAgICAgICAgICAgICAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICAgICAgICAgICAgICJ0eXBlIjogewogICAgICAgICAgICAgICAgInR5cGUiOiAiTUFQIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAia2V5IjogewogICAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICAgIm51bGxhYmxlIjogZmFsc2UsCiAgICAgICAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgICAgICAgfSwKICAgICAgICAgICAgICAgICJ2YWx1ZSI6IHsKICAgICAgICAgICAgICAgICAgInR5cGUiOiAiQU5ZIiwKICAgICAgICAgICAgICAgICAgIm51bGxhYmxlIjogZmFsc2UsCiAgICAgICAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMSwKICAgICAgICAgICAgICAgICAgInNjYWxlIjogLTIxNDc0ODM2NDgKICAgICAgICAgICAgICAgIH0KICAgICAgICAgICAgICB9LAogICAgICAgICAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgICAgICAgICAiZHluYW1pYyI6IGZhbHNlCiAgICAgICAgICAgIH0sCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAiZHluYW1pY1BhcmFtIjogMSwKICAgICAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgICAgIH0KICAgICAgICAgICAgfQogICAgICAgICAgXQogICAgICAgIH0KICAgICAgXSwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogZmFsc2UsCiAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgIH0KICAgIH0sCiAgICB7CiAgICAgICJkeW5hbWljUGFyYW0iOiAyLAogICAgICAidHlwZSI6IHsKICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9CiAgXQp9\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,2,2],"DIGESTS":["message.keyword","status","200"]}},"boost":1.0}},"_source":{"includes":["message","@timestamp","created_at","level","server"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_spath_with_dynamic_fields.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_spath_with_dynamic_fields.yaml new file mode 100644 index 00000000000..6d9f4598b1b --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_spath_with_dynamic_fields.yaml @@ -0,0 +1,10 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalFilter(condition=[=($0, '200')]) + LogicalProject(status=[CAST(ITEM(JSON_EXTRACT_ALL($3), 'status')):VARCHAR NOT NULL], _MAP=[map_append(MAP_FROM_ARRAYS(ARRAY('@timestamp', 'created_at', 'level', 'message', 'server'), ARRAY($2, $0, $4, $3, $1)), map_remove(JSON_EXTRACT_ALL($3), ARRAY('status')))]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..10=[{inputs}], expr#11=[JSON_EXTRACT_ALL($t3)], expr#12=['status'], expr#13=[ITEM($t11, $t12)], expr#14=[CAST($t13):VARCHAR NOT NULL], expr#15=['@timestamp'], expr#16=['created_at'], expr#17=['level'], expr#18=['message'], expr#19=['server'], expr#20=[ARRAY($t15, $t16, $t17, $t18, $t19)], expr#21=[ARRAY($t2, $t0, $t4, $t3, $t1)], expr#22=[MAP_FROM_ARRAYS($t20, $t21)], expr#23=[ARRAY($t12)], expr#24=[map_remove($t11, $t23)], expr#25=[map_append($t22, $t24)], expr#26=['200':VARCHAR], expr#27=[=($t14, $t26)], status=[$t14], _MAP=[$t25], $condition=[$t27]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 415aff204ca..c591f758051 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -43,6 +43,7 @@ import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners; +import org.opensearch.sql.calcite.utils.DynamicFieldsResultProcessor; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; import org.opensearch.sql.common.response.ResponseListener; @@ -319,7 +320,9 @@ private QueryResponse buildResultSet( } Schema schema = new Schema(columns); QueryResponse response = new QueryResponse(schema, values, null); - return response; + QueryResponse processedResponse = DynamicFieldsResultProcessor.expandDynamicFields(response); + + return processedResponse; } /** Registers opensearch-dependent functions */ diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index ab07cd9b5c1..ef7acb2f99f 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -103,6 +103,7 @@ private CalcitePlanContext createBuilderContext(UnaryOperator public RelNode getRelNode(String ppl) { CalcitePlanContext context = createBuilderContext(); Query query = (Query) plan(pplParser, ppl); + context.setRootNode(query.getPlan()); planTransformer.analyze(query.getPlan(), context); RelNode root = context.relBuilder.build(); root = mergeAdjacentFilters(root); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLSpathTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLSpathTest.java index 57b11d83150..6eaa87f5eaa 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLSpathTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLSpathTest.java @@ -45,4 +45,19 @@ public void testEvalWithOutput() { "SELECT JSON_EXTRACT(`ENAME`, 'src.path') `custom`\n" + "FROM `scott`.`EMP`"; verifyPPLToSparkSQL(root, expectedSparkSql); } + + @Test + public void testSpathWithoutPath() { + String ppl = "source=EMP | spath input=ENAME | fields custom"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(custom=[CAST(ITEM(JSON_EXTRACT_ALL($1), 'custom')):VARCHAR NOT NULL])\n" + + // + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT CAST(JSON_EXTRACT_ALL(`ENAME`)['custom'] AS STRING) `custom`\nFROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java index c7d8de43964..a4d4f4874ad 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java @@ -8,8 +8,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.junit.Before; @@ -18,6 +20,7 @@ import org.opensearch.sql.ast.analysis.FieldResolutionResult; import org.opensearch.sql.ast.analysis.FieldResolutionVisitor; import org.opensearch.sql.ast.tree.Relation; +import org.opensearch.sql.ast.tree.SPath; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; @@ -34,46 +37,6 @@ public void setUp() { when(settings.getSettingValue(Settings.Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10); } - private UnresolvedPlan parse(String query) { - AstBuilder astBuilder = new AstBuilder(query, settings); - return astBuilder.visit(parser.parse(query)); - } - - private FieldResolutionResult getSingleRelationResult(String query) { - UnresolvedPlan plan = parse(query); - Map results = visitor.analyze(plan); - UnresolvedPlan relation = results.keySet().iterator().next(); - return results.get(relation); - } - - private void assertSingleRelationFields( - String query, Set expectedFields, String expectedWildcard) { - FieldResolutionResult result = getSingleRelationResult(query); - assertEquals(expectedFields, result.getRegularFields()); - assertEquals(expectedWildcard, result.getWildcard().toString()); - } - - private void assertJoinRelationFields( - String query, Map expectedResultsByTable) { - UnresolvedPlan plan = parse(query); - Map results = visitor.analyze(plan); - - assertEquals(expectedResultsByTable.size(), results.size()); - - for (Map.Entry entry : results.entrySet()) { - if (!(entry.getKey() instanceof Relation)) { - continue; - } - String tableName = ((Relation) entry.getKey()).getTableQualifiedName().toString(); - FieldResolutionResult expectedResult = expectedResultsByTable.get(tableName); - - if (expectedResult != null) { - assertEquals(expectedResult.getRegularFields(), entry.getValue().getRegularFields()); - assertEquals(expectedResult.getWildcard(), entry.getValue().getWildcard()); - } - } - } - @Test public void testSimpleRelation() { assertSingleRelationFields("source=logs", Set.of(), "*"); @@ -211,7 +174,7 @@ public void testMultiRelationResult() { @Test public void testSimpleJoin() { - assertJoinRelationFields( + assertMultiRelationFields( "source=logs1 | join left=l right=r ON l.id = r.id logs2", Map.of( "logs1", new FieldResolutionResult(Set.of("id"), "*"), @@ -220,7 +183,7 @@ public void testSimpleJoin() { @Test public void testJoinWithFilter() { - assertJoinRelationFields( + assertMultiRelationFields( "source=logs1 | where status > 200 | join left=l right=r ON l.id = r.id logs2", Map.of( "logs1", new FieldResolutionResult(Set.of("status", "id"), "*"), @@ -229,7 +192,7 @@ public void testJoinWithFilter() { @Test public void testJoinWithProject() { - assertJoinRelationFields( + assertMultiRelationFields( "source=logs1 | join left=l right=r ON l.id = r.id logs2 | fields l.name, r.value", Map.of( "logs1", new FieldResolutionResult(Set.of("name", "id")), @@ -238,7 +201,7 @@ public void testJoinWithProject() { @Test public void testJoinWithNestedFields() { - assertJoinRelationFields( + assertMultiRelationFields( "source=logs1 | join left=l right=r ON l.id = r.id logs2 | fields l.name, r.value, field," + " nested.field", Map.of( @@ -274,7 +237,7 @@ public void testSelfJoin() { @Test public void testJoinWithAggregation() { - assertJoinRelationFields( + assertMultiRelationFields( "source=logs1 | join left=l right=r ON l.id = r.id logs2 | stats count() by l.region", Map.of( "logs1", new FieldResolutionResult(Set.of("region", "id")), @@ -283,7 +246,7 @@ public void testJoinWithAggregation() { @Test public void testJoinWithSubsearch() { - assertJoinRelationFields( + assertMultiRelationFields( "source=idx1 | where b > 1 | join a [source=idx2 | where c > 2 ] | eval result = c * d", Map.of( "idx1", new FieldResolutionResult(Set.of("a", "b", "c", "d"), "*"), @@ -345,6 +308,49 @@ public void testRenameCommand() { assertSingleRelationFields("source=logs | rename old_name as new_name", Set.of(), "*"); } + @Test + public void testFillnull() { + assertSingleRelationFields( + "source=logs | fillnull with 'NULL' in a, b | fields c, *", Set.of("a", "b", "c"), "*"); + assertSingleRelationFields( + "source=logs | fillnull using a = 'NULL', b = 'NULL' | fields c, *", + Set.of("a", "b", "c"), + "*"); + assertSingleRelationFields( + "source=logs | fillnull value='NULL' a, b | fields c, *", Set.of("a", "b", "c"), "*"); + } + + @Test + public void testFillnullWithoutFields() { + assertThrows( + "Fields need to be specified with fillnull command", + IllegalArgumentException.class, + () -> visitor.analyze(parse("source=logs | fillnull with 'NULL'"))); + } + + @Test + public void testReplaceCommand() { + assertSingleRelationFields( + "source=logs | replace 'IL' WITH 'Illinois' IN a, b | fields c, *", + Set.of("a", "b", "c"), + "*"); + } + + @Test + public void testSpathCommand() { + String query = "source=logs | spath input=json | fields a, *"; + assertSingleRelationFields(query, Set.of("a", "json"), "*"); + assertSingleSpathFields(query, Set.of("a"), "*"); + } + + @Test + public void testSpathTwice() { + assertSingleRelationFields( + "source=logs | spath input=json | spath input=doc | fields a, *", + Set.of("a", "doc", "json"), + "*"); + } + @Test public void testUnimplementedVisitDetected() { assertThrows( @@ -352,4 +358,117 @@ public void testUnimplementedVisitDetected() { IllegalArgumentException.class, () -> visitor.analyze(parse("source=idx1 | kmeans centroids=3"))); } + + @Test + public void testAppend() { + String query = + "source=main | where testCase='simple' | eval c = 4 | " + + "append [source=sub | where testCase='simple' ] | fields a, c, *"; + assertMultiRelationFields( + query, + Map.of( + "main", new FieldResolutionResult(Set.of("a", "testCase"), "*"), + "sub", new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"))); + } + + @Test + public void testAppendWithSpathInMain() { + String query = + "source=main | where testCase='simple' | spath input=doc | " + + "append [source=sub | where testCase='simple' | eval d = 4] | fields a, c, *"; + assertMultiRelationFields( + query, + Map.of( + "main", new FieldResolutionResult(Set.of("a", "c", "doc", "testCase"), "*"), + "sub", new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"))); + assertSingleSpathFields(query, Set.of("a", "c"), "*"); + } + + @Test + public void testAppendWithSpathSubquery() { + String query = + "source=main | where testCase='simple' | append [source=sub | where testCase='simple' |" + + " spath input=doc | eval c = 4] | fields a, c, *"; + assertMultiRelationFields( + query, + Map.of( + "main", new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"), + "sub", new FieldResolutionResult(Set.of("a", "doc", "testCase"), "*"))); + assertSingleSpathFields(query, Set.of("a"), "*"); + } + + private UnresolvedPlan parse(String query) { + AstBuilder astBuilder = new AstBuilder(query, settings); + return astBuilder.visit(parser.parse(query)); + } + + private FieldResolutionResult getSingleRelationResult(String query) { + UnresolvedPlan plan = parse(query); + Map results = visitor.analyze(plan); + return getRelationResult(results); + } + + private FieldResolutionResult getRelationResult( + Map results) { + for (UnresolvedPlan key : results.keySet()) { + if (key instanceof Relation) { + return results.get(key); + } + } + fail("Relation result not found"); + return null; + } + + private FieldResolutionResult getSingleSpathResult(String query) { + UnresolvedPlan plan = parse(query); + Map results = visitor.analyze(plan); + return getSpathResult(results); + } + + private FieldResolutionResult getSpathResult(Map results) { + for (UnresolvedPlan key : results.keySet()) { + if (key instanceof SPath) { + return results.get(key); + } + } + fail("Spath result not found"); + return null; + } + + private void assertSingleSpathFields( + String query, Set expectedFields, String expectedWildcard) { + FieldResolutionResult result = getSingleSpathResult(query); + assertEquals(expectedFields, result.getRegularFields()); + assertEquals(expectedWildcard, result.getWildcard().toString()); + } + + private void assertSingleRelationFields( + String query, Set expectedFields, String expectedWildcard) { + FieldResolutionResult result = getSingleRelationResult(query); + assertEquals(expectedFields, result.getRegularFields()); + assertEquals(expectedWildcard, result.getWildcard().toString()); + } + + private void assertMultiRelationFields( + String query, Map expectedResultsByTable) { + UnresolvedPlan plan = parse(query); + Map results = visitor.analyze(plan); + + Set foundTables = new HashSet<>(); + for (Map.Entry entry : results.entrySet()) { + if (!(entry.getKey() instanceof Relation)) { + continue; + } + String tableName = ((Relation) entry.getKey()).getTableQualifiedName().toString(); + FieldResolutionResult expectedResult = expectedResultsByTable.get(tableName); + + if (expectedResult != null) { + assertEquals(expectedResult.getRegularFields(), entry.getValue().getRegularFields()); + assertEquals(expectedResult.getWildcard(), entry.getValue().getWildcard()); + foundTables.add(tableName); + } + } + + assertEquals(expectedResultsByTable.size(), foundTables.size()); + } }