From fa5d430b969bcdba4c35f883f35eb99bd267a405 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Wed, 27 Feb 2019 16:39:43 +0100 Subject: [PATCH 1/8] Use ipfs instead of result repo --- build.gradle | 4 ++ .../com/iexec/core/result/IPFSService.java | 52 +++++++++++++++++++ .../iexec/core/result/ResultController.java | 6 +-- .../com/iexec/core/result/ResultService.java | 33 +++++------- src/main/resources/application.yml | 6 ++- 5 files changed, 77 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/iexec/core/result/IPFSService.java diff --git a/build.gradle b/build.gradle index 44ccc3216..214b960ef 100644 --- a/build.gradle +++ b/build.gradle @@ -30,6 +30,7 @@ repositories { maven { url "https://packagecloud.io/priv/${token}/iexec/common/maven2" } + maven { url "https://jitpack.io" } } configurations { @@ -87,6 +88,9 @@ dependencies { // expiring map compile "net.jodah:expiringmap:0.5.8" + + // ipfs java client + compile 'com.github.ipfs:java-ipfs-http-client:1.2.3' } jacoco { diff --git a/src/main/java/com/iexec/core/result/IPFSService.java b/src/main/java/com/iexec/core/result/IPFSService.java new file mode 100644 index 000000000..b521ee150 --- /dev/null +++ b/src/main/java/com/iexec/core/result/IPFSService.java @@ -0,0 +1,52 @@ +package com.iexec.core.result; + +import io.ipfs.api.IPFS; +import io.ipfs.api.MerkleNode; +import io.ipfs.api.NamedStreamable; +import io.ipfs.multihash.Multihash; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.Optional; + +@Service +@Slf4j +public class IPFSService { + + @Value("${ipfs.host}") + private String host; + + @Value("${ipfs.port}") + private String port; + + private IPFS ipfs; + + public IPFSService() { + String multiAddress = "/ip4/" + host + "/tcp/" + port; + ipfs = new IPFS(multiAddress); + } + + public Optional getContent(String ipfsHash) { + Multihash filePointer = Multihash.fromBase58(ipfsHash); + try { + return Optional.of(ipfs.get(filePointer)); + } catch (IOException e) { + log.error("Error when trying to retrieve ipfs object [hash:{}]", ipfsHash); + } + return Optional.empty(); + } + + public String putContent(String fileName, byte[] fileContent) { + NamedStreamable.ByteArrayWrapper file = new NamedStreamable.ByteArrayWrapper(fileName, fileContent); + try { + MerkleNode pushedContent = ipfs.add(file).get(0); + return pushedContent.hash.toString(); + } catch (IOException e) { + log.error("Error when trying to push ipfs object [fileName:{}]", fileName); + } + + return ""; + } +} diff --git a/src/main/java/com/iexec/core/result/ResultController.java b/src/main/java/com/iexec/core/result/ResultController.java index d80d88c86..fc06e7e53 100644 --- a/src/main/java/com/iexec/core/result/ResultController.java +++ b/src/main/java/com/iexec/core/result/ResultController.java @@ -43,7 +43,7 @@ public ResponseEntity addResult( return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build(); } - String filename = resultService.addResult( + String ipfsHash = resultService.addResult( Result.builder() .chainTaskId(model.getChainTaskId()) .image(model.getImage()) @@ -52,7 +52,7 @@ public ResponseEntity addResult( .build(), model.getZip()); - if (filename.isEmpty()) { + if (ipfsHash.isEmpty()) { return ResponseEntity.status(HttpStatus.BAD_REQUEST.value()).build(); } @@ -61,7 +61,7 @@ public ResponseEntity addResult( challengeService.invalidateEip712ChallengeString(auth.getChallenge()); - return ok(filename); + return ok(ipfsHash); } @RequestMapping(method = RequestMethod.HEAD, path = "/results/{chainTaskId}") diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index da55e4d43..e8f37f3a0 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -8,11 +8,9 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.gridfs.GridFsOperations; import org.springframework.data.mongodb.gridfs.GridFsResource; import org.springframework.stereotype.Service; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -23,14 +21,13 @@ public class ResultService { private static final String RESULT_FILENAME_PREFIX = "iexec-result-"; - private GridFsOperations gridOperations; private IexecHubService iexecHubService; + private IPFSService ipfsService; - - public ResultService(GridFsOperations gridOperations, - IexecHubService iexecHubService) { - this.gridOperations = gridOperations; + public ResultService(IexecHubService iexecHubService, + IPFSService ipfsService) { this.iexecHubService = iexecHubService; + this.ipfsService = ipfsService; } static String getResultFilename(String chainTaskId) { @@ -39,11 +36,11 @@ static String getResultFilename(String chainTaskId) { boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { // check if result has been already uploaded - if (isResultInDatabase(chainTaskId)) { - log.error("Trying to upload result that has been already uploaded [chainTaskId:{}, uploadRequester:{}]", - chainTaskId, walletAddress); - return false; - } + //if (isResultInDatabase(chainTaskId)) { + // log.error("Trying to upload result that has been already uploaded [chainTaskId:{}, uploadRequester:{}]", + // chainTaskId, walletAddress); + // return false; + //} // ContributionStatus of chainTask should be REVEALED boolean isChainContributionStatusSetToRevealed = iexecHubService.doesWishedStatusMatchesOnChainStatus(chainTaskId, @@ -58,19 +55,15 @@ boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { } public boolean isResultInDatabase(String chainTaskId) { + ipfsService.getContent(); + Query query = Query.query(Criteria.where("filename").is(getResultFilename(chainTaskId))); return gridOperations.findOne(query) != null; } String addResult(Result result, byte[] data) { - if (result == null || result.getChainTaskId() == null) { - return ""; - } - - InputStream inputStream = new ByteArrayInputStream(data); - String resultFileName = getResultFilename(result.getChainTaskId()); - gridOperations.store(inputStream, resultFileName, result); - return resultFileName; + return result == null || result.getChainTaskId() == null ? "" : + ipfsService.putContent(getResultFilename(result.getChainTaskId()), data); } byte[] getResultByChainTaskId(String chainTaskId) throws IOException { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index edce73e57..2c0b9e73a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -48,4 +48,8 @@ resultRepository: sms: protocol: ${IEXEC_SMS_PROTOCOL:http} ip: ${IEXEC_SMS_IP:localhost} - port: ${IEXEC_SMS_PORT:5000} \ No newline at end of file + port: ${IEXEC_SMS_PORT:5000} + +ipfs: + host: ${IEXEC_IPFS_HOST:127.0.0.1} + port: ${IEXEC_IPFS_PORT:5001} \ No newline at end of file From 5072dcd6d92318c37b7b3bb0cde18cd492d65ced Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Mon, 4 Mar 2019 11:58:51 +0100 Subject: [PATCH 2/8] Use ipfs instead of result repo --- src/main/java/com/iexec/core/result/IPFSService.java | 4 ++++ src/main/java/com/iexec/core/result/ResultService.java | 7 ------- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/iexec/core/result/IPFSService.java b/src/main/java/com/iexec/core/result/IPFSService.java index b521ee150..a24fdd27e 100644 --- a/src/main/java/com/iexec/core/result/IPFSService.java +++ b/src/main/java/com/iexec/core/result/IPFSService.java @@ -38,6 +38,10 @@ public Optional getContent(String ipfsHash) { return Optional.empty(); } + public boolean doesContentExists(String ipfsHash) { + return getContent(ipfsHash).isPresent(); + } + public String putContent(String fileName, byte[] fileContent) { NamedStreamable.ByteArrayWrapper file = new NamedStreamable.ByteArrayWrapper(fileName, fileContent); try { diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index e8f37f3a0..32217d393 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -35,13 +35,6 @@ static String getResultFilename(String chainTaskId) { } boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { - // check if result has been already uploaded - //if (isResultInDatabase(chainTaskId)) { - // log.error("Trying to upload result that has been already uploaded [chainTaskId:{}, uploadRequester:{}]", - // chainTaskId, walletAddress); - // return false; - //} - // ContributionStatus of chainTask should be REVEALED boolean isChainContributionStatusSetToRevealed = iexecHubService.doesWishedStatusMatchesOnChainStatus(chainTaskId, walletAddress, ChainContributionStatus.REVEALED); From f3f82890cdc36448f3fd5aa7114c6f9a420ff722 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Mon, 4 Mar 2019 17:19:54 +0100 Subject: [PATCH 3/8] Split result repo into ipfs and mongo --- build.gradle | 5 +- .../core/replicate/ReplicatesController.java | 3 +- .../core/replicate/ReplicatesService.java | 5 +- .../iexec/core/result/IPFS/IPFSConfig.java | 16 ++++++ .../core/result/{ => IPFS}/IPFSService.java | 13 +---- .../iexec/core/result/ResultController.java | 10 ++-- .../com/iexec/core/result/ResultService.java | 56 +++++++++++++++++-- .../iexec/core/result/ResultServiceTest.java | 17 ++++-- 8 files changed, 96 insertions(+), 29 deletions(-) create mode 100644 src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java rename src/main/java/com/iexec/core/result/{ => IPFS}/IPFSService.java (82%) diff --git a/build.gradle b/build.gradle index 214b960ef..69258e749 100644 --- a/build.gradle +++ b/build.gradle @@ -89,8 +89,9 @@ dependencies { // expiring map compile "net.jodah:expiringmap:0.5.8" - // ipfs java client - compile 'com.github.ipfs:java-ipfs-http-client:1.2.3' + // ipfs + compile "com.github.ipfs:java-ipfs-http-client:1.2.3" + compile "com.github.multiformats:java-multiaddr:1.3.1" } jacoco { diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesController.java b/src/main/java/com/iexec/core/replicate/ReplicatesController.java index a0679554b..b9a88a18f 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesController.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesController.java @@ -26,6 +26,7 @@ public ReplicatesController(ReplicatesService replicatesService, public ResponseEntity updateReplicateStatus( @PathVariable(name = "chainTaskId") String chainTaskId, @RequestParam(name = "replicateStatus") ReplicateStatus replicateStatus, + @RequestParam(name = "resultLink") String resultLink, @RequestHeader("Authorization") String bearerToken, @RequestBody ChainReceipt chainReceipt) { @@ -38,7 +39,7 @@ public ResponseEntity updateReplicateStatus( log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}, blockNumber:{}]", chainTaskId, replicateStatus, walletAddress, chainReceipt.getBlockNumber()); - replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER, chainReceipt); + replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER, chainReceipt, resultLink); return ResponseEntity.ok().build(); } } diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index 27cea4be7..e245a7441 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -189,7 +189,7 @@ public void updateReplicateStatus(String chainTaskId, String walletAddress, ReplicateStatus newStatus, ReplicateStatusModifier modifier) { - updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null); + updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null, ""); } // in case the task has been modified between reading and writing it, it is retried up to 100 times @@ -198,7 +198,8 @@ public void updateReplicateStatus(String chainTaskId, String walletAddress, ReplicateStatus newStatus, ReplicateStatusModifier modifier, - ChainReceipt chainReceipt) { + ChainReceipt chainReceipt, + String resultLink) { if (newStatus == ReplicateStatus.RESULT_UPLOADED && !hasResultBeenUploaded(chainTaskId)) { log.error("requested updateResplicateStatus to RESULT_UPLOADED when result has not been" diff --git a/src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java b/src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java new file mode 100644 index 000000000..4e30a5d14 --- /dev/null +++ b/src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java @@ -0,0 +1,16 @@ +package com.iexec.core.result.IPFS; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Getter +public class IPFSConfig { + + @Value("${ipfs.host}") + private String host; + + @Value("${ipfs.port}") + private String port; +} diff --git a/src/main/java/com/iexec/core/result/IPFSService.java b/src/main/java/com/iexec/core/result/IPFS/IPFSService.java similarity index 82% rename from src/main/java/com/iexec/core/result/IPFSService.java rename to src/main/java/com/iexec/core/result/IPFS/IPFSService.java index a24fdd27e..219eecba9 100644 --- a/src/main/java/com/iexec/core/result/IPFSService.java +++ b/src/main/java/com/iexec/core/result/IPFS/IPFSService.java @@ -1,11 +1,10 @@ -package com.iexec.core.result; +package com.iexec.core.result.IPFS; import io.ipfs.api.IPFS; import io.ipfs.api.MerkleNode; import io.ipfs.api.NamedStreamable; import io.ipfs.multihash.Multihash; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.IOException; @@ -15,16 +14,10 @@ @Slf4j public class IPFSService { - @Value("${ipfs.host}") - private String host; - - @Value("${ipfs.port}") - private String port; - private IPFS ipfs; - public IPFSService() { - String multiAddress = "/ip4/" + host + "/tcp/" + port; + public IPFSService(IPFSConfig ipfsConfig) { + String multiAddress = "/ip4/" + ipfsConfig.getHost() + "/tcp/" + ipfsConfig.getPort(); ipfs = new IPFS(multiAddress); } diff --git a/src/main/java/com/iexec/core/result/ResultController.java b/src/main/java/com/iexec/core/result/ResultController.java index 0bb6f4768..9cb6de404 100644 --- a/src/main/java/com/iexec/core/result/ResultController.java +++ b/src/main/java/com/iexec/core/result/ResultController.java @@ -43,7 +43,7 @@ public ResponseEntity addResult( return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build(); } - String ipfsHash = resultService.addResult( + String resultLink = resultService.addResult( Result.builder() .chainTaskId(model.getChainTaskId()) .image(model.getImage()) @@ -52,16 +52,16 @@ public ResponseEntity addResult( .build(), model.getZip()); - if (ipfsHash.isEmpty()) { + if (resultLink.isEmpty()) { return ResponseEntity.status(HttpStatus.BAD_REQUEST.value()).build(); } - log.info("Result uploaded successfully [chainTaskId:{}, uploadRequester:{}]", - model.getChainTaskId(), auth.getWalletAddress()); + log.info("Result uploaded successfully [chainTaskId:{}, uploadRequester:{}, resultLink:{}]", + model.getChainTaskId(), auth.getWalletAddress(), resultLink); challengeService.invalidateEip712ChallengeString(auth.getChallenge()); - return ok(ipfsHash); + return ok(resultLink); } @RequestMapping(method = RequestMethod.HEAD, path = "/results/{chainTaskId}") diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index 32217d393..97dc5587a 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -5,29 +5,44 @@ import com.iexec.common.chain.ChainTask; import com.iexec.common.utils.BytesUtils; import com.iexec.core.chain.IexecHubService; +import com.iexec.core.configuration.ResultRepositoryConfiguration; +import com.iexec.core.result.IPFS.IPFSService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.gridfs.GridFsOperations; import org.springframework.data.mongodb.gridfs.GridFsResource; import org.springframework.stereotype.Service; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; @Service @Slf4j +/** + * Service class to manage all the results. If the result is public, it will be stored on IPFS. If there is a dedicated + * beneficiary, the result will be pushed to mongo. + */ public class ResultService { private static final String RESULT_FILENAME_PREFIX = "iexec-result-"; + private static final String IPFS_ADDRESS_PREFIX = "/ipfs/"; private IexecHubService iexecHubService; private IPFSService ipfsService; + private GridFsOperations gridOperations; + private ResultRepositoryConfiguration resultRepositoryConfig; public ResultService(IexecHubService iexecHubService, - IPFSService ipfsService) { + IPFSService ipfsService, + GridFsOperations gridOperations, + ResultRepositoryConfiguration resultRepositoryConfig) { this.iexecHubService = iexecHubService; this.ipfsService = ipfsService; + this.gridOperations = gridOperations; + this.resultRepositoryConfig = resultRepositoryConfig; } static String getResultFilename(String chainTaskId) { @@ -35,6 +50,13 @@ static String getResultFilename(String chainTaskId) { } boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { + // check if result has been already uploaded + if (isResultInDatabase(chainTaskId)) { + log.error("Trying to upload result that has been already uploaded [chainTaskId:{}, uploadRequester:{}]", + chainTaskId, walletAddress); + return false; + } + // ContributionStatus of chainTask should be REVEALED boolean isChainContributionStatusSetToRevealed = iexecHubService.doesWishedStatusMatchesOnChainStatus(chainTaskId, walletAddress, ChainContributionStatus.REVEALED); @@ -48,15 +70,31 @@ boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { } public boolean isResultInDatabase(String chainTaskId) { - ipfsService.getContent(); - Query query = Query.query(Criteria.where("filename").is(getResultFilename(chainTaskId))); return gridOperations.findOne(query) != null; } String addResult(Result result, byte[] data) { - return result == null || result.getChainTaskId() == null ? "" : - ipfsService.putContent(getResultFilename(result.getChainTaskId()), data); + if (result == null || result.getChainTaskId() == null) { + return ""; + } + + if (isResultPublic(result.getChainTaskId())) { + return addResultToIPFS(result, data); + } else { + return addResultToMongo(result, data); + } + } + + private String addResultToMongo(Result result, byte[] data) { + InputStream inputStream = new ByteArrayInputStream(data); + String resultFileName = getResultFilename(result.getChainTaskId()); + gridOperations.store(inputStream, resultFileName, result); + return resultRepositoryConfig.getResultRepositoryURL() + "/results/" + result.getChainTaskId(); + } + + private String addResultToIPFS(Result result, byte[] data) { + return IPFS_ADDRESS_PREFIX + ipfsService.putContent(result.getChainTaskId(), data); } byte[] getResultByChainTaskId(String chainTaskId) throws IOException { @@ -107,4 +145,12 @@ private Optional getBeneficiary(String chainTaskId, Integer chainId) { return optionalChainDeal.map(chainDeal -> chainDeal.getBeneficiary().toLowerCase()); } + private boolean isResultPublic(String chainTaskId) { + Optional optional = getBeneficiary(chainTaskId, 0); + if (optional.isPresent() && !optional.get().equals(BytesUtils.EMPTY_ADDRESS)) { + return false; + } + return true; + } + } diff --git a/src/test/java/com/iexec/core/result/ResultServiceTest.java b/src/test/java/com/iexec/core/result/ResultServiceTest.java index 16cf27799..9321e4810 100644 --- a/src/test/java/com/iexec/core/result/ResultServiceTest.java +++ b/src/test/java/com/iexec/core/result/ResultServiceTest.java @@ -4,6 +4,7 @@ import com.iexec.common.chain.ChainTask; import com.iexec.common.utils.BytesUtils; import com.iexec.core.chain.IexecHubService; +import com.iexec.core.configuration.ResultRepositoryConfiguration; import com.mongodb.client.gridfs.model.GridFSFile; import org.apache.commons.io.IOUtils; @@ -30,7 +31,10 @@ public class ResultServiceTest { private IexecHubService iexecHubService; @Mock - GridFsOperations gridFsOperations; + private GridFsOperations gridFsOperations; + + @Mock + private ResultRepositoryConfiguration resultRepositoryConfig; @InjectMocks private ResultService resultService; @@ -126,11 +130,16 @@ public void shouldAddResult() { String data = "data"; byte[] dataBytes = data.getBytes(); - String filename = resultService.addResult(result, dataBytes); + when(iexecHubService.getChainTask(any())).thenReturn(Optional.of(new ChainTask())); + ChainDeal chainDeal = ChainDeal.builder().beneficiary("beneficiary").build(); + when(iexecHubService.getChainDeal(any())).thenReturn(Optional.of(chainDeal)); + + when(resultRepositoryConfig.getResultRepositoryURL()).thenReturn("dummyPath"); + String resultLink = resultService.addResult(result, dataBytes); - assertThat(filename).isEqualTo(resultFilename); + assertThat(resultLink).isEqualTo("dummyPath/results/0x1"); Mockito.verify(gridFsOperations, Mockito.times(1)) - .store(any(InputStream.class), Mockito.eq(filename), Mockito.eq(result)); + .store(any(), any(), Mockito.eq(result)); } @Test From e59a2a216bcc93207e2ca01473e6b1ae3fa2e7a0 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Fri, 8 Mar 2019 17:13:29 +0100 Subject: [PATCH 4/8] Package renaming --- .../java/com/iexec/core/replicate/ReplicatesService.java | 5 +++-- src/main/java/com/iexec/core/result/ResultService.java | 2 +- .../com/iexec/core/result/{IPFS => ipfs}/IPFSConfig.java | 2 +- .../com/iexec/core/result/{IPFS => ipfs}/IPFSService.java | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) rename src/main/java/com/iexec/core/result/{IPFS => ipfs}/IPFSConfig.java (88%) rename src/main/java/com/iexec/core/result/{IPFS => ipfs}/IPFSService.java (97%) diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index e245a7441..5885f88b2 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -192,6 +192,7 @@ public void updateReplicateStatus(String chainTaskId, updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null, ""); } + // TODO: this method is to refactor ! // in case the task has been modified between reading and writing it, it is retried up to 100 times @Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100) public void updateReplicateStatus(String chainTaskId, @@ -203,7 +204,7 @@ public void updateReplicateStatus(String chainTaskId, if (newStatus == ReplicateStatus.RESULT_UPLOADED && !hasResultBeenUploaded(chainTaskId)) { log.error("requested updateResplicateStatus to RESULT_UPLOADED when result has not been" - + " uploaded to result repository yet [chainTaskId:{}, ReplicateAddress:{}]", + + " uploaded to result repository yet [chainTaskId:{}, ReplicateAddress:{}]", chainTaskId, walletAddress); return; } @@ -350,7 +351,7 @@ private boolean isTaskStatusFailOnChain(String chainTaskId, String walletAddress public boolean hasResultBeenUploaded(String chainTaskId) { RestTemplate restTemplate = new RestTemplate(); String resultChallengeURI = resultRepoConfig.getResultRepositoryURL() - + "/results/challenge?chainId={id}"; + + "/results/challenge?chainId={id}"; // get the eip712 challenge Eip712Challenge eip712Challenge = restTemplate.getForObject(resultChallengeURI, diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index 97dc5587a..e2edcc5b9 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -6,7 +6,7 @@ import com.iexec.common.utils.BytesUtils; import com.iexec.core.chain.IexecHubService; import com.iexec.core.configuration.ResultRepositoryConfiguration; -import com.iexec.core.result.IPFS.IPFSService; +import com.iexec.core.result.ipfs.IPFSService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; diff --git a/src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java b/src/main/java/com/iexec/core/result/ipfs/IPFSConfig.java similarity index 88% rename from src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java rename to src/main/java/com/iexec/core/result/ipfs/IPFSConfig.java index 4e30a5d14..772099c17 100644 --- a/src/main/java/com/iexec/core/result/IPFS/IPFSConfig.java +++ b/src/main/java/com/iexec/core/result/ipfs/IPFSConfig.java @@ -1,4 +1,4 @@ -package com.iexec.core.result.IPFS; +package com.iexec.core.result.ipfs; import lombok.Getter; import org.springframework.beans.factory.annotation.Value; diff --git a/src/main/java/com/iexec/core/result/IPFS/IPFSService.java b/src/main/java/com/iexec/core/result/ipfs/IPFSService.java similarity index 97% rename from src/main/java/com/iexec/core/result/IPFS/IPFSService.java rename to src/main/java/com/iexec/core/result/ipfs/IPFSService.java index 219eecba9..144a09c47 100644 --- a/src/main/java/com/iexec/core/result/IPFS/IPFSService.java +++ b/src/main/java/com/iexec/core/result/ipfs/IPFSService.java @@ -1,4 +1,4 @@ -package com.iexec.core.result.IPFS; +package com.iexec.core.result.ipfs; import io.ipfs.api.IPFS; import io.ipfs.api.MerkleNode; From 1f3bdc0a2d46b70054e7025fa1b7a7127bf3cad1 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Fri, 8 Mar 2019 17:38:53 +0100 Subject: [PATCH 5/8] Add check for the IPFS/mongo difference --- .../java/com/iexec/core/result/ResultService.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index e2edcc5b9..19baa8a27 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -69,7 +69,19 @@ boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { return true; } - public boolean isResultInDatabase(String chainTaskId) { + boolean isResultInDatabase(String chainTaskId) { + if(isPublicResult(chainTaskId, 0)){ + return isResultInIpfs(chainTaskId); + } + return isResultInMongo(chainTaskId); + } + + boolean isResultInIpfs (String chainTaskId) { + return true; + } + + boolean isResultInMongo(String chainTaskId) { + Query query = Query.query(Criteria.where("filename").is(getResultFilename(chainTaskId))); return gridOperations.findOne(query) != null; } From 6e6d9ff024ede92a06c665f3742785dd0c1be100 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Mon, 11 Mar 2019 13:34:52 +0100 Subject: [PATCH 6/8] Handle ipfs result upload --- .../com/iexec/core/chain/IexecHubService.java | 4 +-- .../com/iexec/core/replicate/Replicate.java | 3 +- .../core/replicate/ReplicatesController.java | 11 +++--- .../core/replicate/ReplicatesService.java | 36 ++++++++++++++++--- .../com/iexec/core/result/ResultService.java | 30 +++++----------- src/main/java/com/iexec/core/task/Task.java | 1 + .../com/iexec/core/task/TaskController.java | 9 ++--- .../java/com/iexec/core/task/TaskService.java | 5 +-- .../iexec/core/result/ResultServiceTest.java | 14 ++++---- .../com/iexec/core/task/TaskServiceTests.java | 18 ++++++++-- 10 files changed, 76 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/iexec/core/chain/IexecHubService.java b/src/main/java/com/iexec/core/chain/IexecHubService.java index 0f1d24632..dc4ea687a 100644 --- a/src/main/java/com/iexec/core/chain/IexecHubService.java +++ b/src/main/java/com/iexec/core/chain/IexecHubService.java @@ -190,7 +190,7 @@ private Optional sendFinalizeTransaction(String chainTaskId, Strin try { receipt = getHubContract(web3jService.getWritingContractGasProvider()).finalize(chainTaskIdBytes, resultUriBytes).send(); } catch (Exception e) { - log.error("Failed finalize [chainTaskId:{}, resultUri:{}, error:{}]]", chainTaskId, resultUri, e.getMessage()); + log.error("Failed finalize [chainTaskId:{}, resultLink:{}, error:{}]]", chainTaskId, resultUri, e.getMessage()); return Optional.empty(); } @@ -207,7 +207,7 @@ private Optional sendFinalizeTransaction(String chainTaskId, Strin return Optional.empty(); } - log.info("Finalized [chainTaskId:{}, resultUri:{}, gasUsed:{}]", chainTaskId, resultUri, receipt.getGasUsed()); + log.info("Finalized [chainTaskId:{}, resultLink:{}, gasUsed:{}]", chainTaskId, resultUri, receipt.getGasUsed()); ChainReceipt chainReceipt = ChainUtils.buildChainReceipt(eventsList.get(0).log, chainTaskId); return Optional.of(chainReceipt); diff --git a/src/main/java/com/iexec/core/replicate/Replicate.java b/src/main/java/com/iexec/core/replicate/Replicate.java index 5c98708b0..55e92acd9 100644 --- a/src/main/java/com/iexec/core/replicate/Replicate.java +++ b/src/main/java/com/iexec/core/replicate/Replicate.java @@ -20,7 +20,7 @@ public class Replicate { private List statusChangeList; private String walletAddress; - private String resultUri; + private String resultLink; private String chainTaskId; private String contributionHash; private int credibility; @@ -49,7 +49,6 @@ private ReplicateStatusChange getLatestStatusChange() { return this.getStatusChangeList().get(this.getStatusChangeList().size() - 1); } - public boolean updateStatus(ReplicateStatus newStatus, ReplicateStatusModifier modifier) { return statusChangeList.add(new ReplicateStatusChange(newStatus, modifier)); } diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesController.java b/src/main/java/com/iexec/core/replicate/ReplicatesController.java index b9a88a18f..82f258933 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesController.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesController.java @@ -1,6 +1,7 @@ package com.iexec.core.replicate; import com.iexec.common.chain.ChainReceipt; +import com.iexec.common.replicate.ReplicateDetails; import com.iexec.common.replicate.ReplicateStatus; import com.iexec.common.replicate.ReplicateStatusModifier; import com.iexec.core.security.JwtTokenProvider; @@ -26,9 +27,8 @@ public ReplicatesController(ReplicatesService replicatesService, public ResponseEntity updateReplicateStatus( @PathVariable(name = "chainTaskId") String chainTaskId, @RequestParam(name = "replicateStatus") ReplicateStatus replicateStatus, - @RequestParam(name = "resultLink") String resultLink, @RequestHeader("Authorization") String bearerToken, - @RequestBody ChainReceipt chainReceipt) { + @RequestBody ReplicateDetails details) { String walletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken); @@ -36,10 +36,11 @@ public ResponseEntity updateReplicateStatus( return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build(); } - log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}, blockNumber:{}]", - chainTaskId, replicateStatus, walletAddress, chainReceipt.getBlockNumber()); + log.info("UpdateReplicateStatus requested [chainTaskId:{}, replicateStatus:{}, walletAddress:{}]", + chainTaskId, replicateStatus, walletAddress); - replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER, chainReceipt, resultLink); + replicatesService.updateReplicateStatus(chainTaskId, walletAddress, replicateStatus, ReplicateStatusModifier.WORKER, + details.getChainReceipt(), details.getResultLink()); return ResponseEntity.ok().build(); } } diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index 5885f88b2..b3ff8231a 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -1,13 +1,12 @@ package com.iexec.core.replicate; -import com.iexec.common.chain.ChainContribution; -import com.iexec.common.chain.ChainContributionStatus; -import com.iexec.common.chain.ChainReceipt; +import com.iexec.common.chain.*; import com.iexec.common.replicate.ReplicateStatus; import com.iexec.common.replicate.ReplicateStatusChange; import com.iexec.common.replicate.ReplicateStatusModifier; import com.iexec.common.result.eip712.Eip712Challenge; import com.iexec.common.result.eip712.Eip712ChallengeUtils; +import com.iexec.common.utils.BytesUtils; import com.iexec.core.chain.ChainConfig; import com.iexec.core.chain.CredentialsService; import com.iexec.core.chain.IexecHubService; @@ -170,6 +169,16 @@ public Optional getRandomReplicateWithRevealStatus(String chainTaskId return Optional.empty(); } + public Optional getReplicateWithResultUploadedStatus(String chainTaskId) { + for (Replicate replicate : getReplicates(chainTaskId)) { + if (replicate.getCurrentStatus().equals(RESULT_UPLOADED)) { + return Optional.of(replicate); + } + } + + return Optional.empty(); + } + public boolean moreReplicatesNeeded(String chainTaskId, int nbWorkersNeeded, long maxExecutionTime) { int nbValidReplicates = 0; for (Replicate replicate : getReplicates(chainTaskId)) { @@ -192,7 +201,7 @@ public void updateReplicateStatus(String chainTaskId, updateReplicateStatus(chainTaskId, walletAddress, newStatus, modifier, null, ""); } - // TODO: this method is to refactor ! + // TODO: this method needs to be refactored ! // in case the task has been modified between reading and writing it, it is retried up to 100 times @Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100) public void updateReplicateStatus(String chainTaskId, @@ -263,6 +272,9 @@ public void updateReplicateStatus(String chainTaskId, chainReceipt = null; } + if (newStatus.equals(RESULT_UPLOADED)) { + replicate.setResultLink(resultLink); + } replicate.updateStatus(newStatus, modifier, chainReceipt); replicatesRepository.save(optionalReplicates.get()); @@ -348,7 +360,21 @@ private boolean isTaskStatusFailOnChain(String chainTaskId, String walletAddress } } + boolean isPublicResult(String chainTaskId, Integer chainId) { + Optional beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId); + if (!beneficiary.isPresent()) { + log.error("Failed to get beneficiary for isPublicResult() method [chainTaskId:{}]", chainTaskId); + return false; + } + return beneficiary.get().equals(BytesUtils.EMPTY_ADDRESS); + } + public boolean hasResultBeenUploaded(String chainTaskId) { + // currently no check in case of IPFS + if(isPublicResult(chainTaskId, 0)){ + return true; + } + RestTemplate restTemplate = new RestTemplate(); String resultChallengeURI = resultRepoConfig.getResultRepositoryURL() + "/results/challenge?chainId={id}"; @@ -371,7 +397,7 @@ public boolean hasResultBeenUploaded(String chainTaskId) { String resultURI = resultRepoConfig.getResultRepositoryURL() + "/results/{chainTaskId}"; - // HEAD resultRepoURL/resuts/chainTaskId + // HEAD resultRepoURL/results/chainTaskId return restTemplate.exchange(resultURI, HttpMethod.HEAD, entity, String.class, chainTaskId) .getStatusCode() .is2xxSuccessful(); diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index 19baa8a27..cb684b2be 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -77,7 +77,7 @@ boolean isResultInDatabase(String chainTaskId) { } boolean isResultInIpfs (String chainTaskId) { - return true; + return false; } boolean isResultInMongo(String chainTaskId) { @@ -91,7 +91,7 @@ String addResult(Result result, byte[] data) { return ""; } - if (isResultPublic(result.getChainTaskId())) { + if (isPublicResult(result.getChainTaskId())) { return addResultToIPFS(result, data); } else { return addResultToMongo(result, data); @@ -124,7 +124,7 @@ byte[] getResultByChainTaskId(String chainTaskId) throws IOException { * TODO 2: Make possible to call this iexecHubService with a 'chainId' at runtime */ boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAddress) { - Optional beneficiary = getBeneficiary(chainTaskId, chainId); + Optional beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId); if (!beneficiary.isPresent()) { log.error("Failed to get beneficiary for isOwnerOfResult() method [chainTaskId:{}, downloaderAddress:{}]", chainTaskId, downloaderAddress); @@ -139,30 +139,16 @@ boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAd return true; } + boolean isPublicResult(String chainTaskId) { + return isPublicResult(chainTaskId, 0); + } + boolean isPublicResult(String chainTaskId, Integer chainId) { - Optional beneficiary = getBeneficiary(chainTaskId, chainId); + Optional beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId); if (!beneficiary.isPresent()) { log.error("Failed to get beneficiary for isPublicResult() method [chainTaskId:{}]", chainTaskId); return false; } return beneficiary.get().equals(BytesUtils.EMPTY_ADDRESS); } - - private Optional getBeneficiary(String chainTaskId, Integer chainId) { - Optional chainTask = iexecHubService.getChainTask(chainTaskId); - if (!chainTask.isPresent()) { - return Optional.empty(); - } - Optional optionalChainDeal = iexecHubService.getChainDeal(chainTask.get().getDealid()); - return optionalChainDeal.map(chainDeal -> chainDeal.getBeneficiary().toLowerCase()); - } - - private boolean isResultPublic(String chainTaskId) { - Optional optional = getBeneficiary(chainTaskId, 0); - if (optional.isPresent() && !optional.get().equals(BytesUtils.EMPTY_ADDRESS)) { - return false; - } - return true; - } - } diff --git a/src/main/java/com/iexec/core/task/Task.java b/src/main/java/com/iexec/core/task/Task.java index 9ee6b5158..b8052cf18 100644 --- a/src/main/java/com/iexec/core/task/Task.java +++ b/src/main/java/com/iexec/core/task/Task.java @@ -46,6 +46,7 @@ public class Task { private Date contributionDeadline; private Date revealDeadline; private Date finalDeadline; + private String resultLink; private List dateStatusList; public Task(String dappName, String commandLine, int trust) { diff --git a/src/main/java/com/iexec/core/task/TaskController.java b/src/main/java/com/iexec/core/task/TaskController.java index c0bbcbada..3d57f9445 100644 --- a/src/main/java/com/iexec/core/task/TaskController.java +++ b/src/main/java/com/iexec/core/task/TaskController.java @@ -43,14 +43,9 @@ public ResponseEntity getTask(@PathVariable("chainTaskId") String chainTaskId) { } Task task = optionalTask.get(); - Optional optionalReplicates = replicatesService.getReplicatesList(chainTaskId); - if (!optionalReplicates.isPresent()) { - return createTaskModel(task, new ReplicatesList()). - map(ResponseEntity::ok). - orElseGet(() -> status(HttpStatus.NO_CONTENT).build()); - } + ReplicatesList replicates = replicatesService.getReplicatesList(chainTaskId).orElseGet(ReplicatesList::new); - return createTaskModel(task, optionalReplicates.get()). + return createTaskModel(task, replicates). map(ResponseEntity::ok). orElseGet(() -> status(HttpStatus.NO_CONTENT).build()); } diff --git a/src/main/java/com/iexec/core/task/TaskService.java b/src/main/java/com/iexec/core/task/TaskService.java index fe586153b..548395b71 100644 --- a/src/main/java/com/iexec/core/task/TaskService.java +++ b/src/main/java/com/iexec/core/task/TaskService.java @@ -18,7 +18,6 @@ import com.iexec.core.worker.WorkerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.retry.annotation.Retryable; @@ -376,9 +375,11 @@ private void uploadRequested2UploadRequestTimeout(Task task) { private void resultUploading2Uploaded(Task task) { boolean condition1 = task.getCurrentStatus().equals(RESULT_UPLOADING); - boolean condition2 = replicatesService.getNbReplicatesContainingStatus(task.getChainTaskId(), ReplicateStatus.RESULT_UPLOADED) > 0; + Optional optionalReplicate = replicatesService.getReplicateWithResultUploadedStatus(task.getChainTaskId()); + boolean condition2 = optionalReplicate.isPresent(); if (condition1 && condition2) { + task.setResultLink(optionalReplicate.get().getResultLink()); updateTaskStatusAndSave(task, RESULT_UPLOADED); resultUploaded2Finalized2Completed(task); } else if (replicatesService.getNbReplicatesWithCurrentStatus(task.getChainTaskId(), ReplicateStatus.RESULT_UPLOAD_REQUEST_FAILED) > 0 && diff --git a/src/test/java/com/iexec/core/result/ResultServiceTest.java b/src/test/java/com/iexec/core/result/ResultServiceTest.java index 9321e4810..7773f5298 100644 --- a/src/test/java/com/iexec/core/result/ResultServiceTest.java +++ b/src/test/java/com/iexec/core/result/ResultServiceTest.java @@ -206,25 +206,23 @@ public void isNotOwnerOfResultSinceWalletAddressShouldBeBeneficiary() { @Test public void isOwnerOfResult() { - String beneficiary = "0xabcd1339Ec7e762e639f4887E2bFe5EE8023E23E"; - when(iexecHubService.getChainTask(chainTaskId)).thenReturn(Optional.of(ChainTask.builder().dealid(chainDealId).build())); - when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().beneficiary(beneficiary).build())); - assertThat(resultService.isOwnerOfResult(chainId, chainTaskId, "0xabcd1339Ec7e762e639f4887E2bFe5EE8023E23E")).isTrue(); + String beneficiary = "0xabcd1339ec7e762e639f4887e2bfe5ee8023e23e"; + when(iexecHubService.getTaskBeneficiary(chainTaskId, chainId)).thenReturn(Optional.of(beneficiary)); + + assertThat(resultService.isOwnerOfResult(chainId, chainTaskId, "0xabcd1339ec7e762e639f4887e2bfe5ee8023e23e")).isTrue(); } @Test public void isPublicResult() { String beneficiary = BytesUtils.EMPTY_ADDRESS; - when(iexecHubService.getChainTask(chainTaskId)).thenReturn(Optional.of(ChainTask.builder().dealid(chainDealId).build())); - when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().beneficiary(beneficiary).build())); + when(iexecHubService.getTaskBeneficiary(chainTaskId, chainId)).thenReturn(Optional.of(beneficiary)); assertThat(resultService.isPublicResult(chainTaskId, chainId)).isTrue(); } @Test public void isNotPublicResult() { String beneficiary = "0xb"; - when(iexecHubService.getChainTask(chainTaskId)).thenReturn(Optional.of(ChainTask.builder().dealid(chainDealId).build())); - when(iexecHubService.getChainDeal(chainDealId)).thenReturn(Optional.of(ChainDeal.builder().beneficiary(beneficiary).build())); + when(iexecHubService.getTaskBeneficiary(chainTaskId, chainId)).thenReturn(Optional.of(beneficiary)); assertThat(resultService.isPublicResult(chainTaskId, chainId)).isFalse(); } } \ No newline at end of file diff --git a/src/test/java/com/iexec/core/task/TaskServiceTests.java b/src/test/java/com/iexec/core/task/TaskServiceTests.java index 209d602d2..5c8d4cc9c 100644 --- a/src/test/java/com/iexec/core/task/TaskServiceTests.java +++ b/src/test/java/com/iexec/core/task/TaskServiceTests.java @@ -41,6 +41,7 @@ public class TaskServiceTests { private final long maxExecutionTime = 60000; private final static String NO_TEE_TAG = BytesUtils.EMPTY_HEXASTRING_64; private final static String TEE_TAG = "0x0000000000000000000000000000000000000000000000000000000000000001"; + private final static String RESULT_LINK = "/ipfs/the_result_string"; @Mock private TaskRepository taskRepository; @@ -693,8 +694,11 @@ public void shouldUpdateResultUploading2Uploaded2Finalizing2Finalized() { //one task.changeStatus(RESULT_UPLOADING); ChainTask chainTask = ChainTask.builder().revealCounter(1).build(); + Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID); + replicate.setResultLink(RESULT_LINK); when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); + when(replicatesService.getReplicateWithResultUploadedStatus(CHAIN_TASK_ID)).thenReturn(Optional.of(replicate)); when(replicatesService.getNbReplicatesContainingStatus(CHAIN_TASK_ID, ReplicateStatus.RESULT_UPLOADED)).thenReturn(1); when(replicatesService.getNbReplicatesContainingStatus(CHAIN_TASK_ID, ReplicateStatus.REVEALED)).thenReturn(1); when(iexecHubService.canFinalize(task.getChainTaskId())).thenReturn(true); @@ -720,8 +724,11 @@ public void shouldUpdateResultUploading2UploadedButNot2Finalizing() { //one work Task task = new Task(DAPP_NAME, COMMAND_LINE, 2, CHAIN_TASK_ID); task.changeStatus(RESULT_UPLOADING); + Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID); + replicate.setResultLink(RESULT_LINK); + when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); - when(replicatesService.getNbReplicatesContainingStatus(CHAIN_TASK_ID, ReplicateStatus.RESULT_UPLOADED)).thenReturn(1); + when(replicatesService.getReplicateWithResultUploadedStatus(CHAIN_TASK_ID)).thenReturn(Optional.of(replicate)); when(iexecHubService.canFinalize(task.getChainTaskId())).thenReturn(true); when(iexecHubService.hasEnoughGas()).thenReturn(false); @@ -736,8 +743,11 @@ public void shouldUpdateResultUploading2Uploaded2Finalizing2FinalizeFail() { //o Task task = new Task(DAPP_NAME, COMMAND_LINE, 2, CHAIN_TASK_ID); task.changeStatus(RESULT_UPLOADING); ChainTask chainTask = ChainTask.builder().revealCounter(1).build(); + Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID); + replicate.setResultLink(RESULT_LINK); when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); + when(replicatesService.getReplicateWithResultUploadedStatus(CHAIN_TASK_ID)).thenReturn(Optional.of(replicate)); when(replicatesService.getNbReplicatesContainingStatus(CHAIN_TASK_ID, ReplicateStatus.RESULT_UPLOADED)).thenReturn(1); when(replicatesService.getNbReplicatesContainingStatus(CHAIN_TASK_ID, ReplicateStatus.REVEALED)).thenReturn(1); when(iexecHubService.canFinalize(task.getChainTaskId())).thenReturn(true); @@ -887,11 +897,15 @@ public void shouldUpdateFromUploadingResultToResultUploaded() { task.changeStatus(TaskStatus.RESULT_UPLOAD_REQUESTED); task.changeStatus(TaskStatus.RESULT_UPLOADING); + Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID); + replicate.setResultLink(RESULT_LINK); + when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); - when(replicatesService.getNbReplicatesContainingStatus(task.getChainTaskId(), ReplicateStatus.RESULT_UPLOADED)).thenReturn(1); + when(replicatesService.getReplicateWithResultUploadedStatus(CHAIN_TASK_ID)).thenReturn(Optional.of(replicate)); taskService.tryUpgradeTaskStatus(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RESULT_UPLOADED); + assertThat(task.getResultLink()).isEqualTo(RESULT_LINK); } // No worker in UPLOADED From 8d486012b18da4a2873e7fab982a008f9e72a456 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Mon, 11 Mar 2019 13:45:33 +0100 Subject: [PATCH 7/8] Add ipfs node in the docker-compose --- src/main/resources/docker-compose.yml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/resources/docker-compose.yml b/src/main/resources/docker-compose.yml index 3c8a3f4be..b52989183 100644 --- a/src/main/resources/docker-compose.yml +++ b/src/main/resources/docker-compose.yml @@ -46,4 +46,15 @@ services: - HUB=0x60E25C038D70A15364DAc11A042DB1dD7A2cccBC - GATEWAY=http://chain:8545 ports: - - 5000:5000 \ No newline at end of file + - 5000:5000 + + ipfs: + image: jbenet/go-ipfs:latest + container_name: ipfs-node + ports: + - 8080:8080 + - 4001:4001 + - 5001:5001 + volumes: + - /tmp/ipfs-docker-staging:/export + - /tmp/ipfs-docker-data:/data/ipfs \ No newline at end of file From 2f5c49bf734687d36fd23bbc4783c2156970c009 Mon Sep 17 00:00:00 2001 From: Ugo Plouviez Date: Mon, 11 Mar 2019 14:46:32 +0100 Subject: [PATCH 8/8] Changes following the pull request --- .../core/replicate/ReplicatesService.java | 28 +++++++++---------- .../iexec/core/result/ResultController.java | 2 +- .../com/iexec/core/result/ResultService.java | 21 ++++---------- .../iexec/core/result/ipfs/IPFSService.java | 2 +- .../iexec/core/result/ResultServiceTest.java | 7 ++--- 5 files changed, 25 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesService.java b/src/main/java/com/iexec/core/replicate/ReplicatesService.java index b3ff8231a..97f9c0b97 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesService.java @@ -147,9 +147,7 @@ public int getNbOffChainReplicatesWithStatus(String chainTaskId, ReplicateStatus private int getNbReplicatesWithGivenStatusJustBeforeWorkerLost(String chainTaskId, ReplicateStatus status) { int nbReplicates = 0; for (Replicate replicate : getReplicates(chainTaskId)) { - int size = replicate.getStatusChangeList().size(); - if (size >= 2 && replicate.getStatusChangeList().get(size - 1).getStatus().equals(WORKER_LOST) - && replicate.getStatusChangeList().get(size - 2).getStatus().equals(status)) { + if (isStatusBeforeWorkerLostEqualsTo(replicate, status)) { nbReplicates++; } } @@ -171,7 +169,11 @@ public Optional getRandomReplicateWithRevealStatus(String chainTaskId public Optional getReplicateWithResultUploadedStatus(String chainTaskId) { for (Replicate replicate : getReplicates(chainTaskId)) { - if (replicate.getCurrentStatus().equals(RESULT_UPLOADED)) { + + boolean isStatusResultUploaded = replicate.getCurrentStatus().equals(RESULT_UPLOADED); + boolean isStatusResultUploadedBeforeWorkerLost = isStatusBeforeWorkerLostEqualsTo(replicate, RESULT_UPLOADED); + + if (isStatusResultUploaded || isStatusResultUploadedBeforeWorkerLost) { return Optional.of(replicate); } } @@ -179,6 +181,13 @@ public Optional getReplicateWithResultUploadedStatus(String chainTask return Optional.empty(); } + private boolean isStatusBeforeWorkerLostEqualsTo(Replicate replicate, ReplicateStatus status) { + int size = replicate.getStatusChangeList().size(); + return size >= 2 + && replicate.getStatusChangeList().get(size - 1).getStatus().equals(WORKER_LOST) + && replicate.getStatusChangeList().get(size - 2).getStatus().equals(status); + } + public boolean moreReplicatesNeeded(String chainTaskId, int nbWorkersNeeded, long maxExecutionTime) { int nbValidReplicates = 0; for (Replicate replicate : getReplicates(chainTaskId)) { @@ -360,18 +369,9 @@ private boolean isTaskStatusFailOnChain(String chainTaskId, String walletAddress } } - boolean isPublicResult(String chainTaskId, Integer chainId) { - Optional beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId); - if (!beneficiary.isPresent()) { - log.error("Failed to get beneficiary for isPublicResult() method [chainTaskId:{}]", chainTaskId); - return false; - } - return beneficiary.get().equals(BytesUtils.EMPTY_ADDRESS); - } - public boolean hasResultBeenUploaded(String chainTaskId) { // currently no check in case of IPFS - if(isPublicResult(chainTaskId, 0)){ + if(iexecHubService.isPublicResult(chainTaskId, 0)){ return true; } diff --git a/src/main/java/com/iexec/core/result/ResultController.java b/src/main/java/com/iexec/core/result/ResultController.java index 9cb6de404..a6713426f 100644 --- a/src/main/java/com/iexec/core/result/ResultController.java +++ b/src/main/java/com/iexec/core/result/ResultController.java @@ -108,7 +108,7 @@ public ResponseEntity getResult(@PathVariable("chainTaskId") String chai @RequestParam(name = "chainId") Integer chainId) throws IOException { Authorization auth = authorizationService.getAuthorizationFromToken(token); - boolean isPublicResult = resultService.isPublicResult(chainTaskId, chainId); + boolean isPublicResult = resultService.isPublicResult(chainTaskId); boolean isAuthorizedOwnerOfResult = auth != null && resultService.isOwnerOfResult(chainId, chainTaskId, auth.getWalletAddress()) && authorizationService.isAuthorizationValid(auth); diff --git a/src/main/java/com/iexec/core/result/ResultService.java b/src/main/java/com/iexec/core/result/ResultService.java index cb684b2be..bde06b28a 100644 --- a/src/main/java/com/iexec/core/result/ResultService.java +++ b/src/main/java/com/iexec/core/result/ResultService.java @@ -70,7 +70,7 @@ boolean canUploadResult(String chainTaskId, String walletAddress, byte[] zip) { } boolean isResultInDatabase(String chainTaskId) { - if(isPublicResult(chainTaskId, 0)){ + if(isPublicResult(chainTaskId)){ return isResultInIpfs(chainTaskId); } return isResultInMongo(chainTaskId); @@ -91,7 +91,7 @@ String addResult(Result result, byte[] data) { return ""; } - if (isPublicResult(result.getChainTaskId())) { + if (iexecHubService.isPublicResult(result.getChainTaskId(), 0)) { return addResultToIPFS(result, data); } else { return addResultToMongo(result, data); @@ -109,6 +109,10 @@ private String addResultToIPFS(Result result, byte[] data) { return IPFS_ADDRESS_PREFIX + ipfsService.putContent(result.getChainTaskId(), data); } + public boolean isPublicResult(String chainTaskId) { + return iexecHubService.isPublicResult(chainTaskId, 0); + } + byte[] getResultByChainTaskId(String chainTaskId) throws IOException { String resultFileName = getResultFilename(chainTaskId); GridFsResource[] resources = gridOperations.getResources(resultFileName); @@ -138,17 +142,4 @@ boolean isOwnerOfResult(Integer chainId, String chainTaskId, String downloaderAd } return true; } - - boolean isPublicResult(String chainTaskId) { - return isPublicResult(chainTaskId, 0); - } - - boolean isPublicResult(String chainTaskId, Integer chainId) { - Optional beneficiary = iexecHubService.getTaskBeneficiary(chainTaskId, chainId); - if (!beneficiary.isPresent()) { - log.error("Failed to get beneficiary for isPublicResult() method [chainTaskId:{}]", chainTaskId); - return false; - } - return beneficiary.get().equals(BytesUtils.EMPTY_ADDRESS); - } } diff --git a/src/main/java/com/iexec/core/result/ipfs/IPFSService.java b/src/main/java/com/iexec/core/result/ipfs/IPFSService.java index 144a09c47..ed52e3a13 100644 --- a/src/main/java/com/iexec/core/result/ipfs/IPFSService.java +++ b/src/main/java/com/iexec/core/result/ipfs/IPFSService.java @@ -31,7 +31,7 @@ public Optional getContent(String ipfsHash) { return Optional.empty(); } - public boolean doesContentExists(String ipfsHash) { + public boolean doesContentExist(String ipfsHash) { return getContent(ipfsHash).isPresent(); } diff --git a/src/test/java/com/iexec/core/result/ResultServiceTest.java b/src/test/java/com/iexec/core/result/ResultServiceTest.java index 7773f5298..a642d09c3 100644 --- a/src/test/java/com/iexec/core/result/ResultServiceTest.java +++ b/src/test/java/com/iexec/core/result/ResultServiceTest.java @@ -214,15 +214,14 @@ public void isOwnerOfResult() { @Test public void isPublicResult() { - String beneficiary = BytesUtils.EMPTY_ADDRESS; - when(iexecHubService.getTaskBeneficiary(chainTaskId, chainId)).thenReturn(Optional.of(beneficiary)); - assertThat(resultService.isPublicResult(chainTaskId, chainId)).isTrue(); + when(iexecHubService.isPublicResult(chainTaskId, 0)).thenReturn(true); + assertThat(resultService.isPublicResult(chainTaskId)).isTrue(); } @Test public void isNotPublicResult() { String beneficiary = "0xb"; when(iexecHubService.getTaskBeneficiary(chainTaskId, chainId)).thenReturn(Optional.of(beneficiary)); - assertThat(resultService.isPublicResult(chainTaskId, chainId)).isFalse(); + assertThat(resultService.isPublicResult(chainTaskId)).isFalse(); } } \ No newline at end of file