diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java index b3795579ccd..0e19671302e 100644 --- a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java @@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.SECONDS; @@ -49,6 +50,7 @@ public class EmbeddedRuntime extends BaseRuntime { private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final MultiSourceServiceLocator serviceLocator; private final URL[] classPathEntries; + private Future runtimeThread; public EmbeddedRuntime(String name, Map properties, String... additionalModules) { this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules)); @@ -78,7 +80,7 @@ public void boot(boolean addShutdownHook) { var runtimeException = new AtomicReference(); var latch = new CountDownLatch(1); - executorService.execute(() -> { + runtimeThread = executorService.submit(() -> { try { var classLoader = URLClassLoader.newInstance(classPathEntries); @@ -110,6 +112,9 @@ public void boot(boolean addShutdownHook) { public void shutdown() { serviceLocator.clearSystemExtensions(); super.shutdown(); + if (runtimeThread != null && !runtimeThread.isDone()) { + runtimeThread.cancel(true); + } } @Override diff --git a/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/CriterionOperatorRegistryImpl.java b/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/CriterionOperatorRegistryImpl.java index 3dd32109cf0..aba06765afc 100644 --- a/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/CriterionOperatorRegistryImpl.java +++ b/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/CriterionOperatorRegistryImpl.java @@ -45,6 +45,7 @@ public static CriterionOperatorRegistry ofDefaults() { registry.registerOperatorPredicate(ILIKE, new IlikeOperatorPredicate()); registry.registerOperatorPredicate(CONTAINS, new ContainsOperatorPredicate()); registry.registerOperatorPredicate(NOT_EQUAL, new NotEqualOperatorPredicate()); + registry.registerOperatorPredicate(LESS_THAN, new LessThanOperatorPredicate()); return registry; } diff --git a/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/LessThanOperatorPredicate.java b/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/LessThanOperatorPredicate.java new file mode 100644 index 00000000000..ef77ccbaab0 --- /dev/null +++ b/core/common/lib/query-lib/src/main/java/org/eclipse/edc/query/LessThanOperatorPredicate.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.query; + +import org.eclipse.edc.spi.query.OperatorPredicate; + +import java.util.Comparator; + +public class LessThanOperatorPredicate implements OperatorPredicate { + @Override + public boolean test(Object value, Object comparedTo) { + if (value instanceof Number number1 && comparedTo instanceof Number number2) { + return Double.compare(number1.doubleValue(), number2.doubleValue()) < 0; + } + + if (value instanceof String string1 && comparedTo instanceof String string2) { + return Comparator.naturalOrder().compare(string1, string2) < 0; + } + + return false; + } +} diff --git a/core/common/lib/query-lib/src/test/java/org/eclipse/edc/query/LessThanOperatorPredicateTest.java b/core/common/lib/query-lib/src/test/java/org/eclipse/edc/query/LessThanOperatorPredicateTest.java new file mode 100644 index 00000000000..d0b2652e4d0 --- /dev/null +++ b/core/common/lib/query-lib/src/test/java/org/eclipse/edc/query/LessThanOperatorPredicateTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.query; + +import org.eclipse.edc.spi.query.OperatorPredicate; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class LessThanOperatorPredicateTest { + + private final OperatorPredicate predicate = new LessThanOperatorPredicate(); + + @ParameterizedTest + @ArgumentsSource(ValidValues.class) + void shouldReturnTrue_whenValueLessThanComparedOne(Object value, Object comparedTo) { + assertThat(predicate.test(value, comparedTo)).isTrue(); + } + + @ParameterizedTest + @ArgumentsSource(InvalidValues.class) + void shouldReturnFalse_whenValueNotLessThanComparedOne(Object value, Object comparedTo) { + assertThat(predicate.test(value, comparedTo)).isFalse(); + } + + private static class ValidValues implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + arguments(1, 2), + arguments(1, 2L), + arguments(1, 1.01f), + arguments(1, 1.01d), + arguments("a", "b") + ); + } + } + + private static class InvalidValues implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + arguments(1, 1), + arguments(1, 1L), + arguments(1, 1.0f), + arguments(1, 1.0d), + arguments("a", "a") + ); + } + } + +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/AbstractStateEntityManager.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/AbstractStateEntityManager.java index 92292d96a62..70f6e3b81f8 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/AbstractStateEntityManager.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/AbstractStateEntityManager.java @@ -55,7 +55,6 @@ public abstract class AbstractStateEntityManager, S @Override public void start() { - entityRetryProcessFactory = new EntityRetryProcessFactory(monitor, clock, entityRetryProcessConfiguration); var stateMachineManagerBuilder = StateMachineManager.Builder .newInstance(getClass().getSimpleName(), monitor, executorInstrumentation, waitStrategy); stateMachineManager = configureStateMachineManager(stateMachineManagerBuilder).build(); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java index 75e073df655..fab4d18c73f 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java @@ -143,6 +143,7 @@ public void initialize(ServiceExtensionContext context) { @Override public void start() { + dataPlaneManager.restartFlows(); dataPlaneManager.start(); } @@ -151,6 +152,7 @@ public void shutdown() { if (dataPlaneManager != null) { dataPlaneManager.stop(); } + pipelineService.closeAll(); } @Provider diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index 0d52e928455..c762bc99153 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -35,6 +35,7 @@ import org.eclipse.edc.statemachine.StateMachineManager; import org.jetbrains.annotations.Nullable; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.function.Function; @@ -47,6 +48,8 @@ import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.eclipse.edc.spi.result.Result.success; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH; /** * Default data manager implementation. @@ -65,7 +68,7 @@ private DataPlaneManagerImpl() { public Result validate(DataFlowStartMessage dataRequest) { // TODO for now no validation for pull scenario, since the transfer service registry // is not applicable here. Probably validation only on the source part required. - if (FlowType.PULL.equals(dataRequest.getFlowType())) { + if (PULL.equals(dataRequest.getFlowType())) { return success(true); } else { var transferService = transferServiceRegistry.resolveTransferService(dataRequest); @@ -121,6 +124,26 @@ public StatusResult terminate(String dataFlowId, @Nullable String reason) }); } + @Override + public StatusResult restartFlows() { + var now = clock.millis(); + List toBeRestarted; + do { + toBeRestarted = store.nextNotLeased(batchSize, + hasState(STARTED.code()), + new Criterion("stateTimestamp", "<", now), + new Criterion("transferType.flowType", "=", PUSH.toString()) + ); + + toBeRestarted.forEach(dataFlow -> { + dataFlow.transitToReceived(); + processReceived(dataFlow); + }); + } while (!toBeRestarted.isEmpty()); + + return StatusResult.success(); + } + @Override protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { return builder @@ -206,7 +229,7 @@ private boolean processReceived(DataFlow dataFlow) { store.save(dataFlow); return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request)) - .entityRetrieve(id -> store.findById(id)) + .entityRetrieve(id -> store.findByIdAndLease(id).orElse(f -> null)) .onSuccess((f, r) -> { if (f.getState() != STARTED.code()) { return; @@ -279,6 +302,7 @@ public Builder self() { @Override public DataPlaneManagerImpl build() { + super.build(); Objects.requireNonNull(manager.transferProcessClient); return manager; } @@ -297,6 +321,7 @@ public Builder authorizationService(DataPlaneAuthorizationService authorizationS manager.authorizationService = authorizationService; return this; } + } } diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java index 340e5d44f21..8f493c9b684 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java @@ -120,6 +120,11 @@ public StreamResult terminate(DataFlow dataFlow) { return terminate(dataFlow.getId()); } + @Override + public void closeAll() { + sources.forEach((processId, source) -> terminate(processId)); + } + @Override public void registerFactory(DataSourceFactory factory) { sourceFactories.add(factory); diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtensionTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtensionTest.java index 6eef62a47ab..dd9d6611029 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtensionTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtensionTest.java @@ -15,6 +15,7 @@ package org.eclipse.edc.connector.dataplane.framework; import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; import org.eclipse.edc.spi.system.ExecutorInstrumentation; @@ -24,12 +25,17 @@ import org.junit.jupiter.api.extension.ExtendWith; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; @ExtendWith(DependencyInjectionExtension.class) class DataPlaneFrameworkExtensionTest { + private final PipelineService pipelineService = mock(); + @BeforeEach public void setUp(ServiceExtensionContext context) { + context.registerService(PipelineService.class, pipelineService); context.registerService(ExecutorInstrumentation.class, ExecutorInstrumentation.noop()); } @@ -40,4 +46,10 @@ void initialize_registers_transferService(ServiceExtensionContext context, DataP assertThat(context.getService(TransferServiceRegistry.class)).isInstanceOf(TransferServiceRegistryImpl.class); } + @Test + void shouldClosePipelineService_whenShutdown(DataPlaneFrameworkExtension extension) { + extension.shutdown(); + + verify(pipelineService).closeAll(); + } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index 1675b64bb42..c2a536e2333 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -17,6 +17,7 @@ import org.eclipse.edc.connector.controlplane.api.client.spi.transferprocess.TransferProcessApiClient; import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; @@ -64,10 +65,12 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -81,7 +84,7 @@ class DataPlaneManagerImplTest { private final DataFlowStartMessage request = createRequest(); private final TransferServiceRegistry registry = mock(); private final DataPlaneAuthorizationService authorizationService = mock(); - private DataPlaneManagerImpl manager; + private DataPlaneManager manager; @BeforeEach public void setUp() { @@ -356,7 +359,7 @@ class Received { void shouldStartTransferTransitionAndTransitionToStarted() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(dataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow)); when(registry.resolveTransferService(any())).thenReturn(transferService); when(transferService.canHandle(any())).thenReturn(true); when(transferService.transfer(any())).thenReturn(new CompletableFuture<>()); @@ -373,7 +376,7 @@ void shouldStartTransferTransitionAndTransitionToStarted() { void shouldStarTransitionToCompleted_whenTransferSucceeds() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(dataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow)); when(registry.resolveTransferService(any())).thenReturn(transferService); when(transferService.canHandle(any())).thenReturn(true); when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success())); @@ -391,7 +394,7 @@ void shouldStartTransferAndNotTransitionToCompleted_whenTransferSucceedsBecauseI var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); var terminatedDataFlow = dataFlowBuilder().state(TERMINATED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(terminatedDataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(terminatedDataFlow)); when(registry.resolveTransferService(any())).thenReturn(transferService); when(transferService.canHandle(any())).thenReturn(true); when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success())); @@ -409,7 +412,7 @@ void shouldNotChangeState_whenTransferGetsSuspended() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); var terminatedDataFlow = dataFlowBuilder().state(SUSPENDED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(terminatedDataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(terminatedDataFlow)); when(registry.resolveTransferService(any())).thenReturn(transferService); when(transferService.canHandle(any())).thenReturn(true); when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success())); @@ -426,7 +429,7 @@ void shouldNotChangeState_whenTransferGetsSuspended() { void shouldStartTransferAndTransitionToFailed_whenTransferFails() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(dataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow)); when(registry.resolveTransferService(any())).thenReturn(transferService); when(transferService.canHandle(any())).thenReturn(true); when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.error("an error"))); @@ -443,7 +446,7 @@ void shouldStartTransferAndTransitionToFailed_whenTransferFails() { void shouldStartTransferAndTransitionToReceivedForRetrying_whenTransferFutureIsFailed() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(dataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow)); when(registry.resolveTransferService(any())).thenReturn(transferService); when(transferService.canHandle(any())).thenReturn(true); when(transferService.transfer(any())).thenReturn(failedFuture(new RuntimeException("an error"))); @@ -460,7 +463,7 @@ void shouldStartTransferAndTransitionToReceivedForRetrying_whenTransferFutureIsF void shouldTransitToFailedIfNoTransferServiceCanHandleStarted() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); - when(store.findById(any())).thenReturn(dataFlow); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(dataFlow)); when(registry.resolveTransferService(any())).thenReturn(null); manager.start(); @@ -565,6 +568,32 @@ void shouldStillSuspend_whenDataFlowHasNoSource() { } } + @Nested + class RestartFlows { + + @Test + void shouldRestartFlows() { + var dataFlow = dataFlowBuilder().state(STARTED.code()).build(); + var anotherDataFlow = dataFlowBuilder().state(STARTED.code()).build(); + when(store.nextNotLeased(anyInt(), any(Criterion[].class))) + .thenReturn(List.of(dataFlow)).thenReturn(List.of(anotherDataFlow)).thenReturn(emptyList()); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.canHandle(any())).thenReturn(true); + when(transferService.transfer(any())).thenReturn(new CompletableFuture<>()); + + var result = manager.restartFlows(); + + assertThat(result).isSucceeded(); + await().untilAsserted(() -> { + verify(transferService, times(2)).transfer(isA(DataFlowStartMessage.class)); + verify(store, times(2)).save(argThat(it -> it.getState() == STARTED.code())); + var captor = ArgumentCaptor.forClass(Criterion[].class); + verify(store, atLeast(1)).nextNotLeased(anyInt(), captor.capture()); + assertThat(captor.getValue()).contains(new Criterion("transferType.flowType", "=", "PUSH")); + }); + } + } + private DataFlow.Builder dataFlowBuilder() { return DataFlow.Builder.newInstance() .source(DataAddress.Builder.newInstance().type("source").build()) diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java index 2d4af834bd8..230ce63746f 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java @@ -195,6 +195,25 @@ void shouldReturnSinkTypesFromFactories() { } + @Nested + class CloseAll { + + @Test + void shouldCloseAllTheOngoingDataFlows() throws Exception { + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.supportedType()).thenReturn("destination"); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(new CompletableFuture<>()); + + service.transfer(dataFlow("source", "destination").toRequest()); + + service.closeAll(); + + verify(source).close(); + } + } + private DataFlow dataFlow(String sourceType, String destinationType) { return DataFlow.Builder.newInstance() .id("1") diff --git a/extensions/common/sql/sql-core/src/main/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslator.java b/extensions/common/sql/sql-core/src/main/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslator.java index c79f10ecacf..41220fadba4 100644 --- a/extensions/common/sql/sql-core/src/main/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslator.java +++ b/extensions/common/sql/sql-core/src/main/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslator.java @@ -20,6 +20,7 @@ import static org.eclipse.edc.spi.query.CriterionOperatorRegistry.EQUAL; import static org.eclipse.edc.spi.query.CriterionOperatorRegistry.ILIKE; import static org.eclipse.edc.spi.query.CriterionOperatorRegistry.IN; +import static org.eclipse.edc.spi.query.CriterionOperatorRegistry.LESS_THAN; import static org.eclipse.edc.spi.query.CriterionOperatorRegistry.LIKE; import static org.eclipse.edc.spi.query.CriterionOperatorRegistry.NOT_EQUAL; @@ -37,6 +38,7 @@ public SqlOperator translate(String operator) { case ILIKE -> new SqlOperator("ilike", String.class); case IN -> new SqlOperator("in", Collection.class); case CONTAINS -> new SqlOperator("??", Object.class); + case LESS_THAN -> new SqlOperator("<", Object.class); default -> null; }; } diff --git a/extensions/common/sql/sql-core/src/test/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslatorTest.java b/extensions/common/sql/sql-core/src/test/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslatorTest.java index 22b552c6ede..2a736145b5a 100644 --- a/extensions/common/sql/sql-core/src/test/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslatorTest.java +++ b/extensions/common/sql/sql-core/src/test/java/org/eclipse/edc/sql/translation/PostgresqlOperatorTranslatorTest.java @@ -72,6 +72,14 @@ void shouldTranslate_contains() { assertThat(operator.rightOperandClass()).isEqualTo(Object.class); } + @Test + void shouldTranslate_lessThan() { + var operator = translator.translate("<"); + + assertThat(operator.representation()).isEqualTo("<"); + assertThat(operator.rightOperandClass()).isEqualTo(Object.class); + } + @Test void shouldReturnNull_whenOperatorNotSupported() { var operator = translator.translate("not-supported"); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java index 2dc0d94e662..816d64e6a1d 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java @@ -14,7 +14,7 @@ package org.eclipse.edc.connector.dataplane.store.sql.schema; -import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.DataPlaneMapping; +import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.DataFlowMapping; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.sql.translation.SqlOperatorTranslator; import org.eclipse.edc.sql.translation.SqlQueryStatement; @@ -74,7 +74,7 @@ public String getSelectTemplate() { @Override public SqlQueryStatement createQuery(QuerySpec querySpec) { - return new SqlQueryStatement(getSelectTemplate(), querySpec, new DataPlaneMapping(this), operatorTranslator); + return new SqlQueryStatement(getSelectTemplate(), querySpec, new DataFlowMapping(this), operatorTranslator); } @Override diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java similarity index 64% rename from extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java rename to extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java index f923f59e9cd..f3fe2fd98ef 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java @@ -17,14 +17,23 @@ import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.store.sql.schema.DataPlaneStatements; import org.eclipse.edc.sql.lease.StatefulEntityMapping; +import org.eclipse.edc.sql.translation.TranslationMapping; /** * Maps fields of a {@link DataFlow} onto the * corresponding SQL schema (= column names) enabling access through Postgres JSON operators where applicable */ -public class DataPlaneMapping extends StatefulEntityMapping { +public class DataFlowMapping extends StatefulEntityMapping { - public DataPlaneMapping(DataPlaneStatements statements) { + public DataFlowMapping(DataPlaneStatements statements) { super(statements); + add("transferType", new TransferTypeMapping(statements)); + } + + private static class TransferTypeMapping extends TranslationMapping { + + TransferTypeMapping(DataPlaneStatements statements) { + add("flowType", statements.getFlowTypeColumn()); + } } } diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/query/CriterionOperatorRegistry.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/query/CriterionOperatorRegistry.java index af6b54e45c5..624275fc22b 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/query/CriterionOperatorRegistry.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/query/CriterionOperatorRegistry.java @@ -27,6 +27,7 @@ public interface CriterionOperatorRegistry { String LIKE = "like"; String ILIKE = "ilike"; String CONTAINS = "contains"; + String LESS_THAN = "<"; /** * Register an operator with the related operator predicate. diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java index 254797d03a7..a5adda87231 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java @@ -73,4 +73,11 @@ default StatusResult terminate(String dataFlowId) { * @return success if data flow is terminated, failed otherwise. */ StatusResult terminate(String dataFlowId, @Nullable String reason); + + /** + * Restart flows + * + * @return success if succeeded, failure otherwise. + */ + StatusResult restartFlows(); } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java index 7bdb550d3f4..f095de8b728 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java @@ -60,4 +60,11 @@ public interface TransferService { */ StreamResult terminate(DataFlow dataFlow); + + /** + * Close all the ongoing DataFlows + */ + void closeAll(); + + } diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java index a7e0006b51d..98945eb6a11 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java @@ -20,9 +20,9 @@ import org.eclipse.edc.spi.entity.Entity; import org.eclipse.edc.spi.entity.MutableEntity; import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.result.StoreFailure; import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -37,10 +37,13 @@ import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; +import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.STARTED; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.result.StoreFailure.Reason.ALREADY_LEASED; import static org.eclipse.edc.spi.result.StoreFailure.Reason.NOT_FOUND; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH; import static org.hamcrest.Matchers.hasSize; public abstract class DataPlaneStoreTestBase { @@ -65,17 +68,6 @@ protected Duration getTestTimeout() { return Duration.ofMillis(500); } - private DataFlow createDataFlow(String id, DataFlowStates state) { - return DataFlow.Builder.newInstance() - .id(id) - .callbackAddress(URI.create("http://any")) - .source(DataAddress.Builder.newInstance().type("src-type").build()) - .destination(DataAddress.Builder.newInstance().type("dest-type").build()) - .state(state.code()) - .transferType(new TransferType("transferType", FlowType.PUSH)) - .build(); - } - @Nested class Create { @@ -197,6 +189,19 @@ void shouldLeaseOrderByStateTimestamp() { assertThat(elements).hasSize(10).extracting(DataFlow::getStateTimestamp).isSorted(); } + @Test + void shouldFilterByFlowType() { + var pull = createDataFlowBuilder().transferType(new TransferType("Any", PULL)).build(); + var push = createDataFlowBuilder().transferType(new TransferType("Any", PUSH)).build(); + getStore().save(pull); + getStore().save(push); + + var leased = getStore().nextNotLeased(2, new Criterion("transferType.flowType", "=", "PUSH")); + + assertThat(leased).hasSize(1).first().extracting(DataFlow::getId) + .isEqualTo(push.getId()); + } + private void delayByTenMillis(StatefulEntity t) { try { Thread.sleep(10); @@ -238,4 +243,21 @@ void shouldReturnAlreadyLeased_whenEntityIsAlreadyLeased() { assertThat(result).isFailed().extracting(StoreFailure::getReason).isEqualTo(ALREADY_LEASED); } } + + private DataFlow createDataFlow(String id, DataFlowStates state) { + return createDataFlowBuilder() + .id(id).state(state.code()) + .build(); + } + + private DataFlow.Builder createDataFlowBuilder() { + return DataFlow.Builder.newInstance() + .id(UUID.randomUUID().toString()) + .callbackAddress(URI.create("http://any")) + .source(DataAddress.Builder.newInstance().type("src-type").build()) + .destination(DataAddress.Builder.newInstance().type("dest-type").build()) + .state(STARTED.code()) + .transferType(new TransferType("transferType", PUSH)); + } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index 47eaf4184a0..b057f1bfdfe 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -27,12 +27,16 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; +import org.eclipse.edc.junit.extensions.EmbeddedRuntime; import org.eclipse.edc.junit.extensions.RuntimeExtension; import org.eclipse.edc.junit.extensions.RuntimePerClassExtension; import org.eclipse.edc.spi.security.Vault; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; @@ -59,6 +63,7 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; import static org.eclipse.edc.util.io.Ports.getFreePort; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.model.HttpRequest.request; @@ -69,6 +74,8 @@ public class TransferStreamingEndToEndTest { + private static final DockerImageName KAFKA_CONTAINER_VERSION = DockerImageName.parse("confluentinc/cp-kafka:7.7.1"); + @Nested @EndToEndTest class InMemory extends Tests { @@ -89,23 +96,77 @@ class InMemory extends Tests { protected Vault getDataplaneVault() { return PROVIDER_DATA_PLANE.getService(Vault.class); } + + } + + @Nested + @PostgresqlIntegrationTest + class Postgres extends Tests { + + @Order(0) + @RegisterExtension + static final BeforeAllCallback CREATE_DATABASES = context -> { + createDatabase(CONSUMER.getName()); + createDatabase(PROVIDER.getName()); + }; + + @RegisterExtension + static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( + Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + + @RegisterExtension + static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( + Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + + private static final EmbeddedRuntime PROVIDER_DATA_PLANE_RUNTIME = Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration()); + + @RegisterExtension + static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension(PROVIDER_DATA_PLANE_RUNTIME); + + @Override + protected Vault getDataplaneVault() { + return PROVIDER_DATA_PLANE.getService(Vault.class); + } + + @Test + void shouldResumeTransfer_whenDataPlaneRestarts() { + try (var consumer = createKafkaConsumer()) { + consumer.subscribe(List.of(sinkTopic)); + + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, kafkaSourceProperty()); + + var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); + assertMessagesAreSentTo(consumer); + + PROVIDER_DATA_PLANE_RUNTIME.shutdown(); + + assertNoMoreMessagesAreSentTo(consumer); + + PROVIDER_DATA_PLANE_RUNTIME.boot(false); + + awaitTransferToBeInState(transferProcessId, STARTED); + assertMessagesAreSentTo(consumer); + } + } } @Testcontainers abstract static class Tests extends TransferEndToEndTestBase { @Container - private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")); + private static final KafkaContainer KAFKA = new KafkaContainer(KAFKA_CONTAINER_VERSION); - private static final String SOURCE_TOPIC = "source_topic"; - private static final String SINK_TOPIC = "sink_topic"; + private final String sourceTopic = "source_topic_" + UUID.randomUUID(); + protected final String sinkTopic = "sink_topic_" + UUID.randomUUID(); @BeforeEach void setUp() { var producer = createKafkaProducer(); newSingleThreadScheduledExecutor().scheduleAtFixedRate( - () -> producer.send(new ProducerRecord<>(SOURCE_TOPIC, sampleMessage())), + () -> producer.send(new ProducerRecord<>(sourceTopic, sampleMessage())), 0, 100, MILLISECONDS); } @@ -148,7 +209,7 @@ void kafkaToHttpTransfer() { @Test void kafkaToKafkaTransfer() { try (var consumer = createKafkaConsumer()) { - consumer.subscribe(List.of(SINK_TOPIC)); + consumer.subscribe(List.of(sinkTopic)); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty()); @@ -165,7 +226,7 @@ void kafkaToKafkaTransfer() { @Test void shouldSuspendAndResumeTransfer() { try (var consumer = createKafkaConsumer()) { - consumer.subscribe(List.of(SINK_TOPIC)); + consumer.subscribe(List.of(sinkTopic)); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, kafkaSourceProperty()); @@ -184,18 +245,18 @@ void shouldSuspendAndResumeTransfer() { } } - private void assertMessagesAreSentTo(Consumer consumer) { + protected void assertMessagesAreSentTo(Consumer consumer) { await().atMost(timeout).untilAsserted(() -> { var records = consumer.poll(ZERO); assertThat(records.isEmpty()).isFalse(); - records.records(SINK_TOPIC).forEach(record -> assertThat(record.value()).isEqualTo(sampleMessage())); + records.records(sinkTopic).forEach(record -> assertThat(record.value()).isEqualTo(sampleMessage())); }); } - private void assertNoMoreMessagesAreSentTo(Consumer consumer) { + protected void assertNoMoreMessagesAreSentTo(Consumer consumer) { consumer.poll(ZERO); await().pollDelay(5, SECONDS).atMost(timeout).untilAsserted(() -> { - var recordsFound = consumer.poll(Duration.ofSeconds(1)).records(SINK_TOPIC); + var recordsFound = consumer.poll(Duration.ofSeconds(1)).records(sinkTopic); assertThat(recordsFound).isEmpty(); }); } @@ -213,29 +274,29 @@ private JsonObject httpSink(Integer port, String path) { } @NotNull - private JsonObject kafkaSink() { + protected JsonObject kafkaSink() { return Json.createObjectBuilder() .add(TYPE, EDC_NAMESPACE + "DataAddress") .add(EDC_NAMESPACE + "type", "Kafka") .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() - .add(EDC_NAMESPACE + "topic", SINK_TOPIC) + .add(EDC_NAMESPACE + "topic", sinkTopic) .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), KAFKA.getBootstrapServers()) .build()) .build(); } @NotNull - private Map kafkaSourceProperty() { + protected Map kafkaSourceProperty() { return Map.of( "name", "data", "type", "Kafka", - "topic", SOURCE_TOPIC, + "topic", sourceTopic, kafkaProperty("bootstrap.servers"), KAFKA.getBootstrapServers(), kafkaProperty("max.poll.records"), "100" ); } - private Consumer createKafkaConsumer() { + protected Consumer createKafkaConsumer() { var props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner");