Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Termination of pull transfer process from consumer side not success #4688

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,21 @@ private ServiceResult<TransferProcess> suspendedAction(TransferSuspensionMessage
@NotNull
private ServiceResult<TransferProcess> terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) {
if (transferProcess.canBeTerminated()) {
if (transferProcess.getType() == PROVIDER) {
var termination = dataFlowManager.terminate(transferProcess);
if (termination.failed()) {
return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail()));
}
}
observable.invokeForEach(l -> l.preTerminated(transferProcess));
transferProcess.transitionTerminated();
transferProcess.protocolMessageReceived(message.getId());
update(transferProcess);
observable.invokeForEach(l -> l.terminated(transferProcess));
if (transferProcess.getType() == PROVIDER) {
transferProcess.transitionDeprovisioning();
update(transferProcess);
}
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be terminated"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,39 @@ void notifyTerminated_shouldReturnBadRequest_whenCounterPartyUnauthorized() {

}

@Test
void notifyTerminated_providerTransfer_shouldTerminateDataFlowAndTransitionToDeprovisioning() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcessBuilder().state(STARTED.code()).type(PROVIDER).build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());
var result = service.notifyTerminated(message, tokenRepresentation);


assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store, atLeastOnce()).save(argThat(t -> t.getState() == DEPROVISIONING.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
verify(dataFlowManager).terminate(any());
}

@Test
void findById_shouldReturnTransferProcess_whenValidCounterParty() {
var participantAgent = participantAgent();
Expand Down Expand Up @@ -897,12 +930,13 @@ <M extends ProcessRemoteMessage> void notify_shouldStoreReceivedMessageId(Method
when(validationService.validateAgreement(any(ParticipantAgent.class), any())).thenAnswer(i -> Result.success(i.getArgument(1)));
when(validationService.validateRequest(any(ParticipantAgent.class), isA(ContractAgreement.class))).thenReturn(Result.success());
when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());

var result = methodCall.call(service, message, tokenRepresentation());

assertThat(result).isSucceeded();
var captor = ArgumentCaptor.forClass(TransferProcess.class);
verify(store).save(captor.capture());
verify(store, atLeastOnce()).save(captor.capture());
var storedTransferProcess = captor.getValue();
assertThat(storedTransferProcess.getProtocolMessages().isAlreadyReceived(message.getId())).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
* Essentially a wrapper around the management API enabling to test interactions with other participants, eg. catalog, transfer...
*/
public class Participant {

protected String id;
protected String name;
protected Endpoint managementEndpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -49,6 +50,7 @@
import org.mockserver.model.MediaType;

import java.time.Instant;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -243,6 +245,34 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() {
providerDataSource.verify(request("/source").withMethod("GET"));
}

@Test
void terminateByProvider_httpPull_dataTransfer() {
var assetId = createResources();
var startedTransferContext = startTransferProcess(assetId);
var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessage.getKey(), edrMessage.getValue());

providerDataSource.verify(request("/source").withMethod("GET"));
}

@Test
void terminateByConsumer_httpPull_dataTransfer() {
var assetId = createResources();
var startedTransferContext = startTransferProcess(assetId);
var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

CONSUMER.terminateTransfer(startedTransferContext.consumerTransferProcessId);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessage.getKey(), edrMessage.getValue());

providerDataSource.verify(request("/source").withMethod("GET"));
}

@Test
void pullFromHttp_httpProvision() {
var provisionServer = startClientAndServer(PROVIDER.getHttpProvisionerPort());
Expand Down Expand Up @@ -340,6 +370,44 @@ private HttpResponse cacheEdr(HttpRequest request, Map<String, TransferProcessSt
}
}

private String createResources() {
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());

return assetId;
}

private StartedTransferContext startTransferProcess(String assetId) {
var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();
CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED);

return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId);
}

private Map.Entry<DataAddress, String> assertDataIsAccessible(String consumerTransferProcessId) {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

return new AbstractMap.SimpleEntry<>(edr, msg);
}

private void assertDataIsNotAccessible(String consumerTransferProcessId, DataAddress edr, String msg) {
// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));
}

private record StartedTransferContext(String consumerTransferProcessId, String providerTransferProcessId) { }

/**
* Mocked http provisioner
*/
Expand Down Expand Up @@ -379,6 +447,7 @@ public HttpResponse handle(HttpRequest httpRequest) throws Exception {
}
}


@Nested
@EndToEndTest
class InMemory extends Tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void shouldSuspendAndResumeTransfer() {
.withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute();
assertMessagesAreSentTo(consumer);

CONSUMER.suspendTransfer(transferProcessId, "any kind of reason");
CONSUMER.suspendTransfer(transferProcessId, "any reason");
CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED);
assertNoMoreMessagesAreSentTo(consumer);

Expand Down
Loading