diff --git a/examples/python-examples/custom_executor.py b/examples/python-examples/custom_executor.py deleted file mode 100644 index 5b59765a..00000000 --- a/examples/python-examples/custom_executor.py +++ /dev/null @@ -1,184 +0,0 @@ -import vegafusion as vf -import pyarrow as pa - - -def simple_logging_executor(logical_plan_json: str) -> pa.Table: - """A minimal executor that logs and throws unimplemented error.""" - # TODO: Update signature once we pass logical plan as proper structure instead of string - print("📋 Custom executor received logical plan:") - print(f" Plan length: {len(logical_plan_json)} characters") - print(f" Plan preview: {logical_plan_json[:100]}...") - - raise NotImplementedError("Custom executor is not implemented yet!") - - -# This example shows the simplest possible custom executor that logs -# received plans and throws an unimplemented error. -def main(): - """Demonstrate the minimal custom executor.""" - - spec = get_spec() - - schema = pa.schema( - [ - pa.field("Title", pa.string()), - pa.field("US Gross", pa.int64()), - pa.field("Worldwide Gross", pa.int64()), - pa.field("US DVD Sales", pa.int64()), - pa.field("Production Budget", pa.int64()), - pa.field("Release Date", pa.date32()), - pa.field("MPAA Rating", pa.string()), - pa.field("Running Time min", pa.int32()), - pa.field("Distributor", pa.string()), - pa.field("Source", pa.string()), - pa.field("Major Genre", pa.string()), - pa.field("Creative Type", pa.string()), - pa.field("Director", pa.string()), - pa.field("Rotten Tomatoes Rating", pa.int8()), - pa.field("IMDB Rating", pa.float32()), - pa.field("IMDB Votes", pa.int64()), - ] - ) - - print("Testing custom executor with VegaFusion...") - - # Create a DataFusion runtime that uses a custom Python executor - runtime = vf.VegaFusionRuntime( - executor=simple_logging_executor, - ) - - try: - runtime.pre_transform_spec( - spec=spec, - local_tz="UTC", - inline_datasets={"movies": schema}, - ) - except ValueError as e: - print(f"✅ Expected error caught: {e}") - - print("Done!") - - -def get_spec(): - """ - Based on https://vega.github.io/editor/#/examples/vega/histogram-null-values - """ - return { - "$schema": "https://vega.github.io/schema/vega/v5.json", - "description": "A histogram of film ratings, modified to include null values.", - "width": 400, - "height": 200, - "padding": 5, - "autosize": {"type": "fit", "resize": True}, - "signals": [ - { - "name": "maxbins", - "value": 10, - "bind": {"input": "select", "options": [5, 10, 20]}, - }, - {"name": "binCount", "update": "(bins.stop - bins.start) / bins.step"}, - {"name": "nullGap", "value": 10}, - {"name": "barStep", "update": "(width - nullGap) / (1 + binCount)"}, - ], - "data": [ - { - "name": "table", - "url": "vegafusion+dataset://movies", - "transform": [ - {"type": "extent", "field": "IMDB Rating", "signal": "extent"}, - { - "type": "bin", - "signal": "bins", - "field": "IMDB Rating", - "extent": {"signal": "extent"}, - "maxbins": {"signal": "maxbins"}, - }, - ], - }, - { - "name": "counts", - "source": "table", - "transform": [ - {"type": "filter", "expr": "datum['IMDB Rating'] != null"}, - {"type": "aggregate", "groupby": ["bin0", "bin1"]}, - ], - }, - { - "name": "nulls", - "source": "table", - "transform": [ - {"type": "filter", "expr": "datum['IMDB Rating'] == null"}, - {"type": "aggregate", "groupby": []}, - ], - }, - ], - "scales": [ - { - "name": "yscale", - "type": "linear", - "range": "height", - "round": True, - "nice": True, - "domain": { - "fields": [ - {"data": "counts", "field": "count"}, - {"data": "nulls", "field": "count"}, - ] - }, - }, - { - "name": "xscale", - "type": "linear", - "range": [{"signal": "barStep + nullGap"}, {"signal": "width"}], - "round": True, - "domain": {"signal": "[bins.start, bins.stop]"}, - "bins": {"signal": "bins"}, - }, - { - "name": "xscale-null", - "type": "band", - "range": [0, {"signal": "barStep"}], - "round": True, - "domain": [None], - }, - ], - "axes": [ - {"orient": "bottom", "scale": "xscale", "tickMinStep": 0.5}, - {"orient": "bottom", "scale": "xscale-null"}, - {"orient": "left", "scale": "yscale", "tickCount": 5, "offset": 5}, - ], - "marks": [ - { - "type": "rect", - "from": {"data": "counts"}, - "encode": { - "update": { - "x": {"scale": "xscale", "field": "bin0", "offset": 1}, - "x2": {"scale": "xscale", "field": "bin1"}, - "y": {"scale": "yscale", "field": "count"}, - "y2": {"scale": "yscale", "value": 0}, - "fill": {"value": "steelblue"}, - }, - "hover": {"fill": {"value": "firebrick"}}, - }, - }, - { - "type": "rect", - "from": {"data": "nulls"}, - "encode": { - "update": { - "x": {"scale": "xscale-null", "value": None, "offset": 1}, - "x2": {"scale": "xscale-null", "band": 1}, - "y": {"scale": "yscale", "field": "count"}, - "y2": {"scale": "yscale", "value": 0}, - "fill": {"value": "#aaa"}, - }, - "hover": {"fill": {"value": "firebrick"}}, - }, - }, - ], - } - - -if __name__ == "__main__": - main() diff --git a/examples/python-examples/pre_transform_logical_plan.py b/examples/python-examples/pre_transform_logical_plan.py deleted file mode 100644 index 439f0f80..00000000 --- a/examples/python-examples/pre_transform_logical_plan.py +++ /dev/null @@ -1,194 +0,0 @@ -import json -from typing import Any - -import vegafusion as vf -import pyarrow as pa - - -def main(): - spec = get_spec() - schema = pa.schema( - [ - pa.field("Title", pa.string()), - pa.field("US Gross", pa.int64()), - pa.field("Worldwide Gross", pa.int64()), - pa.field("US DVD Sales", pa.int64()), - pa.field("Production Budget", pa.int64()), - pa.field("Release Date", pa.date32()), - pa.field("MPAA Rating", pa.string()), - pa.field("Running Time min", pa.int32()), - pa.field("Distributor", pa.string()), - pa.field("Source", pa.string()), - pa.field("Major Genre", pa.string()), - pa.field("Creative Type", pa.string()), - pa.field("Director", pa.string()), - pa.field("Rotten Tomatoes Rating", pa.int8()), - pa.field("IMDB Rating", pa.float32()), - pa.field("IMDB Votes", pa.int64()), - ] - ) - transformed_spec, data, warnings = vf.runtime.pre_transform_logical_plan( - spec, - inline_datasets={"movies": schema}, - ) - for dataset in data: - print("Dataset", dataset["name"]) - print(dataset.get("logical_plan")) - print(dataset.get("data")) - print("=" * 50) - - -def get_spec() -> dict[str, Any]: - """ - Based on https://vega.github.io/editor/#/examples/vega/histogram-null-values - """ - spec_str = """ -{ - "$schema": "https://vega.github.io/schema/vega/v5.json", - "description": "A histogram of film ratings, modified to include null values.", - "width": 400, - "height": 200, - "padding": 5, - "autosize": {"type": "fit", "resize": true}, - - "signals": [ - { - "name": "maxbins", "value": 10 - }, - { - "name": "binCount", - "update": "(bins.stop - bins.start) / bins.step" - }, - { - "name": "nullGap", "value": 10 - }, - { - "name": "barStep", - "update": "(width - nullGap) / (1 + binCount)" - } - ], - - "data": [ - { - "name": "table", - "url": "vegafusion+dataset://movies", - "transform": [ - { - "type": "extent", "field": "IMDB Rating", - "signal": "extent" - }, - { - "type": "bin", "signal": "bins", - "field": "IMDB Rating", "extent": {"signal": "extent"}, - "maxbins": 10 - } - ] - }, - { - "name": "counts", - "source": "table", - "transform": [ - { - "type": "filter", - "expr": "datum['IMDB Rating'] != null" - }, - { - "type": "aggregate", - "groupby": ["bin0", "bin1"] - } - ] - }, - { - "name": "nulls", - "source": "table", - "transform": [ - { - "type": "filter", - "expr": "datum['IMDB Rating'] == null" - }, - { - "type": "aggregate", - "groupby": [] - } - ] - } - ], - - "scales": [ - { - "name": "yscale", - "type": "linear", - "range": "height", - "round": true, "nice": true, - "domain": { - "fields": [ - {"data": "counts", "field": "count"}, - {"data": "nulls", "field": "count"} - ] - } - }, - { - "name": "xscale", - "type": "linear", - "range": [{"signal": "barStep + nullGap"}, {"signal": "width"}], - "round": true, - "domain": {"signal": "[bins.start, bins.stop]"}, - "bins": {"signal": "bins"} - }, - { - "name": "xscale-null", - "type": "band", - "range": [0, {"signal": "barStep"}], - "round": true, - "domain": [null] - } - ], - - "axes": [ - {"orient": "bottom", "scale": "xscale", "tickMinStep": 0.5}, - {"orient": "bottom", "scale": "xscale-null"}, - {"orient": "left", "scale": "yscale", "tickCount": 5, "offset": 5} - ], - - "marks": [ - { - "type": "rect", - "from": {"data": "counts"}, - "encode": { - "update": { - "x": {"scale": "xscale", "field": "bin0", "offset": 1}, - "x2": {"scale": "xscale", "field": "bin1"}, - "y": {"scale": "yscale", "field": "count"}, - "y2": {"scale": "yscale", "value": 0}, - "fill": {"value": "steelblue"} - }, - "hover": { - "fill": {"value": "firebrick"} - } - } - }, - { - "type": "rect", - "from": {"data": "nulls"}, - "encode": { - "update": { - "x": {"scale": "xscale-null", "value": null, "offset": 1}, - "x2": {"scale": "xscale-null", "band": 1}, - "y": {"scale": "yscale", "field": "count"}, - "y2": {"scale": "yscale", "value": 0}, - "fill": {"value": "#aaa"} - }, - "hover": { - "fill": {"value": "firebrick"} - } - } - } - ] -} - - """ - return json.loads(spec_str) - - -if __name__ == "__main__": - main() diff --git a/vegafusion-core/src/proto/pretransform.proto b/vegafusion-core/src/proto/pretransform.proto index c82d3109..78b4a5fc 100644 --- a/vegafusion-core/src/proto/pretransform.proto +++ b/vegafusion-core/src/proto/pretransform.proto @@ -130,20 +130,8 @@ message ExportUpdate { tasks.TaskValue value = 4; } -message PreTransformLogicalPlanRequest { - string spec = 1; - repeated tasks.InlineDataset inline_datasets = 2; - PreTransformLogicalPlanOpts opts = 3; -} - message PreTransformLogicalPlanWarning { oneof warning_type { PlannerWarning planner = 1; } } - -message PreTransformLogicalPlanResponse { - string spec = 1; - repeated ExportUpdate export_updates = 2; - repeated PreTransformLogicalPlanWarning warnings = 3; -} \ No newline at end of file diff --git a/vegafusion-core/src/proto/services.proto b/vegafusion-core/src/proto/services.proto index 20d46176..20599ba5 100644 --- a/vegafusion-core/src/proto/services.proto +++ b/vegafusion-core/src/proto/services.proto @@ -10,7 +10,6 @@ service VegaFusionRuntime { rpc PreTransformSpec(pretransform.PreTransformSpecRequest) returns (PreTransformSpecResult) {} rpc PreTransformValues(pretransform.PreTransformValuesRequest) returns (PreTransformValuesResult) {} rpc PreTransformExtract(pretransform.PreTransformExtractRequest) returns (PreTransformExtractResult) {} - rpc PreTransformLogicalPlan(pretransform.PreTransformLogicalPlanRequest) returns (PreTransformLogicalPlanResult) {} } message QueryRequest { @@ -45,11 +44,4 @@ message PreTransformExtractResult { errors.Error error = 1; pretransform.PreTransformExtractResponse response = 2; } -} - -message PreTransformLogicalPlanResult { - oneof result { - errors.Error error = 1; - pretransform.PreTransformLogicalPlanResponse response = 2; - } } \ No newline at end of file diff --git a/vegafusion-python/src/executor.rs b/vegafusion-python/src/executor.rs deleted file mode 100644 index ed2ed7d4..00000000 --- a/vegafusion-python/src/executor.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use pyo3::types::PyString; -use vegafusion_common::{data::table::VegaFusionTable, datafusion_expr::LogicalPlan}; -use vegafusion_core::runtime::PlanExecutor; - -pub struct PythonPlanExecutor { - python_executor: PyObject, -} - -impl PythonPlanExecutor { - fn new(python_executor: PyObject) -> Self { - Self { python_executor } - } -} - -#[async_trait] -impl PlanExecutor for PythonPlanExecutor { - async fn execute_plan( - &self, - plan: LogicalPlan, - ) -> vegafusion_common::error::Result { - let plan_str = plan.display_pg_json().to_string(); - - let python_executor = &self.python_executor; - let result = tokio::task::spawn_blocking({ - let python_executor = Python::with_gil(|py| python_executor.clone_ref(py)); - let plan_str = plan_str.clone(); - - move || { - Python::with_gil(|py| -> PyResult { - let plan_py = PyString::new(py, &plan_str); - - let table_result = if python_executor.bind(py).is_callable() { - python_executor.call1(py, (plan_py,)) - } else if python_executor.bind(py).hasattr("execute_plan")? { - let execute_plan_method = - python_executor.bind(py).getattr("execute_plan")?; - execute_plan_method - .call1((plan_py,)) - .map(|result| result.into()) - } else { - return Err(PyValueError::new_err( - "Executor must be callable or have an execute_plan method", - )); - }?; - - VegaFusionTable::from_pyarrow(py, &table_result.bind(py)) - }) - } - }) - .await; - - match result { - Ok(Ok(table)) => Ok(table), - Ok(Err(py_err)) => Err(vegafusion_common::error::VegaFusionError::internal( - format!("Python executor error: {}", py_err), - )), - Err(join_err) => Err(vegafusion_common::error::VegaFusionError::internal( - format!("Failed to execute Python executor: {}", join_err), - )), - } - } -} - -/// Helper function to convert a Python object to a PlanExecutor -/// Accepts either: -/// - A callable that takes a logical plan string and returns an Arrow table -/// - An object with execute_plan method that has the same signature -pub fn python_object_to_executor( - python_obj: Option, -) -> PyResult>> { - match python_obj { - Some(obj) => { - Python::with_gil(|py| -> PyResult>> { - let obj_ref = obj.bind(py); - - // Validate that the object is either callable or has execute_plan method - if obj_ref.is_callable() || obj_ref.hasattr("execute_plan")? { - Ok(Some(Arc::new(PythonPlanExecutor::new(obj)))) - } else { - Err(PyValueError::new_err( - "Executor must be callable or have an execute_plan method", - )) - } - }) - } - None => Ok(None), - } -} diff --git a/vegafusion-python/src/lib.rs b/vegafusion-python/src/lib.rs index 217711a7..be08d07b 100644 --- a/vegafusion-python/src/lib.rs +++ b/vegafusion-python/src/lib.rs @@ -1,12 +1,11 @@ mod chart_state; -mod executor; mod utils; mod vendor; use lazy_static::lazy_static; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use pyo3::types::{PyBytes, PyDict, PyList, PyString, PyTuple}; +use pyo3::types::{PyBytes, PyDict, PyList, PyTuple}; use std::str::FromStr; use std::sync::{Arc, Once}; use tokio::runtime::Runtime; @@ -14,11 +13,9 @@ use tonic::transport::{Channel, Uri}; use vegafusion_core::error::{ToExternalError, VegaFusionError}; use vegafusion_core::proto::gen::pretransform::pre_transform_extract_warning::WarningType as ExtractWarningType; -use vegafusion_core::proto::gen::pretransform::pre_transform_logical_plan_warning::WarningType as LogicalPlanWarningType; use vegafusion_core::proto::gen::pretransform::pre_transform_values_warning::WarningType as ValueWarningType; use vegafusion_core::proto::gen::pretransform::{ - PreTransformExtractOpts, PreTransformLogicalPlanOpts, PreTransformSpecOpts, - PreTransformValuesOpts, PreTransformVariable, + PreTransformExtractOpts, PreTransformSpecOpts, PreTransformValuesOpts, PreTransformVariable, }; use vegafusion_core::proto::gen::tasks::{TzConfig, Variable}; use vegafusion_runtime::task_graph::GrpcVegaFusionRuntime; @@ -32,14 +29,12 @@ use vegafusion_core::planning::projection_pushdown::get_column_usage as rs_get_c use vegafusion_core::planning::watch::WatchPlan; use vegafusion_core::task_graph::graph::ScopedVariable; -use vegafusion_core::task_graph::task_value::{MaterializedTaskValue, TaskValue}; +use vegafusion_core::task_graph::task_value::MaterializedTaskValue; use vegafusion_runtime::tokio_runtime::TOKIO_THREAD_STACK_SIZE; use vegafusion_core::runtime::{PlanExecutor, VegaFusionRuntimeTrait}; use vegafusion_runtime::task_graph::cache::VegaFusionCache; -use vegafusion_common::data::scalar::ScalarValueHelpers; - use crate::chart_state::PyChartState; use crate::utils::{parse_json_spec, process_inline_datasets}; use crate::vendor::select_executor_for_vendor; @@ -98,15 +93,13 @@ impl PyVegaFusionRuntime { #[pymethods] impl PyVegaFusionRuntime { #[staticmethod] - #[pyo3(signature = (max_capacity=None, memory_limit=None, worker_threads=None, executor=None))] + #[pyo3(signature = (max_capacity=None, memory_limit=None, worker_threads=None))] pub fn new_embedded( max_capacity: Option, memory_limit: Option, worker_threads: Option, - executor: Option, ) -> PyResult { - let rust_executor = select_executor_for_vendor(None, executor)?; - Self::build_with_executor(max_capacity, memory_limit, worker_threads, rust_executor) + Self::build_with_executor(max_capacity, memory_limit, worker_threads, None) } #[staticmethod] @@ -408,106 +401,6 @@ impl PyVegaFusionRuntime { }) } - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (spec, local_tz, default_input_tz=None, preserve_interactivity=None, inline_datasets=None, keep_signals=None, keep_datasets=None))] - pub fn pre_transform_logical_plan( - &self, - py: Python, - spec: PyObject, - local_tz: String, - default_input_tz: Option, - preserve_interactivity: Option, - inline_datasets: Option<&Bound>, - keep_signals: Option)>>, - keep_datasets: Option)>>, - ) -> PyResult<(PyObject, PyObject, PyObject)> { - let spec = parse_json_spec(spec)?; - let preserve_interactivity = preserve_interactivity.unwrap_or(true); - - let inline_datasets = process_inline_datasets(inline_datasets)?; - - let mut keep_variables: Vec = Vec::new(); - for (name, scope) in keep_signals.unwrap_or_default() { - keep_variables.push(PreTransformVariable { - variable: Some(Variable::new_signal(&name)), - scope, - }); - } - for (name, scope) in keep_datasets.unwrap_or_default() { - keep_variables.push(PreTransformVariable { - variable: Some(Variable::new_data(&name)), - scope, - }); - } - - let (client_spec, export_updates, warnings) = py.allow_threads(|| { - self.tokio_runtime - .block_on(self.runtime.pre_transform_logical_plan( - &spec, - inline_datasets, - &PreTransformLogicalPlanOpts { - local_tz, - default_input_tz, - preserve_interactivity, - keep_variables, - }, - )) - })?; - - Python::with_gil(|py| -> PyResult<(PyObject, PyObject, PyObject)> { - let py_spec = pythonize::pythonize(py, &client_spec)?; - - let py_export_list = PyList::empty(py); - for export_update in export_updates { - let py_export_dict = PyDict::new(py); - py_export_dict.set_item( - "namespace", - pythonize::pythonize(py, &export_update.namespace)?, - )?; - py_export_dict.set_item("name", export_update.name)?; - - match export_update.value { - TaskValue::Plan(plan) => { - // TODO: we probably want more flexible serialization format than pg_json, but protobuf - // fails with our memtable, we're likely need to provide custom serializator - // and deserializator for it? Or it's a bug in DataFusion - let lp_str = plan.display_pg_json().to_string(); - py_export_dict.set_item("logical_plan", PyString::new(py, &lp_str))?; - } - TaskValue::Table(table) => { - // Convert table to PyArrow - let pytable = table.to_pyo3_arrow()?.to_pyarrow(py)?; - py_export_dict.set_item("data", pytable)?; - } - TaskValue::Scalar(scalar) => { - // Convert scalar to JSON value - let json_value = scalar.to_json()?; - let py_json_value = pythonize::pythonize(py, &json_value)?; - py_export_dict.set_item("data", py_json_value)?; - } - } - - py_export_list.append(py_export_dict)?; - } - - let warnings: Vec<_> = warnings - .iter() - .map(|warning| match warning.warning_type.as_ref().unwrap() { - LogicalPlanWarningType::Planner(planner_warning) => { - PreTransformSpecWarningSpec { - typ: "Planner".to_string(), - message: planner_warning.message.clone(), - } - } - }) - .collect(); - - let py_warnings = pythonize::pythonize(py, &warnings)?; - - Ok((py_spec.into(), py_export_list.into(), py_warnings.into())) - }) - } - pub fn clear_cache(&self) -> PyResult<()> { if let Some(runtime) = self.runtime.as_any().downcast_ref::() { self.tokio_runtime.block_on(runtime.clear_cache()); diff --git a/vegafusion-python/src/vendor.rs b/vegafusion-python/src/vendor.rs index 2feb8d52..f3728d66 100644 --- a/vegafusion-python/src/vendor.rs +++ b/vegafusion-python/src/vendor.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use crate::executor::python_object_to_executor; use async_trait::async_trait; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; @@ -50,8 +49,15 @@ pub fn select_executor_for_vendor( Ok(Some(Arc::new(SparkSqlPlanExecutor::new(py_exec)))) } Some("datafusion") | Some("") | None => { - // Fall back to default DataFusion if no executor; otherwise wrap Python executor - python_object_to_executor(executor) + // For DataFusion we don't support passing custom executor from Python (due to issues with + // serialization and deserialization of LogicalPlan). We always use default runtime executor + // by passing None + if executor.is_some() { + return Err(PyValueError::new_err( + "Custom executors are not supported for the default DataFusion runtime. Remove executor parameter or use different vendor.", + )); + } + Ok(None) } Some(other) => Err(PyValueError::new_err(format!( "Unsupported vendor: '{}'. Supported vendors: 'datafusion', 'sparksql'", diff --git a/vegafusion-python/vegafusion/runtime.py b/vegafusion-python/vegafusion/runtime.py index 03c4d1e5..760561aa 100644 --- a/vegafusion-python/vegafusion/runtime.py +++ b/vegafusion-python/vegafusion/runtime.py @@ -207,7 +207,6 @@ def __init__( cache_capacity: int = 64, memory_limit: int | None = None, worker_threads: int | None = None, - executor: PlanExecutor | None = None, ) -> None: """ Initialize a VegaFusionRuntime. @@ -216,19 +215,12 @@ def __init__( cache_capacity: Cache capacity. memory_limit: Memory limit. worker_threads: Number of worker threads. - executor: Optional custom executor for logical plan execution. - Can be either: - - A callable that takes a logical plan JSON string and - returns a PyArrow Table - - An object with an execute_plan method that has the same signature - - None to use the default DataFusion executor """ self._runtime = None self._grpc_url: str | None = None self._cache_capacity = cache_capacity self._memory_limit = memory_limit self._worker_threads = worker_threads - self._executor = executor @property def runtime(self) -> PyVegaFusionRuntime: @@ -251,7 +243,6 @@ def runtime(self) -> PyVegaFusionRuntime: self.cache_capacity, self.memory_limit, self.worker_threads, - self._executor, ) return self._runtime @@ -266,7 +257,7 @@ def new_vendor( ) -> VegaFusionRuntime: from vegafusion._vegafusion import PyVegaFusionRuntime - inst = cls(cache_capacity, memory_limit, worker_threads, executor=None) + inst = cls(cache_capacity, memory_limit, worker_threads) if inst.memory_limit is None: inst.memory_limit = get_virtual_memory() // 2 if inst.worker_threads is None: @@ -795,88 +786,6 @@ def pre_transform_extract( return new_spec, datasets, warnings - def pre_transform_logical_plan( - self, - spec: dict[str, Any] | str, - local_tz: str | None = None, - default_input_tz: str | None = None, - preserve_interactivity: bool = True, - inline_datasets: dict[str, Any] | None = None, - keep_signals: list[str | tuple[str, list[int]]] | None = None, - keep_datasets: list[str | tuple[str, list[int]]] | None = None, - ) -> tuple[dict[str, Any], list[dict[str, Any]], list[PreTransformWarning]]: - """ - Evaluate supported transforms in an input Vega specification into - logical plans. - - This method is similar to pre_transform_datasets, but for inline - datasets defined as schema, it will return logical plan instead - or applying transformations directly on data. - - Args: - spec: A Vega specification dict or JSON string. - local_tz: Name of timezone to be considered local. E.g. 'America/New_York'. - Defaults to the value of vf.get_local_tz(), which defaults to the system - timezone if one can be determined. - default_input_tz: Name of timezone (e.g. 'America/New_York') that naive - datetime strings should be interpreted in. Defaults to `local_tz`. - preserve_interactivity: If True (default) then the interactive behavior of - the chart will be preserved. This requires that all the data that - participates in interactions be included in the resulting spec rather - than being pre-transformed. If False, then all possible data - transformations are applied even if they break the original interactive - behavior of the chart. - inline_datasets: A dict from dataset names to pandas DataFrames, pyarrow - Tables, or pyarrow Schemas. Inline datasets may be referenced by the - input specification using the following url syntax - 'vegafusion+dataset://{dataset_name}' or 'table://{dataset_name}'. - - keep_signals: Signals from the input spec that must be included in the - pre-transformed spec, even if they are no longer referenced. - A list with elements that are either: - - * The name of a top-level signal as a string - * A two-element tuple where the first element is the name of a signal - as a string and the second element is the nested scope of the dataset - as a list of integers - keep_datasets: Datasets from the input spec that must be included in the - pre-transformed spec even if they are no longer referenced. - A list with elements that are either: - - * The name of a top-level dataset as a string - * A two-element tuple where the first element is the name of a dataset - as a string and the second element is the nested scope of the dataset - as a list of integers - - Returns: - tuple[dict[str, Any], list[dict[str, Any]], list[PreTransformWarning]]: - Three-element tuple of - - * The Vega specification as a dict with pre-transformed datasets - included but left empty. - * Export updates as a list of dictionaries with keys: - * `"name"`: dataset name - * `"namespace"`: where this dataset belongs, either `"data"` or `"signal"` - * `"logical_plan"`: json representation of LogicalPlan (when applicable) - * `"data"`: materialized data (when applicable) - * A list of warnings as dictionaries. Each warning dict has a ``'type'`` - key indicating the warning type, and a ``'message'`` key containing - a description of the warning. - """ - local_tz = local_tz or get_local_tz() - - new_spec, export_updates, warnings = self.runtime.pre_transform_logical_plan( - spec, - local_tz, - default_input_tz=default_input_tz, - preserve_interactivity=preserve_interactivity, - inline_datasets=inline_datasets or {}, - keep_signals=parse_variables(keep_signals), - keep_datasets=parse_variables(keep_datasets), - ) - - return new_spec, export_updates, warnings - @property def worker_threads(self) -> int | None: """ diff --git a/vegafusion-server/src/main.rs b/vegafusion-server/src/main.rs index c615da75..fd05b5bf 100644 --- a/vegafusion-server/src/main.rs +++ b/vegafusion-server/src/main.rs @@ -8,10 +8,9 @@ use vegafusion_core::proto::gen::services::vega_fusion_runtime_server::{ VegaFusionRuntimeServer as TonicVegaFusionRuntimeServer, }; use vegafusion_core::proto::gen::services::{ - pre_transform_extract_result, pre_transform_logical_plan_result, pre_transform_spec_result, - pre_transform_values_result, query_request, query_result, PreTransformExtractResult, - PreTransformLogicalPlanResult, PreTransformSpecResult, PreTransformValuesResult, QueryRequest, - QueryResult, + pre_transform_extract_result, pre_transform_spec_result, pre_transform_values_result, + query_request, query_result, PreTransformExtractResult, PreTransformSpecResult, + PreTransformValuesResult, QueryRequest, QueryResult, }; use vegafusion_core::proto::gen::tasks::TaskGraphValueResponse; use vegafusion_core::proto::gen::tasks::{ @@ -25,11 +24,9 @@ use vegafusion_runtime::task_graph::runtime::{decode_inline_datasets, VegaFusion use clap::Parser; use regex::Regex; use vegafusion_core::proto::gen::pretransform::{ - ExportUpdate, PreTransformExtractDataset, PreTransformExtractRequest, - PreTransformExtractResponse, PreTransformLogicalPlanOpts, PreTransformLogicalPlanRequest, - PreTransformLogicalPlanResponse, PreTransformSpecOpts, PreTransformSpecRequest, - PreTransformSpecResponse, PreTransformValuesOpts, PreTransformValuesRequest, - PreTransformValuesResponse, + PreTransformExtractDataset, PreTransformExtractRequest, PreTransformExtractResponse, + PreTransformSpecOpts, PreTransformSpecRequest, PreTransformSpecResponse, + PreTransformValuesOpts, PreTransformValuesRequest, PreTransformValuesResponse, }; use vegafusion_runtime::task_graph::cache::VegaFusionCache; use vegafusion_runtime::tokio_runtime::TOKIO_THREAD_STACK_SIZE; @@ -268,63 +265,6 @@ impl VegaFusionRuntimeGrpc { Ok(result) } - - async fn pre_transform_logical_plan_request( - &self, - request: PreTransformLogicalPlanRequest, - ) -> Result { - let opts = request.opts.unwrap_or_else(|| PreTransformLogicalPlanOpts { - local_tz: "UTC".to_string(), - default_input_tz: None, - preserve_interactivity: true, - keep_variables: vec![], - }); - - let inline_datasets = - decode_inline_datasets(request.inline_datasets, self.runtime.ctx.as_ref()).await?; - - let spec: ChartSpec = serde_json::from_str(&request.spec)?; - - let (transformed_spec, export_updates, warnings) = self - .runtime - .pre_transform_logical_plan(&spec, inline_datasets, &opts) - .await?; - - let proto_export_updates = export_updates - .into_iter() - .map(|export_update| { - let namespace = match export_update.namespace { - vegafusion_core::planning::watch::ExportUpdateNamespace::Signal => { - "signal".to_string() - } - vegafusion_core::planning::watch::ExportUpdateNamespace::Data => { - "data".to_string() - } - }; - Ok(ExportUpdate { - namespace, - name: export_update.name, - scope: export_update.scope, - value: Some(vegafusion_core::proto::gen::tasks::TaskValue::try_from( - &export_update.value, - )?), - }) - }) - .collect::, VegaFusionError>>()?; - - let response = PreTransformLogicalPlanResult { - result: Some(pre_transform_logical_plan_result::Result::Response( - PreTransformLogicalPlanResponse { - spec: serde_json::to_string(&transformed_spec) - .with_context(|| "Failed to convert chart spec to string")?, - export_updates: proto_export_updates, - warnings, - }, - )), - }; - - Ok(response) - } } #[tonic::async_trait] @@ -376,19 +316,6 @@ impl TonicVegaFusionRuntime for VegaFusionRuntimeGrpc { Err(err) => Err(Status::unknown(err.to_string())), } } - - async fn pre_transform_logical_plan( - &self, - request: Request, - ) -> Result, Status> { - let result = self - .pre_transform_logical_plan_request(request.into_inner()) - .await; - match result { - Ok(result) => Ok(Response::new(result)), - Err(err) => Err(Status::unknown(err.to_string())), - } - } } /// VegaFusion Server