Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-166: Function add MANUAL delivery semantics #15560

Closed
shibd opened this issue May 12, 2022 · 3 comments
Closed

PIP-166: Function add MANUAL delivery semantics #15560

shibd opened this issue May 12, 2022 · 3 comments
Labels
release/important-notice The changes which are important should be mentioned in the release note Stale type/PIP

Comments

@shibd
Copy link
Member

shibd commented May 12, 2022

Discussion thread: https://lists.apache.org/thread/4f2w1mqvhhs3mvccbcg2sk19b60xwkgf
Vote thread: https://lists.apache.org/thread/1ojcc12sxd87nz49yrflk8jv2nk98hvr (Pass)

Motivation

Currently, Function supports three delivery semantics and also provides autoAck to control whether to automatically ack.
Because autoAck affects the delivery semantics of Function, it can be confusing for users to understand the relationship between these two parameters.

For example, when the user configures Guarantees == ATMOST_ONCE and autoAck == false, then the framework will not help the user to ack messages, and the processing semantics may become ATLEAST_ONCE.

The delivery semantics provided by Function should be clear. When the user sets the guarantees, the framework should ensure point-to-point semantic processing and cannot be affected by other parameters.

Goal

Add MANUAL delivery semantics and delete autoAck config.

The original intention of autoAck semantics is that users want to control the timing of ack by themselves. When autoAck == false, the processing semantics provided by the framework should be invalid. Then we can add MANUAL processing semantics to replace the autoAck == false scenario.

When the user configuration ProcessingGuarantees == MANUAL , the framework does not help the user to do any ack operations, and the ack is left to the user to handle. In other cases, the framework guarantees processing semantics.

The processing logic of all semantics is:

  • ATMOST_ONCE: When the message is read by the client, it is immediately acknowledged, and only then the function is executed, thus guaranteeing it will not run more than once.
  • ATLEAST_ONCE: The message is acknowledged after the function finished execution, thus it will be run at least once.
  • EFFECTIVELY_ONCE: The message is acknowledged after the function finished execution. Depends on pulsar deduplication, and provides end-to-end effectively once processing.
  • MANUAL: The function framework does not do the ack operation, it is handled by the user inside the function.

API Changes

  1. Add MANUAL type to ProcessingGuarantees.
    (and also Function.proto)
public enum ProcessingGuarantees {
      ATLEAST_ONCE, 
      ATMOST_ONCE, 
      EFFECTIVELY_ONCE, 
      MANUAL 
}
  1. Indication of autoAck is deprecated, and not use it in the code.
    (and also Function.proto)
public class FunctionConfig {
     @Deprecated
     private Boolean autoAck;
}

I would issue a WARN when reading configuring the function (thus emitted once) when the user actively configured autoAck=false and warn them that this configuration is deprecated and they should switch to the MANUAL ProcessingGuarantee configuration option.

  1. Add new PulsarSinkProcessor implements: PulsarSinkManualProcessor. PulsarSinkManualProcessor do not do any ack operation.
        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
        switch (processingGuarantees) {
+          case MANUAL:
+             this.pulsarSinkProcessor = new PulsarSinkManualProcessor(schema, crypto);
+             break;
            case ATMOST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema, crypto);
                break;
            case ATLEAST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema, crypto);
                break;
            case EFFECTIVELY_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema, crypto);
                break;
        }

Implementation

  1. When the delivery semantic is ATMOST_ONCE, add verify autoAck must be true. If the validation fails, let the function fail to start (This temporarily resolves the configuration ambiguity). When autoAck is subsequently removed, the message will be acked immediately after receiving the message.

if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
currentRecord.ack();
}
}

  1. When user call record.ack() in function, just ProcessingGuarantees == MANUAL can be work. In turn, when ProcessingGuarantees != MANUAL, user call record.ack() is invalid(print warn log once).

  2. Add documentation that autoAck will be deprecated, and explain the MANUAL semantics and when record.ack() takes effect.

  3. For the windows function, the implementation of the currently delivery guarantee is problematic. Currently only support ATMOST_ONCE and EFFECTIVELY_ONCE guarantees. Because the message has been asked before sending output topic.

As follows code, onExpiry precedes onActivation.

public void onExpiry(List<Event<Record<T>>> events) {
for (Event<Record<T>> event : events) {
event.getRecord().ack();
}
}
@Override
public void onActivation(List<Event<Record<T>>> tuples, List<Event<Record<T>>> newTuples,
List<Event<Record<T>>>
expiredTuples, Long referenceTime) {
processWindow(
context,
tuples.stream().map(event -> event.get()).collect(Collectors.toList()),
newTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
expiredTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
referenceTime);
}
};

So, treat windows function as special function. Override delivery semantics in FunctionConfig as MANUAL, add guarantee configuration in Windows, then handle the ack timing according to this configuration inside Function Windows.

FunctionConfigUtils#convert

        if (windowConfig != null) {
            // Windows Function not support EFFECTIVELY_ONCE
            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new IllegalArgumentException(
                        "Windows Function not support EFFECTIVELY_ONCE Guarantees.");
            } else {
                // Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
                windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees
                        .valueOf(functionDetailsBuilder.getProcessingGuarantees().name()));
                functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL);
            }
         }

Plan test

  1. The main test and assert is that when ProcessingGuarantees == MANUAL, the function framework will not do any ack operations for the user.
  2. Validate the test of autoAck=false still works (you haven't broken anything)
  3. Validate existing ProcessingGuarantee test for AtMostOnce, AtLeastOnce, ExactlyOnce still works (when autoAck=true)

Compatibility

  1. This change will indicate of autoAck is deprecated, and check is not used in the code. And document clearly it's deprecated for the following 2~3 release. And then ignore it.
  2. Runtimes of other languages ​​need to maintain consistent processing logic (python, go).

Since our changes are backward compatible before the change is released, we can open multiple PR to iteratively implement the runtime of each language. When all languages ​​are supported, publish documentation to inform users.

Incompatible case

  1. When the user configures Guarantees == ATMOST_ONCE and autoAck == false.
  • Current implementation: Function can be successfully started and the framework does not help user autoAck message. And the processing semantics may become ATLEAST_ONCE.
  • Changed implementation: Function failed to start, and printed error response to the user.
  1. If user set autoAck == true and call record.ack() in function.
  • Current implementation: Can be work, But will ack the message multiple times.
  • Changed implementation: Can be work, But just when Guarantees == MANUAL take effect, In other cases, the warning log will be printed.

An additional explanation is required, these incompatible cases can all be considered bugs of the current implementation.

@shibd shibd added the type/PIP label May 12, 2022
@shibd shibd changed the title PIP-166: Function add NONE delivery semantics PIP-166: Function add MANUAL delivery semantics May 30, 2022
@codelipenghui codelipenghui added the release/important-notice The changes which are important should be mentioned in the release note label Jun 2, 2022
@eolivelli
Copy link
Contributor

I have one last comment:
"then delete it"

We cannot "delete it", at least we can ignore it, otherwise when you upgrade Pulsar the function won't start.
There are huge Pulsar multi-tenant clusters and we cannot force users to find all the instances of such functions and then upgrade the configuration.

@shibd
Copy link
Member Author

shibd commented Jun 8, 2022

I have one last comment: "then delete it"

We cannot "delete it", at least we can ignore it, otherwise when you upgrade Pulsar the function won't start. There are huge Pulsar multi-tenant clusters and we cannot force users to find all the instances of such functions and then upgrade the configuration.

Thank review. Agreed, I changed it.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/important-notice The changes which are important should be mentioned in the release note Stale type/PIP
Projects
None yet
Development

No branches or pull requests

3 participants