Skip to content

Commit

Permalink
Merge pull request #230 from iExecBlockchainComputing/ipfs
Browse files Browse the repository at this point in the history
Manage upload to ipfs
  • Loading branch information
Ugo Plouviez authored Mar 11, 2019
2 parents 301656c + 2f5c49b commit c5ee0c6
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 80 deletions.
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ repositories {
maven {
url "https://packagecloud.io/priv/${token}/iexec/common/maven2"
}
maven { url "https://jitpack.io" }
}

configurations {
Expand Down Expand Up @@ -87,6 +88,10 @@ dependencies {

// expiring map
compile "net.jodah:expiringmap:0.5.8"

// ipfs
compile "com.github.ipfs:java-ipfs-http-client:1.2.3"
compile "com.github.multiformats:java-multiaddr:1.3.1"
}

jacoco {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
try {
receipt = getHubContract(web3jService.getWritingContractGasProvider()).finalize(chainTaskIdBytes, resultUriBytes).send();
} catch (Exception e) {
log.error("Failed finalize [chainTaskId:{}, resultUri:{}, error:{}]]", chainTaskId, resultUri, e.getMessage());
log.error("Failed finalize [chainTaskId:{}, resultLink:{}, error:{}]]", chainTaskId, resultUri, e.getMessage());
return Optional.empty();
}

Expand All @@ -210,7 +210,7 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
return Optional.empty();
}

log.info("Finalized [chainTaskId:{}, resultUri:{}, gasUsed:{}]", chainTaskId, resultUri, receipt.getGasUsed());
log.info("Finalized [chainTaskId:{}, resultLink:{}, gasUsed:{}]", chainTaskId, resultUri, receipt.getGasUsed());
ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(eventsList.get(0).log, chainTaskId);

return Optional.of(chainReceipt);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/iexec/core/replicate/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class Replicate {

private List<ReplicateStatusChange> statusChangeList;
private String walletAddress;
private String resultUri;
private String resultLink;
private String chainTaskId;
private String contributionHash;
private int credibility;
Expand Down Expand Up @@ -49,7 +49,6 @@ private ReplicateStatusChange getLatestStatusChange() {
return this.getStatusChangeList().get(this.getStatusChangeList().size() - 1);
}


public boolean updateStatus(ReplicateStatus newStatus, ReplicateStatusModifier modifier) {
return statusChangeList.add(new ReplicateStatusChange(newStatus, modifier));
}
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/iexec/core/replicate/ReplicatesController.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.iexec.core.replicate;

import com.iexec.common.chain.ChainReceipt;
import com.iexec.common.replicate.ReplicateDetails;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusModifier;
import com.iexec.core.security.JwtTokenProvider;
Expand All @@ -27,18 +28,19 @@ public ResponseEntity<String> updateReplicateStatus(
@PathVariable(name = "chainTaskId") String chainTaskId,
@RequestParam(name = "replicateStatus") ReplicateStatus replicateStatus,
@RequestHeader("Authorization") String bearerToken,
@RequestBody ChainReceipt chainReceipt) {
@RequestBody ReplicateDetails details) {

String walletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);

if (walletAddress.isEmpty()) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build();
}

log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}, blockNumber:{}]",
chainTaskId, replicateStatus, walletAddress, chainReceipt.getBlockNumber());
log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}]",
chainTaskId, replicateStatus, walletAddress);

replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER, chainReceipt);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER,
details.getChainReceipt(), details.getResultLink());
return ResponseEntity.ok().build();
}
}
50 changes: 39 additions & 11 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.iexec.core.replicate;

import com.iexec.common.chain.ChainContribution;
import com.iexec.common.chain.ChainContributionStatus;
import com.iexec.common.chain.ChainReceipt;
import com.iexec.common.chain.*;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusChange;
import com.iexec.common.replicate.ReplicateStatusModifier;
import com.iexec.common.result.eip712.Eip712Challenge;
import com.iexec.common.result.eip712.Eip712ChallengeUtils;
import com.iexec.common.utils.BytesUtils;
import com.iexec.core.chain.ChainConfig;
import com.iexec.core.chain.CredentialsService;
import com.iexec.core.chain.IexecHubService;
Expand Down Expand Up @@ -148,9 +147,7 @@ public int getNbOffChainReplicatesWithStatus(String chainTaskId, ReplicateStatus
private int getNbReplicatesWithGivenStatusJustBeforeWorkerLost(String chainTaskId, ReplicateStatus status) {
int nbReplicates = 0;
for (Replicate replicate : getReplicates(chainTaskId)) {
int size = replicate.getStatusChangeList().size();
if (size >= 2 && replicate.getStatusChangeList().get(size - 1).getStatus().equals(WORKER_LOST)
&& replicate.getStatusChangeList().get(size - 2).getStatus().equals(status)) {
if (isStatusBeforeWorkerLostEqualsTo(replicate, status)) {
nbReplicates++;
}
}
Expand All @@ -170,6 +167,27 @@ public Optional<Replicate> getRandomReplicateWithRevealStatus(String chainTaskId
return Optional.empty();
}

public Optional<Replicate> getReplicateWithResultUploadedStatus(String chainTaskId) {
for (Replicate replicate : getReplicates(chainTaskId)) {

boolean isStatusResultUploaded = replicate.getCurrentStatus().equals(RESULT_UPLOADED);
boolean isStatusResultUploadedBeforeWorkerLost = isStatusBeforeWorkerLostEqualsTo(replicate, RESULT_UPLOADED);

if (isStatusResultUploaded || isStatusResultUploadedBeforeWorkerLost) {
return Optional.of(replicate);
}
}

return Optional.empty();
}

private boolean isStatusBeforeWorkerLostEqualsTo(Replicate replicate, ReplicateStatus status) {
int size = replicate.getStatusChangeList().size();
return size >= 2
&& replicate.getStatusChangeList().get(size - 1).getStatus().equals(WORKER_LOST)
&& replicate.getStatusChangeList().get(size - 2).getStatus().equals(status);
}

public boolean moreReplicatesNeeded(String chainTaskId, int nbWorkersNeeded, long maxExecutionTime) {
int nbValidReplicates = 0;
for (Replicate replicate : getReplicates(chainTaskId)) {
Expand All @@ -189,20 +207,22 @@ public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
ReplicateStatusModifier modifier) {
updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null);
updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null, "");
}

// TODO: this method needs to be refactored !
// in case the task has been modified between reading and writing it, it is retried up to 100 times
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
ReplicateStatusModifier modifier,
ChainReceipt chainReceipt) {
ChainReceipt chainReceipt,
String resultLink) {

if (newStatus == ReplicateStatus.RESULT_UPLOADED && !hasResultBeenUploaded(chainTaskId)) {
log.error("requested updateResplicateStatus to RESULT_UPLOADED when result has not been"
+ " uploaded to result repository yet [chainTaskId:{}, ReplicateAddress:{}]",
+ " uploaded to result repository yet [chainTaskId:{}, ReplicateAddress:{}]",
chainTaskId, walletAddress);
return;
}
Expand Down Expand Up @@ -261,6 +281,9 @@ public void updateReplicateStatus(String chainTaskId,
chainReceipt = null;
}

if (newStatus.equals(RESULT_UPLOADED)) {
replicate.setResultLink(resultLink);
}
replicate.updateStatus(newStatus, modifier, chainReceipt);
replicatesRepository.save(optionalReplicates.get());

Expand Down Expand Up @@ -347,9 +370,14 @@ private boolean isTaskStatusFailOnChain(String chainTaskId, String walletAddress
}

public boolean hasResultBeenUploaded(String chainTaskId) {
// currently no check in case of IPFS
if(iexecHubService.isPublicResult(chainTaskId, 0)){
return true;
}

RestTemplate restTemplate = new RestTemplate();
String resultChallengeURI = resultRepoConfig.getResultRepositoryURL()
+ "/results/challenge?chainId={id}";
+ "/results/challenge?chainId={id}";

// get the eip712 challenge
Eip712Challenge eip712Challenge = restTemplate.getForObject(resultChallengeURI,
Expand All @@ -369,7 +397,7 @@ public boolean hasResultBeenUploaded(String chainTaskId) {

String resultURI = resultRepoConfig.getResultRepositoryURL() + "/results/{chainTaskId}";

// HEAD resultRepoURL/resuts/chainTaskId
// HEAD resultRepoURL/results/chainTaskId
return restTemplate.exchange(resultURI, HttpMethod.HEAD, entity, String.class, chainTaskId)
.getStatusCode()
.is2xxSuccessful();
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/iexec/core/result/ResultController.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ResponseEntity<String> addResult(
return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build();
}

String filename = resultService.addResult(
String resultLink = resultService.addResult(
Result.builder()
.chainTaskId(model.getChainTaskId())
.image(model.getImage())
Expand All @@ -52,16 +52,16 @@ public ResponseEntity<String> addResult(
.build(),
model.getZip());

if (filename.isEmpty()) {
if (resultLink.isEmpty()) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST.value()).build();
}

log.info("Result uploaded successfully [chainTaskId:{}, uploadRequester:{}]",
model.getChainTaskId(), auth.getWalletAddress());
log.info("Result uploaded successfully [chainTaskId:{}, uploadRequester:{}, resultLink:{}]",
model.getChainTaskId(), auth.getWalletAddress(), resultLink);

challengeService.invalidateEip712ChallengeString(auth.getChallenge());

return ok(filename);
return ok(resultLink);
}

@RequestMapping(method = RequestMethod.HEAD, path = "/results/{chainTaskId}")
Expand Down Expand Up @@ -108,7 +108,7 @@ public ResponseEntity<byte[]> getResult(@PathVariable("chainTaskId") String chai
@RequestParam(name = "chainId") Integer chainId) throws IOException {
Authorization auth = authorizationService.getAuthorizationFromToken(token);

boolean isPublicResult = resultService.isPublicResult(chainTaskId, chainId);
boolean isPublicResult = resultService.isPublicResult(chainTaskId);
boolean isAuthorizedOwnerOfResult = auth != null
&& resultService.isOwnerOfResult(chainId, chainTaskId, auth.getWalletAddress())
&& authorizationService.isAuthorizationValid(auth);
Expand Down
75 changes: 48 additions & 27 deletions src/main/java/com/iexec/core/result/ResultService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.iexec.common.chain.ChainTask;
import com.iexec.common.utils.BytesUtils;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.configuration.ResultRepositoryConfiguration;
import com.iexec.core.result.ipfs.IPFSService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
Expand All @@ -19,18 +21,28 @@

@Service
@Slf4j
/**
* Service class to manage all the results. If the result is public, it will be stored on IPFS. If there is a dedicated
* beneficiary, the result will be pushed to mongo.
*/
public class ResultService {

private static final String RESULT_FILENAME_PREFIX = "iexec-result-";
private static final String IPFS_ADDRESS_PREFIX = "/ipfs/";

private GridFsOperations gridOperations;
private IexecHubService iexecHubService;
private IPFSService ipfsService;
private GridFsOperations gridOperations;
private ResultRepositoryConfiguration resultRepositoryConfig;


public ResultService(GridFsOperations gridOperations,
IexecHubService iexecHubService) {
this.gridOperations = gridOperations;
public ResultService(IexecHubService iexecHubService,
IPFSService ipfsService,
GridFsOperations gridOperations,
ResultRepositoryConfiguration resultRepositoryConfig) {
this.iexecHubService = iexecHubService;
this.ipfsService = ipfsService;
this.gridOperations = gridOperations;
this.resultRepositoryConfig = resultRepositoryConfig;
}

static String getResultFilename(String chainTaskId) {
Expand All @@ -57,7 +69,19 @@ boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) {
return true;
}

public boolean isResultInDatabase(String chainTaskId) {
boolean isResultInDatabase(String chainTaskId) {
if(isPublicResult(chainTaskId)){
return isResultInIpfs(chainTaskId);
}
return isResultInMongo(chainTaskId);
}

boolean isResultInIpfs (String chainTaskId) {
return false;
}

boolean isResultInMongo(String chainTaskId) {

Query query = Query.query(Criteria.where("filename").is(getResultFilename(chainTaskId)));
return gridOperations.findOne(query) != null;
}
Expand All @@ -67,10 +91,26 @@ String addResult(Result result, byte[] data) {
return "";
}

if (iexecHubService.isPublicResult(result.getChainTaskId(), 0)) {
return addResultToIPFS(result, data);
} else {
return addResultToMongo(result, data);
}
}

private String addResultToMongo(Result result, byte[] data) {
InputStream inputStream = new ByteArrayInputStream(data);
String resultFileName = getResultFilename(result.getChainTaskId());
gridOperations.store(inputStream, resultFileName, result);
return resultFileName;
return resultRepositoryConfig.getResultRepositoryURL() + "/results/" + result.getChainTaskId();
}

private String addResultToIPFS(Result result, byte[] data) {
return IPFS_ADDRESS_PREFIX + ipfsService.putContent(result.getChainTaskId(), data);
}

public boolean isPublicResult(String chainTaskId) {
return iexecHubService.isPublicResult(chainTaskId, 0);
}

byte[] getResultByChainTaskId(String chainTaskId) throws IOException {
Expand All @@ -88,7 +128,7 @@ byte[] getResultByChainTaskId(String chainTaskId) throws IOException {
* TODO 2: Make possible to call this iexecHubService with a 'chainId' at runtime
*/
boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAddress) {
Optional<String> beneficiary = getBeneficiary(chainTaskId, chainId);
Optional<String> beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId);
if (!beneficiary.isPresent()) {
log.error("Failed to get beneficiary for isOwnerOfResult() method [chainTaskId:{}, downloaderAddress:{}]",
chainTaskId, downloaderAddress);
Expand All @@ -102,23 +142,4 @@ boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAd
}
return true;
}

boolean isPublicResult(String chainTaskId, Integer chainId) {
Optional<String> beneficiary = getBeneficiary(chainTaskId, chainId);
if (!beneficiary.isPresent()) {
log.error("Failed to get beneficiary for isPublicResult() method [chainTaskId:{}]", chainTaskId);
return false;
}
return beneficiary.get().equals(BytesUtils.EMPTY_ADDRESS);
}

private Optional<String> getBeneficiary(String chainTaskId, Integer chainId) {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(chainTaskId);
if (!chainTask.isPresent()) {
return Optional.empty();
}
Optional<ChainDeal> optionalChainDeal = iexecHubService.getChainDeal(chainTask.get().getDealid());
return optionalChainDeal.map(chainDeal -> chainDeal.getBeneficiary().toLowerCase());
}

}
16 changes: 16 additions & 0 deletions src/main/java/com/iexec/core/result/ipfs/IPFSConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.iexec.core.result.ipfs;

import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Getter
public class IPFSConfig {

@Value("${ipfs.host}")
private String host;

@Value("${ipfs.port}")
private String port;
}
Loading

0 comments on commit c5ee0c6

Please sign in to comment.