Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EDC v0.3.0 #52

Merged
merged 5 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractOffer;
import org.eclipse.edc.connector.policy.spi.PolicyDefinition;
import org.eclipse.edc.policy.model.Policy;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ExecutionException;

import static java.lang.String.format;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

/**
* Automated contract negotiation
Expand Down Expand Up @@ -87,6 +89,7 @@ public ClientEndpoint(PolicyService policyService,
@POST
@Path(NEGOTIATE_PATH)
public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("providerId") String providerId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
LOGGER.debug(format("Received a %s POST request", NEGOTIATE_PATH));
Expand All @@ -103,16 +106,22 @@ public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
.build();
}

ContractOffer offer = ContractOffer.Builder.newInstance()
var offer = ContractOffer.Builder.newInstance()
.id(idPolicyPair.getFirst())
.policy(idPolicyPair.getSecond())
.assetId(assetId)
.providerId(providerUrl.toString())
.build();

var contractRequest = ContractRequest.Builder.newInstance()
.contractOffer(offer)
.counterPartyAddress(providerUrl.toString())
.providerId(providerId)
.protocol(DATASPACE_PROTOCOL_HTTP)
.build();
ContractAgreement agreement;

try {
agreement = negotiator.negotiate(providerUrl, offer);
agreement = negotiator.negotiate(contractRequest);
} catch (InterruptedException | ExecutionException negotiationException) {
LOGGER.error(format("Negotiation failed for provider %s and contractOffer %s", providerUrl,
offer.getId()), negotiationException);
Expand Down Expand Up @@ -153,22 +162,19 @@ public Response getDataset(@QueryParam("providerUrl") URL providerUrl,
* Initiate a contract negotiation, acting as a consumer, with a provider
* connector.
*
* @param providerUrl The provider's url.
* @param contractOffer A contract offer to be negotiated with.
* @param contractRequest The contract request to be sent.
* @return An agreementID on success or an error message on error.
*/
@POST
@Path(NEGOTIATE_CONTRACT_PATH)
public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
ContractOffer contractOffer) {
Objects.requireNonNull(providerUrl, "Provider URL must not be null");
Objects.requireNonNull(contractOffer, "ContractOffer must not be null");
public Response negotiateContract(ContractRequest contractRequest) {
Objects.requireNonNull(contractRequest, "ContractOffer must not be null");
try {
var agreement = negotiator.negotiate(providerUrl, contractOffer);
return Response.ok(agreement).build();
var agreement = negotiator.negotiate(contractRequest);
return Response.ok(agreement).build(); // TODO contract request instead of contractoffer: do whole change
} catch (InterruptedException | ExecutionException negotiationException) {
LOGGER.error(format("Negotiation failed for provider %s and contractOffer %s", providerUrl,
contractOffer.getId()), negotiationException);
LOGGER.error(format("Negotiation failed for provider %s and contractOffer %s", contractRequest.getProviderId(),
contractRequest.getContractOffer().getId()), negotiationException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(negotiationException.getMessage())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public PolicyService(CatalogService catalogService, TypeTransformerRegistry tran
* otherwise occupied, and was interrupted.
*/
public Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws InterruptedException {
var catalogFuture = catalogService.request(providerUrl.toString(),
var catalogFuture = catalogService.requestCatalog(
providerUrl.toString(),
DATASPACE_PROTOCOL_HTTP,
QuerySpec.Builder.newInstance()
.filter(List.of(criterion(Asset.PROPERTY_ID, "=", assetId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import de.fraunhofer.iosb.app.client.ClientEndpoint;
import de.fraunhofer.iosb.app.model.configuration.Configuration;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.types.domain.DataAddress;
Expand Down Expand Up @@ -114,21 +113,16 @@ public CompletableFuture<String> initiateTransferProcess(URL providerUrl, String
var dataFuture = new CompletableFuture<String>();
observable.register(dataFuture, agreementId);

var dataRequest = DataRequest.Builder.newInstance()
var transferRequest = TransferRequest.Builder.newInstance()
.id(UUID.randomUUID().toString()) // this is not relevant, thus can be random
.connectorAddress(providerUrl.toString()) // the address of the provider connector
.protocol(DATASPACE_PROTOCOL_HTTP)
.connectorId("consumer")
.assetId(assetId)
.dataDestination(dataSinkAddress)
.managedResources(false) // we do not need any provisioning
.contractId(agreementId)
.build();

var transferRequest = TransferRequest.Builder.newInstance()
.dataRequest(dataRequest)
.build();

var transferProcessStatus = transferProcessManager.initiateConsumerRequest(transferRequest);
if (transferProcessStatus.failed()) {
throw new EdcException(transferProcessStatus.getFailureDetail());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractOffer;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.query.QuerySpec;

import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.lang.String.format;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

/**
* Send contract offer, negotiation status watch
Expand Down Expand Up @@ -65,8 +61,7 @@ public Negotiator(ConsumerContractNegotiationManager consumerNegotiationManager,
/**
* Negotiate a contract agreement using the given contract offer if no agreement exists for this constellation.
*
* @param providerUrl The provider of the data.
* @param contractOffer The object of negotiation.
* @param contractRequest The contract request to be sent.
* @return contractAgreement of the completed negotiation.
* @throws ExecutionException Attempted to retrieve the agreementId but the
* task aborted by throwing an exception. This
Expand All @@ -75,28 +70,16 @@ public Negotiator(ConsumerContractNegotiationManager consumerNegotiationManager,
* @throws InterruptedException Thread for agreementId was waiting, sleeping, or
* otherwise occupied, and was interrupted.
*/
public ContractAgreement negotiate(URL providerUrl, ContractOffer contractOffer)
public ContractAgreement negotiate(ContractRequest contractRequest)
throws InterruptedException, ExecutionException {
var contractRequestData = ContractRequestData.Builder.newInstance()
.connectorId("anonymous")
.counterPartyAddress(providerUrl.toString())
.contractOffer(contractOffer)
.protocol(DATASPACE_PROTOCOL_HTTP)
.build();

var contractRequest = ContractRequest.Builder.newInstance()
.requestData(contractRequestData)
.build();

var previousAgreements = contractNegotiationStore.queryAgreements(QuerySpec.max());
var relevantAgreements = previousAgreements
.filter(agreement -> agreement.getAssetId().equals(contractOffer.getAssetId()))
.filter(agreement -> agreement.getProviderId().equals(contractOffer.getProviderId()))
.filter(agreement -> agreement.getAssetId().equals(contractRequest.getContractOffer().getAssetId()))
.filter(agreement -> agreement.getProviderId().equals(contractRequest.getProviderId()))
.toList();

if (relevantAgreements.size() > 0) { // An agreement exists for this asset & provider
return relevantAgreements.get(0); // Pick first agreement, hope contractNegotiationStore removes invalid
// agreements
if (!relevantAgreements.isEmpty()) {
return relevantAgreements.get(0); // assuming contractNegotiationStore removes invalid agreements
}

var result = consumerNegotiationManager.initiate(contractRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,4 @@ public void deleteAssetAndContracts(String assetId) {
resourceAgent.deleteAsset(assetId);
}

/**
* Removes a contract
*
* @param contractId The contract id
*/
public void deleteContract(String contractId) {
contractHandler.deleteContractDefinition(contractId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.asset.Asset;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static java.lang.String.format;

Expand Down Expand Up @@ -62,22 +62,32 @@ public class ContractHandler {
private final Logger logger;
private final ObjectReader objectReader;

private final Policy defaultPolicy;

/**
* Creates an instance of the ContractHandler class.
*
* @param contractStore Needed to manage EDC contracts.
* @param policyStore Needed to manage EDC policies.
*/
public ContractHandler(ContractDefinitionStore contractStore, PolicyDefinitionStore policyStore) {
Objects.requireNonNull(contractStore);
Objects.requireNonNull(policyStore);
Objects.requireNonNull(contractStore, "ContractDefinitionStore");
Objects.requireNonNull(policyStore, "PolicyDefinitionStore");
this.contractDefinitionStore = contractStore;
this.policyDefinitionStore = policyStore;

defaultPolicy = initializeDefaultPolicy();

configuration = Configuration.getInstance();
logger = Logger.getInstance();
var objectMapper = new ObjectMapper();
objectReader = objectMapper.readerFor(Policy.class);
objectReader = new ObjectMapper().readerFor(Policy.class);
}

/**
* Registers the given assetId to the default contract.
* Registers the given assetId to the default contract with the default access and contract policies.
*
* @param assetId The asset ID
* @return Contract id of contract this assetId was registered to
* @return Contract id of contract this assetId was registered to.
*/
public String registerAssetToDefaultContract(String assetId) {
Objects.requireNonNull(assetId);
Expand All @@ -97,19 +107,16 @@ public void deleteContractsWithAssetId(String assetId) {
.forEach(contract -> contractDefinitionStore.deleteById(contract.getId()));
}

/**
* Deletes the contract definition with the given id. Wraps
* {@link org.eclipse.edc.connector.contract.spi.offer.store.ContractDefinitionStore#deleteById(String)
* ContractDefinitionStore.deleteById()}
*
* @param contractId Contract to be deleted
* @return The removed contract definition or null if the contract definition
* was not found
*/
public StoreResult<ContractDefinition> deleteContractDefinition(String contractId) {
return contractDefinitionStore.deleteById(contractId);

private Policy initializeDefaultPolicy() {
return Policy.Builder.newInstance()
.permission(Permission.Builder.newInstance()
.action(Action.Builder.newInstance().type("USE").build())
.build())
.build();
}


private String createDefaultContract(String assetId) {
contractNumber++;
var accessPolicyId = DEFAULT_ACCESS_POLICY_UID + contractNumber;
Expand All @@ -119,54 +126,19 @@ private String createDefaultContract(String assetId) {
var defaultAccessPolicyPath = configuration.getDefaultAccessPolicyPath();
var defaultContractPolicyPath = configuration.getDefaultContractPolicyPath();

var usePermissionPolicy = Policy.Builder.newInstance()
.permission(Permission.Builder.newInstance()
.action(Action.Builder.newInstance().type("USE").build())
//.target(assetId)
.build())
//.type(PolicyType.CONTRACT)
//.target(assetId)
.build();
var defaultAccessPolicy = getPolicyDefinitionFromFile(defaultAccessPolicyPath).orElse(defaultPolicy);
var defaultContractPolicy = getPolicyDefinitionFromFile(defaultContractPolicyPath).orElse(defaultPolicy);

var defaultAccessPolicyDefinition = PolicyDefinition.Builder.newInstance()
.id(accessPolicyId)
.policy(usePermissionPolicy)
.policy(defaultAccessPolicy.withTarget(assetId))
.build();
var defaultContractPolicyDefinition = PolicyDefinition.Builder.newInstance()
.id(contractPolicyId)
.policy(usePermissionPolicy)
.policy(defaultContractPolicy.withTarget(assetId))
.build();

if (Objects.nonNull(defaultAccessPolicyPath)) {
try {
Policy defaultAccessPolicy = objectReader.readValue(Path.of(defaultAccessPolicyPath).toFile());
defaultAccessPolicyDefinition = PolicyDefinition.Builder.newInstance()
.id(accessPolicyId)
.policy(defaultAccessPolicy.withTarget(assetId))
.build();
} catch (IOException ioException) {
logger.error(
format("Could not find a correct access policy at path %s. Using internal default policy.",
defaultAccessPolicyPath),
ioException);
}
}
policyDefinitionStore.create(defaultAccessPolicyDefinition);

if (Objects.nonNull(defaultContractPolicyPath)) {
try {
Policy defaultContractPolicy = objectReader.readValue(Path.of(defaultContractPolicyPath).toFile());
defaultContractPolicyDefinition = PolicyDefinition.Builder.newInstance()
.id(contractPolicyId)
.policy(defaultContractPolicy.withTarget(assetId))
.build();
} catch (IOException ioException) {
logger.error(
format("Could not find a correct contract policy at path %s. Using internal default policy.",
defaultContractPolicyPath),
ioException);
}
}
policyDefinitionStore.create(defaultContractPolicyDefinition);

var defaultContractDefinition = ContractDefinition.Builder.newInstance()
Expand All @@ -180,4 +152,21 @@ private String createDefaultContract(String assetId) {

return contractDefinitionId;
}

private Optional<Policy> getPolicyDefinitionFromFile(String filePath) {
if (Objects.isNull(filePath)) {
return Optional.empty();
}

try {
Policy filePolicy = objectReader.readValue(Path.of(filePath).toFile());
return Optional.of(filePolicy);
} catch (IOException ioException) {
logger.error(
format("Could not find a valid policy at path %s. Using internal default policy.",
filePath),
ioException);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public String createAsset(String sourceUrl, String name, String contentType, Str
.id(assetId)
.name(name)
.contentType(contentType)
.version(version).build();
assetIndex.create(asset, dataAddress);
.version(version)
.dataAddress(dataAddress)
.build();
assetIndex.create(asset);
return assetId;
}

Expand Down
Loading