Skip to content

Commit

Permalink
Fix: dataplaneId null for older TransferProcesses (#1534)
Browse files Browse the repository at this point in the history
* Fix transfer dataplane signaling when transfer process dp_id is null.

* Update dependencies.

* Fix transfer signaling client.

* Add DataPlaneSignalingFlowControllerTest class.

* Add DataPlaneSignalingFlowControllerTest class.
  • Loading branch information
bmg13 authored Sep 11, 2024
1 parent 7850e36 commit 7a0c2a9
Show file tree
Hide file tree
Showing 9 changed files with 724 additions and 1 deletion.
2 changes: 1 addition & 1 deletion edc-controlplane/edc-controlplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ dependencies {
runtimeOnly(project(":edc-extensions:edr:edr-api-v2"))
runtimeOnly(project(":edc-extensions:edr:edr-callback"))
runtimeOnly(project(":edc-extensions:tokenrefresh-handler"))
runtimeOnly(project(":edc-extensions:transfer-dataplane-signaling"))
runtimeOnly(libs.edc.core.edrstore)
runtimeOnly(libs.edc.edr.store.receiver)
runtimeOnly(libs.edc.dpf.transfer.signaling)
runtimeOnly(libs.edc.controlplane.callback.staticendpoint)

// needed for BPN validation
Expand Down
27 changes: 27 additions & 0 deletions edc-extensions/transfer-dataplane-signaling/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
implementation(libs.edc.spi.web)
implementation(libs.edc.spi.dataplane.selector)
implementation(libs.edc.spi.transfer)
implementation(libs.edc.dpf.signaling.client)

testImplementation(libs.edc.junit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.transfer.signaling;

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Set;
import java.util.UUID;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

/**
* Implementation of {@link DataFlowController} that is compliant with the data plane signaling.
* <p>
* It handles all the transfer process where the transferType met the criteria defined in the format mapping of the
* signaling spec
*
* @see <a href="https://github.com/eclipse-edc/Connector/blob/main/docs/developer/data-plane-signaling/data-plane-signaling.md">Data plane signaling</a>
* @see <a href="https://github.com/eclipse-edc/Connector/blob/main/docs/developer/data-plane-signaling/data-plane-signaling-mapping.md">Data plane signaling transfer type mapping</a>
*/
public class DataPlaneSignalingFlowController implements DataFlowController {

private final ControlApiUrl callbackUrl;
private final DataPlaneSelectorService selectorClient;
private final DataPlaneClientFactory clientFactory;
private final DataFlowPropertiesProvider propertiesProvider;
private final String selectionStrategy;
private final FlowTypeExtractor flowTypeExtractor;

public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient,
DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory,
String selectionStrategy, FlowTypeExtractor flowTypeExtractor) {
this.callbackUrl = callbackUrl;
this.selectorClient = selectorClient;
this.propertiesProvider = propertiesProvider;
this.clientFactory = clientFactory;
this.selectionStrategy = selectionStrategy;
this.flowTypeExtractor = flowTypeExtractor;
}

@Override
public boolean canHandle(TransferProcess transferProcess) {
return flowTypeExtractor.extract(transferProcess.getTransferType()).succeeded();
}

@Override
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {
var flowType = flowTypeExtractor.extract(transferProcess.getTransferType());
if (flowType.failed()) {
return StatusResult.failure(FATAL_ERROR, flowType.getFailureDetail());
}

var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy);
if (propertiesResult.failed()) {
return StatusResult.failure(FATAL_ERROR, propertiesResult.getFailureDetail());
}

var selection = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getTransferType(), selectionStrategy);
if (!selection.succeeded()) {
return StatusResult.failure(FATAL_ERROR, selection.getFailureDetail());
}

var dataFlowRequest = DataFlowStartMessage.Builder.newInstance()
.id(UUID.randomUUID().toString())
.processId(transferProcess.getId())
.sourceDataAddress(transferProcess.getContentDataAddress())
.destinationDataAddress(transferProcess.getDataDestination())
.participantId(policy.getAssignee())
.agreementId(transferProcess.getContractId())
.assetId(transferProcess.getAssetId())
.flowType(flowType.getContent())
.callbackAddress(callbackUrl != null ? callbackUrl.get() : null)
.properties(propertiesResult.getContent())
.build();

var dataPlaneInstance = selection.getContent();
return clientFactory.createClient(dataPlaneInstance)
.start(dataFlowRequest)
.map(it -> DataFlowResponse.Builder.newInstance()
.dataAddress(it.getDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build()
);
}

@Override
public StatusResult<Void> suspend(TransferProcess transferProcess) {
return getClientForDataplane(transferProcess.getDataPlaneId())
.map(client -> client.suspend(transferProcess.getId()))
.orElse(f -> {
var message = "Failed to select the data plane for suspending the transfer process %s. %s"
.formatted(transferProcess.getId(), f.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, message);
});
}

@Override
public StatusResult<Void> terminate(TransferProcess transferProcess) {
var dataPlaneId = transferProcess.getDataPlaneId();
if (dataPlaneId == null) {
return StatusResult.success();
}

return getClientForDataplane(dataPlaneId)
.map(client -> client.terminate(transferProcess.getId()))
.orElse(f -> {
var message = "Failed to select the data plane for terminating the transfer process %s. %s"
.formatted(transferProcess.getId(), f.getFailureDetail());
return StatusResult.failure(FATAL_ERROR, message);
});
}

@Override
public Set<String> transferTypesFor(Asset asset) {
var result = selectorClient.getAll();
if (result.failed()) {
return emptySet();
}

return result.getContent().stream()
.filter(it -> it.getAllowedSourceTypes().contains(asset.getDataAddress().getType()))
.map(DataPlaneInstance::getAllowedTransferTypes)
.flatMap(Collection::stream)
.collect(toSet());
}

private StatusResult<DataPlaneClient> getClientForDataplane(String id) {
return selectorClient.findById(id)
.map(clientFactory::createClient)
.map(StatusResult::success)
.orElse(f -> StatusResult.failure(FATAL_ERROR, "No data-plane found with id %s. %s".formatted(id, f.getFailureDetail())));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.transfer.signaling;

import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;

import java.util.Map;

import static org.eclipse.tractusx.edc.dataplane.transfer.signaling.TransferDataPlaneSignalingExtension.NAME;

@Extension(NAME)
public class TransferDataPlaneSignalingExtension implements ServiceExtension {

protected static final String NAME = "Transfer Data Plane Signaling Extension";

private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random";

@Setting(value = "Defines strategy for Data Plane instance selection in case Data Plane is not embedded in current runtime", defaultValue = DEFAULT_DATAPLANE_SELECTOR_STRATEGY)
private static final String DPF_SELECTOR_STRATEGY = "edc.dataplane.client.selector.strategy";

@Inject
private DataFlowManager dataFlowManager;

@Inject(required = false)
private ControlApiUrl callbackUrl;

@Inject
private DataPlaneSelectorService selectorService;

@Inject
private DataPlaneClientFactory clientFactory;

@Inject(required = false)
private DataFlowPropertiesProvider propertiesProvider;

@Inject
private FlowTypeExtractor flowTypeExtractor;

@Override
public void initialize(ServiceExtensionContext context) {
var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY);
var controller = new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(),
clientFactory, selectionStrategy, flowTypeExtractor);
dataFlowManager.register(controller);
}

private DataFlowPropertiesProvider getPropertiesProvider() {
return propertiesProvider == null ? (tp, p) -> StatusResult.success(Map.of()) : propertiesProvider;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#################################################################################
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
#################################################################################

org.eclipse.tractusx.edc.dataplane.transfer.signaling.TransferDataPlaneSignalingExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.transfer.signaling;

import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class TransferDataPlaneSignalingExtensionTest {

private final DataFlowManager dataFlowManager = mock();

@BeforeEach
void setup(ServiceExtensionContext context) {
context.registerService(DataFlowManager.class, dataFlowManager);
}

@Test
void initialize(ServiceExtensionContext context, TransferDataPlaneSignalingExtension extension) {
extension.initialize(context);
verify(dataFlowManager).register(isA(DataPlaneSignalingFlowController.class));
}
}
Loading

0 comments on commit 7a0c2a9

Please sign in to comment.