Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage upload to ipfs #230

Merged
merged 14 commits into from
Mar 11, 2019
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ repositories {
maven {
url "https://packagecloud.io/priv/${token}/iexec/common/maven2"
}
maven { url "https://jitpack.io" }
}

configurations {
Expand Down Expand Up @@ -87,6 +88,10 @@ dependencies {

// expiring map
compile "net.jodah:expiringmap:0.5.8"

// ipfs
compile "com.github.ipfs:java-ipfs-http-client:1.2.3"
compile "com.github.multiformats:java-multiaddr:1.3.1"
}

jacoco {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
try {
receipt = getHubContract(web3jService.getWritingContractGasProvider()).finalize(chainTaskIdBytes, resultUriBytes).send();
} catch (Exception e) {
log.error("Failed finalize [chainTaskId:{}, resultUri:{}, error:{}]]", chainTaskId, resultUri, e.getMessage());
log.error("Failed finalize [chainTaskId:{}, resultLink:{}, error:{}]]", chainTaskId, resultUri, e.getMessage());
return Optional.empty();
}

Expand All @@ -210,7 +210,7 @@ private Optional<ChainReceipt> sendFinalizeTransaction(String chainTaskId, Strin
return Optional.empty();
}

log.info("Finalized [chainTaskId:{}, resultUri:{}, gasUsed:{}]", chainTaskId, resultUri, receipt.getGasUsed());
log.info("Finalized [chainTaskId:{}, resultLink:{}, gasUsed:{}]", chainTaskId, resultUri, receipt.getGasUsed());
ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(eventsList.get(0).log, chainTaskId);

return Optional.of(chainReceipt);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/iexec/core/replicate/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class Replicate {

private List<ReplicateStatusChange> statusChangeList;
private String walletAddress;
private String resultUri;
private String resultLink;
private String chainTaskId;
private String contributionHash;
private int credibility;
Expand Down Expand Up @@ -49,7 +49,6 @@ private ReplicateStatusChange getLatestStatusChange() {
return this.getStatusChangeList().get(this.getStatusChangeList().size() - 1);
}


public boolean updateStatus(ReplicateStatus newStatus, ReplicateStatusModifier modifier) {
return statusChangeList.add(new ReplicateStatusChange(newStatus, modifier));
}
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/iexec/core/replicate/ReplicatesController.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.iexec.core.replicate;

import com.iexec.common.chain.ChainReceipt;
import com.iexec.common.replicate.ReplicateDetails;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusModifier;
import com.iexec.core.security.JwtTokenProvider;
Expand All @@ -27,18 +28,19 @@ public ResponseEntity<String> updateReplicateStatus(
@PathVariable(name = "chainTaskId") String chainTaskId,
@RequestParam(name = "replicateStatus") ReplicateStatus replicateStatus,
@RequestHeader("Authorization") String bearerToken,
@RequestBody ChainReceipt chainReceipt) {
@RequestBody ReplicateDetails details) {

String walletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);

if (walletAddress.isEmpty()) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build();
}

log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}, blockNumber:{}]",
chainTaskId, replicateStatus, walletAddress, chainReceipt.getBlockNumber());
log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}]",
chainTaskId, replicateStatus, walletAddress);

replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER, chainReceipt);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER,
details.getChainReceipt(), details.getResultLink());
return ResponseEntity.ok().build();
}
}
50 changes: 39 additions & 11 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.iexec.core.replicate;

import com.iexec.common.chain.ChainContribution;
import com.iexec.common.chain.ChainContributionStatus;
import com.iexec.common.chain.ChainReceipt;
import com.iexec.common.chain.*;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusChange;
import com.iexec.common.replicate.ReplicateStatusModifier;
import com.iexec.common.result.eip712.Eip712Challenge;
import com.iexec.common.result.eip712.Eip712ChallengeUtils;
import com.iexec.common.utils.BytesUtils;
import com.iexec.core.chain.ChainConfig;
import com.iexec.core.chain.CredentialsService;
import com.iexec.core.chain.IexecHubService;
Expand Down Expand Up @@ -148,9 +147,7 @@ public int getNbOffChainReplicatesWithStatus(String chainTaskId, ReplicateStatus
private int getNbReplicatesWithGivenStatusJustBeforeWorkerLost(String chainTaskId, ReplicateStatus status) {
int nbReplicates = 0;
for (Replicate replicate : getReplicates(chainTaskId)) {
int size = replicate.getStatusChangeList().size();
if (size >= 2 && replicate.getStatusChangeList().get(size - 1).getStatus().equals(WORKER_LOST)
&& replicate.getStatusChangeList().get(size - 2).getStatus().equals(status)) {
if (isStatusBeforeWorkerLostEqualsTo(replicate, status)) {
nbReplicates++;
}
}
Expand All @@ -170,6 +167,27 @@ public Optional<Replicate> getRandomReplicateWithRevealStatus(String chainTaskId
return Optional.empty();
}

public Optional<Replicate> getReplicateWithResultUploadedStatus(String chainTaskId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have an issue when worker is lost after UPLOADED

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private int getNbReplicatesWithGivenStatusJustBeforeWorkerLost(String chainTaskId, ReplicateStatus status) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch, thanks! I've modified the code.

for (Replicate replicate : getReplicates(chainTaskId)) {

boolean isStatusResultUploaded = replicate.getCurrentStatus().equals(RESULT_UPLOADED);
boolean isStatusResultUploadedBeforeWorkerLost = isStatusBeforeWorkerLostEqualsTo(replicate, RESULT_UPLOADED);

if (isStatusResultUploaded || isStatusResultUploadedBeforeWorkerLost) {
return Optional.of(replicate);
}
}

return Optional.empty();
}

private boolean isStatusBeforeWorkerLostEqualsTo(Replicate replicate, ReplicateStatus status) {
int size = replicate.getStatusChangeList().size();
return size >= 2
&& replicate.getStatusChangeList().get(size - 1).getStatus().equals(WORKER_LOST)
&& replicate.getStatusChangeList().get(size - 2).getStatus().equals(status);
}

public boolean moreReplicatesNeeded(String chainTaskId, int nbWorkersNeeded, long maxExecutionTime) {
int nbValidReplicates = 0;
for (Replicate replicate : getReplicates(chainTaskId)) {
Expand All @@ -189,20 +207,22 @@ public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
ReplicateStatusModifier modifier) {
updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null);
updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null, "");
}

// TODO: this method needs to be refactored !
// in case the task has been modified between reading and writing it, it is retried up to 100 times
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
ReplicateStatusModifier modifier,
ChainReceipt chainReceipt) {
ChainReceipt chainReceipt,
String resultLink) {

if (newStatus == ReplicateStatus.RESULT_UPLOADED && !hasResultBeenUploaded(chainTaskId)) {
log.error("requested updateResplicateStatus to RESULT_UPLOADED when result has not been"
+ " uploaded to result repository yet [chainTaskId:{}, ReplicateAddress:{}]",
+ " uploaded to result repository yet [chainTaskId:{}, ReplicateAddress:{}]",
chainTaskId, walletAddress);
return;
}
Expand Down Expand Up @@ -261,6 +281,9 @@ public void updateReplicateStatus(String chainTaskId,
chainReceipt = null;
}

if (newStatus.equals(RESULT_UPLOADED)) {
replicate.setResultLink(resultLink);
}
replicate.updateStatus(newStatus, modifier, chainReceipt);
replicatesRepository.save(optionalReplicates.get());

Expand Down Expand Up @@ -347,9 +370,14 @@ private boolean isTaskStatusFailOnChain(String chainTaskId, String walletAddress
}

public boolean hasResultBeenUploaded(String chainTaskId) {
// currently no check in case of IPFS
if(iexecHubService.isPublicResult(chainTaskId, 0)){
return true;
}

RestTemplate restTemplate = new RestTemplate();
String resultChallengeURI = resultRepoConfig.getResultRepositoryURL()
+ "/results/challenge?chainId={id}";
+ "/results/challenge?chainId={id}";

// get the eip712 challenge
Eip712Challenge eip712Challenge = restTemplate.getForObject(resultChallengeURI,
Expand All @@ -369,7 +397,7 @@ public boolean hasResultBeenUploaded(String chainTaskId) {

String resultURI = resultRepoConfig.getResultRepositoryURL() + "/results/{chainTaskId}";

// HEAD resultRepoURL/resuts/chainTaskId
// HEAD resultRepoURL/results/chainTaskId
return restTemplate.exchange(resultURI, HttpMethod.HEAD, entity, String.class, chainTaskId)
.getStatusCode()
.is2xxSuccessful();
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/iexec/core/result/ResultController.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ResponseEntity<String> addResult(
return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build();
}

String filename = resultService.addResult(
String resultLink = resultService.addResult(
Result.builder()
.chainTaskId(model.getChainTaskId())
.image(model.getImage())
Expand All @@ -52,16 +52,16 @@ public ResponseEntity<String> addResult(
.build(),
model.getZip());

if (filename.isEmpty()) {
if (resultLink.isEmpty()) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST.value()).build();
}

log.info("Result uploaded successfully [chainTaskId:{}, uploadRequester:{}]",
model.getChainTaskId(), auth.getWalletAddress());
log.info("Result uploaded successfully [chainTaskId:{}, uploadRequester:{}, resultLink:{}]",
model.getChainTaskId(), auth.getWalletAddress(), resultLink);

challengeService.invalidateEip712ChallengeString(auth.getChallenge());

return ok(filename);
return ok(resultLink);
}

@RequestMapping(method = RequestMethod.HEAD, path = "/results/{chainTaskId}")
Expand Down Expand Up @@ -108,7 +108,7 @@ public ResponseEntity<byte[]> getResult(@PathVariable("chainTaskId") String chai
@RequestParam(name = "chainId") Integer chainId) throws IOException {
Authorization auth = authorizationService.getAuthorizationFromToken(token);

boolean isPublicResult = resultService.isPublicResult(chainTaskId, chainId);
boolean isPublicResult = resultService.isPublicResult(chainTaskId);
boolean isAuthorizedOwnerOfResult = auth != null
&& resultService.isOwnerOfResult(chainId, chainTaskId, auth.getWalletAddress())
&& authorizationService.isAuthorizationValid(auth);
Expand Down
75 changes: 48 additions & 27 deletions src/main/java/com/iexec/core/result/ResultService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.iexec.common.chain.ChainTask;
import com.iexec.common.utils.BytesUtils;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.configuration.ResultRepositoryConfiguration;
import com.iexec.core.result.ipfs.IPFSService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
Expand All @@ -19,18 +21,28 @@

@Service
@Slf4j
/**
* Service class to manage all the results. If the result is public, it will be stored on IPFS. If there is a dedicated
* beneficiary, the result will be pushed to mongo.
*/
public class ResultService {

private static final String RESULT_FILENAME_PREFIX = "iexec-result-";
private static final String IPFS_ADDRESS_PREFIX = "/ipfs/";

private GridFsOperations gridOperations;
private IexecHubService iexecHubService;
private IPFSService ipfsService;
private GridFsOperations gridOperations;
private ResultRepositoryConfiguration resultRepositoryConfig;


public ResultService(GridFsOperations gridOperations,
IexecHubService iexecHubService) {
this.gridOperations = gridOperations;
public ResultService(IexecHubService iexecHubService,
IPFSService ipfsService,
GridFsOperations gridOperations,
ResultRepositoryConfiguration resultRepositoryConfig) {
this.iexecHubService = iexecHubService;
this.ipfsService = ipfsService;
this.gridOperations = gridOperations;
this.resultRepositoryConfig = resultRepositoryConfig;
}

static String getResultFilename(String chainTaskId) {
Expand All @@ -57,7 +69,19 @@ boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) {
return true;
}

public boolean isResultInDatabase(String chainTaskId) {
boolean isResultInDatabase(String chainTaskId) {
if(isPublicResult(chainTaskId)){
return isResultInIpfs(chainTaskId);
}
return isResultInMongo(chainTaskId);
}

boolean isResultInIpfs (String chainTaskId) {
return false;
}

boolean isResultInMongo(String chainTaskId) {

Query query = Query.query(Criteria.where("filename").is(getResultFilename(chainTaskId)));
return gridOperations.findOne(query) != null;
}
Expand All @@ -67,10 +91,26 @@ String addResult(Result result, byte[] data) {
return "";
}

if (iexecHubService.isPublicResult(result.getChainTaskId(), 0)) {
return addResultToIPFS(result, data);
} else {
return addResultToMongo(result, data);
}
}

private String addResultToMongo(Result result, byte[] data) {
InputStream inputStream = new ByteArrayInputStream(data);
String resultFileName = getResultFilename(result.getChainTaskId());
gridOperations.store(inputStream, resultFileName, result);
return resultFileName;
return resultRepositoryConfig.getResultRepositoryURL() + "/results/" + result.getChainTaskId();
}

private String addResultToIPFS(Result result, byte[] data) {
return IPFS_ADDRESS_PREFIX + ipfsService.putContent(result.getChainTaskId(), data);
}

public boolean isPublicResult(String chainTaskId) {
return iexecHubService.isPublicResult(chainTaskId, 0);
}

byte[] getResultByChainTaskId(String chainTaskId) throws IOException {
Expand All @@ -88,7 +128,7 @@ byte[] getResultByChainTaskId(String chainTaskId) throws IOException {
* TODO 2: Make possible to call this iexecHubService with a 'chainId' at runtime
*/
boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAddress) {
Optional<String> beneficiary = getBeneficiary(chainTaskId, chainId);
Optional<String> beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId);
if (!beneficiary.isPresent()) {
log.error("Failed to get beneficiary for isOwnerOfResult() method [chainTaskId:{}, downloaderAddress:{}]",
chainTaskId, downloaderAddress);
Expand All @@ -102,23 +142,4 @@ boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAd
}
return true;
}

boolean isPublicResult(String chainTaskId, Integer chainId) {
Optional<String> beneficiary = getBeneficiary(chainTaskId, chainId);
if (!beneficiary.isPresent()) {
log.error("Failed to get beneficiary for isPublicResult() method [chainTaskId:{}]", chainTaskId);
return false;
}
return beneficiary.get().equals(BytesUtils.EMPTY_ADDRESS);
}

private Optional<String> getBeneficiary(String chainTaskId, Integer chainId) {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(chainTaskId);
if (!chainTask.isPresent()) {
return Optional.empty();
}
Optional<ChainDeal> optionalChainDeal = iexecHubService.getChainDeal(chainTask.get().getDealid());
return optionalChainDeal.map(chainDeal -> chainDeal.getBeneficiary().toLowerCase());
}

}
16 changes: 16 additions & 0 deletions src/main/java/com/iexec/core/result/ipfs/IPFSConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.iexec.core.result.ipfs;

import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Getter
public class IPFSConfig {

@Value("${ipfs.host}")
private String host;

@Value("${ipfs.port}")
private String port;
}
Loading