From 753155d6d2a7999ca11262aea935975ab88076b0 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Fri, 23 Feb 2018 13:01:17 -0800 Subject: [PATCH] Remove snapshotting (#246) * removing snapshotting * cleaning up --- .../worker/FunctionMetaDataManager.java | 93 ++------ .../FunctionMetaDataSnapshotManager.java | 190 --------------- .../worker/FunctionMetaDataTopicTailer.java | 9 +- .../functions/worker/SchedulerManager.java | 5 + .../functions/worker/WorkerService.java | 5 - .../worker/FunctionMetaDataManagerTest.java | 8 - .../FunctionMetaDataSnapshotManagerTest.java | 217 ------------------ 7 files changed, 33 insertions(+), 494 deletions(-) delete mode 100644 pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManager.java delete mode 100644 pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManagerTest.java diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 596487e8d0cc1..8a8f95e5c4cc1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -20,12 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderConfiguration; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Request; @@ -38,7 +38,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -56,20 +55,15 @@ public class FunctionMetaDataManager implements AutoCloseable { private final Map pendingServiceRequests = new ConcurrentHashMap<>(); private final ServiceRequestManager serviceRequestManager; - private final FunctionMetaDataSnapshotManager functionMetaDataSnapshotManager; private final SchedulerManager schedulerManager; private final WorkerConfig workerConfig; private final PulsarClient pulsarClient; - final String initializeMarkerRequestId = UUID.randomUUID().toString(); - - // The message id of the last messaged processed by function runtime manager - @VisibleForTesting - @Getter - MessageId lastProcessedMessageId = MessageId.earliest; private FunctionMetaDataTopicTailer functionMetaDataTopicTailer; - private CompletableFuture initializePhase = new CompletableFuture<>(); + @Setter + @Getter + boolean isInitializePhase = false; public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, @@ -78,8 +72,6 @@ public FunctionMetaDataManager(WorkerConfig workerConfig, this.pulsarClient = pulsarClient; this.serviceRequestManager = getServiceRequestManager( this.pulsarClient, this.workerConfig.getFunctionMetadataTopic()); - this.functionMetaDataSnapshotManager = new FunctionMetaDataSnapshotManager( - this.workerConfig, this, this.pulsarClient); this.schedulerManager = schedulerManager; } @@ -94,18 +86,24 @@ public FunctionMetaDataManager(WorkerConfig workerConfig, */ public void initialize() { log.info("/** Initializing Function Metadata Manager **/"); - log.info("Restoring metadata store from snapshot..."); - MessageId lastMessageId = this.restore(); - log.info("Function metadata store restored from snapshot with message id: {}", lastMessageId); try { - Reader reader = this.pulsarClient.createReader( - this.workerConfig.getFunctionMetadataTopic(), - lastMessageId, - new ReaderConfiguration()); + + Reader reader = pulsarClient.newReader() + .topic(this.workerConfig.getFunctionMetadataTopic()) + .startMessageId(MessageId.earliest) + .create(); + this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader); + // read all existing messages + this.setInitializePhase(true); + while (reader.hasMessageAvailable()) { + this.functionMetaDataTopicTailer.processRequest(reader.readNext()); + } + this.setInitializePhase(false); + // schedule functions if necessary + this.schedulerManager.schedule(); + // start function metadata tailer this.functionMetaDataTopicTailer.start(); - this.sendIntializationMarker(); - this.initializePhase.get(); } catch (Exception e) { log.error("Failed to initialize meta data store: ", e.getMessage(), e); @@ -233,12 +231,9 @@ public synchronized CompletableFuture deregisterFunction(String t */ public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) { - // make sure that snapshotting and processing requests don't happen simultaneously + // make sure that processing requests don't happen simultaneously synchronized (this) { switch (serviceRequest.getServiceRequestType()) { - case INITIALIZE: - this.processInitializeMarker(serviceRequest); - break; case UPDATE: this.processUpdate(serviceRequest); break; @@ -248,17 +243,9 @@ public void processRequest(MessageId messageId, Request.ServiceRequest serviceRe default: log.warn("Received request with unrecognized type: {}", serviceRequest); } - this.lastProcessedMessageId = messageId; } } - /** - * Creates a snapshot of the FMT (Function Metadata Topic) that can be restored at a later time - */ - public void snapshot() { - this.functionMetaDataSnapshotManager.snapshot(); - } - /** * Private methods for internal use. Should not be used outside of this class */ @@ -311,7 +298,7 @@ synchronized void proccessDeregister(Request.ServiceRequest deregisterRequest) { completeRequest(deregisterRequest, true); } - if (needsScheduling) { + if (!this.isInitializePhase() && needsScheduling) { this.schedulerManager.schedule(); } } @@ -346,24 +333,11 @@ synchronized void processUpdate(Request.ServiceRequest updateRequest) { } } - if (needsScheduling) { + if (!this.isInitializePhase() && needsScheduling) { this.schedulerManager.schedule(); } } - @VisibleForTesting - void processInitializeMarker(Request.ServiceRequest serviceRequest) { - if (isMyInitializeMarkerRequest(serviceRequest)) { - this.completeInitializePhase(); - log.info("Initializing Metadata state done!"); - } - } - - - private MessageId restore() { - return this.functionMetaDataSnapshotManager.restore(); - } - /** * Complete requests that this worker has pending * @param serviceRequest @@ -425,18 +399,6 @@ CompletableFuture submit(Request.ServiceRequest serviceRequest) { return requestResultCompletableFuture; } - private boolean isMyInitializeMarkerRequest(Request.ServiceRequest serviceRequest) { - return isSendByMe(serviceRequest) && this.initializeMarkerRequestId.equals(serviceRequest.getRequestId()); - } - - private boolean isSendByMe(Request.ServiceRequest serviceRequest) { - return this.workerConfig.getWorkerId().equals(serviceRequest.getWorkerId()); - } - - private void completeInitializePhase() { - this.initializePhase.complete(null); - } - @Override public void close() throws Exception { if (this.functionMetaDataTopicTailer != null) { @@ -445,20 +407,9 @@ public void close() throws Exception { if (this.serviceRequestManager != null) { this.serviceRequestManager.close(); } - if (this.functionMetaDataSnapshotManager != null) { - this.functionMetaDataSnapshotManager.close(); - } } private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException { return new ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic)); } - - void sendIntializationMarker() { - log.info("Sending Initialize message..."); - this.serviceRequestManager.submitRequest( - ServiceRequestUtils.getIntializationRequest( - this.initializeMarkerRequestId, - this.workerConfig.getWorkerId())); - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManager.java deleted file mode 100644 index 1390b52d94be9..0000000000000 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManager.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import com.google.protobuf.ByteString; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderConfiguration; -import org.apache.pulsar.functions.proto.Function; - -import java.io.IOException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutionException; - -@Slf4j -public class FunctionMetaDataSnapshotManager implements AutoCloseable{ - - private final WorkerConfig workerConfig; - private PulsarAdmin pulsarAdminClient; - private final PulsarClient pulsarClient; - private final FunctionMetaDataManager functionMetaDataManager; - - public FunctionMetaDataSnapshotManager (WorkerConfig workerConfig, - FunctionMetaDataManager functionMetaDataManager, - PulsarClient pulsarClient) { - this.workerConfig = workerConfig; - this.functionMetaDataManager = functionMetaDataManager; - this.pulsarClient = pulsarClient; - } - - /** - * Restores the latest snapshot into in memory state - * @return the message Id associated with the latest snapshot - */ - MessageId restore() { - List snapshots = getSnapshotTopics(); - if (snapshots.isEmpty()) { - // if no snapshot that go to earliest message in fmt - return MessageId.earliest; - } else { - - String latestsSnapshot = String.format("persistent://%s/%s/snapshot-%d", - this.workerConfig.getPulsarFunctionsNamespace(), - this.workerConfig.getFunctionMetadataSnapshotsTopicPath(), - snapshots.get(0)); - log.info("Restoring state snapshot from {}", latestsSnapshot); - Function.Snapshot snapshot = null; - MessageId lastAppliedMessageId = null; - try (Reader reader = this.pulsarClient.createReader( - latestsSnapshot, MessageId.earliest, new ReaderConfiguration())){ - snapshot = Function.Snapshot.parseFrom(reader.readNextAsync().get().getData()); - for (Function.FunctionMetaData functionMetaData : snapshot.getFunctionMetaDataListList()) { - this.functionMetaDataManager.setFunctionMetaData(functionMetaData); - } - lastAppliedMessageId = MessageId.fromByteArray(snapshot.getLastAppliedMessageId().toByteArray()); - - } catch (InterruptedException | ExecutionException | IOException e) { - log.error("Failed to read snapshot from topic " + latestsSnapshot); - throw new RuntimeException(e); - } - log.info("Restored state snapshot from {} with last message id {}", latestsSnapshot, lastAppliedMessageId); - return lastAppliedMessageId; - } - } - - /** - * Snap shots the current state and puts it in a topic. Only one worker should execute this at a time - */ - void snapshot() { - Function.Snapshot.Builder snapshotBuilder = Function.Snapshot.newBuilder(); - - List snapshots = getSnapshotTopics(); - int nextSnapshotTopicIndex = 0; - if (!snapshots.isEmpty()) { - nextSnapshotTopicIndex = snapshots.get(0); - } - nextSnapshotTopicIndex += 1; - String nextSnapshotTopic = String.format("persistent://%s/%s/snapshot-%d", - this.workerConfig.getPulsarFunctionsNamespace(), - this.workerConfig.getFunctionMetadataSnapshotsTopicPath(), - nextSnapshotTopicIndex); - - // Make sure not processing any requests at the same time - synchronized (this) { - List functionMetaDataList = functionMetaDataManager.getAllFunctionMetaData(); - if (functionMetaDataList.isEmpty()) { - return; - } - for (Function.FunctionMetaData functionMetaData : functionMetaDataList) { - snapshotBuilder.addFunctionMetaDataList(functionMetaData); - } - log.info("Writing snapshot to {} with last message id {}", nextSnapshotTopic, - functionMetaDataManager.getLastProcessedMessageId()); - snapshotBuilder.setLastAppliedMessageId(ByteString.copyFrom( - functionMetaDataManager.getLastProcessedMessageId().toByteArray())); - } - - this.writeSnapshot(nextSnapshotTopic, snapshotBuilder.build()); - - // deleting older snapshots - for (Integer snapshotIndex : snapshots) { - String oldSnapshotTopic = String.format("persistent://%s/%s/snapshot-%d", - this.workerConfig.getPulsarFunctionsNamespace(), - this.workerConfig.getFunctionMetadataSnapshotsTopicPath(), - snapshotIndex); - log.info("Deleting old snapshot {}", oldSnapshotTopic); - this.deleteSnapshot(oldSnapshotTopic); - } - } - - void writeSnapshot(String topic, Function.Snapshot snapshot) { - try (Producer producer = this.pulsarClient.createProducer(topic)){ - producer.send(snapshot.toByteArray()); - } catch (PulsarClientException e) { - log.error("Failed to write snapshot", e); - throw new RuntimeException(e); - } - } - - void deleteSnapshot(String snapshotTopic) { - PulsarAdmin pulsarAdmin = this.getPulsarAdminClient(); - try { - pulsarAdmin.persistentTopics().delete(snapshotTopic); - } catch (PulsarAdminException e) { - log.error("Failed to delete old snapshot {}", snapshotTopic, e); - throw new RuntimeException(e); - } - } - - List getSnapshotTopics() { - PulsarAdmin pulsarAdmin = this.getPulsarAdminClient(); - String namespace = workerConfig.getPulsarFunctionsNamespace(); - String snapshotsTopicPath = workerConfig.getFunctionMetadataSnapshotsTopicPath(); - String snapshotTopicPath = String.format("persistent://%s/%s", namespace, snapshotsTopicPath); - List ret = new LinkedList<>(); - try { - List topics = pulsarAdmin.persistentTopics().getList(namespace); - for (String topic : topics) { - String prefix = String.format("%s/snapshot-", snapshotTopicPath); - if (topic.startsWith(prefix)) { - ret.add(Integer.parseInt( - topic.replace(prefix, ""))); - } - } - } catch (PulsarAdminException e) { - log.error("Error getting persistent topics", e); - throw new RuntimeException(e); - } - Collections.sort(ret, Collections.reverseOrder()); - return ret; - } - - private PulsarAdmin getPulsarAdminClient() { - if (this.pulsarAdminClient == null) { - this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl()); - } - return this.pulsarAdminClient; - } - - @Override - public void close() throws Exception { - if (this.pulsarAdminClient != null) { - this.pulsarAdminClient.close(); - } - } -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java index cea64492f82f3..29a890c40cc5e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java @@ -63,9 +63,7 @@ public void close() { log.info("Stopped function state consumer"); } - @Override - public void accept(Message msg) { - + public void processRequest(Message msg) { ServiceRequest serviceRequest; try { @@ -80,7 +78,12 @@ public void accept(Message msg) { } this.functionMetaDataManager.processRequest(msg.getMessageId(), serviceRequest); + } + + @Override + public void accept(Message msg) { + processRequest(msg); // receive next request receiveOne(); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 1bfb1e0b65c2e..7ae8b7862dfbc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -133,10 +133,15 @@ private void invokeScheduler() { .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()).collect(Collectors.toList()); List needsAssignment = this.getUnassignedFunctionInstances(workerIdToAssignments, allInstances); + if (needsAssignment.isEmpty()) { + return; + } List assignments = this.scheduler.schedule( needsAssignment, currentAssignments, currentMembership); + log.debug("New assignments computed: {}", assignments); + long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1; Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() .setVersion(assignmentVersion) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index b6e20331fd94c..5793e97a40f71 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -127,11 +127,6 @@ public void start(URI dlogUri) throws InterruptedException { this.clusterServiceCoordinator = new ClusterServiceCoordinator( this.workerConfig.getWorkerId(), membershipManager); - // start periodic snapshot routine - this.clusterServiceCoordinator.addTask( - "snapshot", - this.workerConfig.getSnapshotFreqMs(), - () -> functionMetaDataManager.snapshot()); this.clusterServiceCoordinator.addTask("membership-monitor", this.workerConfig.getFailureCheckFreqMs(), diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java index 65471ea95de8b..eb37e77c23505 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java @@ -211,7 +211,6 @@ public void testProcessRequest() throws PulsarClientException { mock(SchedulerManager.class), mock(PulsarClient.class))); - Mockito.doNothing().when(functionMetaDataManager).processInitializeMarker(any(Request.ServiceRequest.class)); Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class)); Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class)); @@ -220,7 +219,6 @@ public void testProcessRequest() throws PulsarClientException { Request.ServiceRequest.ServiceRequestType.UPDATE).build(); functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest); - Assert.assertEquals(MessageId.earliest, functionMetaDataManager.lastProcessedMessageId); verify(functionMetaDataManager, times(1)).processUpdate (any(Request.ServiceRequest.class)); verify(functionMetaDataManager).processUpdate(serviceRequest); @@ -230,17 +228,11 @@ public void testProcessRequest() throws PulsarClientException { Request.ServiceRequest.ServiceRequestType.INITIALIZE).build(); functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest); - Assert.assertEquals(MessageId.earliest, functionMetaDataManager.lastProcessedMessageId); - verify(functionMetaDataManager, times(1)).processInitializeMarker( - any(Request.ServiceRequest.class)); - verify(functionMetaDataManager).processInitializeMarker(serviceRequest); - serviceRequest = Request.ServiceRequest.newBuilder().setServiceRequestType( Request.ServiceRequest.ServiceRequestType.DELETE).build(); functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest); - Assert.assertEquals(MessageId.earliest, functionMetaDataManager.lastProcessedMessageId); verify(functionMetaDataManager, times(1)).proccessDeregister( any(Request.ServiceRequest.class)); verify(functionMetaDataManager).proccessDeregister(serviceRequest); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManagerTest.java deleted file mode 100644 index a124a605c360b..0000000000000 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataSnapshotManagerTest.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import com.google.protobuf.ByteString; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderConfiguration; -import org.apache.pulsar.functions.proto.Function; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class FunctionMetaDataSnapshotManagerTest { - - @Test - public void testSnapshot() throws IOException { - - FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); - FunctionMetaDataSnapshotManager functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - mock(PulsarClient.class))); - // nothing to snapshot - - Mockito.doReturn(new LinkedList<>()).when(functionMetaDataManager).getAllFunctionMetaData(); - Mockito.doReturn(new LinkedList<>()).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - - functionMetaDataSnapshotManager.snapshot(); - verify(functionMetaDataSnapshotManager, times(0)).writeSnapshot(any(), any()); - verify(functionMetaDataSnapshotManager, times(0)).deleteSnapshot(any()); - - // things to snapshot - functionMetaDataManager = mock(FunctionMetaDataManager.class); - functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - mock(PulsarClient.class))); - List functionMetaDataList = new LinkedList<>(); - functionMetaDataList.add(Function.FunctionMetaData.getDefaultInstance()); - Mockito.doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - Mockito.doReturn(MessageId.earliest).when(functionMetaDataManager).getLastProcessedMessageId(); - Mockito.doReturn(new LinkedList<>()).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - Mockito.doNothing().when(functionMetaDataSnapshotManager).writeSnapshot(anyString(), any(Function.Snapshot.class)); - functionMetaDataSnapshotManager.snapshot(); - verify(functionMetaDataSnapshotManager, times(1)).writeSnapshot(anyString(), any(Function.Snapshot.class)); - verify(functionMetaDataSnapshotManager).writeSnapshot( - eq("persistent://test/standalone/functions/snapshots-tests/snapshot-1"), - any(Function.Snapshot.class)); - verify(functionMetaDataSnapshotManager, times(0)).deleteSnapshot(any()); - - // nothing to snapshot but a snap exists - functionMetaDataManager = mock(FunctionMetaDataManager.class); - functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - mock(PulsarClient.class))); - List lst = new LinkedList<>(); - lst.add(1); - functionMetaDataList.add(Function.FunctionMetaData.getDefaultInstance()); - Mockito.doReturn(new LinkedList<>()).when(functionMetaDataManager).getAllFunctionMetaData(); - Mockito.doReturn(MessageId.earliest).when(functionMetaDataManager).getLastProcessedMessageId(); - Mockito.doReturn(lst).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - Mockito.doNothing().when(functionMetaDataSnapshotManager).writeSnapshot(anyString(), any(Function.Snapshot.class)); - functionMetaDataSnapshotManager.snapshot(); - verify(functionMetaDataSnapshotManager, times(0)).writeSnapshot(anyString(), any(Function.Snapshot.class)); - verify(functionMetaDataSnapshotManager, times(0)).deleteSnapshot(any()); - - // something to snapshot and old snapshot to delete - functionMetaDataManager = mock(FunctionMetaDataManager.class); - functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - mock(PulsarClient.class))); - lst = new LinkedList<>(); - lst.add(1); - lst.add(2); - Collections.sort(lst, Collections.reverseOrder()); - functionMetaDataList.add(Function.FunctionMetaData.getDefaultInstance()); - Mockito.doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - Mockito.doReturn(MessageId.earliest).when(functionMetaDataManager).getLastProcessedMessageId(); - Mockito.doReturn(lst).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - Mockito.doNothing().when(functionMetaDataSnapshotManager).writeSnapshot(anyString(), any(Function.Snapshot.class)); - Mockito.doNothing().when(functionMetaDataSnapshotManager).deleteSnapshot(anyString()); - functionMetaDataSnapshotManager.snapshot(); - verify(functionMetaDataSnapshotManager, times(1)).writeSnapshot(anyString(), any(Function.Snapshot.class)); - verify(functionMetaDataSnapshotManager).writeSnapshot( - eq("persistent://test/standalone/functions/snapshots-tests/snapshot-3"), - any(Function.Snapshot.class)); - verify(functionMetaDataSnapshotManager, times(2)).deleteSnapshot(any()); - } - - @Test - public void testRestoreSnapshot() throws PulsarClientException { - FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); - FunctionMetaDataSnapshotManager functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - mock(PulsarClient.class))); - - //nothing to restore - Mockito.doReturn(new LinkedList<>()).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - Assert.assertEquals(MessageId.earliest, functionMetaDataSnapshotManager.restore()); - - //snapshots to restore - Message msg = mock(Message.class); - Function.FunctionMetaData functionMetaData = Function.FunctionMetaData.newBuilder() - .getDefaultInstanceForType(); - when(msg.getData()).thenReturn(Function.Snapshot.newBuilder() - .addFunctionMetaDataList(functionMetaData).setLastAppliedMessageId( - ByteString.copyFrom(MessageId.latest.toByteArray())).build().toByteArray()); - - CompletableFuture receiveFuture = CompletableFuture.completedFuture(msg); - Reader reader = mock(Reader.class); - when(reader.readNextAsync()) - .thenReturn(receiveFuture) - .thenReturn(new CompletableFuture<>()); - PulsarClient pulsarClient = mock(PulsarClient.class); - Mockito.doReturn(reader).when(pulsarClient).createReader(anyString(), any(MessageId.class), any - (ReaderConfiguration.class)); - functionMetaDataManager = mock(FunctionMetaDataManager.class); - functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - pulsarClient)); - List lst = new LinkedList<>(); - lst.add(1); - Mockito.doReturn(lst).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - Assert.assertEquals(MessageId.latest, functionMetaDataSnapshotManager.restore()); - verify(pulsarClient).createReader(eq("persistent://test/standalone/functions/snapshots-tests/snapshot-1"), - eq(MessageId.earliest), any(ReaderConfiguration.class)); - verify(functionMetaDataManager, times(1)).setFunctionMetaData(any(Function.FunctionMetaData.class)); - verify(functionMetaDataManager).setFunctionMetaData(functionMetaData); - - - // mulitple snapshots - msg = mock(Message.class); - functionMetaData = Function.FunctionMetaData.newBuilder() - .getDefaultInstanceForType(); - when(msg.getData()).thenReturn(Function.Snapshot.newBuilder() - .addFunctionMetaDataList(functionMetaData).setLastAppliedMessageId( - ByteString.copyFrom(MessageId.latest.toByteArray())).build().toByteArray()); - - receiveFuture = CompletableFuture.completedFuture(msg); - reader = mock(Reader.class); - when(reader.readNextAsync()) - .thenReturn(receiveFuture) - .thenReturn(new CompletableFuture<>()); - pulsarClient = mock(PulsarClient.class); - Mockito.doReturn(reader).when(pulsarClient).createReader(anyString(), any(MessageId.class), any - (ReaderConfiguration.class)); - functionMetaDataManager = mock(FunctionMetaDataManager.class); - functionMetaDataSnapshotManager = spy( - new FunctionMetaDataSnapshotManager(new WorkerConfig() - .setPulsarFunctionsNamespace("test/standalone/functions") - .setFunctionMetadataSnapshotsTopicPath("snapshots-tests"), - functionMetaDataManager, - pulsarClient)); - lst = new LinkedList<>(); - lst.add(1); - lst.add(2); - Collections.sort(lst, Collections.reverseOrder()); - Mockito.doReturn(lst).when(functionMetaDataSnapshotManager).getSnapshotTopics(); - Assert.assertEquals(MessageId.latest, functionMetaDataSnapshotManager.restore()); - verify(pulsarClient).createReader(eq("persistent://test/standalone/functions/snapshots-tests/snapshot-2"), - eq(MessageId.earliest), any(ReaderConfiguration.class)); - verify(functionMetaDataManager, times(1)).setFunctionMetaData(any(Function.FunctionMetaData.class)); - verify(functionMetaDataManager).setFunctionMetaData(functionMetaData); - - } -}