Skip to content

Commit

Permalink
Remove snapshotting (apache#246)
Browse files Browse the repository at this point in the history
* removing snapshotting

* cleaning up
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent fb5e34b commit 753155d
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 494 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
Expand All @@ -38,7 +38,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -56,20 +55,15 @@ public class FunctionMetaDataManager implements AutoCloseable {
private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();

private final ServiceRequestManager serviceRequestManager;
private final FunctionMetaDataSnapshotManager functionMetaDataSnapshotManager;
private final SchedulerManager schedulerManager;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
final String initializeMarkerRequestId = UUID.randomUUID().toString();

// The message id of the last messaged processed by function runtime manager
@VisibleForTesting
@Getter
MessageId lastProcessedMessageId = MessageId.earliest;

private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;

private CompletableFuture<Void> initializePhase = new CompletableFuture<>();
@Setter
@Getter
boolean isInitializePhase = false;

public FunctionMetaDataManager(WorkerConfig workerConfig,
SchedulerManager schedulerManager,
Expand All @@ -78,8 +72,6 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
this.pulsarClient = pulsarClient;
this.serviceRequestManager = getServiceRequestManager(
this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
this.functionMetaDataSnapshotManager = new FunctionMetaDataSnapshotManager(
this.workerConfig, this, this.pulsarClient);
this.schedulerManager = schedulerManager;
}

Expand All @@ -94,18 +86,24 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
*/
public void initialize() {
log.info("/** Initializing Function Metadata Manager **/");
log.info("Restoring metadata store from snapshot...");
MessageId lastMessageId = this.restore();
log.info("Function metadata store restored from snapshot with message id: {}", lastMessageId);
try {
Reader reader = this.pulsarClient.createReader(
this.workerConfig.getFunctionMetadataTopic(),
lastMessageId,
new ReaderConfiguration());

Reader reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
.create();

this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader);
// read all existing messages
this.setInitializePhase(true);
while (reader.hasMessageAvailable()) {
this.functionMetaDataTopicTailer.processRequest(reader.readNext());
}
this.setInitializePhase(false);
// schedule functions if necessary
this.schedulerManager.schedule();
// start function metadata tailer
this.functionMetaDataTopicTailer.start();
this.sendIntializationMarker();
this.initializePhase.get();

} catch (Exception e) {
log.error("Failed to initialize meta data store: ", e.getMessage(), e);
Expand Down Expand Up @@ -233,12 +231,9 @@ public synchronized CompletableFuture<RequestResult> deregisterFunction(String t
*/
public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {

// make sure that snapshotting and processing requests don't happen simultaneously
// make sure that processing requests don't happen simultaneously
synchronized (this) {
switch (serviceRequest.getServiceRequestType()) {
case INITIALIZE:
this.processInitializeMarker(serviceRequest);
break;
case UPDATE:
this.processUpdate(serviceRequest);
break;
Expand All @@ -248,17 +243,9 @@ public void processRequest(MessageId messageId, Request.ServiceRequest serviceRe
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
}
this.lastProcessedMessageId = messageId;
}
}

/**
* Creates a snapshot of the FMT (Function Metadata Topic) that can be restored at a later time
*/
public void snapshot() {
this.functionMetaDataSnapshotManager.snapshot();
}

/**
* Private methods for internal use. Should not be used outside of this class
*/
Expand Down Expand Up @@ -311,7 +298,7 @@ synchronized void proccessDeregister(Request.ServiceRequest deregisterRequest) {
completeRequest(deregisterRequest, true);
}

if (needsScheduling) {
if (!this.isInitializePhase() && needsScheduling) {
this.schedulerManager.schedule();
}
}
Expand Down Expand Up @@ -346,24 +333,11 @@ synchronized void processUpdate(Request.ServiceRequest updateRequest) {
}
}

if (needsScheduling) {
if (!this.isInitializePhase() && needsScheduling) {
this.schedulerManager.schedule();
}
}

@VisibleForTesting
void processInitializeMarker(Request.ServiceRequest serviceRequest) {
if (isMyInitializeMarkerRequest(serviceRequest)) {
this.completeInitializePhase();
log.info("Initializing Metadata state done!");
}
}


private MessageId restore() {
return this.functionMetaDataSnapshotManager.restore();
}

/**
* Complete requests that this worker has pending
* @param serviceRequest
Expand Down Expand Up @@ -425,18 +399,6 @@ CompletableFuture<RequestResult> submit(Request.ServiceRequest serviceRequest) {
return requestResultCompletableFuture;
}

private boolean isMyInitializeMarkerRequest(Request.ServiceRequest serviceRequest) {
return isSendByMe(serviceRequest) && this.initializeMarkerRequestId.equals(serviceRequest.getRequestId());
}

private boolean isSendByMe(Request.ServiceRequest serviceRequest) {
return this.workerConfig.getWorkerId().equals(serviceRequest.getWorkerId());
}

private void completeInitializePhase() {
this.initializePhase.complete(null);
}

@Override
public void close() throws Exception {
if (this.functionMetaDataTopicTailer != null) {
Expand All @@ -445,20 +407,9 @@ public void close() throws Exception {
if (this.serviceRequestManager != null) {
this.serviceRequestManager.close();
}
if (this.functionMetaDataSnapshotManager != null) {
this.functionMetaDataSnapshotManager.close();
}
}

private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
return new ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic));
}

void sendIntializationMarker() {
log.info("Sending Initialize message...");
this.serviceRequestManager.submitRequest(
ServiceRequestUtils.getIntializationRequest(
this.initializeMarkerRequestId,
this.workerConfig.getWorkerId()));
}
}

This file was deleted.

Loading

0 comments on commit 753155d

Please sign in to comment.