Skip to content

Commit

Permalink
Refactor to support at-least-once processing (apache#54)
Browse files Browse the repository at this point in the history
* Refactor to support at-least-once processing

This change is refactoring the relationship between Spawner, FunctionContainer and JavaInstance to better support various processing semantics.

- ThreadFunctionContainerFactory is a "singleton" across multiple functions.
- Spawner, FunctionContainer and JavaInstance are per function. Spawner is manging the lifecycle of function container, in the function container
  it will folks a thread for running java instance, because it has to load classpathes correctly.
- Remove subscribermanager and move consumer and producer logic into container. that means in future if we are supporting process based and docker
  based container, we can just take the thread container and run it directly as a separate process or a separate docker process. There is no
  extra communication between spawner and containers.

* Create pulsar client in thread container factory
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 793b1cf commit d0c231e
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 573 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import lombok.Getter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

import java.util.List;

@Parameters(commandDescription = "Operations about functions")
public class CmdFunctions extends CmdBase {
private LocalRunner localRunner;
Expand Down Expand Up @@ -109,13 +109,27 @@ void run_functions_cmd() throws Exception {
1024 // 1024 outstanding tuples
);

Spawner spawner = Spawner.createSpawner(
functionConfig,
limitsConfig,
admin.getServiceUrl().toString(), jarFile);
ClientConfiguration clientConf;
if (admin instanceof PulsarFunctionsAdmin) {
clientConf = ((PulsarFunctionsAdmin) admin).getClientConf();
} else {
clientConf = new ClientConfiguration();
}

try (ThreadFunctionContainerFactory containerFactory = new ThreadFunctionContainerFactory(
limitsConfig.getMaxBufferedTuples(),
admin.getServiceUrl().toString(),
clientConf)) {

Spawner spawner = Spawner.createSpawner(
functionConfig,
limitsConfig,
jarFile,
containerFactory);

spawner.start();
spawner.join();
spawner.start();
spawner.join();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.*;

import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +34,7 @@ public class PulsarFunctionsAdmin extends PulsarAdmin {
private static final Logger LOG = LoggerFactory.getLogger(PulsarFunctionsAdmin.class);

private final Functions functions;
private final ClientConfiguration clientConf;

/**
* Construct a new Pulsar Admin client object.
Expand All @@ -49,6 +49,11 @@ public class PulsarFunctionsAdmin extends PulsarAdmin {
public PulsarFunctionsAdmin(URL serviceUrl, ClientConfiguration pulsarConfig) throws PulsarClientException {
super(serviceUrl, pulsarConfig);
this.functions = new FunctionsImpl(web, auth);
this.clientConf = pulsarConfig;
}

public ClientConfiguration getClientConf() {
return clientConf;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,4 @@ public interface FunctionContainer {

FunctionConfig getFunctionConfig();

CompletableFuture<ExecutionResult> sendMessage(String topicName, String messageId, byte[] data);

}
Loading

0 comments on commit d0c231e

Please sign in to comment.