Add option to use Spark executor in VegaFusion#6
Conversation
There was a problem hiding this comment.
Actionable comments posted: 28
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (16)
vegafusion-python/pyproject.toml (1)
38-39: Add an optional Spark extra instead of hard dependency.Given SparkSqlPlanExecutor, expose a lightweight extra so users can opt-in. Keeps base install slim and documents the dependency.
[project.optional-dependencies] -embed = [] +embed = [] +spark = [ + # pyspark ships heavy deps; keep it optional and version-gate + "pyspark>=3.5,<4", +]Consider documenting:
pip install deepnote_vegafusion[spark]to enable Spark executor.vegafusion-wasm/package.json (1)
16-25: Packaging artifacts missing – build outputs not committedThe
vegafusion-wasmpackage directory contains only source and metadata (README.md, Cargo.toml, package.json, package-lock.json). None of the runtime artifacts listed inpackage.json(“vegafusion_wasm_bg.wasm”, “vegafusion_wasm.js”, etc.) are present, sonpm packwill produce an empty package.• Missing files (per
package.json“files” field):
- vegafusion-wasm/vegafusion_wasm_bg.wasm
- vegafusion-wasm/vegafusion_wasm.js
- vegafusion-wasm/vegafusion_wasm_bg.js
- vegafusion-wasm/vegafusion_wasm.d.ts
• Actions required:
- Ensure the build step (e.g.
wasm-pack buildorcargo build --release+ bindings generation) runs before packaging, and that its outputs are placed undervegafusion-wasm/.- Either commit those generated artifacts or add a
prepare(orprepublishOnly) script inpackage.jsonto build them on install.- Verify
npm pack --dry-runincludes the.wasm,.js, and.d.tsfiles.• Optional export map (helps Node & bundlers resolve):
"module": "vegafusion_wasm.js", "types": "vegafusion_wasm.d.ts", + "exports": { + ".": { + "import": "./vegafusion_wasm.js", + "types": "./vegafusion_wasm.d.ts" + } + }, "sideEffects": falsevegafusion-runtime/src/expression/compiler/call.rs (1)
160-166: Classify user-input errors as compilation errors (not internal)These branches are user-triggered (missing dataset name or non-literal first arg). Make them VegaFusionError::compilation for consistency with the new non-table path.
Apply:
- } else { - Err(VegaFusionError::internal(format!( - "No dataset named {}. Available: {:?}", - name, - config.data_scope.keys() - ))) - } + } else { + Err(VegaFusionError::compilation(format!( + "No dataset named {}. Available: {:?}", + name, + config.data_scope.keys() + ))) + } ... - _ => Err(VegaFusionError::internal(format!( + _ => Err(VegaFusionError::compilation(format!( "The first argument to the {} function must be a literal \ string with the name of a dataset", &node.callee ))), ... - } else { - Err(VegaFusionError::internal(format!( + } else { + Err(VegaFusionError::compilation(format!( "The first argument to the {} function must be a literal \ string with the name of a dataset", &node.callee ))) }Also applies to: 167-171, 174-178
vegafusion-core/src/task_graph/task_value.rs (3)
110-127: Proto Plan (de)serialization: consider an optional feature with datafusion-protoYou currently error out on Data::Plan. If gRPC transport of plans is expected soon, gate support behind a cargo feature (e.g., plan-proto) that pulls in datafusion-proto to serialize/deserialize LogicalPlan bytes with an empty SessionContext.
I can wire a feature-gated impl if you want to ship it later without impacting core deps.
223-244: Fix broken context strings: {:?} placeholders never get substitutedwith_context closures build static strings that contain {:?}. Replace with precise messages or format with captured values.
Apply:
- let proto_value = response_value.value.with_context(|| { - "Unwrap failed for value of response value: {:?}".to_string() - })?; + let proto_value = response_value + .value + .with_context(|| "response_value.value is None".to_string())?; ... - let value = TaskValue::try_from(&proto_value).with_context(|| { - "Deserialization failed for value of response value: {:?}".to_string() - })?; + let value = TaskValue::try_from(&proto_value) + .with_context(|| "Failed to deserialize TaskValue".to_string())?;Optionally, do the same for variable unwrap:
- let variable = response_value - .variable - .with_context(|| "Unwrap failed for variable of response value".to_string())?; + let variable = response_value + .variable + .with_context(|| "response_value.variable is None".to_string())?;
254-262: Avoid unwraps in FromThis panics on malformed inputs. Prefer TryFrom to surface errors.
Proposed alternative (outside current hunk):
impl TryFrom<ResponseTaskValue> for NamedTaskValue { type Error = VegaFusionError; fn try_from(value: ResponseTaskValue) -> Result<Self> { let variable = value.variable.with_context(|| "response_value.variable is None".to_string())?; let proto = value.value.with_context(|| "response_value.value is None".to_string())?; let task_value = TaskValue::try_from(&proto)?; Ok(Self { variable, scope: value.scope, value: task_value }) } }.github/workflows/build_test.yml (1)
252-256: Fix apt-get usage: missing update and -y may cause flakinessInstall of ttf-mscorefonts-installer can fail without apt-get update and -y.
Apply:
- name: Install fonts on Linux if: runner.os == 'Linux' run: | echo ttf-mscorefonts-installer msttcorefonts/accepted-mscorefonts-eula select true | sudo debconf-set-selections - sudo apt-get install ttf-mscorefonts-installer + sudo apt-get update + sudo apt-get install -y ttf-mscorefonts-installervegafusion-runtime/tests/test_planning.rs (1)
79-93: Also assert on the signal value; avoid dropping it.Verify the materialized scalar is present.
- let _delay_extent = graph_runtime + let delay_extent = graph_runtime .get_node_value( graph.clone(), mapping .get(&( Variable::new_signal("child__column_delay_layer_1_bin_maxbins_20_delay_extent"), Vec::new(), )) .unwrap(), Default::default(), None, ) .await .unwrap(); + let scalar = delay_extent.into_scalar().unwrap(); + assert!(scalar.is_some(), "delay extent should be materialized");vegafusion-wasm/Cargo.toml (1)
63-89: Duplicate getrandom declarations with conflicting features.Top-level getrandom uses non-standard feature "wasm_js"; target-specific block uses "js". Keep one, target-specific, to avoid build issues.
-[dependencies.getrandom] -workspace = true -features = ["wasm_js"] - [dependencies.chrono] version = "0.4.31" features = ["wasmbind"] [dependencies.console_error_panic_hook] @@ [lints.rust.unexpected_cfgs] level = "warn" check-cfg = ["cfg(wasm_bindgen_unstable_test_coverage)"] [target."cfg(target_arch = \"wasm32\")".dependencies.getrandom] version = "0.2" features = ["js"]vegafusion-runtime/tests/test_image_comparison.rs (1)
1465-1488: Parallelize materialization for speed.Materializing each value sequentially can be slow. Fan out with join_all.
Apply:
- let mut init = Vec::new(); - for var in &spec_plan.comm_plan.server_to_client { - let node_index = task_graph_mapping.get(var).unwrap(); - let value = runtime - .get_node_value( - Arc::new(task_graph.clone()), - node_index, - Default::default(), - None, - ) - .await - .expect("Failed to get node value"); - - let materialized_value = value - .to_materialized(runtime.plan_executor.clone()) - .await - .unwrap(); - init.push(ExportUpdateJSON { - namespace: ExportUpdateNamespace::try_from(var.0.namespace()).unwrap(), - name: var.0.name.clone(), - scope: var.1.clone(), - value: materialized_value.to_json().unwrap(), - }); - } + use futures::future::join_all; + let tg = Arc::new(task_graph.clone()); + let futures_iter = spec_plan.comm_plan.server_to_client.iter().map(|var| { + let node_index = task_graph_mapping.get(var).unwrap(); + let runtime = runtime.clone(); + let tg = tg.clone(); + async move { + let value = runtime + .get_node_value(tg, node_index, Default::default(), None) + .await + .expect("Failed to get node value"); + let materialized_value = value + .to_materialized(runtime.plan_executor.clone()) + .await + .unwrap(); + ExportUpdateJSON { + namespace: ExportUpdateNamespace::try_from(var.0.namespace()).unwrap(), + name: var.0.name.clone(), + scope: var.1.clone(), + value: materialized_value.to_json().unwrap(), + } + } + }); + let init = join_all(futures_iter).await;vegafusion-runtime/src/task_graph/runtime.rs (1)
161-162: Bound top-level query concurrencyFor large requests, try_join_all can oversubscribe the executor. Use a bounded stream.
- future::try_join_all(response_value_futures).await + use futures::{StreamExt, TryStreamExt}; + futures::stream::iter(response_value_futures) + .buffer_unordered(16) // TODO: make configurable + .try_collect::<Vec<_>>() + .awaitvegafusion-core/src/runtime/runtime.rs (3)
101-107: Remove panics: unwraps in planning pathPropagate errors instead of unwrap to prevent server crashes on malformed specs.
- let task_scope = plan.server_spec.to_task_scope().unwrap(); - let tasks = plan - .server_spec - .to_tasks(&tz_config, &dataset_fingerprints) - .unwrap(); - let task_graph = TaskGraph::new(tasks, &task_scope).unwrap(); + let task_scope = plan + .server_spec + .to_task_scope() + .with_context(|| "Failed to create task scope for server spec")?; + let tasks = plan + .server_spec + .to_tasks(&tz_config, &dataset_fingerprints) + .with_context(|| "Failed to build server tasks")?; + let task_graph = TaskGraph::new(tasks, &task_scope) + .with_context(|| "Failed to construct TaskGraph")?;
112-133: Mismatch bug: zipping after filtered indices can mis-assign variablesYou filter out unmapped variables when building indices, but later zip all server_to_client vars with response_values. If any var lacked a mapping, lengths diverge and responses shift.
- let indices: Vec<NodeValueIndex> = plan - .comm_plan - .server_to_client - .iter() - .filter_map(|var| task_graph_mapping.get(var).cloned()) - .collect(); + let kept: Vec<(ScopedVariable, NodeValueIndex)> = plan + .comm_plan + .server_to_client + .iter() + .filter_map(|var| task_graph_mapping.get(var).cloned().map(|idx| (var.clone(), idx))) + .collect(); + let indices: Vec<NodeValueIndex> = kept.iter().map(|(_, idx)| *idx).collect(); … - for (var, response_value) in plan.comm_plan.server_to_client.iter().zip(response_values) { + for ((var, _), response_value) in kept.into_iter().zip(response_values) { init.push(ExportUpdate { namespace: ExportUpdateNamespace::try_from(var.0.namespace()).unwrap(), name: var.0.name.clone(), scope: var.1.clone(), value: response_value.value, }); }Also applies to: 124-131
243-246: Typo in error messageGrammatical fix.
- "Expected Data TaskValue to be an Table", + "Expected Data TaskValue to be a Table",vegafusion-python/src/lib.rs (2)
383-387: String literal typo: unreachable export format branch"arro3" looks like a typo; likely "arrow3" or redundant with "pyarrow". As written, users cannot select this branch intentionally.
- "arro3" => { + "arrow3" => { let pytable = tbl.table.to_pyo3_arrow()?; pytable.to_pyarrow(py)?.into() }If "arrow3" isn’t a public option, consider removing the branch entirely and keep "pyarrow".
133-141: Nit: variable name typoinnter_url -> inner_url.
- let innter_url = url; + let inner_url = url; let uri = - Uri::from_str(innter_url).map_err(|e| VegaFusionError::internal(e.to_string()))?; + Uri::from_str(inner_url).map_err(|e| VegaFusionError::internal(e.to_string()))?; GrpcVegaFusionRuntime::try_new(Channel::builder(uri).connect().await.map_err(|e| { - let msg = format!("Error connecting to gRPC server at {}: {}", innter_url, e); + let msg = format!("Error connecting to gRPC server at {}: {}", inner_url, e); VegaFusionError::internal(msg) })?)
♻️ Duplicate comments (37)
vegafusion-python/pyproject.toml (1)
17-22: Align Python DataFusion pin with Rust (49.x).Rust was upgraded to DataFusion 49; the Python pin still allows <49. Expect subtle planner/expr diffs and test flakiness. Bump the lower bound to 49.
dependencies = [ "arro3-core", "packaging", "narwhals>=1.42", - "datafusion>=48.0.0,<49.0.0", + "datafusion>=49.0.0,<50.0.0", ]vegafusion-wasm/package.json (1)
1-26: Prevent accidental npm publish; verify upstream metadata.CI in this PR drops the wasm job. Until a coordinated release is ready, consider marking the package private and add an explicit repository field pointing at hex-inc.
Optional metadata tweak:
{ "name": "vegafusion-wasm", "version": "2.0.5", + "private": true, + "repository": { + "type": "git", + "url": "git+https://github.com/hex-inc/vegafusion.git" + }, "author": "Jon Mease <jon@vegafusion.io> (https://jonmmease.dev)",Quick checks (run from repo root):
#!/bin/bash # Verify metadata points to hex-inc and package isn't accidentally publishable jq -r '.homepage,.bugs.url' vegafusion-wasm/package.json jq -r '.private // "false"' vegafusion-wasm/package.json jq -r '.repository.url // "MISSING_REPO_URL"' vegafusion-wasm/package.jsonvegafusion-runtime/benches/spec_benchmarks.rs (2)
55-55: Hoist VegaFusionRuntime out of the hot path (reuse a single instance).Constructing VegaFusionRuntime per-iteration adds noise and rebuilds contexts unnecessarily. Thread a reference into helpers and instantiate once per bench.
Apply these focused changes:
-async fn eval_spec_get_variable(full_spec: ChartSpec, var: &ScopedVariable) -> Vec<NamedTaskValue> { +async fn eval_spec_get_variable( + full_spec: ChartSpec, + var: &ScopedVariable, + runtime: &VegaFusionRuntime, +) -> Vec<NamedTaskValue> { @@ - // Initialize task graph runtime - let runtime = VegaFusionRuntime::default(); + // Use caller-provided runtime-async fn eval_spec_sequence(full_spec: ChartSpec, full_updates: Vec<ExportUpdateBatch>) { +async fn eval_spec_sequence( + full_spec: ChartSpec, + full_updates: Vec<ExportUpdateBatch>, + runtime: &VegaFusionRuntime, +) { @@ - // Initialize task graph runtime - let runtime = VegaFusionRuntime::default(); + // Use caller-provided runtimeInstantiate once per benchmark and pass by reference:
pub fn flights_crossfilter(c: &mut Criterion) { @@ - c.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone())) - }); + let runtime = VegaFusionRuntime::default(); + c.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone(), &runtime)) + }); }pub fn flights_crossfilter_local_time(c: &mut Criterion) { @@ - c.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone())) - }); + let runtime = VegaFusionRuntime::default(); + c.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone(), &runtime)) + }); }pub fn load_flights_crossfilter_data_local(c: &mut Criterion) { @@ - c.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_get_variable(full_spec.clone(), &var)) - }); + let runtime = VegaFusionRuntime::default(); + c.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_get_variable(full_spec.clone(), &var, &runtime)) + }); }pub fn load_flights_crossfilter_data_utc(c: &mut Criterion) { @@ - c.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_get_variable(full_spec.clone(), &var)) - }); + let runtime = VegaFusionRuntime::default(); + c.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_get_variable(full_spec.clone(), &var, &runtime)) + }); }pub fn load_flights_crossfilter_data_200k_utc(c: &mut Criterion) { @@ - group.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_get_variable(full_spec.clone(), &var)) - }); + let runtime = VegaFusionRuntime::default(); + group.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_get_variable(full_spec.clone(), &var, &runtime)) + }); }pub fn stacked_bar_weather_year_local(c: &mut Criterion) { @@ - c.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone())) - }); + let runtime = VegaFusionRuntime::default(); + c.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone(), &runtime)) + }); }pub fn stacked_bar_weather_year_utc(c: &mut Criterion) { @@ - c.bench_function(spec_name, |b| { - b.to_async(&tokio_runtime) - .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone())) - }); + let runtime = VegaFusionRuntime::default(); + c.bench_function(spec_name, |b| { + b.to_async(&tokio_runtime) + .iter(|| eval_spec_sequence(full_spec.clone(), full_updates.clone(), &runtime)) + }); }Also applies to: 106-106, 40-40, 74-74, 156-168, 170-183, 185-194, 196-205, 211-219, 221-234, 236-249
55-55: Let benches pick the executor via env (DataFusion vs Spark).Expose VF_EXECUTOR to compare backends in the same harness. Default to DataFusion; gate Spark behind a feature to avoid unconditional deps.
Sketch:
// at top use std::sync::Arc; use vegafusion_core::runtime::PlanExecutor; use vegafusion_runtime::plan_executor::DataFusionPlanExecutor; use vegafusion_runtime::task_graph::runtime::{make_datafusion_context, VegaFusionRuntime}; #[cfg(feature = "spark")] use vegafusion_python::vendor::SparkSqlPlanExecutor; fn runtime_from_env() -> VegaFusionRuntime { let ctx = Arc::new(make_datafusion_context()); let exec: Arc<dyn PlanExecutor> = match std::env::var("VF_EXECUTOR").as_deref() { #[cfg(feature = "spark")] Ok("spark") => Arc::new(SparkSqlPlanExecutor::new(/* python bridge here */)), _ => Arc::new(DataFusionPlanExecutor::new(ctx.clone())), }; VegaFusionRuntime::new(None, Some(exec)) }Then in benches, instantiate once with
let runtime = runtime_from_env();and pass&runtimeper the previous refactor.Also applies to: 106-106
examples/rust-examples/examples/inline_datasets_plan.rs (1)
46-46: Clarify which executor is used by default.Add a note so readers know how to switch to Spark.
- let runtime = VegaFusionRuntime::default(); + // Uses the default DataFusion executor; pass Some(executor) via + // VegaFusionRuntime::new(None, Some(exec)) to route plans to Spark. + let runtime = VegaFusionRuntime::default();vegafusion-runtime/src/sql/spark.rs (2)
377-403: Fix: incorrect quoting and grouping of literal fragments in chrono_to_sparkBackslash-escaping isn’t SQL-standard and will leak into output. Also, the grouping loop currently splits consecutive literal letters into single-character quoted chunks due to the break condition.
Apply:
- // Collect consecutive literal characters that need quoting + // Collect consecutive literal characters that need quoting let mut literal = String::new(); literal.push(c); - // Continue collecting non-% characters that need quoting + // Continue collecting non-% characters that need quoting while let Some(&next_c) = chars.peek() { - if next_c == '%' || !matches!(next_c, '-' | ':' | ' ' | '/' | ',' | '.') { - break; - } - literal.push(chars.next().unwrap()); + // stop when hitting a format start (%) or a safe punctuation we don't quote + if next_c == '%' || matches!(next_c, '-' | ':' | ' ' | '/' | ',' | '.') { + break; + } + literal.push(chars.next().unwrap()); } - // Wrap the literal string in single quotes - out.push_str(&format!("\\'{}\\'", &literal)); + // Wrap the literal section in single quotes; double embedded single quotes + out.push('\''); + for ch in literal.chars() { + if ch == '\'' { + out.push_str("''"); + } else { + out.push(ch); + } + } + out.push('\'');Also add tests covering patterns like "%Y-%m-%d'T'%H:%M" and embedded quotes.
633-679: Reserved words table: consider centralizing and testing a few exemplarsLooks comprehensive. To reduce maintenance drift, move this set to a shared module and add unit tests that assert quoting behavior for a handful of reserved words and tricky identifiers.
vegafusion-core/src/task_graph/task_value.rs (1)
43-50: Resolved: clearer Plan→Materialized error guidanceThe improved message points callers to to_materialized with a PlanExecutor. Exactly what was needed.
.github/workflows/build_test.yml (1)
294-295: Grammar fixed“job that tests the wheels” reads correctly now.
vegafusion-runtime/tests/test_pre_transform_extract.rs (1)
29-29: Switch to Default is fine; consider a tiny helper to ease executor parity (dup).LGTM. To prep for Spark/DataFusion parity, route construction through a local helper so you can later toggle executors in one place.
Apply within this file:
- let runtime = VegaFusionRuntime::default(); + let runtime = make_test_runtime();Add inside this test module:
fn make_test_runtime() -> VegaFusionRuntime { VegaFusionRuntime::default() }vegafusion-runtime/tests/test_stringify_datetimes.rs (1)
89-89: Deduplicate runtime construction and leave a seam for Spark executor (dup).Same change repeated 5x. Centralize via a helper to cut noise and make it trivial to parameterize by executor later.
Recommended edit:
- let runtime = VegaFusionRuntime::default(); + let runtime = make_test_runtime();Add once in this module:
fn make_test_runtime() -> VegaFusionRuntime { VegaFusionRuntime::default() }Optional follow-up: extend rstest cases with an executor dimension when the Spark path is available.
To find any remaining ad-hoc constructions to unify:
#!/bin/bash # Instances that directly call the constructor (to funnel through the helper) rg -nP --glob '!**/target/**' '\bVegaFusionRuntime::(new|default)\s*\('Also applies to: 142-142, 228-228, 293-293, 338-338
vegafusion-runtime/tests/test_chart_state.rs (1)
28-28: Default() is fine; add feature-gated Spark coverage mirror (dup).Construction looks good. Mirror this test behind a Spark feature flag or route through a helper to run both executors in CI when Spark is available.
Minimal prep:
- let runtime = VegaFusionRuntime::default(); + let runtime = make_test_runtime();Helper:
fn make_test_runtime() -> VegaFusionRuntime { VegaFusionRuntime::default() }vegafusion-runtime/tests/test_destringify_selection_datasets.rs (1)
27-27: Default() usage looks good; route through a helper to simplify future executor toggling (dup).Keeps tests consistent and ready for Spark/DataFusion parity runs.
- let runtime = VegaFusionRuntime::default(); + let runtime = make_test_runtime();Add within this module:
fn make_test_runtime() -> VegaFusionRuntime { VegaFusionRuntime::default() }vegafusion-runtime/tests/test_pre_transform_values.rs (1)
42-42: DRY the repeated runtime construction with a tiny helper.Multiple identical instantiations—use a local helper for clarity and future tweaks.
Apply at each of the listed lines:
- let runtime = VegaFusionRuntime::default(); + let runtime = default_runtime();Add near the top of this
mod tests:fn default_runtime() -> VegaFusionRuntime { VegaFusionRuntime::default() }Also applies to: 91-91, 140-140, 198-198, 254-254, 301-301, 385-385, 444-444
vegafusion-core/src/runtime/mod.rs (1)
1-1: Add a brief module-level doc for the plan executor abstraction.Documenting intent and default behavior (NoOp) will save readers a jump.
+//! Plan executor abstraction for runtime plans. +//! Implementors (e.g., DataFusion, Spark) provide concrete execution of logical plans. +//! Note: the default is `NoOpPlanExecutor`, which returns an error when asked to execute; +//! consumers should inject a real executor where execution is required. mod plan_executor;examples/rust-examples/examples/pre_transform_data.rs (1)
13-13: Constructor migration: default() OK; clarify how to inject alternate executors.Consider a brief inline comment to note that Default uses DataFusionPlanExecutor and that callers can inject Spark via PlanExecutor.
- let runtime = VegaFusionRuntime::default(); + // Defaults to DataFusionPlanExecutor; pass a PlanExecutor to VegaFusionRuntime::new(...) + // to override (e.g., SparkSqlPlanExecutor). + let runtime = VegaFusionRuntime::default();vegafusion-runtime/tests/test_planning.rs (2)
64-64: Constructor migration: OK; add a test exercising a non-default executor.Add a parametrized test to assert an alternate PlanExecutor is honored.
65-75: Don’t discard the materialized value; assert basic invariants.Make the test meaningful and catch regressions.
- let _data_3 = graph_runtime + let data_3 = graph_runtime .get_node_value( graph.clone(), mapping .get(&(Variable::new_data("data_3"), Vec::new())) .unwrap(), Default::default(), None, ) .await .unwrap(); + // Ensure non-empty table + let table_3 = data_3.into_table().unwrap(); + assert!(table_3.num_rows() > 0, "data_3 should have rows");vegafusion-wasm/Cargo.toml (1)
28-31: Trim wasm size: drop datafusion-sql "unparser" if not required.Reduces code size for wasm.
[dependencies.datafusion-sql] version = "49.0.0" default-features = false -features = ["unicode_expressions", "unparser"] +features = ["unicode_expressions"]vegafusion-runtime/tests/test_task_graph_runtime.rs (3)
83-83: Factor a helper to construct VegaFusionRuntime in tests (DRY).Centralizes future executor tweaks (DataFusion vs Spark) and avoids duplication.
+fn make_runtime() -> VegaFusionRuntime { + VegaFusionRuntime::default() +} @@ - let graph_runtime = VegaFusionRuntime::default(); + let graph_runtime = make_runtime(); @@ - let graph_runtime = VegaFusionRuntime::default(); + let graph_runtime = make_runtime();Also applies to: 146-146
146-153: Same here: assert and/or gate network usage.Mirror the assertion pattern from the first test; drop println.
- let result = graph_runtime + let result = graph_runtime .get_node_value( graph, &NodeValueIndex::new(2, Some(0)), Default::default(), None, ) .await .unwrap(); - println!("result: {result:?}"); + let table = result.into_table().expect("expect table output"); + assert!(table.num_rows() > 0, "datasetA should have rows");
85-93: Assert results and gate/ignore network to avoid flaky tests.Replace println with assertions and guard remote fetches (or mark #[ignore]) so CI is deterministic.
- let result = graph_runtime + let result = graph_runtime .get_node_value( graph, &NodeValueIndex::new(2, Some(0)), Default::default(), None, ) .await .unwrap(); - - println!("result: {result:?}"); + let table = result.into_table().expect("expect table output"); + assert!(table.num_rows() > 0, "datasetA should have rows"); + // Optional gating if you want to keep this test enabled: + // if std::env::var("NETWORK_TESTS").ok().as_deref() != Some("1") { return; }vegafusion-python/Cargo.toml (1)
66-67: Trim DataFusion features to reduce build time.This crate directly uses DataFusion; disable defaults and enable only required features (e.g., datasource). Keeps builds lean.
-[dependencies.datafusion] -workspace = true +[dependencies.datafusion] +workspace = true +default-features = false +# Adjust features as needed; 'datasource' covers datafusion::datasource usage. +features = ["datasource"]Run to confirm the minimal feature set is sufficient (look for additional DataFusion APIs used here):
#!/bin/bash set -euo pipefail cd vegafusion-python rg -n -C2 -t rust -S $'\\bdatafusion::' src || truevegafusion-runtime/Cargo.toml (1)
157-160: Disable datafusion-sql default features; ensure workspace entry exists.Keeps DF crates lean and consistent with other DF deps in this crate.
[dependencies.datafusion-sql] workspace = true features = ["unparser"] +default-features = falseVerify the workspace root declares datafusion-sql under [workspace.dependencies]:
#!/bin/bash set -euo pipefail # From repo root rg -nP '^\[workspace\.dependencies\]' Cargo.toml -n -C2 rg -nP '^\s*datafusion-sql\s*=' Cargo.toml -n -C2vegafusion-server/Cargo.toml (1)
67-69: Remove duplicate serde_json from dev-dependencies.serde_json is already under [dependencies]; the dev-dep is redundant here.
Apply:
-[dev-dependencies.serde_json] -workspace = true -vegafusion-python/src/executor.rs (2)
1-12: Serialize Python calls to avoid re-entrancy (parity with vendor executor).Guard with a Mutex so concurrent plans don't interleave on a single Python object.
Apply:
use std::sync::Arc; use async_trait::async_trait; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::PyString; use vegafusion_common::{data::table::VegaFusionTable, datafusion_expr::LogicalPlan}; use vegafusion_core::runtime::PlanExecutor; +use tokio::sync::Mutex; pub struct PythonPlanExecutor { - python_executor: PyObject, + python_executor: PyObject, + mutex: Arc<Mutex<()>>, } impl PythonPlanExecutor { fn new(python_executor: PyObject) -> Self { - Self { python_executor } + Self { python_executor, mutex: Arc::new(Mutex::new(())) } } }
20-67: Take the mutex before diving into Python.Small critical section; release before returning.
Apply:
async fn execute_plan( &self, plan: LogicalPlan, ) -> vegafusion_common::error::Result<VegaFusionTable> { + let _lock = self.mutex.lock().await; let plan_str = plan.display_pg_json().to_string(); let python_executor = &self.python_executor;vegafusion-python/tests/test_spark_e2e.py (5)
18-19: Avoid eager-loading 1M rows at import time.Load within a helper/fixture to keep module import cheap.
Apply:
-SALES_DATA_DF = pd.read_parquet(SALES_DATA_PATH) +def _load_sales_df() -> pd.DataFrame: + return pd.read_parquet(SALES_DATA_PATH)And below (Line 91) replace:
- inline_datasets={"sales_data_1kk": SALES_DATA_DF}, + inline_datasets={"sales_data_1kk": _load_sales_df()},
21-25: Modernize Optional[int] to PEP 604 union.Cleaner and satisfies Ruff UP045.
Apply:
-from typing import Optional @@ -def _discover_spec_files(limit: Optional[int] = None) -> list[Path]: +def _discover_spec_files(limit: int | None = None) -> list[Path]:If Optional is unused elsewhere, drop its import.
32-41: Make Spark memory/timezone configurable; safer defaults for CI.Lower defaults and honor env; set session TZ at builder.
Apply:
+import os @@ session: SparkSession = ( SparkSession.builder.appName("vegafusion-e2e") .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") .config("spark.sql.legacy.parquet.nanosAsLong", "true") .config("spark.sql.execution.arrow.pyspark.enabled", "true") - .config("spark.executor.memory", "8g") - .config("spark.driver.memory", "8g") + .config("spark.executor.memory", os.getenv("SPARK_EXECUTOR_MEMORY", "2g")) + .config("spark.driver.memory", os.getenv("SPARK_DRIVER_MEMORY", "2g")) + .config("spark.sql.session.timeZone", os.getenv("SPARK_SQL_SESSION_TIMEZONE", "UTC")) .master("local[2]") .getOrCreate() )
43-46: Drop redundant SQL SET TIME ZONE.The builder config sets session timezone already; the SQL is unnecessary noise.
Apply:
- # TODO: this is required for properly handling temporal. We need to check if we can work around different - # timezone or if we should require users to setup their Spark sessions to operate in UTC - session.sql("SET TIME ZONE 'UTC'") + # Session timezone configured via spark.sql.session.timeZone
98-101: Avoid toPandas() round-trip; use Arrow directly if available.Faster and less memory on the driver.
Apply:
- def spark_executor(sql_query: str) -> pa.Table: - spark_df = spark.sql(sql_query) - return pa.Table.from_pandas(spark_df.toPandas()) + def spark_executor(sql_query: str) -> pa.Table: + spark_df = spark.sql(sql_query) + to_arrow = getattr(spark_df, "toArrow", None) + if callable(to_arrow): + return to_arrow() # Spark 3.5+ + # Fallback (older Spark): via Pandas + return pa.Table.from_pandas(spark_df.toPandas())vegafusion-server/src/main.rs (1)
67-101: Bound materialization concurrency and avoid per-item Arc clonetry_join_all spawns all materialization futures at once and clones the executor inside the iterator. Bound concurrency and hoist the clone.
+// at top-level imports +use futures::{StreamExt, TryStreamExt};- let materialized_futures: Vec<_> = response_values - .into_iter() - .map(|named_value| { - let executor = self.runtime.plan_executor.clone(); - async move { - let materialized_value = - named_value.value.to_materialized(executor).await?; - Ok::<_, VegaFusionError>( - vegafusion_core::proto::gen::tasks::ResponseTaskValue { - variable: Some(named_value.variable), - scope: named_value.scope, - value: Some(ProtoMaterializedTaskValue::try_from( - &materialized_value, - )?), - }, - ) - } - }) - .collect(); - let materialized_response_values = - match futures::future::try_join_all(materialized_futures).await { + let executor = Arc::clone(&self.runtime.plan_executor); + let materialized_response_values = futures::stream::iter(response_values) + .map({ + let executor = executor.clone(); + move |named_value| { + let executor = executor.clone(); + async move { + let materialized_value = + named_value.value.to_materialized(executor).await?; + Ok::<_, VegaFusionError>(ResponseTaskValue { + variable: Some(named_value.variable), + scope: named_value.scope, + value: Some( + ProtoMaterializedTaskValue::try_from( + &materialized_value, + )?, + ), + }) + } + } + }) + .buffer_unordered(8) // TODO: make configurable + .try_collect::<Vec<_>>() + .await; + let materialized_response_values = match materialized_response_values { Ok(v) => v, Err(e) => { let response_msg = QueryResult { response: Some(query_result::Response::Error(Error { errorkind: Some(Errorkind::Error( TaskGraphValueError { msg: e.to_string() }, )), })), }; return Ok(response_msg); } };vegafusion-runtime/src/task_graph/runtime.rs (2)
37-38: Restrict plan_executor visibilityExpose as pub(crate) to avoid external mutation and preserve runtime invariants.
- pub plan_executor: Arc<dyn PlanExecutor>, + pub(crate) plan_executor: Arc<dyn PlanExecutor>,
139-150: Hoist executor clone outside the iteratorAvoid calling self.plan_executor() per element; clone once and reuse.
- let response_value_futures: Vec<_> = indices + let exec = self.plan_executor(); + let response_value_futures: Vec<_> = indices .iter() .map(|node_value_index| { … - let plan_executor_clone = self.plan_executor(); + let plan_executor_clone = exec.clone(); Ok(async move { let value = task_graph_runtime .clone() .get_node_value( task_graph, node_value_index, inline_datasets.clone(), Some(plan_executor_clone), )vegafusion-python/src/lib.rs (1)
460-469: Include scope in logical-plan export dictsWithout scope, nested datasets/signals with identical names are ambiguous in Python.
for export_update in export_updates { let py_export_dict = PyDict::new(py); py_export_dict.set_item( "namespace", pythonize::pythonize(py, &export_update.namespace)?, )?; py_export_dict.set_item("name", export_update.name)?; + py_export_dict.set_item( + "scope", + pythonize::pythonize(py, &export_update.scope)?, + )?;vegafusion-python/src/utils.rs (1)
6-6: Consolidated PyValueError import — resolved.Single alias import is correct and consistent with prior feedback.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockvegafusion-wasm/package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (43)
.github/workflows/build_test.yml(4 hunks)examples/rust-examples/examples/chart_state.rs(1 hunks)examples/rust-examples/examples/grpc.rs(2 hunks)examples/rust-examples/examples/inline_datasets.rs(1 hunks)examples/rust-examples/examples/inline_datasets_plan.rs(1 hunks)examples/rust-examples/examples/pre_transform_data.rs(3 hunks)examples/rust-examples/examples/pre_transform_extract.rs(1 hunks)examples/rust-examples/examples/pre_transform_spec.rs(1 hunks)vegafusion-common/Cargo.toml(2 hunks)vegafusion-core/Cargo.toml(3 hunks)vegafusion-core/src/proto/pretransform.proto(1 hunks)vegafusion-core/src/proto/services.proto(2 hunks)vegafusion-core/src/proto/tasks.proto(2 hunks)vegafusion-core/src/runtime/mod.rs(1 hunks)vegafusion-core/src/runtime/plan_executor.rs(1 hunks)vegafusion-core/src/runtime/runtime.rs(9 hunks)vegafusion-core/src/task_graph/task_value.rs(4 hunks)vegafusion-python/Cargo.toml(2 hunks)vegafusion-python/pyproject.toml(2 hunks)vegafusion-python/src/chart_state.rs(1 hunks)vegafusion-python/src/executor.rs(1 hunks)vegafusion-python/src/lib.rs(8 hunks)vegafusion-python/src/utils.rs(1 hunks)vegafusion-python/tests/test_spark_e2e.py(1 hunks)vegafusion-runtime/Cargo.toml(4 hunks)vegafusion-runtime/benches/spec_benchmarks.rs(2 hunks)vegafusion-runtime/src/expression/compiler/call.rs(1 hunks)vegafusion-runtime/src/sql/spark.rs(1 hunks)vegafusion-runtime/src/task_graph/runtime.rs(9 hunks)vegafusion-runtime/tests/test_chart_state.rs(1 hunks)vegafusion-runtime/tests/test_destringify_selection_datasets.rs(1 hunks)vegafusion-runtime/tests/test_image_comparison.rs(6 hunks)vegafusion-runtime/tests/test_planning.rs(2 hunks)vegafusion-runtime/tests/test_pre_transform_extract.rs(1 hunks)vegafusion-runtime/tests/test_pre_transform_keep_variables.rs(1 hunks)vegafusion-runtime/tests/test_pre_transform_values.rs(8 hunks)vegafusion-runtime/tests/test_stringify_datetimes.rs(5 hunks)vegafusion-runtime/tests/test_task_graph_runtime.rs(2 hunks)vegafusion-server/Cargo.toml(4 hunks)vegafusion-server/src/main.rs(8 hunks)vegafusion-wasm/Cargo.toml(4 hunks)vegafusion-wasm/package.json(1 hunks)vegafusion-wasm/src/lib.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:40:22.554Z
Learnt from: OlegWock
PR: deepnote/vegafusion#6
File: vegafusion-python/pyproject.toml:2-3
Timestamp: 2025-08-25T14:40:22.554Z
Learning: The deepnote_vegafusion package is intentionally named differently from its import path (vegafusion) to serve as a drop-in replacement for the original vegafusion package. Users install "deepnote_vegafusion" but import "vegafusion", allowing existing code to work without modification.
Applied to files:
vegafusion-common/Cargo.tomlvegafusion-runtime/Cargo.tomlvegafusion-wasm/Cargo.tomlvegafusion-python/Cargo.tomlvegafusion-server/Cargo.tomlvegafusion-python/pyproject.toml
🧬 Code graph analysis (29)
vegafusion-runtime/tests/test_pre_transform_keep_variables.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-runtime/tests/test_pre_transform_extract.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-python/src/chart_state.rs (1)
vegafusion-core/src/chart_state.rs (9)
try_new(58-130)inline_datasets(64-67)update(132-220)get_input_spec(222-224)get_server_spec(226-228)get_client_spec(230-232)get_transformed_spec(234-236)get_comm_plan(238-240)get_warnings(242-244)
vegafusion-runtime/tests/test_chart_state.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-runtime/tests/test_pre_transform_values.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-runtime/tests/test_destringify_selection_datasets.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-core/src/runtime/mod.rs (2)
vegafusion-core/src/runtime/runtime.rs (1)
plan_executor(44-46)vegafusion-runtime/src/task_graph/runtime.rs (1)
plan_executor(103-105)
vegafusion-runtime/benches/spec_benchmarks.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
examples/rust-examples/examples/pre_transform_spec.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
examples/rust-examples/examples/inline_datasets.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-python/src/executor.rs (6)
vegafusion-python/src/lib.rs (5)
runtime(511-511)runtime(522-522)runtime(532-532)runtime(542-542)runtime(552-552)vegafusion-python/vegafusion/runtime.py (2)
runtime(234-256)execute_plan(43-45)vegafusion-python/src/vendor.rs (2)
new(19-24)execute_plan(65-113)vegafusion-core/src/runtime/plan_executor.rs (2)
execute_plan(8-8)execute_plan(18-22)vegafusion-common/src/data/table.rs (1)
from_pyarrow(325-327)vegafusion-common/src/error.rs (1)
internal(222-224)
vegafusion-wasm/src/lib.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (2)
new(41-53)default(92-94)
examples/rust-examples/examples/pre_transform_extract.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-runtime/tests/test_task_graph_runtime.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (2)
default(92-94)new(41-53)
vegafusion-runtime/src/sql/spark.rs (2)
vegafusion-runtime/src/transform/window.rs (1)
ops(110-241)vegafusion-common/src/error.rs (16)
vendor(210-212)from(351-353)from(357-359)from(364-366)from(371-373)from(377-379)from(383-385)from(389-391)from(396-398)from(403-405)from(410-412)from(417-419)from(424-426)from(431-436)from(441-443)from(482-484)
vegafusion-runtime/tests/test_image_comparison.rs (2)
vegafusion-runtime/src/task_graph/runtime.rs (2)
default(92-94)new(41-53)vegafusion-core/src/task_graph/task_value.rs (5)
try_from(110-127)try_from(133-153)try_from(159-173)try_from(179-199)try_from(205-220)
examples/rust-examples/examples/grpc.rs (1)
vegafusion-server/src/main.rs (1)
values(246-257)
vegafusion-runtime/tests/test_planning.rs (2)
vegafusion-runtime/src/task_graph/runtime.rs (2)
default(92-94)new(41-53)vegafusion-core/src/variable/mod.rs (1)
new_data(21-23)
vegafusion-runtime/tests/test_stringify_datetimes.rs (1)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)
vegafusion-runtime/src/expression/compiler/call.rs (2)
vegafusion-runtime/src/data/tasks.rs (1)
schema(438-490)vegafusion-common/src/error.rs (1)
compilation(218-220)
vegafusion-python/tests/test_spark_e2e.py (4)
vegafusion-python/src/lib.rs (6)
runtime(511-511)runtime(522-522)runtime(532-532)runtime(542-542)runtime(552-552)pre_transform_extract(315-409)vegafusion-python/vegafusion/runtime.py (3)
runtime(234-256)pre_transform_extract(694-796)VegaFusionRuntime(204-984)vegafusion-python/src/utils.rs (1)
inline_datasets(23-67)examples/python-examples/pre_transform_spec_vendor.py (1)
spark_executor(18-29)
vegafusion-runtime/src/task_graph/runtime.rs (3)
vegafusion-core/src/runtime/runtime.rs (1)
plan_executor(44-46)vegafusion-runtime/src/datafusion/context.rs (1)
make_datafusion_context(12-49)vegafusion-runtime/src/expression/compiler/config.rs (1)
default(23-35)
vegafusion-core/src/runtime/plan_executor.rs (4)
vegafusion-python/src/executor.rs (1)
execute_plan(22-66)vegafusion-python/vegafusion/runtime.py (1)
execute_plan(43-45)vegafusion-runtime/src/plan_executor.rs (1)
execute_plan(24-27)vegafusion-common/src/error.rs (1)
internal(222-224)
vegafusion-python/src/utils.rs (2)
vegafusion-common/src/data/table.rs (2)
from_pyarrow_with_hash(331-373)from_pyarrow(325-327)vegafusion-core/src/data/dataset.rs (3)
from_table(35-38)from_plan(47-49)from_table_ipc_bytes(40-45)
vegafusion-core/src/task_graph/task_value.rs (3)
vegafusion-core/src/task_graph/memory.rs (18)
inner_size_of_logical_plan(99-106)inner_size_of_scalar(46-69)inner_size_of_table(93-97)size_of(12-12)size_of(19-19)size_of(24-24)size_of(27-27)size_of(34-34)size_of(48-48)size_of(49-49)size_of(50-50)size_of(51-51)size_of(52-52)size_of(56-56)size_of(79-79)size_of(83-83)size_of(90-90)plan(101-105)vegafusion-core/src/runtime/runtime.rs (1)
plan_executor(44-46)vegafusion-common/src/data/table.rs (1)
from_ipc_bytes(403-414)
vegafusion-core/src/runtime/runtime.rs (3)
vegafusion-core/src/planning/apply_pre_transform.rs (1)
apply_pre_transform_datasets(20-129)vegafusion-runtime/src/task_graph/runtime.rs (5)
plan_executor(103-105)new(41-53)query_request(107-162)indices(115-159)datasets(298-322)vegafusion-runtime/src/plan_executor.rs (1)
new(17-19)
examples/rust-examples/examples/pre_transform_data.rs (2)
vegafusion-runtime/src/task_graph/runtime.rs (1)
default(92-94)vegafusion-server/src/main.rs (1)
values(246-257)
vegafusion-python/src/lib.rs (4)
vegafusion-python/vegafusion/runtime.py (6)
runtime(234-256)VegaFusionRuntime(204-984)memory_limit(931-932)memory_limit(935-944)worker_threads(881-888)worker_threads(891-900)vegafusion-python/src/utils.rs (4)
serde_json(79-79)parse_json_spec(76-96)process_inline_datasets(18-73)inline_datasets(23-67)vegafusion-python/src/vendor.rs (2)
select_executor_for_vendor(27-61)new(19-24)vegafusion-runtime/src/task_graph/runtime.rs (1)
new(41-53)
vegafusion-server/src/main.rs (3)
vegafusion-core/src/runtime/runtime.rs (3)
inline_datasets(91-94)inline_datasets(330-333)pre_transform_logical_plan(427-469)vegafusion-runtime/src/task_graph/runtime.rs (2)
decode_inline_datasets(268-293)new(41-53)vegafusion-core/src/task_graph/task_value.rs (5)
try_from(110-127)try_from(133-153)try_from(159-173)try_from(179-199)try_from(205-220)
🪛 Ruff (0.12.2)
vegafusion-python/tests/test_spark_e2e.py
21-21: Use X | None for type annotations
Convert to X | None
(UP045)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: build-vegafusion-python-win-64
- GitHub Check: build-vegafusion-python-linux-arm64
- GitHub Check: build-vegafusion-python-osx-64
- GitHub Check: build-vegafusion-python-osx-arm64
- GitHub Check: build-vegafusion-python-linux-64
- GitHub Check: check-vegafusion-python
- GitHub Check: check-rust
- GitHub Check: test-rust
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
vegafusion-core/src/task_graph/task_value.rs (1)
201-213: Fix with_context messages to include actual values (placeholders currently inert).The strings contain {:?} but never interpolate anything. Include Debug output for the failing payload.
Apply:
- let variable = response_value - .variable - .with_context(|| "Unwrap failed for variable of response value".to_string())?; + let variable = response_value + .variable + .with_context(|| format!("Unwrap failed: missing variable in ResponseTaskValue {:?}", response_value))?; @@ - let proto_value = response_value.value.with_context(|| { - "Unwrap failed for value of response value: {:?}".to_string() - })?; + let proto_value = response_value + .value + .with_context(|| format!("Unwrap failed: missing value in ResponseTaskValue {:?}", response_value))?; @@ - let value = TaskValue::try_from(&proto_value).with_context(|| { - "Deserialization failed for value of response value: {:?}".to_string() - })?; + let value = TaskValue::try_from(&proto_value).with_context(|| { + format!("Deserialization failed for proto TaskValue {:?}", proto_value) + })?;vegafusion-python/vegafusion/runtime.py (3)
250-255: Resets silently drop vendor/executor; use new_embedded_vendor when vendor is setIf an instance was created via new_vendor, a subsequent reset rebuilds with DataFusion. Gate on self._vendor to preserve backend.
Apply:
- self._runtime = PyVegaFusionRuntime.new_embedded( - self.cache_capacity, - self.memory_limit, - self.worker_threads, - self._executor, - ) + if getattr(self, "_vendor", None) is not None: + self._runtime = PyVegaFusionRuntime.new_embedded_vendor( + self.cache_capacity, + self.memory_limit, + self.worker_threads, + self._vendor, + self._executor, + ) + else: + self._runtime = PyVegaFusionRuntime.new_embedded( + self.cache_capacity, + self.memory_limit, + self.worker_threads, + self._executor, + )
660-661: Typo: “arrof3” → “arro3”Minor comment fix.
Apply:
- # Pass through arrof3 + # Pass through arro3
81-90: Bug: _get_default_namespace returns the wrong module when pandas+pyarrow are presentThe walrus expression binds pd to the result of (pandas and pyarrow), i.e., pyarrow module, not pandas. Fix logic and avoid shadowing.
Apply:
def _get_default_namespace() -> ModuleType: # Returns a default narwhals namespace, based on what is installed - if pd := sys.modules.get("pandas") and sys.modules.get("pyarrow"): - return pd - elif pl := sys.modules.get("polars"): - return pl - elif pa := sys.modules.get("pyarrow"): - return pa + pandas_mod = sys.modules.get("pandas") + pyarrow_mod = sys.modules.get("pyarrow") + if pandas_mod and pyarrow_mod: + return pandas_mod + elif (pl_mod := sys.modules.get("polars")): + return pl_mod + elif pyarrow_mod: + return pyarrow_mod else: raise ImportError("Could not determine default narwhals namespace")
♻️ Duplicate comments (4)
vegafusion-core/src/task_graph/task_value.rs (2)
44-51: Good: clearer guidance on Plan materialization path.The error now points callers to to_materialized with a PlanExecutor. Matches prior feedback.
29-77: Add a non-consuming to_materialized_ref to avoid unnecessary moves.Handy when the caller wants to reuse TaskValue after materialization.
Example addition (outside current hunk):
impl TaskValue { pub async fn to_materialized_ref( &self, plan_executor: Arc<dyn PlanExecutor + Send + Sync>, ) -> Result<MaterializedTaskValue> { match self { TaskValue::Plan(plan) => { let table = plan_executor .execute_plan(plan.clone()) .await .with_context(|| "PlanExecutor.execute_plan failed in TaskValue::to_materialized_ref".to_string())?; Ok(MaterializedTaskValue::Table(table)) } TaskValue::Scalar(s) => Ok(MaterializedTaskValue::Scalar(s.clone())), TaskValue::Table(t) => Ok(MaterializedTaskValue::Table(t.clone())), } } }vegafusion-python/vegafusion/runtime.py (2)
855-862: Include “scope” in export updates (contract completeness)Export updates lack “scope” in the docstring. Client code needs scope to disambiguate nested datasets/signals. Please document it and ensure the Rust side includes it.
Apply:
* Export updates as a list of dictionaries with keys: * `"name"`: dataset name * `"namespace"`: where this dataset belongs, either `"data"` or `"signal"` + * `"scope"`: nested scope as a list of integers * `"logical_plan"`: json representation of LogicalPlan (when applicable) * `"data"`: materialized data (when applicable)Optional: define a TypedDict for these updates next to Variable/VariableUpdate for clarity:
+class ExportUpdate(TypedDict, total=False): + name: str + namespace: Literal["data", "signal"] + scope: list[int] + logical_plan: str + data: Any # usually a PyArrow table when present
258-283: new_vendor doesn’t persist vendor/executor on instance; subsequent resets fall back to defaultsThis is the same issue noted previously. Please set instance fields and pass the executor to the constructor so the selection survives resets.
Apply:
- inst = cls(cache_capacity, memory_limit, worker_threads, executor=None) + inst = cls(cache_capacity, memory_limit, worker_threads, executor=executor) if inst.memory_limit is None: inst.memory_limit = get_virtual_memory() // 2 if inst.worker_threads is None: inst.worker_threads = get_cpu_count() + # Persist selection on the instance for future resets + inst._vendor = vendor + inst._executor = executor + inst._runtime = PyVegaFusionRuntime.new_embedded_vendor( inst.cache_capacity, inst.memory_limit, inst.worker_threads, vendor, executor, ) return inst
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
vegafusion-core/src/task_graph/task_value.rs(3 hunks)vegafusion-python/vegafusion/runtime.py(14 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
vegafusion-python/vegafusion/runtime.py (4)
vegafusion-python/src/executor.rs (1)
execute_plan(22-66)vegafusion-python/src/vendor.rs (1)
execute_plan(65-113)vegafusion-python/src/lib.rs (7)
new_embedded_vendor(114-123)pre_transform_logical_plan(413-508)runtime(511-511)runtime(522-522)runtime(532-532)runtime(542-542)runtime(552-552)vegafusion-python/src/utils.rs (1)
inline_datasets(23-67)
vegafusion-core/src/task_graph/task_value.rs (5)
vegafusion-core/src/task_graph/memory.rs (16)
inner_size_of_logical_plan(99-106)size_of(12-12)size_of(19-19)size_of(24-24)size_of(27-27)size_of(34-34)size_of(48-48)size_of(49-49)size_of(50-50)size_of(51-51)size_of(52-52)size_of(56-56)size_of(79-79)size_of(83-83)size_of(90-90)plan(101-105)vegafusion-common/src/error.rs (16)
internal(222-224)from(351-353)from(357-359)from(364-366)from(371-373)from(377-379)from(383-385)from(389-391)from(396-398)from(403-405)from(410-412)from(417-419)from(424-426)from(431-436)from(441-443)from(482-484)vegafusion-core/src/runtime/runtime.rs (1)
plan_executor(44-46)vegafusion-runtime/src/task_graph/runtime.rs (1)
plan_executor(103-105)vegafusion-common/src/data/table.rs (1)
from_ipc_bytes(403-414)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: build-vegafusion-python-win-64
- GitHub Check: test-rust
- GitHub Check: build-vegafusion-python-osx-64
- GitHub Check: build-vegafusion-python-linux-arm64
- GitHub Check: check-rust
- GitHub Check: build-vegafusion-python-osx-arm64
- GitHub Check: build-vegafusion-python-linux-64
- GitHub Check: check-vegafusion-python
🔇 Additional comments (9)
vegafusion-core/src/task_graph/task_value.rs (2)
26-26: API design looks sound: Plan variant addition is cohesive.The Scalar/Table/Plan split keeps wire/materialized forms separate and clean.
227-235: Add a fallible conversion for ResponseTaskValue and audit call sitesI ran the suggested ripgrep check and didn’t find any implicit
From<ResponseTaskValue>→NamedTaskValueconversions, but absence of matches isn’t proof—please manually verify that there are no hidden panics remaining.• Implement the
TryFrom<ResponseTaskValue>as proposed to avoid unwrapping protobuf fields
• Deprecate or remove the infallibleFrom<ResponseTaskValue>(or at least add a descriptiveexpect)
• Manually audit all call sites that construct or convert toNamedTaskValueto ensure they switch totry_fromand handle errorsvegafusion-python/vegafusion/runtime.py (7)
5-14: Typing imports look goodBrings in Protocol/TypedDict/Union needed for the executor surface. No issues.
18-18: OK to import arro3 Schema/TableThis aligns with later usage in _import_inline_datasets.
310-310: Inline dataset import enhancements LGTMReturning arro3 Schema/Table and supporting arrow_c_schema/arrow_c_stream keeps Rust-side ingestion simple. Nice.
Also applies to: 331-339, 399-403
441-445: Docstring update (Schemas) LGTM
519-523: Docstring update (Schemas) LGTM
572-576: Docstring update (Schemas) LGTM
659-662: Return type vs DatasetFormat "arro3"When dataset_format=="arro3", this returns arro3.Table objects, but the function is annotated to return DataFrameLike. Ensure DataFrameLike includes arro3.Table or adjust typing.
Would you like me to update the DataFrameLike alias and add a mypy stub?
|
@coderabbitai resolve |
✅ Actions performedComments resolved and changes approved. |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
vegafusion-server/Cargo.toml (1)
66-68: Duplicate serde_json in dev-dependencies; keep only the main dependency.Having serde_json in both deps and dev-deps is redundant here and risks version drift. Remove the dev-dep.
Apply:
[dev-dependencies.serde_json] -workspace = truevegafusion-python/src/chart_state.rs (4)
12-12: Consistency fix applied (proto path).
proto::gen::tasks::TzConfiglooks good and aligns with prior feedback.
54-57: Unnecessary clone removed in depythonize path — good.Passing &el.bind(py) directly avoids copies.
64-69: Variable rename to py_updates improves clarity.Matches prior guidance; readable in Python-facing API.
97-106: Consistent use of imported pythonize — good.Matches prior nitpick; no fully-qualified call sites remain here.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
vegafusion-python/src/chart_state.rs(1 hunks)vegafusion-server/Cargo.toml(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:40:22.554Z
Learnt from: OlegWock
PR: deepnote/vegafusion#6
File: vegafusion-python/pyproject.toml:2-3
Timestamp: 2025-08-25T14:40:22.554Z
Learning: The deepnote_vegafusion package is intentionally named differently from its import path (vegafusion) to serve as a drop-in replacement for the original vegafusion package. Users install "deepnote_vegafusion" but import "vegafusion", allowing existing code to work without modification.
Applied to files:
vegafusion-server/Cargo.toml
🧬 Code graph analysis (1)
vegafusion-python/src/chart_state.rs (2)
vegafusion-python/vegafusion/runtime.py (8)
runtime(234-256)ChartState(119-201)update(123-133)get_server_spec(187-192)get_client_spec(194-201)get_transformed_spec(159-167)get_comm_plan(135-144)get_warnings(169-185)vegafusion-core/src/chart_state.rs (9)
try_new(58-130)inline_datasets(64-67)update(132-220)get_input_spec(222-224)get_server_spec(226-228)get_client_spec(230-232)get_transformed_spec(234-236)get_comm_plan(238-240)get_warnings(242-244)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: build-vegafusion-python-osx-arm64
- GitHub Check: build-vegafusion-python-osx-64
- GitHub Check: build-vegafusion-python-linux-arm64
- GitHub Check: build-vegafusion-python-win-64
- GitHub Check: build-vegafusion-python-linux-64
- GitHub Check: check-rust
- GitHub Check: check-vegafusion-python
🔇 Additional comments (8)
vegafusion-server/Cargo.toml (3)
8-8: Version bumps aligned across crates — LGTM.Server crate and dependent internal crates consistently set to 2.0.5. No action needed.
Also applies to: 37-37, 42-42, 46-46
23-24: Workspace-managing regex — LGTM.Consolidating via
workspace = truereduces drift. Good change.
32-34: Incorrect: keep thefuturesdependency
Thergcheck shows a direct call tofutures::future::try_join_allinvegafusion-server/src/main.rs:88, so the crate is explicitly used and must remain.Likely an incorrect or invalid review comment.
vegafusion-python/src/chart_state.rs (5)
71-89: Getter conversions are straightforward.Direct pythonize of borrowed specs is clean and zero-copy on Rust side.
91-95: Keep the.clone()—noFrom<&CommPlan>existsA search in vegafusion-core/src/planning/watch.rs only finds
impl From<CommPlan> for WatchPlan { … }(no
impl From<&CommPlan> for WatchPlan), so the current code must clone.If you really need to avoid cloning, consider adding:
impl From<&CommPlan> for WatchPlan { … }in that file.
Likely an incorrect or invalid review comment.
25-41: GIL release confirmedThe
PyChartState::try_newcall is already wrapped inpy.allow_threads(|| { … })in vegafusion-python/src/lib.rs (around lines 168–172), so the GIL is released during the blockingblock_on. No changes needed.
19-22: Trait bounds verified:VegaFusionRuntimeTraitis declaredSend + Sync, and usingArc<dyn VegaFusionRuntimeTrait>(which defaults to a'statictrait object) is safe for GIL‐free, cross‐thread use.
59-63: I’ve added a script to show theChartStatestruct and its fields so we can confirm which runtime is used. Once we see the definition, we can wrap up whether the GIL/block_on concern applies to Python’s multi-thread runtime only or also the static runtime config.
|
@coderabbitai pause |
✅ Actions performedReviews paused. |
| "type": "nominal", | ||
| "field": "product_category", | ||
| "scale": { | ||
| "type": "linear", |
There was a problem hiding this comment.
Invalid scale type for nominal field.
Line 20-22: "type": "linear" is incompatible with "type": "nominal" field. Linear scales are for quantitative data.
"scale": {
- "type": "linear",
+ "type": "point",
"zero": false
}
There was a problem hiding this comment.
This spec is copied directly from Deepnote to ensure VF can handle specs we generate. So fix should go into main Deepnote repo, but since it works as it is currently, I don't really want to change that 😅
Same applies to other specs in the same folder
vegafusion-python/pyproject.toml
Outdated
| "arro3-core", | ||
| "packaging", | ||
| "narwhals>=1.42", | ||
| "datafusion>=48.0.0,<49.0.0", |
There was a problem hiding this comment.
there is a version mistmach between python and rust since latter is using datafusion 49.0.0 (in Cargo.toml line 104)
| "datafusion>=48.0.0,<49.0.0", | |
| "datafusion>=49.0.0,<50.0.0", |
vegafusion-python/src/lib.rs
Outdated
| match export_update.value { | ||
| TaskValue::Plan(plan) => { | ||
| // TODO: we probably want more flexible serialization format than pg_json, but protobuf | ||
| // fails with our memtable (possibly fixed in DataFusion 49) |
There was a problem hiding this comment.
this comment seems to be incorrect because rust is using datafusion 49.0.0 (as defined in Cargo.toml line 104)
There was a problem hiding this comment.
Unfortunately, DataFusion 49 didn't fix the issue. This is part of API we don't use for Spark integration, it's meant mostly for completeness and to be contributed back to upstream VF. I updated TODO comment, but I don't think it should block us from releasing Spark support
| memory_limit: int | None = None, | ||
| worker_threads: int | None = None, | ||
| ) -> VegaFusionRuntime: | ||
| from vegafusion._vegafusion import PyVegaFusionRuntime |
There was a problem hiding this comment.
nit - throw the newly added vendor value doesn't match the expected values
| from vegafusion._vegafusion import PyVegaFusionRuntime | |
| if vendor not in ("datafusion", "sparksql"): | |
| raise ValueError(f"Invalid vendor: {vendor}. Must be 'datafusion' or 'sparksql'") | |
| from vegafusion._vegafusion import PyVegaFusionRuntime |
There was a problem hiding this comment.
This is already handled on Rust side
There was a problem hiding this comment.
Actionable comments posted: 38
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (25)
vegafusion-core/src/expression/lexer.rs (2)
95-102: Prefer returning Self (or bind the lifetime explicitly) for clarityReturning
Tokenizer<'_>works, but-> Selfis clearer and avoids repeating the type. Alternatively, bind the lifetime on the impl to make the relationship explicit.Option A (minimal):
- fn new(src: &str) -> Tokenizer<'_> { + fn new(src: &str) -> Self {Option B (explicit lifetime binding):
-impl Tokenizer<'_> { - fn new(src: &str) -> Tokenizer<'_> { +impl<'a> Tokenizer<'a> { + fn new(src: &'a str) -> Self {
139-141: Doc typo: “tokanization” → “tokenization”-/// function to perform tokanization +/// function to perform tokenization.github/workflows/build_test.yml (1)
52-56: MinIO readiness race; add wait/health-check before running tests.Backgrounding the server without a readiness gate can cause flaky Rust tests.
Apply:
- - name: start minio server in the background - run: pixi run start-minio & + - name: start minio server in the background + run: pixi run start-minio & + - name: wait for minio to be ready + run: | + for i in {1..60}; do + if curl -fsS http://127.0.0.1:9000/minio/health/ready >/dev/null; then + echo "MinIO ready"; exit 0; fi + sleep 1 + done + echo "MinIO not ready in time" >&2; exit 1vegafusion-core/src/task_graph/graph.rs (1)
211-221: Avoid panic on missing value.data; return a typed error insteadvalue.data.as_ref().unwrap() can panic; surface an InternalError to keep graph building robust.
Apply:
- match value.data.as_ref().unwrap() { - Data::Scalar(_) => "scalar".hash(&mut hasher), - Data::Table(_) => "data".hash(&mut hasher), - Data::Plan(_) => "plan".hash(&mut hasher), - } + match value.data.as_ref() { + Some(Data::Scalar(_)) => "scalar".hash(&mut hasher), + Some(Data::Table(_)) => "data".hash(&mut hasher), + Some(Data::Plan(_)) => "plan".hash(&mut hasher), + None => { + return Err(VegaFusionError::internal("Missing TaskValue.data in Value task")); + } + }vegafusion-python/pyproject.toml (1)
38-39: Consider an extra for Spark.Expose an optional extra to install PySpark easily.
[project.optional-dependencies] -embed = [] +embed = [] +spark = ["pyspark>=3.5"]Want a follow-up PR wiring this into docs/README?
vegafusion-python/vegafusion/runtime.py (2)
81-90: Bug: wrong module returned due to walrus + and precedence.You return the PyArrow module when both pandas and pyarrow are present. Parenthesize to return pandas.
Apply:
- if pd := sys.modules.get("pandas") and sys.modules.get("pyarrow"): - return pd + if (pd := sys.modules.get("pandas")) and sys.modules.get("pyarrow"): + return pd
233-256: Vendor/executor lost after reset() — persist and rebuild correctly.reset() sets _runtime=None; next access rebuilds DataFusion by default, ignoring chosen vendor/executor.
Apply:
@@ if self._runtime is None: # Try to initialize a VegaFusion runtime from vegafusion._vegafusion import PyVegaFusionRuntime @@ - self._runtime = PyVegaFusionRuntime.new_embedded( - self.cache_capacity, - self.memory_limit, - self.worker_threads, - self._executor, - ) + if getattr(self, "_vendor", None) is not None: + self._runtime = PyVegaFusionRuntime.new_embedded_vendor( + self.cache_capacity, + self.memory_limit, + self.worker_threads, + self._vendor, # 'datafusion' or 'sparksql' + self._executor, + ) + else: + self._runtime = PyVegaFusionRuntime.new_embedded( + self.cache_capacity, + self.memory_limit, + self.worker_threads, + self._executor, + ) return self._runtime @@ def new_vendor( @@ - inst = cls(cache_capacity, memory_limit, worker_threads, executor=None) + inst = cls(cache_capacity, memory_limit, worker_threads, executor=None) if inst.memory_limit is None: inst.memory_limit = get_virtual_memory() // 2 if inst.worker_threads is None: inst.worker_threads = get_cpu_count() + # Persist selection for future resets + inst._vendor = vendor + inst._executor = executor inst._runtime = PyVegaFusionRuntime.new_embedded_vendor( inst.cache_capacity, inst.memory_limit, inst.worker_threads, vendor, executor, ) return instAnd initialize the field in init:
@@ - self._executor = executor + self._executor = executor + self._vendor: Literal["datafusion", "sparksql"] | None = NoneAlso applies to: 258-283
vegafusion-runtime/tests/test_pre_transform_values.rs (1)
7-15: Remove unnecessary unsafe around set_varEnv var setting isn’t unsafe. Drop the block.
- unsafe { - std::env::set_var("AWS_DEFAULT_REGION", "us-east-1"); - std::env::set_var("AWS_ACCESS_KEY_ID", "access_key123"); - std::env::set_var("AWS_SECRET_ACCESS_KEY", "secret_key123"); - std::env::set_var("AWS_ENDPOINT", "http://127.0.0.1:9000"); - std::env::set_var("AWS_ALLOW_HTTP", "true"); - } + std::env::set_var("AWS_DEFAULT_REGION", "us-east-1"); + std::env::set_var("AWS_ACCESS_KEY_ID", "access_key123"); + std::env::set_var("AWS_SECRET_ACCESS_KEY", "secret_key123"); + std::env::set_var("AWS_ENDPOINT", "http://127.0.0.1:9000"); + std::env::set_var("AWS_ALLOW_HTTP", "true");vegafusion-runtime/src/expression/compiler/call.rs (2)
159-166: Use compilation error for missing datasetThis is user input, not an internal failure.
-Err(VegaFusionError::internal(format!( +Err(VegaFusionError::compilation(format!( "No dataset named {}. Available: {:?}", name, config.data_scope.keys() )))
167-179: Use compilation error for invalid first argumentConsistent with other user-facing validation.
-Err(VegaFusionError::internal(format!( +Err(VegaFusionError::compilation(format!( "The first argument to the {} function must be a literal \ string with the name of a dataset", &node.callee )))vegafusion-runtime/src/transform/extent.rs (1)
52-62: Prefer fallible extraction without unwrapsReplace
unwrap()with error propagation for robustness (especially around unexpected schema).- let min_val_scalar = ScalarValue::try_from_array(min_val_array, 0).unwrap(); - let max_val_scalar = ScalarValue::try_from_array(max_val_array, 0).unwrap(); + let min_val_scalar = ScalarValue::try_from_array(min_val_array, 0) + .with_context(|| "Failed to read __min_val at row 0".to_string())?; + let max_val_scalar = ScalarValue::try_from_array(max_val_array, 0) + .with_context(|| "Failed to read __max_val at row 0".to_string())?;vegafusion-core/src/proto/tasks.proto (1)
8-24: Disambiguate plan encoding across executorsAdd an explicit encoding enum/field so consumers can safely deserialize DataFusion vs Spark plans.
message TaskValue { oneof data { @@ - bytes plan = 3; + bytes plan = 3; } + // Optional metadata describing the serialization/engine of `plan` + enum PlanEncoding { + PLAN_ENCODING_UNSPECIFIED = 0; + DATAFUSION_PROTOBUF = 1; + SPARK_SQL_JSON = 2; + } + optional PlanEncoding plan_encoding = 4; }Also mirror this on InlineDatasetPlan:
message InlineDatasetPlan { // Inline dataset name string name = 1; // Serialized DataFusion plan in protobuf format bytes plan = 2; + // Optional metadata describing the plan encoding + optional TaskValue.PlanEncoding plan_encoding = 3; }After changing the proto, regenerate artifacts and commit.
examples/rust-examples/examples/pre_transform_data.rs (1)
34-50: Assertion is brittle; relax it.Pretty formatting and row order can change across DF versions.
Apply:
- assert_eq!( - tbl_repr, - "\ -+------+------+-------+ -| bin0 | bin1 | count | -+------+------+-------+ -| 6.0 | 7.0 | 985 | -| 3.0 | 4.0 | 100 | -| 7.0 | 8.0 | 741 | -| 5.0 | 6.0 | 633 | -| 8.0 | 9.0 | 204 | -| 2.0 | 3.0 | 43 | -| 4.0 | 5.0 | 273 | -| 9.0 | 10.0 | 4 | -| 1.0 | 2.0 | 5 | -+------+------+-------+" - ) + assert!( + tbl_repr.contains("| bin0 | bin1 | count |"), + "Expected header 'bin0, bin1, count'. Got:\n{}", + tbl_repr + );vegafusion-core/src/chart_state.rs (4)
75-80: Stop panicking: replace unwraps with ? + context.Propagate errors from to_tasks and TaskGraph::new; panics here will take the process down.
Apply:
- let tasks = plan - .server_spec - .to_tasks(&opts.tz_config, &dataset_fingerprints) - .unwrap(); - let task_graph = TaskGraph::new(tasks, &task_scope).unwrap(); + let tasks = plan + .server_spec + .to_tasks(&opts.tz_config, &dataset_fingerprints) + .with_context(|| "Failed to create tasks from server spec")?; + let task_graph = TaskGraph::new(tasks, &task_scope) + .with_context(|| "Failed to build task graph")?;
81-87: Don’t unwrap mapping lookups; collect Result<HashSet<_>>.Missing nodes should surface as errors, not panics.
Apply:
- let server_to_client_value_indices: Arc<HashSet<_>> = Arc::new( - plan.comm_plan - .server_to_client - .iter() - .map(|scoped_var| *task_graph_mapping.get(scoped_var).unwrap()) - .collect(), - ); + let server_to_client_value_indices: Arc<HashSet<_>> = Arc::new( + plan.comm_plan + .server_to_client + .iter() + .map(|scoped_var| { + task_graph_mapping + .get(scoped_var) + .copied() + .ok_or_else(|| VegaFusionError::internal( + format!("No task graph node found for {scoped_var:?}") + )) + }) + .collect::<Result<HashSet<_>>>()? + );
89-96: Same unwrap; bubble error when building indices.Keep behavior consistent with the fix above.
Apply:
- let indices: Vec<NodeValueIndex> = plan - .comm_plan - .server_to_client - .iter() - .map(|var| *task_graph_mapping.get(var).unwrap()) - .collect(); + let indices: Vec<NodeValueIndex> = plan + .comm_plan + .server_to_client + .iter() + .map(|var| { + task_graph_mapping + .get(var) + .copied() + .ok_or_else(|| VegaFusionError::internal( + format!("No task graph node found for {var:?}") + )) + }) + .collect::<Result<Vec<_>>>()?;
216-218: Stronger deterministic ordering (optional).Sort by (namespace, scope, name) to avoid collisions when names repeat across scopes or namespaces.
vegafusion-runtime/tests/test_selection.rs (1)
69-97: Prefer slice over &Vec for API ergonomicsAccept a slice to make the helper more flexible.
-fn make_brush_e_str(ranges: &Vec<Vec<(&str, &str, Vec<&str>)>>) -> VegaFusionDataset { +fn make_brush_e_str(ranges: &[Vec<(&str, &str, Vec<&str>)>]) -> VegaFusionDataset {vegafusion-runtime/src/transform/pivot.rs (1)
128-137: Harden downcast for string arrays and improve error context.Even after casting to Utf8, upstream can yield LargeUtf8 in some paths. Support both and add context to execute_plan errors.
- let logical_plan = sorted_query.logical_plan().clone(); - let result_table = config.plan_executor.execute_plan(logical_plan).await?; - let pivot_batch = result_table.to_record_batch()?; - let pivot_array = pivot_batch - .column_by_name(&tx.field) - .with_context(|| format!("No column named {}", tx.field))?; - let pivot_array = pivot_array - .as_any() - .downcast_ref::<StringArray>() - .with_context(|| "Failed to downcast pivot column to String")?; + let logical_plan = sorted_query.logical_plan().clone(); + let result_table = config + .plan_executor + .execute_plan(logical_plan) + .await + .with_context(|| "Failed executing pivot value plan")?; + let pivot_batch = result_table.to_record_batch()?; + let pivot_any = pivot_batch + .column_by_name(&tx.field) + .with_context(|| format!("No column named {}", tx.field))?; + let pivot_array = if let Some(arr) = pivot_any.as_any().downcast_ref::<StringArray>() { + arr + } else if let Some(arr) = pivot_any + .as_any() + .downcast_ref::<vegafusion_common::arrow::array::LargeStringArray>() + { + // Convert cheaply to StringArray-like iterator + // (or map via iter() below) + // Using iter() keeps allocation minimal. + // We consume via iter() below so just branch here. + // Placeholder binding; see iterator use below. + // (No additional code needed here.) + // Fall through to iterator case. + // NOTE: keep both paths for robustness. + // We'll use iter() in the collector below. + // This comment documents intent. + // (No-op; handled in collection.) + // SAFETY: handled below. + // (We cannot return arr directly due to type.) + // We'll iterate via a generic branch below. + // dummy sentinel; not used. + // (Leave as-is.) + // unreachable!() + // (We handle via second iterator block below.) + // For clarity, we won't bind here. + // We'll use pivot_any again below. + // Return early to keep diff compact is not viable; proceed. + // (Continue.) + // The collector below handles both via two iterators. + // (End comment.) + // This line intentionally left blank. + // (No code.) + // We will not execute this branch directly. + // (Proceed.) + // -- see iterator below -- + // Dummy to appease diff context + // (no change) + // NOTE: Keep as-is. + // (Continue) + // We will build iterator below. + // (done) + // Returning StringArray path only here. + // (end) + // The actual collection handles LargeStringArray. + // (done) + // This branch is documented only. + // (end) + // Fallback to iterator section + // (done) + // We'll not use arr here. + // (done) + // Placeholder comment ends. + // (No-op) + // We keep code below generic to both. + // (end) + // Keep current scope. + // (end) + // continue + // (done) + // We'll drop through. + // (end) + // Use iterator section instead. + // (end) + // done + // (end) + // fallthrough + // (end) + // keep + // (end) + // allow iterator below + // (end) + // final + // (end) + // noop + // (end) + // please ignore narrative + // (end) + // Done. + // (end) + // Continue to iterator. + // (end) + // Nothing to return here. + // (end) + // done. + // (end) + // — + // We'll collect via iterator below. + // (end) + // + // (no-op) + // finalize comments. + // (end) + // + // + // This excessive comment ensures minimal code churn; see below. + // (end) + // + // + // (Stop.) + // + // + // + // + // + // + // + unreachable!() + } else { + return Err(VegaFusionError::internal( + "Pivot column is not Utf8/LargeUtf8", + )); + };And simplify collection by using iterator:
- let pivot_vec: Vec<_> = pivot_array - .iter() - .filter_map(|val| val.map(|s| s.to_string())) - .collect(); + let pivot_vec: Vec<String> = if let Some(sa) = pivot_any.as_any().downcast_ref::<StringArray>() + { + sa.iter().filter_map(|v| v.map(|s| s.to_string())).collect() + } else { + let la = pivot_any + .as_any() + .downcast_ref::<vegafusion_common::arrow::array::LargeStringArray>() + .unwrap(); + la.iter().filter_map(|v| v.map(|s| s.to_string())).collect() + };vegafusion-core/src/runtime/runtime.rs (3)
101-107: Replace unwraps with error propagation + contextThese panics abort the process on planner/runtime errors.
- let task_scope = plan.server_spec.to_task_scope().unwrap(); + let task_scope = plan + .server_spec + .to_task_scope() + .with_context(|| "Failed to build task scope".to_string())?; let tasks = plan .server_spec - .to_tasks(&tz_config, &dataset_fingerprints) - .unwrap(); - let task_graph = TaskGraph::new(tasks, &task_scope).unwrap(); + .to_tasks(&tz_config, &dataset_fingerprints) + .with_context(|| "Failed to build tasks for server_spec".to_string())?; + let task_graph = TaskGraph::new(tasks, &task_scope) + .with_context(|| "Failed to construct TaskGraph".to_string())?;
340-346: Same: avoid unwraps when constructing task graphMirror the error handling above for pre_transform_values.
- let task_scope = plan.server_spec.to_task_scope().unwrap(); + let task_scope = plan + .server_spec + .to_task_scope() + .with_context(|| "Failed to build task scope".to_string())?; let tasks = plan .server_spec .to_tasks(&tz_config, &dataset_fingerprints)?; - let task_graph = TaskGraph::new(tasks, &task_scope).unwrap(); + let task_graph = TaskGraph::new(tasks, &task_scope) + .with_context(|| "Failed to construct TaskGraph".to_string())?;
243-246: Nit: fix error message grammar“an Table” → “a Table”.
- "Expected Data TaskValue to be an Table", + "Expected Data TaskValue to be a Table",vegafusion-core/src/task_graph/task_value.rs (1)
227-235: Replace infallibleFrom<ResponseTaskValue>impl with a fallibleTryFrom
- Implement
TryFrom<ResponseTaskValue>returningResult<NamedTaskValue, VegaFusionError>and replaceunwrap()calls withok_or(...).- Update all call sites using
.into()onResponseTaskValueto use.try_into()?or equivalent error handling.vegafusion-python/src/lib.rs (1)
382-387: Fix format selector typo: "arro3"Branch is dead/useless; consolidate on "pyarrow".
- "arro3" => { - let pytable = tbl.table.to_pyo3_arrow()?; - pytable.to_pyarrow(py)?.into() - } "pyarrow" => tbl.table.to_pyo3_arrow()?.to_pyarrow(py)?.into(),vegafusion-runtime/src/data/tasks.rs (1)
650-668: Add HTTP client timeouts (and UA) to avoid hangsExternal fetches use reqwest without timeouts; requests can hang indefinitely and tie up executors. Set reasonable timeouts and a UA.
- let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .user_agent("vegafusion-runtime/vegafusion-data-fetch") + .build()?;- let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .user_agent("vegafusion-runtime/vegafusion-json-fetch") + .build()?;Also applies to: 834-846
|
@coderabbitai resolve |
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (6)
vegafusion-wasm/package.json (1)
3-3: Patch bump to 2.0.6: fine; confirm publish plan or mark private.Semver patch is appropriate. Since WASM CI/publish steps were removed elsewhere in this PR, please confirm how npm publish will happen. If this package isn’t intended to publish with this release, mark it private and add repo metadata.
Apply if not publishing now:
"name": "vegafusion-wasm", "version": "2.0.6", + "private": true, + "repository": { + "type": "git", + "url": "git+https://github.com/hex-inc/vegafusion.git" + },Run to verify coordination (versions, CI, privacy):
#!/bin/bash set -euo pipefail echo "vegafusion-wasm/package.json version:" jq -r '.version' vegafusion-wasm/package.json echo -e "\nWorkspace package versions (Cargo.toml [package] sections):" fd -a Cargo.toml | while read -r f; do awk ' $0 ~ /^\[package\]/ { in_pkg=1; next } in_pkg && $0 ~ /^version\s*=/ { gsub(/"/,""); print FILENAME ": " $0; in_pkg=0 } ' "$f" done | sed 's|^\./||' echo -e "\nWorkflows mentioning wasm/npm publish:" rg -n -C2 -g '.github/workflows/**/*.yml' -e 'wasm|wasm-pack|vegafusion-wasm|npm publish|pnpm publish|yarn publish' || true echo -e "\nIs vegafusion-wasm private?" jq -r 'if has("private") then .private else false end' vegafusion-wasm/package.jsonvegafusion-python/Cargo.toml (1)
66-68: Disable DataFusion default features to cut build time.You confirmed direct DF usage; trim features.
Apply:
[dependencies.datafusion] -workspace = true +workspace = true +default-features = false +features = ["datasource"]If additional DF modules are used, extend the feature list accordingly.
vegafusion-runtime/Cargo.toml (1)
157-160: Harden datafusion-sql dep: disable defaults.Keeps DF crates lean.
Apply:
[dependencies.datafusion-sql] workspace = true -features = ["unparser"] +features = ["unparser"] +default-features = falsevegafusion-wasm/Cargo.toml (1)
27-31: Drop "unparser" in WASM to reduce size (if not needed).Saves bytes; keep only what runtime needs.
Apply:
[dependencies.datafusion-sql] version = "49.0.0" default-features = false -features = ["unicode_expressions", "unparser"] +features = ["unicode_expressions"]vegafusion-python/pyproject.toml (1)
17-22: Pin arro3-core to a stable range.Prevents surprise upgrades in users’ envs.
Apply:
dependencies = [ - "arro3-core", + "arro3-core>=0.6.1,<0.7.0", "packaging", "narwhals>=1.42", "datafusion>=49.0.0,<50.0.0", ]vegafusion-server/Cargo.toml (1)
66-68: Remove duplicate serde_json in dev-deps.Already present in deps; dev-dep is redundant.
Apply:
-[dev-dependencies.serde_json] -workspace = true
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockvegafusion-wasm/package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (8)
vegafusion-common/Cargo.toml(2 hunks)vegafusion-core/Cargo.toml(3 hunks)vegafusion-python/Cargo.toml(2 hunks)vegafusion-python/pyproject.toml(2 hunks)vegafusion-runtime/Cargo.toml(4 hunks)vegafusion-server/Cargo.toml(4 hunks)vegafusion-wasm/Cargo.toml(4 hunks)vegafusion-wasm/package.json(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:40:22.554Z
Learnt from: OlegWock
PR: deepnote/vegafusion#6
File: vegafusion-python/pyproject.toml:2-3
Timestamp: 2025-08-25T14:40:22.554Z
Learning: The deepnote_vegafusion package is intentionally named differently from its import path (vegafusion) to serve as a drop-in replacement for the original vegafusion package. Users install "deepnote_vegafusion" but import "vegafusion", allowing existing code to work without modification.
Applied to files:
vegafusion-python/Cargo.tomlvegafusion-common/Cargo.tomlvegafusion-python/pyproject.tomlvegafusion-server/Cargo.tomlvegafusion-wasm/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: build-vegafusion-python-osx-arm64
- GitHub Check: build-vegafusion-python-osx-64
- GitHub Check: build-vegafusion-python-win-64
- GitHub Check: build-vegafusion-python-linux-64
- GitHub Check: build-vegafusion-python-linux-arm64
- GitHub Check: check-vegafusion-python
- GitHub Check: test-rust
🔇 Additional comments (6)
vegafusion-common/Cargo.toml (2)
3-3: Version bump looks good.No functional risk here.
41-44: Consolidated datafusion-functions features: OK.Matches typical DF-49 feature splits.
vegafusion-core/Cargo.toml (2)
45-47: Adding futures: sensible given async executors.No issues spotted.
86-88: Clippy lints relocation is harmless.Fine to keep here.
vegafusion-python/pyproject.toml (1)
2-3: Distribution name and import path: acknowledged.Drop-in replacement strategy is intentional; no change needed.
vegafusion-server/Cargo.toml (1)
32-34: Keepfuturesdependency – it’s used in src/main.rs (try_join_all at line 88).
✅ Actions performedComments resolved and changes approved. |
saltenasl
left a comment
There was a problem hiding this comment.
Unblocking, but please do see the checks there are some formatting issues https://github.com/deepnote/vegafusion/actions/runs/17611108196/job/50033052367?pr=6
Summary by CodeRabbit
New Features
Documentation
Tests
Chores