-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP 86][Function] Preload and release of external resources #11112
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why did you swallow the exception in close()
while fail fast in setup()
? For your case, preProcess
should not throw a checked exception.
In addition, IMO it's not a good practice to just use Exception
as the checked exception but not the subclass of Exception
.
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
Accept your suggestion, I have corrected your problem.Please codereview |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of this PR adding two steps (init
and close
) into the pulsar function, I have left some comments as the review, PTAL.
@@ -160,7 +179,22 @@ private void processAsyncResults(BiConsumer<Record, JavaExecutionResult> resultC | |||
@Override | |||
public void close() { | |||
context.close(); | |||
executor.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have corrected it
* | ||
* @throws Exception | ||
*/ | ||
void preProcess(Context context) throws RuntimeException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
preProcess
and postProcess
is likely to be invoked with each message each time before and after process
, so the naming is kind confuse from my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have corrected it
throw new RuntimeException("function preProcess occurred exception", e); | ||
} | ||
} | ||
if (null != javaUtilFunction && javaUtilFunction instanceof Hook) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the docs, java.util.function.Function
is not intend to interact with Context
, so we should avoid to passing Context
to java.util.function.Function
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have corrected it
You may misunderstand my comment.
I mean use a derived class of public interface MetadataSerde<T> {
byte[] serialize(T value) throws IOException;
T deserialize(byte[] content) throws IOException;
} It checks Though IMO it's a better practice, using |
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
I have modified it, please check |
* Initial and close of external resources | ||
*/ | ||
@InterfaceAudience.Public | ||
public interface Hook { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest this new api extends the Function
interface since it's strongly coupled with the Function
api and should not be used individually in other cases.
public interface HookFunction extends Function {
void setup();
void cleanup();
}
And we should keep java.util.Function
as it is to keep its simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified it, please check
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
# Conflicts: # pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Hook.java # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
# Conflicts: # pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this feature very useful.
Can you please start aa discussion on dev@pulsar.apache.org in order to show your needs and how you want to address them?
The mailing list is the place in which the community discusses about new features
I have sent an email, please check |
thanks |
if (null != javaUtilFunction && javaUtilFunction instanceof HookFunction) { | ||
try { | ||
((HookFunction) javaUtilFunction).setup(); | ||
} catch (Exception e) { | ||
log.error("setup error:", e); | ||
throw e; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we keep java.util.Function
as simple as possible, and not add hook method for it
if (null != javaUtilFunction && javaUtilFunction instanceof HookFunction) { | ||
try { | ||
((HookFunction) javaUtilFunction).cleanup(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we keep java.util.Function
as simple as possible, and not add hook method for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think HookFunction can provide users with additional choices, and it also makes the responsibilities of each method of Function more single and clear, please think again
Sorry, I accidentally add the comment with Please let me know if you update the PR and I'll review again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of my thoughts:
-
If we are going to introduce new methods or stages in the lifecycle of a function such as "open" and "close", I would recommend we follow what we already have for Pulsar IO sources and Sink so that we are consistent.
-
I am also not a particular fan of the name "HookFunction". Can we call it "RichFunction"? This naming is consistent with other projects such as Apache Storm and Flink.
-
In regards to the topic
If this rule must be followed, then one possible way is we define a separate HookFunction
interface which includes setup
, process
, tearDown
methods, and add support in the new runtime to handle this interface. Just as currently the runtime handles pulsar.Function
and java.util.Function
separately. The old runtime will not be able to recognize the new interface and thus won't execute functions which need initialization.
I would rather not define a completely new interface. Ideally we have a "RichFunction" interface that extends the existing. So that we can minimize interface duplication and if we want to introduce new interfaces in the future we can build on top of that. In regards to the issue about the old runtime executing a function that implements the new interface, is it sufficient to document somewhere that the worker has to be updated to a certain version to run the functions that implement the new interface?
I agree with your point of view, and made the corresponding changes, please codereview |
@BewareMyPower @nlu90 @jerrypeng @eolivelli PTAL when you have time since Pulsar 2.9.0 release cut is coming. |
@jerrypeng @eolivelli Should we resume the discussion of this useful feature? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that there is value in this work.
Is the discussion stale?
What's the status of the PIP?
Are we adoption the new process? That is to have a github issue with the design doc? That would help people to review the design decisions.
Feel free to ping me on slack if you need some guidance
context.close(); | ||
executor.shutdown(); | ||
if (null != function && function instanceof RichFunction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you don't need the null check as null is never a instance of a class
* | ||
* @throws Exception | ||
*/ | ||
void setup() throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pass some Context in order to let the Function initialise using the information about the env.
@wuchenxi123 Do you want to provide some update to Enrico's above questions? |
…ernal resources (#13205) ### Motivation This patch is based on #11112, I have talked with @wuchenxi123 and I will continue the work for this feature. ### Modifications Introduce `RichFunction` interface extends `Function`, which provide `setup` and `tearDown` API. `setup` is called only once when function instance started. `tearDown` is called when function instance closed. User could use these interface to initialize and release their external resources such as RedisClient. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - The public API: ( **yes**)
…ernal resources (apache#13205) ### Motivation This patch is based on apache#11112, I have talked with @wuchenxi123 and I will continue the work for this feature. ### Modifications Introduce `RichFunction` interface extends `Function`, which provide `setup` and `tearDown` API. `setup` is called only once when function instance started. `tearDown` is called when function instance closed. User could use these interface to initialize and release their external resources such as RedisClient. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - The public API: ( **yes**)
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #
Motivation
Modifications
@slf4j
public class HelloWorldFunction implements Function<String, Void> {
@OverRide
public String process(byte[] input, Context context) {
}
}
eg:
@slf4j
public class HelloWorldFunction implements Function<String, Void>,Hook {
@OverRide
public String process(byte[] input, Context context) {
}
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
yes
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation