diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index f82cb6760404..d365ae55904a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -174,6 +174,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { if (rel.hasCommon && rel.getCommon.hasPlanId) { plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId) + // scalastyle:off println + + println() + println("Planner get a plan:") + println(s"$plan") + println() } plan } diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py b/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py index 175404ff750c..3d7f86228191 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_combine.py @@ -27,12 +27,6 @@ class FrameParityCombineTests(FrameCombineMixin, PandasOnSparkTestUtils, ReusedC def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_append(self): - super().test_append() - if __name__ == "__main__": from pyspark.pandas.tests.connect.computation.test_parity_combine import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py index 88eeb735d464..e2b92190b6e2 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py @@ -27,22 +27,10 @@ class FrameParityComputeTests(FrameComputeMixin, PandasOnSparkTestUtils, ReusedC def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_diff(self): - super().test_diff() - @unittest.skip("Spark Connect does not support RDD but the tests depend on them.") def test_mode(self): super().test_mode() - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_pct_change(self): - super().test_pct_change() - @unittest.skip("TODO(SPARK-43618): Fix pyspark.sq.column._unary_op to work with Spark Connect.") def test_rank(self): super().test_rank() diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py b/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py index 8015d90aaa5b..e14d296749c0 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py @@ -29,54 +29,6 @@ class FrameParityCumulativeTests( def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummax(self): - super().test_cummax() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummax_multiindex_columns(self): - super().test_cummax_multiindex_columns() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummin(self): - super().test_cummin() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummin_multiindex_columns(self): - super().test_cummin_multiindex_columns() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumprod(self): - super().test_cumprod() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumprod_multiindex_columns(self): - super().test_cumprod_multiindex_columns() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumsum(self): - super().test_cumsum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumsum_multiindex_columns(self): - super().test_cumsum_multiindex_columns() - if __name__ == "__main__": from pyspark.pandas.tests.connect.computation.test_parity_cumulative import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_missing_data.py b/python/pyspark/pandas/tests/connect/computation/test_parity_missing_data.py index a88c8692eca4..d2ff09e5e8a5 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_missing_data.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_missing_data.py @@ -29,36 +29,6 @@ class FrameParityMissingDataTests( def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_backfill(self): - super().test_backfill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_bfill(self): - super().test_bfill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_ffill(self): - super().test_ffill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_fillna(self): - return super().test_fillna() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_pad(self): - super().test_pad() - if __name__ == "__main__": from pyspark.pandas.tests.connect.computation.test_parity_missing_data import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_pivot.py b/python/pyspark/pandas/tests/connect/computation/test_parity_pivot.py index d2c4f9ae6071..0d2adba0295d 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_pivot.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_pivot.py @@ -27,18 +27,6 @@ class FrameParityPivotTests(FramePivotMixin, PandasOnSparkTestUtils, ReusedConne def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_pivot_table(self): - super().test_pivot_table() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_pivot_table_dtypes(self): - super().test_pivot_table_dtypes() - if __name__ == "__main__": from pyspark.pandas.tests.connect.computation.test_parity_pivot import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_basic_slow.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_basic_slow.py index 926caf569796..ef84a8931d30 100644 --- a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_basic_slow.py +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_basic_slow.py @@ -24,23 +24,7 @@ class DiffFramesParityBasicSlowTests( DiffFramesBasicSlowMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_diff(self): - super().test_diff() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rank(self): - super().test_rank() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_shift(self): - super().test_shift() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py b/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py index 98ebf3ca44a0..72be826d0d80 100644 --- a/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py +++ b/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py @@ -27,12 +27,6 @@ class FrameParityReshapingTests(FrameReshapingMixin, PandasOnSparkTestUtils, Reu def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_transpose(self): - super().test_transpose() - if __name__ == "__main__": from pyspark.pandas.tests.connect.frame.test_parity_reshaping import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_time_series.py b/python/pyspark/pandas/tests/connect/frame/test_parity_time_series.py index ae289edbc858..6b8a93f895e6 100644 --- a/python/pyspark/pandas/tests/connect/frame/test_parity_time_series.py +++ b/python/pyspark/pandas/tests/connect/frame/test_parity_time_series.py @@ -29,12 +29,6 @@ class FrameParityTimeSeriesTests( def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_shift(self): - super().test_shift() - if __name__ == "__main__": from pyspark.pandas.tests.connect.frame.test_parity_time_series import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/groupby/test_parity_cumulative.py b/python/pyspark/pandas/tests/connect/groupby/test_parity_cumulative.py index aaa799bc9964..696c283b648a 100644 --- a/python/pyspark/pandas/tests/connect/groupby/test_parity_cumulative.py +++ b/python/pyspark/pandas/tests/connect/groupby/test_parity_cumulative.py @@ -24,35 +24,7 @@ class GroupbyParityCumulativeTests( GroupbyCumulativeMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumcount(self): - super().test_cumcount() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummax(self): - super().test_cummax() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummin(self): - super().test_cummin() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumprod(self): - super().test_cumprod() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumsum(self): - super().test_cumsum() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/groupby/test_parity_groupby.py b/python/pyspark/pandas/tests/connect/groupby/test_parity_groupby.py index 4e9f5108fd95..8293652b476d 100644 --- a/python/pyspark/pandas/tests/connect/groupby/test_parity_groupby.py +++ b/python/pyspark/pandas/tests/connect/groupby/test_parity_groupby.py @@ -24,23 +24,7 @@ class GroupByParityTests( GroupByTestsMixin, PandasOnSparkTestUtils, TestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_shift(self): - super().test_shift() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_diff(self): - super().test_diff() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rank(self): - super().test_rank() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/groupby/test_parity_missing_data.py b/python/pyspark/pandas/tests/connect/groupby/test_parity_missing_data.py index 1ca101ef5452..752e8568fbd5 100644 --- a/python/pyspark/pandas/tests/connect/groupby/test_parity_missing_data.py +++ b/python/pyspark/pandas/tests/connect/groupby/test_parity_missing_data.py @@ -24,23 +24,7 @@ class GroupbyParityMissingDataTests( GroupbyMissingDataMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_bfill(self): - super().test_bfill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_ffill(self): - super().test_ffill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_fillna(self): - super().test_fillna() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py index b1e185389f32..3cf4dc9b3d22 100644 --- a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py +++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py @@ -29,18 +29,6 @@ class IndexesParityTests( def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_append(self): - super().test_append() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_monotonic(self): - super().test_monotonic() - @unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.") def test_factorize(self): super().test_factorize() diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_reset_index.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_reset_index.py index 6647d76735ba..c19460946d15 100644 --- a/python/pyspark/pandas/tests/connect/indexes/test_parity_reset_index.py +++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_reset_index.py @@ -29,12 +29,6 @@ class FrameParityResetIndexTests( def psdf(self): return ps.from_pandas(self.pdf) - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_reset_index_with_default_index_types(self): - super().test_reset_index_with_default_index_types() - if __name__ == "__main__": from pyspark.pandas.tests.connect.indexes.test_parity_reset_index import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py b/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py index b3df55cb68e9..bd17521dd840 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_arg_ops.py @@ -22,11 +22,7 @@ class SeriesParityArgOpsTests(SeriesArgOpsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_argsort(self): - super().test_argsort() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_compute.py b/python/pyspark/pandas/tests/connect/series/test_parity_compute.py index 00e35b27e8fe..65dfedf55bfe 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_compute.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_compute.py @@ -22,28 +22,10 @@ class SeriesParityComputeTests(SeriesComputeMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_diff(self): - super().test_diff() - @unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.") def test_factorize(self): super().test_factorize() - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_shift(self): - super().test_shift() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_unstack(self): - super().test_unstack() - if __name__ == "__main__": from pyspark.pandas.tests.connect.series.test_parity_compute import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_cumulative.py b/python/pyspark/pandas/tests/connect/series/test_parity_cumulative.py index f7cd03e057ad..a4978dca6faa 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_cumulative.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_cumulative.py @@ -24,29 +24,7 @@ class SeriesParityCumulativeTests( SeriesCumulativeMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummax(self): - super().test_cummax() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummin(self): - super().test_cummin() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumprod(self): - super().test_cumprod() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumsum(self): - super().test_cumsum() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_index.py b/python/pyspark/pandas/tests/connect/series/test_parity_index.py index 81da3e44d6db..06b8051e2561 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_index.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_index.py @@ -22,11 +22,7 @@ class SeriesParityIndexTests(SeriesIndexMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_reset_index_with_default_index_types(self): - super().test_reset_index_with_default_index_types() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_missing_data.py b/python/pyspark/pandas/tests/connect/series/test_parity_missing_data.py index e648173289cb..e756b2e887d4 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_missing_data.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_missing_data.py @@ -24,41 +24,7 @@ class SeriesParityMissingDataTests( SeriesMissingDataMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_backfill(self): - super().test_backfill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_bfill(self): - super().test_bfill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_ffill(self): - super().test_ffill() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_fillna(self): - super().test_fillna() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_pad(self): - super().test_pad() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_replace(self): - super().test_replace() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_stat.py b/python/pyspark/pandas/tests/connect/series/test_parity_stat.py index 17e83fa3b479..d9ec3f4addf4 100644 --- a/python/pyspark/pandas/tests/connect/series/test_parity_stat.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_stat.py @@ -22,12 +22,6 @@ class SeriesParityStatTests(SeriesStatMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_pct_change(self): - super().test_pct_change() - @unittest.skip("TODO(SPARK-43618): Fix pyspark.sq.column._unary_op to work with Spark Connect.") def test_rank(self): super().test_rank() diff --git a/python/pyspark/pandas/tests/connect/test_parity_categorical.py b/python/pyspark/pandas/tests/connect/test_parity_categorical.py index 3e05eb2c0f3b..210cfce8ddba 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_categorical.py +++ b/python/pyspark/pandas/tests/connect/test_parity_categorical.py @@ -53,12 +53,6 @@ def test_reorder_categories(self): def test_set_categories(self): super().test_set_categories() - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_unstack(self): - super().test_unstack() - if __name__ == "__main__": from pyspark.pandas.tests.connect.test_parity_categorical import * # noqa: F401 diff --git a/python/pyspark/pandas/tests/connect/test_parity_default_index.py b/python/pyspark/pandas/tests/connect/test_parity_default_index.py index c5410e6dd584..1e95fac9285e 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_default_index.py +++ b/python/pyspark/pandas/tests/connect/test_parity_default_index.py @@ -24,12 +24,6 @@ class DefaultIndexParityTests( DefaultIndexTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_default_index_sequence(self): - super().test_default_index_sequence() - @unittest.skip( "TODO(SPARK-43623): Enable DefaultIndexParityTests.test_index_distributed_sequence_cleanup." ) diff --git a/python/pyspark/pandas/tests/connect/test_parity_ewm.py b/python/pyspark/pandas/tests/connect/test_parity_ewm.py index e079f8472962..748728203337 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_ewm.py +++ b/python/pyspark/pandas/tests/connect/test_parity_ewm.py @@ -22,17 +22,7 @@ class EWMParityTests(EWMTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase, TestUtils): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_ewm_mean(self): - super().test_ewm_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_ewm_func(self): - super().test_groupby_ewm_func() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_expanding.py b/python/pyspark/pandas/tests/connect/test_parity_expanding.py index a6f2cf9bc3ce..7f8b1a3cac2f 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_expanding.py +++ b/python/pyspark/pandas/tests/connect/test_parity_expanding.py @@ -24,125 +24,7 @@ class ExpandingParityTests( ExpandingTestsMixin, PandasOnSparkTestUtils, TestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_count(self): - super().test_expanding_count() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_kurt(self): - super().test_expanding_kurt() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_max(self): - super().test_expanding_max() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_mean(self): - super().test_expanding_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_min(self): - super().test_expanding_min() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_quantile(self): - super().test_expanding_quantile() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_skew(self): - super().test_expanding_skew() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_std(self): - super().test_expanding_std() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_sum(self): - super().test_expanding_sum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_expanding_var(self): - super().test_expanding_var() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_count(self): - super().test_groupby_expanding_count() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_kurt(self): - super().test_groupby_expanding_kurt() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_max(self): - super().test_groupby_expanding_max() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_mean(self): - super().test_groupby_expanding_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_min(self): - super().test_groupby_expanding_min() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_quantile(self): - super().test_groupby_expanding_quantile() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_skew(self): - super().test_groupby_expanding_skew() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_std(self): - super().test_groupby_expanding_std() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_sum(self): - super().test_groupby_expanding_sum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_var(self): - super().test_groupby_expanding_var() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_generic_functions.py b/python/pyspark/pandas/tests/connect/test_parity_generic_functions.py index 1bf2650d8742..158215073ad9 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_generic_functions.py +++ b/python/pyspark/pandas/tests/connect/test_parity_generic_functions.py @@ -24,11 +24,7 @@ class GenericFunctionsParityTests( GenericFunctionsTestsMixin, TestUtils, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_interpolate(self): - super().test_interpolate() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_namespace.py b/python/pyspark/pandas/tests/connect/test_parity_namespace.py index 72f638ca23c6..db7f62fdbd5a 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_namespace.py +++ b/python/pyspark/pandas/tests/connect/test_parity_namespace.py @@ -22,18 +22,6 @@ class NamespaceParityTests(NamespaceTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_concat_index_axis(self): - super().test_concat_index_axis() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_concat_multiindex_sort(self): - super().test_concat_multiindex_sort() - @unittest.skip("TODO(SPARK-43655): Enable NamespaceParityTests.test_get_index_map.") def test_get_index_map(self): super().test_get_index_map() diff --git a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby.py b/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby.py index 5d6b6a80b9bd..685ec5c45c5f 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby.py +++ b/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby.py @@ -24,53 +24,7 @@ class OpsOnDiffFramesGroupByParityTests( OpsOnDiffFramesGroupByTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumcount(self): - super().test_cumcount() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummax(self): - super().test_cummax() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cummin(self): - super().test_cummin() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumprod(self): - super().test_cumprod() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_cumsum(self): - super().test_cumsum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_diff(self): - super().test_diff() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_fillna(self): - super().test_fillna() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_shift(self): - super().test_shift() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py b/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py index 90fa36f3b98a..c373268cdb23 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py +++ b/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py @@ -29,47 +29,7 @@ class OpsOnDiffFramesGroupByExpandingParityTests( TestUtils, ReusedConnectTestCase, ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_count(self): - super().test_groupby_expanding_count() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_min(self): - super().test_groupby_expanding_min() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_max(self): - super().test_groupby_expanding_max() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_mean(self): - super().test_groupby_expanding_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_sum(self): - super().test_groupby_expanding_sum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_std(self): - super().test_groupby_expanding_std() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_expanding_var(self): - super().test_groupby_expanding_var() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py b/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py index dd82e4432565..4a52bb0748f5 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py +++ b/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py @@ -29,47 +29,7 @@ class OpsOnDiffFramesGroupByRollingParityTests( TestUtils, ReusedConnectTestCase, ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_count(self): - super().test_groupby_rolling_count() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_min(self): - super().test_groupby_rolling_min() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_max(self): - super().test_groupby_rolling_max() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_mean(self): - super().test_groupby_rolling_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_sum(self): - super().test_groupby_rolling_sum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_std(self): - super().test_groupby_rolling_std() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_var(self): - super().test_groupby_rolling_var() + pass if __name__ == "__main__": diff --git a/python/pyspark/pandas/tests/connect/test_parity_rolling.py b/python/pyspark/pandas/tests/connect/test_parity_rolling.py index 712c1a10df99..8318bed24f03 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_rolling.py +++ b/python/pyspark/pandas/tests/connect/test_parity_rolling.py @@ -24,125 +24,7 @@ class RollingParityTests( RollingTestsMixin, PandasOnSparkTestUtils, TestUtils, ReusedConnectTestCase ): - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_count(self): - super().test_groupby_rolling_count() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_kurt(self): - super().test_groupby_rolling_kurt() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_max(self): - super().test_groupby_rolling_max() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_mean(self): - super().test_groupby_rolling_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_min(self): - super().test_groupby_rolling_min() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_quantile(self): - super().test_groupby_rolling_quantile() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_skew(self): - super().test_groupby_rolling_skew() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_std(self): - super().test_groupby_rolling_std() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_sum(self): - super().test_groupby_rolling_sum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_groupby_rolling_var(self): - super().test_groupby_rolling_var() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_count(self): - super().test_rolling_count() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_kurt(self): - super().test_rolling_kurt() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_max(self): - super().test_rolling_max() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_mean(self): - super().test_rolling_mean() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_min(self): - super().test_rolling_min() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_quantile(self): - super().test_rolling_quantile() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_skew(self): - super().test_rolling_skew() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_std(self): - super().test_rolling_std() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_sum(self): - super().test_rolling_sum() - - @unittest.skip( - "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." - ) - def test_rolling_var(self): - super().test_rolling_var() + pass if __name__ == "__main__": diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 00f2a85d602c..82185ed96997 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -823,6 +823,7 @@ def schema(self, plan: pb2.Plan) -> StructType: Return schema for given plan. """ logger.info(f"Schema for plan: {self._proto_to_string(plan)}") + print(f"Schema for plan: {self._proto_to_string(plan)}") schema = self._analyze(method="schema", plan=plan).schema assert schema is not None # Server side should populate the struct field which is the schema. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 55433ea04b8f..ef3108485f02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3151,9 +3151,16 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // Add Window operators. val withWindow = addWindow(windowExpressions, withProject) + val planId = p.getTagValue(LogicalPlan.PLAN_ID_TAG) + // Finally, generate output columns according to the original projectList. val finalProjectList = projectList.map(_.toAttribute) - Project(finalProjectList, withWindow) + val newProject = Project(finalProjectList, withWindow) + + // retain the plan id used in Spark Connect + planId.foreach(newProject.setTagValue(LogicalPlan.PLAN_ID_TAG, _)) + + newProject } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 2a1067be004e..1387124e96ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -256,7 +256,9 @@ abstract class TypeCoercionBase { object WidenSetOperationTypes extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - plan resolveOperatorsUpWithNewOutput { + val planId = plan.getTagValue(LogicalPlan.PLAN_ID_TAG) + + val newPlan = plan resolveOperatorsUpWithNewOutput { case s @ Except(left, right, isAll) if s.childrenResolved && left.output.length == right.output.length && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) @@ -290,6 +292,11 @@ abstract class TypeCoercionBase { s.copy(children = newChildren) -> attrMapping } } + + // retain the plan id used in Spark Connect + planId.foreach(newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, _)) + + newPlan } /** Build new children with the widest types for each attribute among all the children */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index c2330cdb59db..6ebf0eb6eaea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.{BinaryLike, CurrentOrigin, LeafLike, QuaternaryLike, SQLQueryContext, TernaryLike, TreeNode, UnaryLike} import org.apache.spark.sql.catalyst.trees.TreePattern.{RUNTIME_REPLACEABLE, TreePattern} import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -343,8 +344,14 @@ abstract class Expression extends TreeNode[Expression] { override def simpleString(maxFields: Int): String = toString - override def toString: String = prettyName + truncatedString( - flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields) + override def toString: String = { + val str = prettyName + truncatedString( + flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields) + this.getTagValue(LogicalPlan.PLAN_ID_TAG) match { + case Some(planId) => s"$str {planId=$planId}" + case _ => str + } + } /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 374eb070db1c..dbeeebe8fa17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -125,6 +125,15 @@ abstract class LogicalPlan } } + override def simpleString(maxFields: Int): String = { + val str = super.simpleString(maxFields) + this.getTagValue(LogicalPlan.PLAN_ID_TAG) match { + case Some(planId) => s"$str {planId=$planId}" + case _ => str + } + } + + private[this] lazy val childAttributes = AttributeSeq.fromNormalOutput(children.flatMap(_.output)) private[this] lazy val childMetadataAttributes = AttributeSeq(children.flatMap(_.metadataOutput)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 9d29ca1f9c6e..732d6781e6ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -45,6 +45,7 @@ object RuleExecutor { } } +// scalastyle:off println class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { private val logLevel = SQLConf.get.planChangeLogLevel @@ -63,6 +64,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { """.stripMargin } + println(message) logBasedOnLevel(message) } } @@ -81,6 +83,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { } } + println(message) logBasedOnLevel(message) } } @@ -97,6 +100,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { |Total time of effective runs: $totalTimeEffective seconds """.stripMargin + println(message) logBasedOnLevel(message) }