Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27629][PySpark] Prevent Unpickler from intervening each unpickling #24521

Closed
wants to merge 3 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented May 3, 2019

What changes were proposed in this pull request?

In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol.

It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue.

A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared.

We has two options:

  1. Continues to reuse Unpickler, but calls its close after each unpickling.
  2. Not to reuse Unpickler and create new Unpickler object in each unpickling.

This patch takes option 1.

How was this patch tested?

Passing the test added in SPARK-27612 (#24519).

@viirya
Copy link
Member Author

viirya commented May 3, 2019

cc @HyukjinKwon @BryanCutler

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105107 has finished for PR 24521 at commit d7312fb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging into this @viirya , I'm still trying to understand the reason why previous data in the memoize map creates the output we see. Ultimately, this does look like a bug in Pyrolite and this workaround might be fine. Would you mind adding a reference to the JIRA in your comment and in the test added in #24519?

@@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
unpickle.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like close() clears the memoized map, the UnpickleStack, and the input stream. Since the input stream is a ByteArrayStream that's a no-op and is fine. Since each row is independent of the others, I don't see any reason why the other 2 would store anything necessary, so I believe that will be fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anything about this, but it looks odd to close the object repeatedly. It may not cause a problem now. What's the downside to using a new object for each row, just performance?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I missed Since each row is independent of the others - so I thought we should manually control memo. I had to look deeper :D. Looks fine. Yes, I guess just it needs an extra clear call for each row.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use use a new object for each row is also ok. I didn't do it just for (possible) performance concern.

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105112 has finished for PR 24521 at commit d7312fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 3, 2019

Nice try. My impression was that it somehow wrongly reuses that memo (and can't close the input stream - I overlooked) so was thinking we need a manual control after upgrading to 4.17 (so that we can access to memo) but looking into this again the fix looks correct.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, BTW, I believe there are some more places to fix, for instance, BatchEvalPythonExec. Can you double check those places too while we're here?

@viirya
Copy link
Member Author

viirya commented May 4, 2019

Ultimately, this does look like a bug in Pyrolite and this workaround might be fine. Would you mind adding a reference to the JIRA in your comment and in the test added in #24519?

I'd like to file an issue to Pyrolite. From my view, memo map should be cleared when op code STOP is hit, because each loading should be independent, so the memo map shouldn't be reuse without clearing.

I added few comments to the test and the JIRA.

@viirya
Copy link
Member Author

viirya commented May 4, 2019

BTW, I believe there are some more places to fix, for instance, BatchEvalPythonExec. Can you double check those places too while we're here?

Checked other places and fixed that. Thanks for reminding that.

@SparkQA
Copy link

SparkQA commented May 4, 2019

Test build #105117 has finished for PR 24521 at commit 04a2e04.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2019

Test build #105118 has finished for PR 24521 at commit 053e6a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM nice followup

@HyukjinKwon
Copy link
Member

Merged to master.

@gatorsmile
Copy link
Member

@viirya Could you please help measure the perf?

@HyukjinKwon
Copy link
Member

Shouldn't be a big diff and I don't think there's other ways to fix this issue even if there's perf diff.
But yes, let's quickly run a simple job and check before/after to clarify the doubt. This was pointed out by @srowen too.

@viirya
Copy link
Member Author

viirya commented May 8, 2019

I'm ok. I also think it shouldn't be a big diff. I will post the perf diff.

@viirya
Copy link
Member Author

viirya commented May 8, 2019

I ran a pretty simple job and measure time before and after this PR in 5 runs:

>>> import time
>>> start = time.time()
>>> df = spark.createDataFrame([[1, 2, 3, 4]] * 100000, "array<integer>")
>>> collected = df.collect()
>>> end = time.time()
>>> print(end - start)

Before:
0.6398153305053711
0.6232590675354004
0.603309154510498
0.5923750400543213
0.5802857875823975

After:
0.6308741569519043
0.6202919483184814
0.5926861763000488
0.603518009185791
0.5818917751312256

rshkv pushed a commit to palantir/spark that referenced this pull request May 23, 2020
…ling

In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol.

It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue.

A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared.

We has two options:

1. Continues to reuse Unpickler, but calls its close after each unpickling.
2. Not to reuse Unpickler and create new Unpickler object in each unpickling.

This patch takes option 1.

Passing the test added in SPARK-27612 (apache#24519).

Closes apache#24521 from viirya/SPARK-27629.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@rshkv rshkv mentioned this pull request May 23, 2020
9 tasks
rshkv pushed a commit to palantir/spark that referenced this pull request Jun 5, 2020
…ling

In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol.

It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue.

A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared.

We has two options:

1. Continues to reuse Unpickler, but calls its close after each unpickling.
2. Not to reuse Unpickler and create new Unpickler object in each unpickling.

This patch takes option 1.

Passing the test added in SPARK-27612 (apache#24519).

Closes apache#24521 from viirya/SPARK-27629.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@viirya viirya deleted the SPARK-27629 branch December 27, 2023 18:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants