Skip to content

Commit

Permalink
feat: restart interrupted data flows
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Nov 11, 2024
1 parent 858d09b commit a2b904f
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -49,6 +50,7 @@ public class EmbeddedRuntime extends BaseRuntime {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final MultiSourceServiceLocator serviceLocator;
private final URL[] classPathEntries;
private Future<?> runtimeThread;

public EmbeddedRuntime(String name, Map<String, String> properties, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules));
Expand Down Expand Up @@ -78,7 +80,7 @@ public void boot(boolean addShutdownHook) {
var runtimeException = new AtomicReference<Exception>();
var latch = new CountDownLatch(1);

executorService.execute(() -> {
runtimeThread = executorService.submit(() -> {
try {
var classLoader = URLClassLoader.newInstance(classPathEntries);

Expand Down Expand Up @@ -110,6 +112,9 @@ public void boot(boolean addShutdownHook) {
public void shutdown() {
serviceLocator.clearSystemExtensions();
super.shutdown();
if (runtimeThread != null && !runtimeThread.isDone()) {
runtimeThread.cancel(true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class StateMachineManager {

private final List<Processor> processors = new ArrayList<>();
private final List<Processor> startupProcessors = new ArrayList<>();
private final ScheduledExecutorService executor;
private final AtomicBoolean active = new AtomicBoolean();
private final WaitStrategy waitStrategy;
Expand All @@ -65,6 +66,7 @@ private StateMachineManager(String name, Monitor monitor, ExecutorInstrumentatio
*/
public Future<?> start() {
active.set(true);
performStartupLogic();
return scheduleNextIterationIn(0L);
}

Expand Down Expand Up @@ -103,6 +105,19 @@ private Runnable loop() {
};
}

private void performStartupLogic() {
for (var startupProcessor : startupProcessors) {
try {
long count;
do {
count = startupProcessor.process();
} while (count > 0);
} catch (Throwable e) {
monitor.severe(format("StateMachineManager [%s] startup error caught", name), e);
}
}
}

private void performLogic() {
try {
var processed = processors.stream()
Expand Down Expand Up @@ -150,6 +165,17 @@ public Builder shutdownTimeout(int seconds) {
return this;
}

/**
* Register a processor that will run once at startup before the regular processors.
*
* @param startupProcessor the processor.
* @return the builder.
*/
public Builder startupProcessor(Processor startupProcessor) {
loop.startupProcessors.add(startupProcessor);
return this;
}

public StateMachineManager build() {
return loop;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;


class StateMachineManagerTest {

private final WaitStrategy waitStrategy = mock(WaitStrategy.class);
private final Monitor monitor = mock(Monitor.class);
private final WaitStrategy waitStrategy = mock();
private final Monitor monitor = mock();
private final ExecutorInstrumentation instrumentation = ExecutorInstrumentation.noop();

@BeforeEach
Expand All @@ -45,12 +47,9 @@ void setUp() {
}

@Test
void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
void shouldExecuteProcessorsAsyncAndCanBeStopped() {
var processor = mock(Processor.class);
when(processor.process()).thenAnswer(i -> {
Thread.sleep(100L);
return 1L;
});
when(processor.process()).thenAnswer(i -> process(1));
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.shutdownTimeout(1)
Expand All @@ -67,12 +66,10 @@ void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
}

@Test
void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws InterruptedException {
void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() {
var processor = mock(Processor.class);
when(processor.process()).thenReturn(1L);
doAnswer(i -> {
return 1L;
}).when(waitStrategy).success();
doAnswer(i -> 1L).when(waitStrategy).success();
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();
Expand All @@ -86,13 +83,11 @@ void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws Interrup
}

@Test
void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {
void shouldWaitForSomeTimeIfNoEntityIsProcessed() {
var processor = mock(Processor.class);
when(processor.process()).thenReturn(0L);
var waitStrategy = mock(WaitStrategy.class);
doAnswer(i -> {
return 0L;
}).when(waitStrategy).waitForMillis();
doAnswer(i -> 0L).when(waitStrategy).waitForMillis();
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();
Expand All @@ -118,12 +113,10 @@ void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {
}

@Test
void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws InterruptedException {
void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() {
var processor = mock(Processor.class);
when(processor.process()).thenThrow(new EdcException("exception")).thenReturn(0L);
when(waitStrategy.retryInMillis()).thenAnswer(i -> {
return 1L;
});
when(waitStrategy.retryInMillis()).thenAnswer(i -> 1L);
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
.build();
Expand All @@ -135,4 +128,36 @@ void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws Interrupte
verify(waitStrategy).retryInMillis();
});
}

@Test
void shouldExecuteStartupProcessorUntilItHasEntitiesToProcess() {
var processor = mock(Processor.class);
when(processor.process()).thenAnswer(i -> process(1));
var startupProcessor = mock(Processor.class);
when(startupProcessor.process()).thenAnswer(i -> process(1)).thenAnswer(i -> process(1)).thenAnswer(i -> process(0));
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.startupProcessor(startupProcessor)
.processor(processor)
.shutdownTimeout(1)
.build();

stateMachine.start();

await().untilAsserted(() -> {
verify(processor, atLeast(2)).process();
verify(startupProcessor, times(3)).process();

assertThat(stateMachine.stop()).succeedsWithin(2, SECONDS);
verifyNoMoreInteractions(processor);
});
}

private long process(long count) {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void shutdown() {
if (dataPlaneManager != null) {
dataPlaneManager.stop();
}
pipelineService.closeAll();
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason)
@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.startupProcessor(processDataFlowInState(STARTED, this::restartFlow))
.processor(processDataFlowInState(RECEIVED, this::processReceived))
.processor(processDataFlowInState(COMPLETED, this::processCompleted))
.processor(processDataFlowInState(FAILED, this::processFailed));
Expand Down Expand Up @@ -192,6 +193,11 @@ private Result<DataFlowResponseMessage> handlePush(DataFlowStartMessage startMes
.build());
}

private boolean restartFlow(DataFlow dataFlow) {
dataFlow.transitToReceived();
return processReceived(dataFlow);
}

private boolean processReceived(DataFlow dataFlow) {
var request = dataFlow.toRequest();
var transferService = transferServiceRegistry.resolveTransferService(request);
Expand All @@ -206,7 +212,7 @@ private boolean processReceived(DataFlow dataFlow) {
store.save(dataFlow);

return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request))
.entityRetrieve(id -> store.findById(id))
.entityRetrieve(id -> store.findByIdAndLease(id).orElse(f -> null))
.onSuccess((f, r) -> {
if (f.getState() != STARTED.code()) {
return;
Expand Down Expand Up @@ -297,6 +303,7 @@ public Builder authorizationService(DataPlaneAuthorizationService authorizationS
manager.authorizationService = authorizationService;
return this;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public StreamResult<Void> terminate(DataFlow dataFlow) {
return terminate(dataFlow.getId());
}

@Override
public void closeAll() {
sources.forEach((processId, source) -> terminate(processId));
}

@Override
public void registerFactory(DataSourceFactory factory) {
sourceFactories.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
Expand All @@ -24,12 +25,17 @@
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneFrameworkExtensionTest {

private final PipelineService pipelineService = mock();

@BeforeEach
public void setUp(ServiceExtensionContext context) {
context.registerService(PipelineService.class, pipelineService);
context.registerService(ExecutorInstrumentation.class, ExecutorInstrumentation.noop());
}

Expand All @@ -40,4 +46,10 @@ void initialize_registers_transferService(ServiceExtensionContext context, DataP
assertThat(context.getService(TransferServiceRegistry.class)).isInstanceOf(TransferServiceRegistryImpl.class);
}

@Test
void shouldClosePipelineService_whenShutdown(DataPlaneFrameworkExtension extension) {
extension.shutdown();

verify(pipelineService).closeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class Received {
void shouldStartTransferTransitionAndTransitionToStarted() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(new CompletableFuture<>());
Expand All @@ -373,7 +373,7 @@ void shouldStartTransferTransitionAndTransitionToStarted() {
void shouldStarTransitionToCompleted_whenTransferSucceeds() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
Expand All @@ -391,7 +391,7 @@ void shouldStartTransferAndNotTransitionToCompleted_whenTransferSucceedsBecauseI
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
var terminatedDataFlow = dataFlowBuilder().state(TERMINATED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(terminatedDataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(terminatedDataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
Expand All @@ -409,7 +409,7 @@ void shouldNotChangeState_whenTransferGetsSuspended() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
var terminatedDataFlow = dataFlowBuilder().state(SUSPENDED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(terminatedDataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(terminatedDataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));
Expand All @@ -426,7 +426,7 @@ void shouldNotChangeState_whenTransferGetsSuspended() {
void shouldStartTransferAndTransitionToFailed_whenTransferFails() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.error("an error")));
Expand All @@ -443,7 +443,7 @@ void shouldStartTransferAndTransitionToFailed_whenTransferFails() {
void shouldStartTransferAndTransitionToReceivedForRetrying_whenTransferFutureIsFailed() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(failedFuture(new RuntimeException("an error")));
Expand All @@ -460,7 +460,7 @@ void shouldStartTransferAndTransitionToReceivedForRetrying_whenTransferFutureIsF
void shouldTransitToFailedIfNoTransferServiceCanHandleStarted() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(dataFlow);
when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(null);

manager.start();
Expand Down Expand Up @@ -565,6 +565,25 @@ void shouldStillSuspend_whenDataFlowHasNoSource() {
}
}

@Nested
class RestartFlowsAtStartup {
@Test
void shouldRestartFlowsAtStartup() {
var dataFlow = dataFlowBuilder().state(STARTED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(STARTED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(new CompletableFuture<>());

manager.start();

await().untilAsserted(() -> {
verify(transferService).transfer(isA(DataFlowStartMessage.class));
verify(store).save(argThat(it -> it.getState() == STARTED.code()));
});
}
}

private DataFlow.Builder dataFlowBuilder() {
return DataFlow.Builder.newInstance()
.source(DataAddress.Builder.newInstance().type("source").build())
Expand Down
Loading

0 comments on commit a2b904f

Please sign in to comment.