-
Notifications
You must be signed in to change notification settings - Fork 2.5k
fix: Handle nested map and array columns in MDT #17694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2b3c18a
321278a
f6ca08c
6089540
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,41 @@ public final class HoodieSchemaUtils { | |
| public static final HoodieSchema METADATA_FIELD_SCHEMA = HoodieSchema.createNullable(HoodieSchemaType.STRING); | ||
| public static final HoodieSchema RECORD_KEY_SCHEMA = initRecordKeySchema(); | ||
|
|
||
| /** | ||
| * Constants for Parquet-style accessor patterns used in nested MAP and ARRAY navigation. | ||
| * These patterns are specifically used for column stats generation and differ from | ||
| * InternalSchema constants which are used in schema evolution contexts. | ||
| */ | ||
| private static final String ARRAY_LIST = "list"; | ||
| private static final String ARRAY_ELEMENT = "element"; | ||
| private static final String ARRAY_SPARK = "array"; // Spark writer uses this | ||
| private static final String MAP_KEY_VALUE = "key_value"; | ||
| private static final String MAP_KEY = "key"; | ||
| private static final String MAP_VALUE = "value"; | ||
|
|
||
| private static final String ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT; | ||
| private static final String MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY; | ||
| private static final String MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE; | ||
|
|
||
| /** | ||
| * Advances offset past a component name in the path, handling end-of-path and dot separator. | ||
| * | ||
| * @param path the full path string | ||
| * @param offset current position in path | ||
| * @param component the component name to match (e.g., "element", "key", "value") | ||
| * @return new offset after component and dot, or path.length() if at end, or -1 if no match | ||
| */ | ||
| private static int getNextOffset(String path, int offset, String component) { | ||
| if (!path.regionMatches(offset, component, 0, component.length())) { | ||
| return -1; | ||
| } | ||
| int next = offset + component.length(); | ||
| if (next == path.length()) { | ||
| return next; | ||
| } | ||
| return (path.charAt(next) == '.') ? next + 1 : -1; | ||
| } | ||
|
|
||
| // Private constructor to prevent instantiation | ||
| private HoodieSchemaUtils() { | ||
| throw new UnsupportedOperationException("Utility class cannot be instantiated"); | ||
|
|
@@ -395,44 +430,161 @@ public static BigDecimal convertBytesToBigDecimal(byte[] value, int precision, i | |
| public static Option<Pair<String, HoodieSchemaField>> getNestedField(HoodieSchema schema, String fieldName) { | ||
| ValidationUtils.checkArgument(schema != null, "Schema cannot be null"); | ||
| ValidationUtils.checkArgument(fieldName != null && !fieldName.isEmpty(), "Field name cannot be null or empty"); | ||
| return getNestedFieldInternal(schema, fieldName, ""); | ||
| return getNestedFieldInternal(schema, fieldName, 0, ""); | ||
| } | ||
|
|
||
| /** | ||
| * Internal helper method for recursively retrieving nested fields. | ||
| * Internal helper method for recursively retrieving nested fields using offset-based navigation. | ||
| * | ||
| * @param schema the current schema to search in | ||
| * @param fieldName the remaining field path | ||
| * @param prefix the accumulated field path prefix | ||
| * @return Option containing Pair of canonical field name and the HoodieSchemaField, or Option.empty() if field not found | ||
| * <p>Supports nested field access using dot notation including MAP and ARRAY types | ||
| * using Parquet-style accessor patterns:</p> | ||
| * | ||
| * <ul> | ||
| * <li><b>RECORD types:</b> Standard dot notation (e.g., {@code "user.profile.name"})</li> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not have official public docs supporting this feature now. If users were to select a nested (ARRAY/MAP) field as part of the column stats index list, they'll need to follow this format to define the If we follow through with this, this will need a doc. |
||
| * <li><b>ARRAY types:</b> Use {@code ".list.element"} to access array elements | ||
| * <ul> | ||
| * <li>Example: {@code "items.list.element"} accesses element schema of array</li> | ||
| * <li>Example: {@code "items.list.element.id"} accesses nested field within array elements</li> | ||
| * </ul> | ||
| * </li> | ||
| * <li><b>MAP types:</b> Use {@code ".key_value.key"} or {@code ".key_value.value"} to access map components | ||
| * <ul> | ||
| * <li>Example: {@code "metadata.key_value.key"} accesses map keys (always STRING)</li> | ||
| * <li>Example: {@code "metadata.key_value.value"} accesses map value schema</li> | ||
| * <li>Example: {@code "nested_map.key_value.value.field"} accesses nested field within map values</li> | ||
| * </ul> | ||
| * </li> | ||
| * </ul> | ||
| * | ||
| * <p><b>Note:</b> These accessor patterns ({@code .list.element} and {@code .key_value.key/value}) | ||
| * are specifically used for column stats generation and follow Parquet's logical type structure. | ||
| * This differs from {@link org.apache.hudi.internal.schema.InternalSchema} constants | ||
| * ({@code ARRAY_ELEMENT}, {@code MAP_KEY}, {@code MAP_VALUE}) which are used in other contexts | ||
| * like schema evolution.</p> | ||
| * | ||
| * @param schema the current schema to search in | ||
| * @param fullPath the full field path string | ||
| * @param offset current position in fullPath | ||
| * @param prefix the accumulated field path prefix | ||
| * @return Option containing a pair of canonical field name and the HoodieSchemaField, or Option.empty() if field not found | ||
| */ | ||
| private static Option<Pair<String, HoodieSchemaField>> getNestedFieldInternal(HoodieSchema schema, String fieldName, String prefix) { | ||
| private static Option<Pair<String, HoodieSchemaField>> getNestedFieldInternal( | ||
| HoodieSchema schema, String fullPath, int offset, String prefix) { | ||
| HoodieSchema nonNullableSchema = getNonNullTypeFromUnion(schema); | ||
|
|
||
| if (!fieldName.contains(".")) { | ||
| // Base case: simple field name | ||
| int nextDot = fullPath.indexOf('.', offset); | ||
| // Terminal case: no more dots in this segment | ||
| if (nextDot == -1) { | ||
| if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) { | ||
| return Option.empty(); | ||
| } | ||
| String fieldName = fullPath.substring(offset); | ||
| return nonNullableSchema.getField(fieldName) | ||
| .map(field -> Pair.of(prefix + fieldName, field)); | ||
| } else { | ||
| // Recursive case: nested field | ||
| if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) { | ||
| return Option.empty(); | ||
| } | ||
| // Recursive case: more nesting to explore | ||
| String rootFieldName = fullPath.substring(offset, nextDot); | ||
| int nextOffset = nextDot + 1; | ||
| // Handle RECORD: standard field navigation | ||
| if (nonNullableSchema.getType() == HoodieSchemaType.RECORD) { | ||
| return nonNullableSchema.getField(rootFieldName) | ||
| .flatMap(f -> getNestedFieldInternal(f.schema(), fullPath, nextOffset, prefix + rootFieldName + ".")); | ||
| } | ||
| // Handle ARRAY: expect ".list.element" (Avro) or ".array" (Spark) | ||
| if (nonNullableSchema.getType() == HoodieSchemaType.ARRAY && ARRAY_LIST.equals(rootFieldName)) { | ||
| return handleArrayNavigationAvro(nonNullableSchema, fullPath, nextOffset, prefix); | ||
| } | ||
| if (nonNullableSchema.getType() == HoodieSchemaType.ARRAY && ARRAY_SPARK.equals(rootFieldName)) { | ||
| return handleArrayNavigationSpark(nonNullableSchema, fullPath, nextOffset, prefix); | ||
| } | ||
| // Handle MAP: expect ".key_value.key" or ".key_value.value" | ||
| if (nonNullableSchema.getType() == HoodieSchemaType.MAP && MAP_KEY_VALUE.equals(rootFieldName)) { | ||
| return handleMapNavigation(nonNullableSchema, fullPath, nextOffset, prefix); | ||
| } | ||
| return Option.empty(); | ||
| } | ||
|
|
||
| /** | ||
| * Handles navigation into ARRAY types using the Avro ".list.element" pattern. | ||
| * | ||
| * @param arraySchema the ARRAY schema to navigate into | ||
| * @param fullPath the full field path string | ||
| * @param offset current position in fullPath (should point to "element") | ||
| * @param prefix the accumulated field path prefix | ||
| * @return Option containing the nested field, or Option.empty() if invalid path | ||
| */ | ||
| private static Option<Pair<String, HoodieSchemaField>> handleArrayNavigationAvro( | ||
| HoodieSchema arraySchema, String fullPath, int offset, String prefix) { | ||
| int nextPos = getNextOffset(fullPath, offset, ARRAY_ELEMENT); | ||
| if (nextPos == -1) { | ||
| return Option.empty(); | ||
| } | ||
|
|
||
| HoodieSchema elSchema = arraySchema.getElementType(); | ||
| if (nextPos == fullPath.length()) { | ||
| return Option.of(Pair.of(prefix + ARRAY_LIST_ELEMENT, | ||
| HoodieSchemaField.of(ARRAY_ELEMENT, elSchema, null, null))); | ||
| } | ||
| return getNestedFieldInternal(elSchema, fullPath, nextPos, prefix + ARRAY_LIST_ELEMENT + "."); | ||
| } | ||
|
|
||
| /** | ||
| * Handles navigation into ARRAY types using the Spark ".array" pattern. | ||
| * | ||
| * @param arraySchema the ARRAY schema to navigate into | ||
| * @param fullPath the full field path string | ||
| * @param offset current position in fullPath (points after "array") | ||
| * @param prefix the accumulated field path prefix | ||
| * @return Option containing the nested field, or Option.empty() if invalid path | ||
| */ | ||
| private static Option<Pair<String, HoodieSchemaField>> handleArrayNavigationSpark( | ||
| HoodieSchema arraySchema, String fullPath, int offset, String prefix) { | ||
| HoodieSchema elSchema = arraySchema.getElementType(); | ||
|
|
||
| // Check if we're at the end (just accessing "array" itself) | ||
| if (offset == fullPath.length()) { | ||
| return Option.of(Pair.of(prefix + ARRAY_SPARK, | ||
| HoodieSchemaField.of(ARRAY_SPARK, elSchema, null, null))); | ||
| } | ||
|
|
||
| // The dot after "array" has already been consumed by the caller (nextDot + 1), | ||
| // so we can directly continue navigating into the element type | ||
| return getNestedFieldInternal(elSchema, fullPath, offset, prefix + ARRAY_SPARK + "."); | ||
| } | ||
|
|
||
| /** | ||
| * Handles navigation into MAP types using the Parquet-style ".key_value.key" or ".key_value.value" patterns. | ||
| * | ||
| * @param mapSchema the MAP schema to navigate into | ||
| * @param fullPath the full field path string | ||
| * @param offset current position in fullPath (should point to "key" or "value") | ||
| * @param prefix the accumulated field path prefix | ||
| * @return Option containing the nested field, or Option.empty() if invalid path | ||
| */ | ||
| private static Option<Pair<String, HoodieSchemaField>> handleMapNavigation( | ||
| HoodieSchema mapSchema, String fullPath, int offset, String prefix) { | ||
| // Check for "key" path | ||
| int keyPos = getNextOffset(fullPath, offset, MAP_KEY); | ||
| if (keyPos != -1) { | ||
| if (keyPos == fullPath.length()) { | ||
| return Option.of(Pair.of(prefix + MAP_KEY_VALUE_KEY, | ||
| HoodieSchemaField.of(MAP_KEY, mapSchema.getKeyType(), null, null))); | ||
| } | ||
| // Map keys are primitives, cannot navigate further | ||
| return Option.empty(); | ||
| } | ||
|
|
||
| int dotIndex = fieldName.indexOf("."); | ||
| String rootFieldName = fieldName.substring(0, dotIndex); | ||
| String remainingPath = fieldName.substring(dotIndex + 1); | ||
| // Check for "value" path | ||
| int valPos = getNextOffset(fullPath, offset, MAP_VALUE); | ||
| if (valPos == -1) { | ||
| return Option.empty(); | ||
| } | ||
|
|
||
| return nonNullableSchema.getField(rootFieldName) | ||
| .flatMap(rootField -> getNestedFieldInternal( | ||
| rootField.schema(), | ||
| remainingPath, | ||
| prefix + rootFieldName + "." | ||
| )); | ||
| HoodieSchema vSchema = mapSchema.getValueType(); | ||
| if (valPos == fullPath.length()) { | ||
| return Option.of(Pair.of(prefix + MAP_KEY_VALUE_VALUE, | ||
| HoodieSchemaField.of(MAP_VALUE, vSchema, null, null))); | ||
| } | ||
| return getNestedFieldInternal(vSchema, fullPath, valPos, prefix + MAP_KEY_VALUE_VALUE + "."); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can collapse this too so
HoodieSchemaUtilsin spark can use this.