Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint #25656

Open
wants to merge 4 commits into
base: release-1.20
Choose a base branch
from

Conversation

ammu20-dev
Copy link

@ammu20-dev ammu20-dev commented Nov 14, 2024

What is the purpose of the change

This pull request fixes the class loading issues when using udf in add jar and enabling checkpointing.

Brief change log

  • Pulled in the FlinkUserCodeClassLoader for UDF jar loading from the resource manager

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (don't know)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 14, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -1029,6 +1029,8 @@ private TableResultInternal executeInternal(
defaultJobName,
jobStatusHookList);
try {
ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please could you change the variable name to be something like originalContextClassLoader

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the variable name to contextClassLoader .

@@ -1029,6 +1029,8 @@ private TableResultInternal executeInternal(
defaultJobName,
jobStatusHookList);
try {
ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add coments and refer to the v2 implementation in the comments and that the v2 refactor is not going to be backported to 1.20.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments on the need for this change. The latest FLIP implementation to introduce a stream graph based job submission moved the StreamGraph module to flink runtime and changed the job submission logic by directly submitting a StreamGraph to the job manager.
Ref FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
Related JIRA: https://issues.apache.org/jira/browse/FLINK-36065
As a result of these changes this issue seems to be fixed for flink v2 as I was not able to reproduce it with the latest main. Hence limiting this particular change to 1.20 versions.

@@ -1069,8 +1072,11 @@ private TableResultInternal executeQueryOperation(

Pipeline pipeline = generatePipelineFromQueryOperation(operation, transformations);
try {
ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add unit tests

Copy link

@davidradl davidradl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change the variable name , add a comment and add unit tests (or provide a reason why unit tests cannot be added) .

@davidradl
Copy link

Reviewed by Chi on 21/11/24. Asked submitter questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants