Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DH-18539: Fix incorrect snapshot results on historical sorted rollups. #6642

Merged
merged 16 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,16 @@ public interface Table extends
* implementation.
*/
String AGGREGATION_ROW_LOOKUP_ATTRIBUTE = "AggregationRowLookup";

/**
* Attribute on sort results used for hierarchical table construction. Specification is left to the implementation.
*/
String SORT_REVERSE_LOOKUP_ATTRIBUTE = "SortReverseLookup";
/**
* Attribute on sort results used for hierarchical table construction. Specificaiton is left to the implementation.
cpwright marked this conversation as resolved.
Show resolved Hide resolved
*/
String SORT_ROW_REDIRECTION_ATTRIBUTE = "SortRowRedirection";

String SNAPSHOT_VIEWPORT_TYPE = "Snapshot";
/**
* This attribute is used internally by TableTools.merge to detect successive merges. Its presence indicates that it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.*;
import java.util.function.LongUnaryOperator;

import static io.deephaven.engine.table.Table.SORT_REVERSE_LOOKUP_ATTRIBUTE;
import static io.deephaven.engine.table.Table.SORT_ROW_REDIRECTION_ATTRIBUTE;

public class SortOperation implements QueryTable.MemoizableOperation<QueryTable> {
static final Map<String, Object> IDENTITY_REDIRECTION_ATTRIBUTES;
// The "+" sign is not valid in a column, therefore we can be sure that this is a proper sentinel value.
static final String IDENTITY_REDIRECTION_VALUE = "+IDENTITY_REDIRECTION";
cpwright marked this conversation as resolved.
Show resolved Hide resolved
static {
final HashMap<String, Object> identityRedirectionAttributes = new HashMap<>();
identityRedirectionAttributes.put(SORT_ROW_REDIRECTION_ATTRIBUTE, IDENTITY_REDIRECTION_VALUE);
IDENTITY_REDIRECTION_ATTRIBUTES = Collections.unmodifiableMap(identityRedirectionAttributes);
}

private final QueryTable parent;
private QueryTable resultTable;
Expand Down Expand Up @@ -131,18 +138,17 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) {
return withSorted(parent);
}


cpwright marked this conversation as resolved.
Show resolved Hide resolved
final WritableRowRedirection sortMapping = sortedKeys.makeHistoricalRowRedirection();
final TrackingRowSet resultRowSet = RowSetFactory.flat(sortedKeys.size()).toTracking();

final Map<String, ColumnSource<?>> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}
final String sortMappingColumnName = populateRedirectedColumns(resultMap, sortMapping);

resultTable = new QueryTable(resultRowSet, resultMap);
parent.copyAttributes(resultTable, BaseTable.CopyAttributeOperation.Sort);
resultTable.setFlat();
resultTable.setAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE, sortMappingColumnName);
setSorted(resultTable);
return resultTable;
}
Expand Down Expand Up @@ -228,7 +234,8 @@ private void setSorted(QueryTable table) {
}

private QueryTable withSorted(QueryTable table) {
return (QueryTable) SortedColumnsAttribute.withOrderForColumn(table, sortColumnNames[0], sortOrder[0]);
return (QueryTable) SortedColumnsAttribute.withOrderForColumn(table, sortColumnNames[0], sortOrder[0],
IDENTITY_REDIRECTION_ATTRIBUTES);
}

@Override
Expand Down Expand Up @@ -280,10 +287,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
sortMapping.writableCast().fillFromChunk(fillFromContext, LongChunk.chunkWrap(sortedKeys),
closer.add(resultRowSet.copy()));

for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}
String sortMappingColumnName = populateRedirectedColumns(resultMap, sortMapping);

// noinspection unchecked
final ColumnSource<Comparable<?>>[] sortedColumnsToSortBy =
Expand All @@ -298,6 +302,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {

resultTable = new QueryTable(resultRowSet, resultMap);
parent.copyAttributes(resultTable, BaseTable.CopyAttributeOperation.Sort);
resultTable.setAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE, sortMappingColumnName);
setReverseLookup(resultTable, (final long innerRowKey) -> {
final long outerRowKey = reverseLookup.get(innerRowKey);
return outerRowKey == reverseLookup.getNoEntryValue() ? RowSequence.NULL_ROW_KEY : outerRowKey;
Expand All @@ -320,19 +325,34 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
}
}

private String populateRedirectedColumns(Map<String, ColumnSource<?>> resultMap, RowRedirection sortMapping) {
// if nothing is actually redirected, we can use the identity value
String sortMappingColumnName = IDENTITY_REDIRECTION_VALUE;

for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
final ColumnSource<?> innerSource = stringColumnSourceEntry.getValue();
final ColumnSource<?> redirectedSource = RedirectedColumnSource.maybeRedirect(sortMapping, innerSource);
resultMap.put(stringColumnSourceEntry.getKey(), redirectedSource);
if (redirectedSource != innerSource) {
sortMappingColumnName = stringColumnSourceEntry.getKey();
}
}
return sortMappingColumnName;
}

/**
* Get the row redirection for a sort result.
*
* @param sortResult The sort result table; <em>must</em> be the direct result of a sort.
* @return The row redirection if at least one column required redirection, otherwise {@code null}
* @return The row redirection for this table if at least one column required redirection, otherwise {@code null}
*/
public static RowRedirection getRowRedirection(@NotNull final Table sortResult) {
for (final ColumnSource<?> columnSource : sortResult.getColumnSources()) {
if (columnSource instanceof RedirectedColumnSource) {
return ((RedirectedColumnSource<?>) columnSource).getRowRedirection();
}
final String columnName = (String) sortResult.getAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE);
if (columnName == null || columnName.equals(IDENTITY_REDIRECTION_VALUE)) {
return null;
}
return null;

return ((RedirectedColumnSource<?>) sortResult.getColumnSource(columnName)).getRowRedirection();
}

/**
Expand All @@ -351,7 +371,7 @@ public static RowRedirection getRowRedirection(@NotNull final Table sortResult)
*
* @param parent The sort input table; must have been sorted in order to produce {@code sortResult}
* @param sortResult The sort result table; <em>must</em> be the direct result of a sort on {@code parent}
* @return The reverse lookup
* @return The reverse lookup, or null if no redirection is performed.
*/
public static LongUnaryOperator getReverseLookup(@NotNull final Table parent, @NotNull final Table sortResult) {
if (BlinkTableTools.isBlink(parent)) {
Expand All @@ -365,9 +385,8 @@ public static LongUnaryOperator getReverseLookup(@NotNull final Table parent, @N
return (LongUnaryOperator) value;
}
final RowRedirection sortRedirection = getRowRedirection(sortResult);
if (sortRedirection == null || sortRedirection == getRowRedirection(parent)) {
// Static table was already sorted
return LongUnaryOperator.identity();
if (sortRedirection == null) {
return null;
}
final HashMapK4V4 reverseLookup = new HashMapLockFreeK4V4(sortResult.intSize(), .75f, RowSequence.NULL_ROW_KEY);
try (final LongColumnIterator innerRowKeys =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ public static Table withOrderForColumn(Table table, String columnName, SortingOr
return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute));
}

/**
* Ensure that the result table is marked as sorted by the given column.
*
* @param table the table to update
* @param columnName the column to update
* @param order the order that the column is sorted in
* @return {@code table}, or a copy of it with the necessary attribute set
*/
public static Table withOrderForColumn(Table table, String columnName, SortingOrder order,
Map<String, ?> additionalAttributes) {
final String oldAttribute = (String) table.getAttribute(Table.SORTED_COLUMNS_ATTRIBUTE);
final String newAttribute = setOrderForColumn(oldAttribute, columnName, order);
if (additionalAttributes.isEmpty()) {
return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute));
} else {
final Map<String, Object> attributesToAdd = new LinkedHashMap<>();
attributesToAdd.putAll(additionalAttributes);
attributesToAdd.put(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute);
return table.withAttributes(attributesToAdd);
}
}

/**
* Get the columns a {@link Table} is sorted by.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ private void doFillChunk(@NotNull final ColumnSource.FillContext context,

if (ascendingMapping) {
effectiveContext.doOrderedFillAscending(innerSource, usePrev, destination);
} else if (innerSource instanceof FillUnordered) {
} else if (FillUnordered.providesFillUnordered(innerSource)) {
// noinspection unchecked
effectiveContext.doUnorderedFill((FillUnordered<Values>) innerSource, usePrev, destination);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
package io.deephaven.engine.table.impl;

import io.deephaven.api.ColumnName;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.csv.CsvTools;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.*;
Expand All @@ -20,14 +25,17 @@
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.TableTools;
import io.deephaven.test.types.OutOfBandTest;

import junit.framework.TestCase;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.BitSet;
Expand All @@ -44,13 +52,8 @@
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.byteToBooleanSource;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.longToInstantSource;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.maybeConvertToPrimitiveChunkType;
import static io.deephaven.engine.testutil.TstUtils.addToTable;
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable;
import static io.deephaven.engine.util.TableTools.booleanCol;
import static io.deephaven.engine.util.TableTools.byteCol;
import static io.deephaven.engine.util.TableTools.intCol;
import static io.deephaven.engine.util.TableTools.newTable;
import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.*;
import static io.deephaven.util.QueryConstants.NULL_INT;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -201,6 +204,62 @@ public void testTreeSnapshotSatisfaction() throws ExecutionException, Interrupte
concurrentExecutor.shutdown();
}

@Test
public void testSortedExpandAll() throws CsvReaderException {
final String data = "A,B,C,N\n" +
"Apple,One,Alpha,1\n" +
"Apple,One,Alpha,2\n" +
"Apple,One,Bravo,3\n" +
"Apple,One,Bravo,4\n" +
"Apple,One,Bravo,5\n" +
"Apple,One,Bravo,6\n" +
"Banana,Two,Alpha,7\n" +
"Banana,Two,Alpha,8\n" +
"Banana,Two,Bravo,3\n" +
"Banana,Two,Bravo,4\n" +
"Banana,Three,Bravo,1\n" +
"Banana,Three,Bravo,1\n";

final Table source = CsvTools.readCsv(new ByteArrayInputStream(data.getBytes()));

TableTools.show(source);
final RollupTable rollupTable = source.rollup(List.of(Aggregation.of(AggSpec.sum(), "N")), "A", "B", "C");
final RollupTable sortedRollup = rollupTable.withNodeOperations(
rollupTable.makeNodeOperationsRecorder(RollupTable.NodeType.Aggregated).sortDescending("N"));

final String[] arrayWithNull = new String[1];
final Table keyTable = newTable(
intCol(rollupTable.getRowDepthColumn().name(), 0),
stringCol("A", arrayWithNull),
stringCol("B", arrayWithNull),
stringCol("C", arrayWithNull),
byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL));

final SnapshotState ss = rollupTable.makeSnapshotState();
final Table snapshot =
snapshotToTable(rollupTable, ss, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
TableTools.showWithRowSet(snapshot);

final SnapshotState ssSort = sortedRollup.makeSnapshotState();

final Table snapshotSort =
snapshotToTable(sortedRollup, ssSort, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
TableTools.showWithRowSet(snapshotSort);

// first we know that the size of the tables must be the same
TestCase.assertEquals(snapshot.size(), snapshotSort.size());
// and the first row must be the same, because it is the parent
assertTableEquals(snapshot.head(1), snapshotSort.head(1));
// then we have six rows of banana, and that should be identical
assertTableEquals(snapshot.slice(5, 11), snapshotSort.slice(1, 7));
// then we need to check on the apple rows, but those are not actually identical because of sorting
Table appleExpected = snapshot.where("A=`Apple`").sortDescending("N");
assertTableEquals(appleExpected, snapshotSort.slice(7, 11));

freeSnapshotTableChunks(snapshot);
freeSnapshotTableChunks(snapshotSort);
}

@SuppressWarnings("SameParameterValue")
private static Table snapshotToTable(
@NotNull final HierarchicalTable<?> hierarchicalTable,
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/client/example_import_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Example_importTable() {
// metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "float"]
// - Volume: type=int32, nullable
// metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"]
// metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String"]
// metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortRowRedirection": "Volume", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortRowRedirection": "java.lang.String", "deephaven:attribute_type.SortedColumns": "java.lang.String"]
// rows: 5
// col[0][Ticker]: ["IBM" "XRX" "XYZZY" "GME" "ZNGA"]
// col[1][Close]: [38.7 53.8 88.5 453 544.9]
Expand Down
Loading