Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ def execute_and_collect(self, job_execution_name: str = None, limit: int = None)
:param job_execution_name: The name of the job execution.
:param limit: The limit for the collected elements.
"""
JPythonConfigUtil = get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
JPythonConfigUtil.configPythonOperator(self._j_data_stream.getExecutionEnvironment())
if job_execution_name is None and limit is None:
return CloseableIterator(self._j_data_stream.executeAndCollect(), self.get_type())
elif job_execution_name is not None and limit is None:
Expand Down
10 changes: 5 additions & 5 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,11 @@ def test_execute_and_collect(self):
decimal.Decimal('2000000000000000000.061111111111111'
'11111111111111'))]
expected = test_data
ds = self.env.from_collection(test_data)
ds = self.env.from_collection(test_data).map(lambda a: a)
with ds.execute_and_collect() as results:
actual = []
for result in results:
actual.append(result)
actual = [result for result in results]
actual.sort()
expected.sort()
self.assertEqual(expected, actual)

def test_key_by_map(self):
Expand Down Expand Up @@ -942,7 +942,7 @@ def test_partition_custom(self):
expected_num_partitions = 5

def my_partitioner(key, num_partitions):
assert expected_num_partitions, num_partitions
assert expected_num_partitions == num_partitions
return key % num_partitions

partitioned_stream = ds.map(lambda x: x, output_type=Types.ROW([Types.STRING(),
Expand Down
Loading