Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Support for Multiple Update Graph Processors #3506

Merged
merged 87 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
d535d15
Enable Support for Multiple Update Graph Processors
nbauernfeind Apr 24, 2023
4039012
Ryan's feedback checkpoint
nbauernfeind May 31, 2023
cad49e6
It starts ^_^
nbauernfeind May 31, 2023
1b0329e
Test Compiles
nbauernfeind Jun 1, 2023
aa4cdf8
Compiling again after rebase
nbauernfeind Jun 1, 2023
bd509ea
1st patch manual review fixes
nbauernfeind Jun 1, 2023
9832eb0
temporarily commit the extra controlled update graph for compilation
nbauernfeind Jun 1, 2023
141a33c
Fix UG.start() inline mistakes
nbauernfeind Jun 1, 2023
db5413c
spotless
nbauernfeind Jun 1, 2023
dbd5e91
Cleanup Round
nbauernfeind Jun 1, 2023
4df0066
spotless + replicate
nbauernfeind Jun 1, 2023
5190aee
replication fix
nbauernfeind Jun 1, 2023
590dd3d
remove extra imports
nbauernfeind Jun 1, 2023
2c4b919
compiles and starts again
nbauernfeind Jun 1, 2023
b4b3e14
dependency update graph validation + few bug fixes
nbauernfeind Jun 1, 2023
a031adb
Fix two type errors in benchmark code
rcaudy Jun 1, 2023
4365dbe
Couple of bug fixes
nbauernfeind Jun 1, 2023
a794193
Fix a handful of issues around UpdateGraph access
nbauernfeind Jun 1, 2023
41b3f2d
Some more fixes
nbauernfeind Jun 1, 2023
f718f90
Refactored UpdateGraph interface somewhat
rcaudy Jun 2, 2023
386851b
More personal review items
nbauernfeind Jun 2, 2023
4424eee
Rebase + Rebuild
nbauernfeind Jun 2, 2023
c69019e
Move UpdateGraphProcessor to engine-table
nbauernfeind Jun 2, 2023
5ed4804
Fix UpdateGraph installation for unit tests
rcaudy Jun 2, 2023
810ef79
Fix PartitionedTableTest cases that use transform. Update poisoned co…
rcaudy Jun 2, 2023
29ac051
Pedantic changes to TrackingWritableRowSetImpl.
rcaudy Jun 2, 2023
31c717f
Gratuitous re-work of poisoned exec context issues.
rcaudy Jun 2, 2023
fd0ef50
Fix RangeJoin and UpdateBy parallelism w.r.t. ExecutionContext
rcaudy Jun 2, 2023
26ef04a
Fix TestCsvTools
rcaudy Jun 2, 2023
52ec4c7
ColumnSources and friends to get their clocks from an instance member…
rcaudy Jun 2, 2023
691bf15
Server starts once again; can stream time table to UI
nbauernfeind Jun 2, 2023
db3e9f9
Move clock into CrossJoinShiftState
nbauernfeind Jun 2, 2023
80f8515
Spotless fixes
rcaudy Jun 2, 2023
80b643c
fix fuzzer, update graph lock, and column source test leaks
nbauernfeind Jun 2, 2023
e3d57bc
Fix DeltaAware leaks except ChunkAdapter
nbauernfeind Jun 2, 2023
6c80fc9
A few TestChunkColumnSource fixes
nbauernfeind Jun 2, 2023
3ee6e9b
Finish Nate's work to make Test*DeltaAwareColumnSource stop leaking c…
rcaudy Jun 2, 2023
f676882
Out damned spot
rcaudy Jun 2, 2023
6d4f84d
Fix UpdateGraphLock static check
nbauernfeind Jun 2, 2023
e146c5c
remove todo
nbauernfeind Jun 2, 2023
47df3db
Address documentation TODOs
rcaudy Jun 2, 2023
0dbd6dc
Merge remote-tracking branch 'upstream/main' into multi_ugp
nbauernfeind Jun 2, 2023
94fccfc
Merge related fixes
nbauernfeind Jun 2, 2023
42b02f9
Merge remote-tracking branch 'origin/multi_ugp' into multi_ugp
nbauernfeind Jun 2, 2023
2b6ab9d
Generators need not statically initialize their input classes
rcaudy Jun 2, 2023
c1056aa
Clean up model farm enum
rcaudy Jun 2, 2023
b538014
Revert inadvertent ReleaseTracker change
rcaudy Jun 2, 2023
12b6d2f
Small fixes, and rename UpdateGraphProcessor to PeriodicUpdateGraph
rcaudy Jun 2, 2023
c5d007e
More small fixes
rcaudy Jun 2, 2023
0829139
Python fixes + refactoring for multi ugp
nbauernfeind Jun 2, 2023
7d79ef6
more python fixes
nbauernfeind Jun 2, 2023
5b9bbc6
More small fixes again
rcaudy Jun 2, 2023
c1618f8
Merge remote-tracking branch 'nate/multi_ugp' into multi_ugp
rcaudy Jun 2, 2023
46149d9
fixup UGP to UG references
nbauernfeind Jun 2, 2023
86574f1
More cleanup
rcaudy Jun 2, 2023
a92f3cb
More fixes from code review
rcaudy Jun 2, 2023
36103c9
quick py fixup
nbauernfeind Jun 2, 2023
4c9fc52
spotless
nbauernfeind Jun 2, 2023
68e638a
quick test compile fix
nbauernfeind Jun 2, 2023
1ea1d2d
Merge remote-tracking branch 'upstream/main' into multi_ugp
nbauernfeind Jun 2, 2023
8243b1c
Even more fixes from code review
rcaudy Jun 3, 2023
15574ed
More spotless
rcaudy Jun 3, 2023
1b9c91d
Merge remote-tracking branch 'nate/multi_ugp' into multi_ugp
rcaudy Jun 3, 2023
c6e7aff
fix test formula column
nbauernfeind Jun 3, 2023
5c1e022
Some unit test fixes
rcaudy Jun 3, 2023
481c48a
fix RowSetShiftDataExpanderTest
nbauernfeind Jun 3, 2023
4caeff7
fix TestKeyedTableListener
nbauernfeind Jun 3, 2023
76441b3
Fix known Java unit test failures
rcaudy Jun 3, 2023
98e34df
Merge remote-tracking branch 'nate/multi_ugp' into multi_ugp
rcaudy Jun 3, 2023
54372d5
Fix more tests
rcaudy Jun 3, 2023
961d3f6
fix TestKeyedArrayBackedMutableTable
nbauernfeind Jun 3, 2023
72c7b3b
Fix more tests again
rcaudy Jun 3, 2023
73b6e19
Merge remote-tracking branch 'nate/multi_ugp' into multi_ugp
rcaudy Jun 3, 2023
8d994c3
Fix even more tests
rcaudy Jun 3, 2023
010cc76
For now re-use the ControlledUpdateGraph
nbauernfeind Jun 3, 2023
b3a46fd
Fix FlightMessageRoundTripTests
nbauernfeind Jun 3, 2023
eb6dc3b
cleaner fix on api server test base
nbauernfeind Jun 3, 2023
8adea9d
revert removal of livenessScope in ApiServerTestBase
nbauernfeind Jun 3, 2023
cb33689
Some python fixes, and experimental improvements to DeephavenApiServe…
rcaudy Jun 3, 2023
d9a193a
This might enable simple static use cases
nbauernfeind Jun 3, 2023
e983c06
Fix the remaining Python issues, I think, and fix some JavaDoc format…
rcaudy Jun 3, 2023
3766515
Merge remote-tracking branch 'nate/multi_ugp' into multi_ugp
rcaudy Jun 3, 2023
61ae914
1. Make DeephavenApiServerTestBase more resilient for subsequent test…
rcaudy Jun 3, 2023
32e6854
Clean up python session initialization for tests, and fix bug in sess…
rcaudy Jun 3, 2023
0c79352
Re-fix the Python tests
rcaudy Jun 3, 2023
f0ee72f
Must install UpdateGraph in Session ThreadFactory Threads
nbauernfeind Jun 3, 2023
29d7e81
Map#put returns old value not new value
nbauernfeind Jun 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.benchmarking.impl;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.testutil.QueryTableTestBase;
import io.deephaven.benchmarking.BenchmarkTable;
import io.deephaven.benchmarking.BenchmarkTableBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private GroovyStaticImportGenerator(final String[] imports, Collection<Predicate
this.skips = skips;

for (String imp : imports) {
Class<?> c = Class.forName(imp);
Class<?> c = Class.forName(imp, false, Thread.currentThread().getContextClassLoader());
log.info("Processing class: " + c);

for (Method m : c.getMethods()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private GenerateFigureImmutable(final boolean isInterface, final String[] import
this.functionNamer = functionNamer == null ? JavaFunction::getMethodName : functionNamer;

for (final String imp : interfaces) {
final Class<?> c = Class.forName(imp);
final Class<?> c = Class.forName(imp, false, Thread.currentThread().getContextClassLoader());
log.info("Processing class: " + c);

for (final Method m : c.getMethods()) {
Expand Down Expand Up @@ -892,7 +892,7 @@ private Map<String, TreeSet<GroovyStaticImportGenerator.JavaFunction>> commonSig

final Set<GroovyStaticImportGenerator.JavaFunction> functionSet = new HashSet<>();
for (String iface : interfaces) {
final Class<?> c = Class.forName(iface);
final Class<?> c = Class.forName(iface, false, Thread.currentThread().getContextClassLoader());
log.info("Processing class: " + c);

for (final java.lang.reflect.Method m : c.getMethods()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public static void main(String[] args) throws ClassNotFoundException, IOExceptio
}

final Set<Method> skip = new HashSet<>();
skip.add(Class.forName("io.deephaven.plot.datasets.DataSeries").getMethod("pointSize", int.class));
skip.add(Class.forName("io.deephaven.plot.datasets.DataSeries").getMethod("pointSize", double.class));
skip.add(Class.forName("io.deephaven.plot.datasets.DataSeries").getMethod("pointSize", long.class));
final Class<?> dataSeriesClass = Class.forName("io.deephaven.plot.datasets.DataSeries", false,
Thread.currentThread().getContextClassLoader());
skip.add(dataSeriesClass.getMethod("pointSize", int.class));
skip.add(dataSeriesClass.getMethod("pointSize", double.class));
skip.add(dataSeriesClass.getMethod("pointSize", long.class));

new Generator("io.deephaven.plot.datasets.multiseries.MultiSeries",
"DataSeriesInternal",
Expand Down Expand Up @@ -170,7 +172,7 @@ static class Generator {
this.isTransform = isTransform;
this.isSwappable = isSwappable;
this.interfaces = interfaces;
output = Class.forName(outputClass);
output = Class.forName(outputClass, false, Thread.currentThread().getContextClassLoader());

final int mod = output.getModifiers();
isInterface = Modifier.isInterface(mod);
Expand Down Expand Up @@ -241,7 +243,7 @@ private String generateClasses(final Set<Method> skip) throws ClassNotFoundExcep
final List<GroovyStaticImportGenerator.JavaFunction> sortedMethods = new ArrayList<>();
final List<GroovyStaticImportGenerator.JavaFunction> methodsWithFunctionParameter = new ArrayList<>();
for (final String clazz : interfaces) {
final Class dataseries = Class.forName(clazz);
final Class dataseries = Class.forName(clazz, false, Thread.currentThread().getContextClassLoader());
final Method[] methods = Arrays.stream(dataseries.getMethods())
.filter(m -> !skip.contains(m))
.toArray(Method[]::new);
Expand Down Expand Up @@ -561,7 +563,8 @@ private String getFigureFunctionInput(final String returnClass,
: "getPartitionedTableHandle().getTable(), ");

if (function.getMethodName().equals("pointColorByY")) {
final Class c = Class.forName("io.deephaven.plot.datasets.multiseries." + returnClass);
final Class c = Class.forName("io.deephaven.plot.datasets.multiseries." + returnClass, false,
Thread.currentThread().getContextClassLoader());
final Method[] methods = Arrays.stream(c.getDeclaredMethods())
.filter(m -> m.getName().equals(tableMethodName))
.filter(m -> m.getParameterTypes().length > 0 && m.getParameterTypes()[0].equals(Table.class))
Expand All @@ -586,7 +589,8 @@ private String getFigureFunctionInput(final String returnClass,
return code.append(", multiSeriesKey), this").toString();
}

final Class c = Class.forName(function.getClassName());
final Class c = Class.forName(function.getClassName(), false,
Thread.currentThread().getContextClassLoader());
final Method[] methods = Arrays.stream(c.getMethods())
.filter(m -> m.getName().equals(tableMethodName))
.filter(m -> m.getParameterTypes().length > 0 && m.getParameterTypes()[0].equals(Table.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private GeneratePlottingConvenience(final String[] staticImports, final String[]
final int lastDot = staticImport.lastIndexOf(".");
final String classPath = staticImport.substring(0, lastDot);
final String methodName = staticImport.substring(lastDot + 1);
final Class<?> c = Class.forName(classPath);
final Class<?> c = Class.forName(classPath, false, Thread.currentThread().getContextClassLoader());
log.info("Processing static class: " + c);

final Method[] methods = Arrays.stream(c.getMethods()).filter(
Expand All @@ -64,7 +64,7 @@ private GeneratePlottingConvenience(final String[] staticImports, final String[]
}

for (final String imp : imports) {
final Class<?> c = Class.forName(imp);
final Class<?> c = Class.forName(imp, false, Thread.currentThread().getContextClassLoader());
log.info("Processing class: " + c);

for (final Method m : c.getMethods()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static void main(String[] args) throws ClassNotFoundException, IOExceptio
* @throws ClassNotFoundException JCLASS is not found
*/
public static Map<Key, ArrayList<JavaFunction>> getMethodSignatures() throws ClassNotFoundException {
final Class<?> c = Class.forName(JCLASS);
final Class<?> c = Class.forName(JCLASS, false, Thread.currentThread().getContextClassLoader());
final Map<Key, ArrayList<JavaFunction>> signatures = new TreeMap<>();

for (final Method m : c.getMethods()) {
Expand Down
2 changes: 2 additions & 0 deletions Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies {
implementation project(':Configuration')
implementation project(':log-factory')

testImplementation project(':engine-test-utils')

testRuntimeOnly project(':log-to-slf4j')
// add configs, and some runtime dependencies to test classpaths
testRuntimeOnly project(':configs')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.AbstractScriptSession;
import io.deephaven.engine.util.PythonEvaluator;
import io.deephaven.engine.util.PythonEvaluatorJpy;
Expand Down Expand Up @@ -69,17 +70,20 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
/**
* Create a Python ScriptSession.
*
* @param updateGraph the default update graph to install for the repl
* @param objectTypeLookup the object type lookup
* @param listener an optional listener that will be notified whenever the query scope changes
* @param runInitScripts if init scripts should be executed
* @param pythonEvaluator
* @throws IOException if an IO error occurs running initialization scripts
*/
public PythonDeephavenSession(
ObjectTypeLookup objectTypeLookup, @Nullable final Listener listener, boolean runInitScripts,
PythonEvaluatorJpy pythonEvaluator)
throws IOException {
super(objectTypeLookup, listener);
final UpdateGraph updateGraph,
final ObjectTypeLookup objectTypeLookup,
@Nullable final Listener listener,
final boolean runInitScripts,
final PythonEvaluatorJpy pythonEvaluator) throws IOException {
super(updateGraph, objectTypeLookup, listener);

evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
Expand Down Expand Up @@ -108,8 +112,9 @@ public PythonDeephavenSession(
* Creates a Python "{@link ScriptSession}", for use where we should only be reading from the scope, such as an
* IPython kernel session.
*/
public PythonDeephavenSession(PythonScope<?> scope) {
super(NoOp.INSTANCE, null);
public PythonDeephavenSession(
final UpdateGraph updateGraph, final PythonScope<?> scope) {
super(updateGraph, NoOp.INSTANCE, null);
this.scope = (PythonScope<PyObject>) scope;
try (final SafeCloseable ignored = executionContext.open()) {
this.module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
Expand Down Expand Up @@ -186,9 +191,8 @@ public void popScope() {
protected void evaluate(String command, String scriptName) {
log.info().append("Evaluating command: " + command).endl();
try {
UpdateGraphProcessor.DEFAULT.exclusiveLock().doLockedInterruptibly(() -> {
evaluator.evalScript(command);
});
ExecutionContext.getContext().getUpdateGraph().exclusiveLock()
.doLockedInterruptibly(() -> evaluator.evalScript(command));
} catch (InterruptedException e) {
throw new CancellationException(e.getMessage() != null ? e.getMessage() : "Query interrupted", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.impl.InMemoryTable;
import io.deephaven.engine.table.ColumnSource;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.*;

import java.util.Objects;
import java.util.function.Function;

public class ComputerTest {

private static InMemoryTable table;
@Rule
public final EngineCleanup framework = new EngineCleanup();

@BeforeClass
public static void createTable() {
private InMemoryTable table;

@Before
public void createTable() {
table = new InMemoryTable(
new String[] {"Column1", "Column2", "Column3"},
new Object[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.InMemoryTable;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.*;

import java.util.Objects;
import java.util.function.Function;

public class FutureTest {

private static InMemoryTable table;
@Rule
public final EngineCleanup framework = new EngineCleanup();

@BeforeClass
public static void createTable() {
private InMemoryTable table;

@Before
public void createTable() {
table = new InMemoryTable(
new String[] {"Column1", "Column2", "Column3"},
new Object[] {
Expand All @@ -41,7 +43,7 @@ private static Input[] createInputs(Function<Object[], Object> gatherFunc) {
return createInputs(gatherFunc, gatherFunc);
}

private static Future createFuture(Function<Object[], Object> modelFunc, Input[] inputs, int batchSize) {
private Future createFuture(Function<Object[], Object> modelFunc, Input[] inputs, int batchSize) {
return new Future(modelFunc, inputs,
new ColumnSource[][] {
table.view("Column1", "Column2").getColumnSources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
import io.deephaven.engine.table.impl.InMemoryTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.api.util.NameValidator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.*;

import java.util.Arrays;
import java.util.function.Function;

public class InputTest {

private static InMemoryTable table;
@Rule
public final EngineCleanup framework = new EngineCleanup();

@BeforeClass
public static void createTable() {
private InMemoryTable table;

@Before
public void createTable() {
table = new InMemoryTable(
new String[] {"Column1", "Column2", "Column3"},
new Object[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
package io.deephaven.integrations.learn;

import io.deephaven.engine.table.impl.InMemoryTable;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.*;

import java.util.function.Function;

public class ScattererTest {

private static InMemoryTable table;
@Rule
public final EngineCleanup framework = new EngineCleanup();

private InMemoryTable table;

@BeforeClass
public static void setup() {
@Before
public void setup() {
table = new InMemoryTable(
new String[] {"Column1", "Column2", "Column3"},
new Object[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.InMemoryTable;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.*;

import java.util.function.BiFunction;
import java.util.function.Function;

public class NumPyTest {

private static InMemoryTable table;
@Rule
public final EngineCleanup framework = new EngineCleanup();

private static final String[] boolColNames = {"bool1", "bool2"};
private static final boolean[][] boolData = {
new boolean[] {true, true, false, false},
Expand Down Expand Up @@ -69,12 +71,14 @@ public class NumPyTest {
doubleData[0], doubleData[1]
};

@BeforeClass
public static void setup() {
private InMemoryTable table;

@Before
public void setup() {
table = new InMemoryTable(columnNames, columnData);
}

public static ColumnSource<?>[] getColSet(final String[] colNames) {
public ColumnSource<?>[] getColSet(final String[] colNames) {
ColumnSource<?>[] rst = new ColumnSource[2];

for (int i = 0; i < 2; i++) {
Expand Down
Loading