Skip to content

Conversation

@HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR backports #24958 to branch-2.4.

This PR proposes to use AtomicReference so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets InputFileBlockHolder.set before the parent does which the parent thread is unable to read later.

  1. In this separate child thread, if it happens to call InputFileBlockHolder.set first without initialization of the parent's thread local (which is done when the ThreadLocal.get() is first called), the child thread seems calling its own initialValue to initialize.

  2. After that, the parent calls its own initialValue to initializes at the first call of ThreadLocal.get().

  3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with AtomicReference for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at #24958 (comment).

How was this patch tested?

Manually tested and unittest was added.

… support input_file_name with Python UDF)

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at apache#24958 (comment).

Manually tested and unittest was added.

Closes apache#24958 from HyukjinKwon/SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@dongjoon-hyun
Copy link
Member

Thank you, @HyukjinKwon . I checked and added 2.4.3/2.3.3 to the affected versions of JIRA.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending Jenkins).

Since this is branch-2.4, this is tested locally with the following libraries.

numpy             1.16.4
pandas            0.19.2
pyarrow           0.8.0
scipy             1.2.2
$ python/run-tests.py --python-executables python --modules pyspark-sql
Running PySpark tests. Output is in /Users/dhyun/PRS/PR-25321/python/unit-tests.log
Will test against the following Python executables: ['python']
Will test the following Python modules: ['pyspark-sql']
Starting test(python): pyspark.sql.tests
Starting test(python): pyspark.sql.catalog
Starting test(python): pyspark.sql.column
Starting test(python): pyspark.sql.conf
Finished test(python): pyspark.sql.catalog (8s)
Starting test(python): pyspark.sql.context
Finished test(python): pyspark.sql.column (12s)
Starting test(python): pyspark.sql.dataframe
Finished test(python): pyspark.sql.conf (13s)
Starting test(python): pyspark.sql.functions
Finished test(python): pyspark.sql.context (11s)
Starting test(python): pyspark.sql.group
Finished test(python): pyspark.sql.group (28s)
Starting test(python): pyspark.sql.readwriter
Finished test(python): pyspark.sql.dataframe (38s)
Starting test(python): pyspark.sql.session
Finished test(python): pyspark.sql.functions (41s)
Starting test(python): pyspark.sql.streaming
Finished test(python): pyspark.sql.session (17s)
Starting test(python): pyspark.sql.types
Finished test(python): pyspark.sql.readwriter (21s)
Starting test(python): pyspark.sql.udf
Finished test(python): pyspark.sql.streaming (18s)
Starting test(python): pyspark.sql.window
Finished test(python): pyspark.sql.types (6s)
Finished test(python): pyspark.sql.window (3s)
Finished test(python): pyspark.sql.udf (11s)
Finished test(python): pyspark.sql.tests (242s) ... 4 tests were skipped
Tests passed in 242 seconds

Skipped tests in pyspark.sql.tests with python:
    test_unbounded_frames (pyspark.sql.tests.HiveContextSQLTests) ... skipped "Unittest < 3.3 doesn't support mocking"
    test_create_dataframe_required_pandas_not_found (pyspark.sql.tests.SQLTests) ... skipped 'Required Pandas was found.'
    test_to_pandas_required_pandas_not_found (pyspark.sql.tests.SQLTests) ... skipped 'Required Pandas was found.'
    test_type_annotation (pyspark.sql.tests.ScalarPandasUDFTests) ... skipped 'Type hints are supported from Python 3.5.'

@SparkQA
Copy link

SparkQA commented Aug 1, 2019

Test build #108497 has finished for PR 25321 at commit 4275f82.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@HyukjinKwon
Copy link
Member Author

I am testing mergine script Python 3 compatibility. please ignore the noise above.

@HyukjinKwon HyukjinKwon changed the title [SPARK-28153][PYTHON][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK-28153][Python][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [SPARK-28153][Python][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK28153][Python][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [SPARK28153][Python][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK28153][Python][BRANCH-2.4] []$[abc]$ Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [SPARK28153][Python][BRANCH-2.4] []$[abc]$ Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [spaRk-28153][Python][BRANCH-2.4] []$[abc]$ Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [spaRk-28153][Python][BRANCH-2.4] []$[abc]$ Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK-28153][PYTHON][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [SPARK-28153][PYTHON][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [spaRk-28153][Python][BRANCH-2.4] []$[abc]$ Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [spaRk-28153][Python][BRANCH-2.4] []$[abc]$ Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK-28153][PYTHON][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-28153][PYTHON][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK-28153][PYTHON][2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@HyukjinKwon HyukjinKwon changed the title [SPARK-28153][PYTHON][2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) [SPARK-28153][PYTHON][BRANCH-2.4] Use AtomicReference at InputFileBlockHolder (to support input_file_name with Python UDF) Aug 1, 2019
@SparkQA
Copy link

SparkQA commented Aug 1, 2019

Test build #108502 has finished for PR 25321 at commit 4275f82.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to branch-2.4.

HyukjinKwon added a commit that referenced this pull request Aug 1, 2019
…ckHolder (to support input_file_name with Python UDF)

## What changes were proposed in this pull request?

This PR backports #24958 to branch-2.4.

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at #24958 (comment).

## How was this patch tested?

Manually tested and unittest was added.

Closes #25321 from HyukjinKwon/backport-SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon closed this Aug 1, 2019
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…ckHolder (to support input_file_name with Python UDF)

## What changes were proposed in this pull request?

This PR backports apache#24958 to branch-2.4.

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at apache#24958 (comment).

## How was this patch tested?

Manually tested and unittest was added.

Closes apache#25321 from HyukjinKwon/backport-SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…ckHolder (to support input_file_name with Python UDF)

## What changes were proposed in this pull request?

This PR backports apache#24958 to branch-2.4.

This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder.

Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later.

1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize.

2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`.

3. Both now have two different references. Updating at child isn't reflected to parent.

This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected.

I also tried to explain this a bit more at apache#24958 (comment).

## How was this patch tested?

Manually tested and unittest was added.

Closes apache#25321 from HyukjinKwon/backport-SPARK-28153.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the backport-SPARK-28153 branch March 3, 2020 01:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants