Skip to content

Commit

Permalink
Renamed FunctionState stuff to something that reflects their function…
Browse files Browse the repository at this point in the history
…ality (apache#53)

* Renamed FunctionState stuff to something that reflects their functionality

* More removal of state
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 59ec630 commit 793b1cf
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* A manager manages function states.
* A manager manages function metadata.
*/
public class FunctionStateManager implements AutoCloseable {
public class FunctionMetaDataManager implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(FunctionStateManager.class);
private static final Logger LOG = LoggerFactory.getLogger(FunctionMetaDataManager.class);

// tenant -> namespace -> (function name, FunctionMetaData)
private final Map<String, Map<String, Map<String, FunctionMetaData>>> functionStateMap = new ConcurrentHashMap<>();
private final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMap = new ConcurrentHashMap<>();

// A map in which the key is the service request id and value is the service request
private final Map<String, ServiceRequest> pendingServiceRequests = new ConcurrentHashMap<>();
Expand All @@ -52,8 +52,8 @@ public class FunctionStateManager implements AutoCloseable {

private final WorkerConfig workerConfig;

public FunctionStateManager(WorkerConfig workerConfig,
ServiceRequestManager serviceRequestManager) {
public FunctionMetaDataManager(WorkerConfig workerConfig,
ServiceRequestManager serviceRequestManager) {
this.workerConfig = workerConfig;
this.serviceRequestManager = serviceRequestManager;
}
Expand All @@ -64,20 +64,20 @@ public void close() {
}

public FunctionMetaData getFunction(String tenant, String namespace, String functionName) {
return this.functionStateMap.get(tenant).get(namespace).get(functionName);
return this.functionMap.get(tenant).get(namespace).get(functionName);
}

public Collection<String> listFunction(String tenant, String namespace) {
List<String> ret = new LinkedList<>();

if (!this.functionStateMap.containsKey(tenant)) {
if (!this.functionMap.containsKey(tenant)) {
return ret;
}

if (!this.functionStateMap.get(tenant).containsKey(namespace)) {
if (!this.functionMap.get(tenant).containsKey(namespace)) {
return ret;
}
for (FunctionMetaData entry : this.functionStateMap.get(tenant).get(namespace).values()) {
for (FunctionMetaData entry : this.functionMap.get(tenant).get(namespace).values()) {
ret.add(entry.getFunctionConfig().getName());
}
return ret;
Expand All @@ -88,20 +88,20 @@ public CompletableFuture<RequestResult> updateFunction(FunctionMetaData function
long version = 0;

String tenant = functionMetaData.getFunctionConfig().getTenant();
if (!this.functionStateMap.containsKey(tenant)) {
this.functionStateMap.put(tenant, new ConcurrentHashMap<>());
if (!this.functionMap.containsKey(tenant)) {
this.functionMap.put(tenant, new ConcurrentHashMap<>());
}

Map<String, Map<String, FunctionMetaData>> namespaces = this.functionStateMap.get(tenant);
Map<String, Map<String, FunctionMetaData>> namespaces = this.functionMap.get(tenant);
String namespace = functionMetaData.getFunctionConfig().getNamespace();
if (!namespaces.containsKey(namespace)) {
namespaces.put(namespace, new ConcurrentHashMap<>());
}

Map<String, FunctionMetaData> functionStates = namespaces.get(namespace);
Map<String, FunctionMetaData> functionMetaDatas = namespaces.get(namespace);
String functionName = functionMetaData.getFunctionConfig().getName();
if (functionStates.containsKey(functionName)) {
version = functionStates.get(functionName).getVersion() + 1;
if (functionMetaDatas.containsKey(functionName)) {
version = functionMetaDatas.get(functionName).getVersion() + 1;
}
functionMetaData.setVersion(version);

Expand All @@ -112,7 +112,7 @@ public CompletableFuture<RequestResult> updateFunction(FunctionMetaData function

public CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
FunctionMetaData functionMetaData
= (FunctionMetaData) this.functionStateMap.get(tenant).get(namespace).get(functionName).clone();
= (FunctionMetaData) this.functionMap.get(tenant).get(namespace).get(functionName).clone();

functionMetaData.incrementVersion();

Expand All @@ -131,9 +131,9 @@ private boolean containsFunction(FunctionConfig functionConfig) {
}

public boolean containsFunction(String tenant, String namespace, String functionName) {
if (this.functionStateMap.containsKey(tenant)) {
if (this.functionStateMap.get(tenant).containsKey(namespace)) {
if (this.functionStateMap.get(tenant).get(namespace).containsKey(functionName)) {
if (this.functionMap.containsKey(tenant)) {
if (this.functionMap.get(tenant).containsKey(namespace)) {
if (this.functionMap.get(tenant).get(namespace).containsKey(functionName)) {
return true;
}
}
Expand Down Expand Up @@ -198,8 +198,8 @@ public void proccessDeregister(DeregisterRequest deregisterRequest) {
// stop running the function
stopFunction(functionName);
}
// remove function from in memory function state store
this.functionStateMap.remove(functionName);
// remove function from in memory function metadata store
this.functionMap.remove(functionName);
completeRequest(deregisterRequest, true);
} else {
completeRequest(deregisterRequest, false,
Expand All @@ -226,8 +226,8 @@ public void processUpdate(UpdateRequest updateRequest) {

// Worker doesn't know about the function so far
if(!this.containsFunction(updateRequestFs)) {
// Since this is the first time worker has seen function, just put it into internal function state store
addFunctionToFunctionStateMap(updateRequestFs);
// Since this is the first time worker has seen function, just put it into internal function metadata store
addFunctionToFunctionMap(updateRequestFs);
// Check if this worker is suppose to run the function
if (this.workerConfig.getWorkerId().equals(updateRequestFs.getWorkerId())) {
// start the function
Expand All @@ -236,11 +236,11 @@ public void processUpdate(UpdateRequest updateRequest) {
completeRequest(updateRequest, true);
} else {
// The request is an update to an existing function since this worker already has a record of this function
// in its function state store
// in its function metadata store
// Check if request is outdated
if (!isRequestOutdated(updateRequest)) {
// update the function state
addFunctionToFunctionStateMap(updateRequestFs);
// update the function metadata
addFunctionToFunctionMap(updateRequestFs);
// check if this worker should run the update
if (isMyRequest(updateRequest)) {
// Update the function
Expand All @@ -254,24 +254,24 @@ public void processUpdate(UpdateRequest updateRequest) {
}
}

private void addFunctionToFunctionStateMap(FunctionMetaData functionMetaData) {
private void addFunctionToFunctionMap(FunctionMetaData functionMetaData) {
FunctionConfig functionConfig = functionMetaData.getFunctionConfig();
if (!this.functionStateMap.containsKey(functionConfig.getTenant())) {
this.functionStateMap.put(functionConfig.getTenant(), new ConcurrentHashMap<>());
if (!this.functionMap.containsKey(functionConfig.getTenant())) {
this.functionMap.put(functionConfig.getTenant(), new ConcurrentHashMap<>());
}

if (!this.functionStateMap.get(functionConfig.getTenant()).containsKey(functionConfig.getNamespace())) {
this.functionStateMap.get(functionConfig.getTenant())
if (!this.functionMap.get(functionConfig.getTenant()).containsKey(functionConfig.getNamespace())) {
this.functionMap.get(functionConfig.getTenant())
.put(functionConfig.getNamespace(), new ConcurrentHashMap<>());
}
this.functionStateMap.get(functionConfig.getTenant())
this.functionMap.get(functionConfig.getTenant())
.get(functionConfig.getNamespace()).put(functionConfig.getName(), functionMetaData);
}

private boolean isRequestOutdated(ServiceRequest serviceRequest) {
FunctionMetaData requestFunctionMetaData = serviceRequest.getFunctionMetaData();
FunctionConfig functionConfig = requestFunctionMetaData.getFunctionConfig();
FunctionMetaData currentFunctionMetaData = this.functionStateMap.get(functionConfig.getTenant())
FunctionMetaData currentFunctionMetaData = this.functionMap.get(functionConfig.getTenant())
.get(functionConfig.getNamespace()).get(functionConfig.getName());
return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest;

@Slf4j
public class FunctionStateConsumer
public class FunctionMetaDataTopicTailer
implements java.util.function.Consumer<Message>, Function<Throwable, Void>, AutoCloseable {

private final FunctionStateManager functionStateManager;
private final FunctionMetaDataManager functionMetaDataManager;
private final Consumer consumer;

public FunctionStateConsumer(FunctionStateManager functionStateManager,
Consumer consumer)
public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
Consumer consumer)
throws PulsarClientException {
this.functionStateManager = functionStateManager;
this.functionMetaDataManager = functionMetaDataManager;
this.consumer = consumer;
}

Expand Down Expand Up @@ -79,10 +79,10 @@ public void accept(Message msg) {

switch(serviceRequest.getRequestType()) {
case UPDATE:
this.functionStateManager.processUpdate((UpdateRequest) serviceRequest);
this.functionMetaDataManager.processUpdate((UpdateRequest) serviceRequest);
break;
case DELETE:
this.functionStateManager.proccessDeregister((DeregisterRequest) serviceRequest);
this.functionMetaDataManager.proccessDeregister((DeregisterRequest) serviceRequest);
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class Worker extends AbstractService {

private final WorkerConfig workerConfig;
private PulsarClient client;
private FunctionStateManager functionStateManager;
private FunctionStateConsumer functionStateConsumer;
private FunctionMetaDataManager functionMetaDataManager;
private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
private Thread serverThread;

public Worker(WorkerConfig workerConfig) {
Expand All @@ -46,13 +46,13 @@ protected void doStart() {
this.client = PulsarClient.create(workerConfig.getPulsarServiceUrl());
ServiceRequestManager reqMgr = new ServiceRequestManager(
client.createProducer(workerConfig.getFunctionMetadataTopic()));
this.functionStateManager = new FunctionStateManager(
this.functionMetaDataManager = new FunctionMetaDataManager(
workerConfig, reqMgr);

ConsumerConfiguration consumerConf = new ConsumerConfiguration();
consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
this.functionStateConsumer = new FunctionStateConsumer(
functionStateManager,
this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(
functionMetaDataManager,
client.subscribe(
workerConfig.getFunctionMetadataTopic(),
workerConfig.getFunctionMetadataTopicSubscription(),
Expand All @@ -65,13 +65,13 @@ protected void doStart() {
throw new RuntimeException(e);
}

WorkerServer server = new WorkerServer(workerConfig, functionStateManager);
WorkerServer server = new WorkerServer(workerConfig, functionMetaDataManager);
this.serverThread = new Thread(server, server.getThreadName());

log.info("Start worker server on port {}...", workerConfig.getWorkerPort());
serverThread.start();
log.info("Start worker function state consumer ...");
functionStateConsumer.start();
functionMetaDataTopicTailer.start();
}

@Override
Expand All @@ -84,11 +84,11 @@ protected void doStop() {
log.warn("Worker server thread is interrupted", e);
}
}
if (null != functionStateConsumer) {
functionStateConsumer.close();
if (null != functionMetaDataTopicTailer) {
functionMetaDataTopicTailer.close();
}
if (null != functionStateManager) {
functionStateManager.close();
if (null != functionMetaDataManager) {
functionMetaDataManager.close();
}
if (null != client) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.functions.runtime.worker.rest;

import org.apache.pulsar.functions.runtime.worker.FunctionStateManager;
import org.apache.pulsar.functions.runtime.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.runtime.worker.WorkerConfig;

Expand All @@ -32,7 +32,7 @@ public class BaseApiResource {
public static final String ATTRIBUTE_WORKER_SERVICE_REQUEST_MANAGER = "worker-service-request-manager";

private WorkerConfig workerConfig;
private FunctionStateManager functionStateManager;
private FunctionMetaDataManager functionMetaDataManager;
private ServiceRequestManager serviceRequestManager;

@Context
Expand All @@ -45,11 +45,11 @@ public WorkerConfig getWorkerConfig() {
return this.workerConfig;
}

public FunctionStateManager getWorkerFunctionStateManager() {
if (this.functionStateManager == null) {
this.functionStateManager = (FunctionStateManager) servletContext.getAttribute(ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER);
public FunctionMetaDataManager getWorkerFunctionStateManager() {
if (this.functionMetaDataManager == null) {
this.functionMetaDataManager = (FunctionMetaDataManager) servletContext.getAttribute(ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER);
}
return this.functionStateManager;
return this.functionMetaDataManager;
}

public ServiceRequestManager getWorkerServiceRequestManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
*/
package org.apache.pulsar.functions.runtime.worker.rest;

import org.apache.pulsar.functions.runtime.worker.FunctionStateManager;
import org.apache.pulsar.functions.runtime.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.runtime.worker.WorkerConfig;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
Expand All @@ -37,12 +36,12 @@ public class WorkerServer implements Runnable{
private static final Logger LOG = LoggerFactory.getLogger(WorkerServer.class);

private WorkerConfig workerConfig;
private FunctionStateManager functionStateManager;
private FunctionMetaDataManager functionMetaDataManager;


public WorkerServer(WorkerConfig workerConfig, FunctionStateManager functionStateManager) {
public WorkerServer(WorkerConfig workerConfig, FunctionMetaDataManager functionMetaDataManager) {
this.workerConfig = workerConfig;
this.functionStateManager = functionStateManager;
this.functionMetaDataManager = functionMetaDataManager;
}

private static String getErrorMessage(Server server, int port, Exception ex) {
Expand All @@ -64,7 +63,7 @@ public void run() {
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);

contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_CONFIG, this.workerConfig);
contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionStateManager);
contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionMetaDataManager);
contextHandler.setContextPath("/");

server.setHandler(contextHandler);
Expand Down
Loading

0 comments on commit 793b1cf

Please sign in to comment.