diff --git a/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py index ecdfcfda3c1d0..eeaee16e518fc 100644 --- a/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state.py @@ -187,6 +187,7 @@ def _test_transform_with_state_basic( self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -255,6 +256,7 @@ def test_transform_with_state_query_restarts(self): self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -262,14 +264,13 @@ def test_transform_with_state_query_restarts(self): self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numValueStateVars"] > 0) self.assertTrue(q.lastProgress.stateOperators[0].customMetrics["numDeletedStateVars"] > 0) - q.stop() - self._prepare_test_resource2(input_path) q = base_query.start() self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) result_df = self.spark.read.parquet(output_path) @@ -329,6 +330,7 @@ def _test_transform_with_state_proc_timer(self, stateful_processor_factory, chec self.assertEqual(q.name, query_name) self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -446,6 +448,7 @@ def prepare_batch3(input_path): self.assertEqual(q.name, query_name) self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -561,6 +564,7 @@ def _test_transform_with_state_non_contiguous_grouping_cols( self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -657,6 +661,7 @@ def _test_transform_with_state_chaining_ops( self.assertEqual(q.name, "chaining_ops_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) def test_transform_with_state_chaining_ops(self): @@ -1168,6 +1173,7 @@ def _run_evolution_test(self, processor_factory, checkpoint_dir, check_results, self.assertEqual(q.name, "evolution_test") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) def test_schema_evolution_scenarios(self): diff --git a/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state_state_variable.py b/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state_state_variable.py index 7ea6b56e1fec1..6ae890e20b6eb 100644 --- a/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state_state_variable.py +++ b/python/pyspark/sql/tests/pandas/streaming/test_pandas_transform_with_state_state_variable.py @@ -177,6 +177,7 @@ def _test_transform_with_state_basic( self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -291,6 +292,7 @@ def check_results(batch_df, batch_id): self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None) @@ -514,6 +516,7 @@ def _test_transform_with_state_init_state( self.assertEqual(q.name, "this_query") self.assertTrue(q.isActive) q.processAllAvailable() + q.stop() q.awaitTermination(10) self.assertTrue(q.exception() is None)