Skip to content

Commit

Permalink
Merge pull request #523 from iExecBlockchainComputing/feature/platfor…
Browse files Browse the repository at this point in the history
…m-registry

Feature/platform registry
  • Loading branch information
jeremyjams authored Sep 23, 2022
2 parents ff9ce31 + 852b3bf commit 32aece6
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 203 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ dependencies {
implementation "org.springframework.boot:spring-boot-starter-actuator"
implementation "org.springframework.cloud:spring-cloud-starter-openfeign"
implementation "org.springframework.boot:spring-boot-starter-hateoas"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.cloud:spring-cloud-starter-config'

// NoSuchMethodError: 'okhttp3.RequestBody okhttp3.RequestBody.create(java.lang.String, okhttp3.MediaType)'
implementation 'com.squareup.okhttp3:okhttp:4.3.1' // Web3j issue: https://github.com/web3j/web3j/issues/1180
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.iexec.core.registry;

import lombok.Getter;
import org.hibernate.validator.constraints.URL;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Getter
@Configuration
public class PlatformRegistryConfiguration {

@URL
@Value("${sms.scone}")
private String sconeSms;

@URL
@Value("${sms.gramine}")
private String gramineSms;

}
27 changes: 17 additions & 10 deletions src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusDetails;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.common.replicate.ReplicateTaskSummary;
import com.iexec.common.replicate.ReplicateTaskSummary.ReplicateTaskSummaryBuilder;
import com.iexec.common.task.TaskAbortCause;
import com.iexec.core.chain.SignatureService;
import com.iexec.core.chain.Web3jService;
Expand Down Expand Up @@ -83,7 +85,7 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
*
*/
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5)
Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlock, String walletAddress) {
Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastBlock, String walletAddress) {
// return empty if max computing task is reached or if the worker is not found
if (!workerService.canAcceptMoreWorks(walletAddress)) {
return Optional.empty();
Expand All @@ -108,7 +110,7 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
}
Worker worker = optional.get();

return getAuthorizationForAnyAvailableTask(
return getReplicateTaskSummaryForAnyAvailableTask(
walletAddress,
worker.isTeeEnabled()
);
Expand All @@ -120,17 +122,17 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
*
* @param walletAddress Wallet address of the worker asking for work.
* @param isTeeEnabled Whether this worker supports TEE.
* @return An {@link Optional} containing a {@link WorkerpoolAuthorization}
* @return An {@link Optional} containing a {@link ReplicateTaskSummary}
* if any {@link Task} is available and can be handled by this worker,
* {@link Optional#empty()} otherwise.
*/
private Optional<WorkerpoolAuthorization> getAuthorizationForAnyAvailableTask(
private Optional<ReplicateTaskSummary> getReplicateTaskSummaryForAnyAvailableTask(
String walletAddress,
boolean isTeeEnabled) {
final List<String> alreadyScannedTasks = new ArrayList<>();

Optional<WorkerpoolAuthorization> authorization = Optional.empty();
while (authorization.isEmpty()) {
Optional<ReplicateTaskSummary> replicateTaskSummary = Optional.empty();
while (replicateTaskSummary.isEmpty()) {
final Optional<Task> oTask = taskService.getPrioritizedInitializedOrRunningTask(
!isTeeEnabled,
alreadyScannedTasks
Expand All @@ -142,12 +144,12 @@ private Optional<WorkerpoolAuthorization> getAuthorizationForAnyAvailableTask(

final Task task = oTask.get();
alreadyScannedTasks.add(task.getChainTaskId());
authorization = getAuthorizationForTask(task, walletAddress);
replicateTaskSummary = getReplicateTaskSummary(task, walletAddress);
}
return authorization;
return replicateTaskSummary;
}

private Optional<WorkerpoolAuthorization> getAuthorizationForTask(Task task, String walletAddress) {
private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String walletAddress) {
String chainTaskId = task.getChainTaskId();
if (!acceptOrRejectTask(task, walletAddress)) {
return Optional.empty();
Expand All @@ -158,7 +160,12 @@ private Optional<WorkerpoolAuthorization> getAuthorizationForTask(Task task, Str
walletAddress,
chainTaskId,
task.getEnclaveChallenge());
return Optional.of(authorization);
ReplicateTaskSummaryBuilder replicateTaskSummary = ReplicateTaskSummary.builder()
.workerpoolAuthorization(authorization);
if(task.isTeeTask()){
replicateTaskSummary.smsUrl(task.getSmsUrl());
}
return Optional.of(replicateTaskSummary.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.iexec.core.replicate;

import com.iexec.common.chain.WorkerpoolAuthorization;
import com.iexec.common.notification.TaskNotification;
import com.iexec.common.notification.TaskNotificationType;
import com.iexec.common.replicate.*;
Expand Down Expand Up @@ -50,7 +49,7 @@ public ReplicatesController(ReplicatesService replicatesService,
}

@GetMapping("/replicates/available")
public ResponseEntity<WorkerpoolAuthorization> getAvailableReplicate(
public ResponseEntity<ReplicateTaskSummary> getAvailableReplicateTaskSummary(
@RequestParam(name = "blockNumber") long blockNumber,
@RequestHeader("Authorization") String bearerToken) {
String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
Expand All @@ -64,7 +63,7 @@ public ResponseEntity<WorkerpoolAuthorization> getAvailableReplicate(
workerService.updateLastReplicateDemandDate(workerWalletAddress);

return replicateSupplyService
.getAuthOfAvailableReplicate(blockNumber, workerWalletAddress)
.getAvailableReplicateTaskSummary(blockNumber, workerWalletAddress)
.map(ResponseEntity::ok)
.orElseGet(() -> status(HttpStatus.NO_CONTENT).build());
}
Expand Down
82 changes: 49 additions & 33 deletions src/main/java/com/iexec/core/sms/SmsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@

package com.iexec.core.sms;

import com.iexec.common.chain.ChainDeal;
import com.iexec.common.chain.IexecHubAbstractService;
import com.iexec.common.task.TaskDescription;
import com.iexec.common.tee.TeeEnclaveProvider;
import com.iexec.common.tee.TeeUtils;
import com.iexec.common.utils.BytesUtils;
import com.iexec.core.registry.PlatformRegistryConfiguration;
import com.iexec.sms.api.SmsClient;
import com.iexec.sms.api.SmsClientCreationException;
import com.iexec.sms.api.SmsClientProvider;
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
Expand All @@ -37,12 +35,13 @@
@Slf4j
@Service
public class SmsService {
private final PlatformRegistryConfiguration registryConfiguration;
private final SmsClientProvider smsClientProvider;
private final IexecHubAbstractService iexecHubService;

public SmsService(SmsClientProvider smsClientProvider, IexecHubAbstractService iexecHubService) {
public SmsService(PlatformRegistryConfiguration registryConfiguration,
SmsClientProvider smsClientProvider) {
this.registryConfiguration = registryConfiguration;
this.smsClientProvider = smsClientProvider;
this.iexecHubService = iexecHubService;
}

/**
Expand All @@ -55,24 +54,42 @@ public SmsService(SmsClientProvider smsClientProvider, IexecHubAbstractService i
* <p>
* If any of these conditions is wrong, then the {@link SmsClient} is considered to be not-ready.
*
* @param chainDealId ID of the on-chain deal related to the task to execute.
* @param chainTaskId ID of the on-chain task.
* @return {@literal true} if previous conditions are met, {@literal false} otherwise.
* @param tag Tag of the deal.
* @return SMS url if TEE types of tag & SMS match.
*/
public boolean isSmsClientReady(String chainDealId, String chainTaskId) {
try {
final Optional<ChainDeal> chainDeal = iexecHubService.getChainDeal(chainDealId);
if (chainDeal.isEmpty()) {
log.error("No chain deal for given ID [chainDealId: {}]", chainDealId);
return false;
}
final SmsClient smsClient = smsClientProvider.getOrCreateSmsClientForUninitializedTask(chainDeal.get(), chainTaskId);
final TeeEnclaveProvider teeEnclaveProviderForDeal = TeeUtils.getTeeEnclaveProvider(chainDeal.get().getTag());
return checkSmsTeeEnclaveProvider(smsClient, teeEnclaveProviderForDeal, chainTaskId);
} catch (SmsClientCreationException e) {
log.error("SmsClient is not ready [chainTaskId: {}]", chainTaskId, e);
return false;
public Optional<String> getVerifiedSmsUrl(String chainTaskId, String tag) {
final TeeEnclaveProvider teeEnclaveProviderForDeal = TeeUtils.getTeeEnclaveProvider(tag);
if(teeEnclaveProviderForDeal == null){
log.error("Can't get verified SMS url with invalid TEE enclave " +
"provider from tag [chainTaskId:{}]", chainTaskId);
return Optional.empty();
}
Optional<String> smsUrl = retrieveSmsUrl(teeEnclaveProviderForDeal);
if(smsUrl.isEmpty()){
log.error("Can't get verified SMS url since type of tag is not " +
"supported [chainTaskId:{},teeEnclaveProvider:{}]",
chainTaskId, teeEnclaveProviderForDeal);
return Optional.empty();
}
final SmsClient smsClient = smsClientProvider.getSmsClient(smsUrl.get());
if(!checkSmsTeeEnclaveProvider(smsClient, teeEnclaveProviderForDeal, chainTaskId)){
log.error("Can't get verified SMS url since tag TEE type " +
"does not match SMS TEE type [chainTaskId:{},teeProviderForTask:{}]",
chainTaskId, teeEnclaveProviderForDeal);
return Optional.empty();
}
return smsUrl;
}

private Optional<String> retrieveSmsUrl(TeeEnclaveProvider teeEnclaveProvider) {
Optional<String> smsUrl = Optional.empty();
if(TeeEnclaveProvider.SCONE.equals(teeEnclaveProvider)){
smsUrl = Optional.of(registryConfiguration.getSconeSms());
} else if(TeeEnclaveProvider.GRAMINE.equals(teeEnclaveProvider)){
smsUrl = Optional.of(registryConfiguration.getGramineSms());
}
return smsUrl;
}

private boolean checkSmsTeeEnclaveProvider(SmsClient smsClient,
Expand All @@ -96,33 +113,32 @@ private boolean checkSmsTeeEnclaveProvider(SmsClient smsClient,
return true;
}

public Optional<String> getEnclaveChallenge(String chainTaskId, boolean isTeeEnabled) {
return isTeeEnabled
? generateEnclaveChallenge(chainTaskId)
: Optional.of(BytesUtils.EMPTY_ADDRESS);
public Optional<String> getEnclaveChallenge(String chainTaskId, String smsUrl) {
return StringUtils.isEmpty(smsUrl)
? Optional.of(BytesUtils.EMPTY_ADDRESS)
: generateEnclaveChallenge(chainTaskId, smsUrl);
}

@Retryable(value = FeignException.class)
Optional<String> generateEnclaveChallenge(String chainTaskId) {
final TaskDescription taskDescription = iexecHubService.getTaskDescription(chainTaskId);

Optional<String> generateEnclaveChallenge(String chainTaskId, String smsUrl) {
// SMS client should already have been created once before.
// If it couldn't be created, then the task would have been aborted.
// So the following won't throw an exception.
final SmsClient smsClient = smsClientProvider.getOrCreateSmsClientForTask(taskDescription);
final SmsClient smsClient = smsClientProvider.getSmsClient(smsUrl);

final String teeChallengePublicKey = smsClient.generateTeeChallenge(chainTaskId);

if (teeChallengePublicKey == null || teeChallengePublicKey.isEmpty()) {
log.error("An error occurred while getting teeChallengePublicKey [chainTaskId:{}]", chainTaskId);
if (StringUtils.isEmpty(teeChallengePublicKey)) {
log.error("An error occurred while getting teeChallengePublicKey "
+ "[chainTaskId:{}, smsUrl:{}]", chainTaskId, smsUrl);
return Optional.empty();
}

return Optional.of(teeChallengePublicKey);
}

@Recover
Optional<String> generateEnclaveChallenge(FeignException e, String chainTaskId) {
Optional<String> generateEnclaveChallenge(FeignException e, String chainTaskId, String smsUrl) {
log.error("Failed to get enclaveChallenge from SMS even after retrying [chainTaskId:{}, attempts:3]", chainTaskId, e);
return Optional.empty();
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/iexec/core/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.iexec.common.chain.ChainUtils;
import com.iexec.common.dapp.DappType;
import com.iexec.common.tee.TeeUtils;

import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
Expand Down Expand Up @@ -92,6 +91,7 @@ public class Task {
private String chainCallbackData;
private List<TaskStatusChange> dateStatusList;
private String enclaveChallenge;
private String smsUrl;

public Task(String dappName, String commandLine, int trust) {
this.dappType = DappType.DOCKER;
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/com/iexec/core/task/update/TaskUpdateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,16 @@ void received2Initializing(Task task) {
return;
}

if (task.isTeeTask() && !smsService.isSmsClientReady(task.getChainDealId(), task.getChainTaskId())) {
log.error("Couldn't get SmsClient [chainTaskId: {}]", task.getChainTaskId());
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
updateTaskStatusAndSave(task, FAILED);
return;
if (task.isTeeTask()) {
Optional<String> smsUrl = smsService.getVerifiedSmsUrl(task.getChainTaskId(), task.getTag());
if(smsUrl.isEmpty()){
log.error("Couldn't get verified SMS url [chainTaskId: {}]", task.getChainTaskId());
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
updateTaskStatusAndSave(task, FAILED);
return;
}
task.setSmsUrl(smsUrl.get()); //SMS URL source of truth for the task
taskService.updateTask(task);
}

blockchainAdapterService
Expand All @@ -215,7 +220,7 @@ void received2Initializing(Task task) {
.ifPresentOrElse(chainTaskId -> {
log.info("Requested initialize on blockchain [chainTaskId:{}]",
task.getChainTaskId());
final Optional<String> enclaveChallenge = smsService.getEnclaveChallenge(chainTaskId, task.isTeeTask());
final Optional<String> enclaveChallenge = smsService.getEnclaveChallenge(chainTaskId, task.getSmsUrl());
if (enclaveChallenge.isEmpty()) {
log.error("Can't initialize task, enclave challenge is empty" +
" [chainTaskId:{}]", chainTaskId);
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ spring:
host: ${MONGO_HOST:localhost}
port: ${MONGO_PORT:13002}
auto-index-creation: true # Auto-index creation is disabled by default starting with Spring Data MongoDB 3.x.
config.import: "configserver:${IEXEC_PLATFORM_REGISTRY}" # configserver:http://platform-registry:8888
cloud.config:
profile: ${IEXEC_PLATFORM_REGISTRY_STACK:} # mainnet, bellecour3, 1234, ..
label: ${IEXEC_PLATFORM_REGISTRY_LABEL:} # main, develop, v10, 07998be
mongock:
runner-type: InitializingBean
change-logs-scan-package:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.iexec.core.registry;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = PlatformRegistryConfiguration.class)
@TestPropertySource(properties = {
"sms.scone=http://scone-sms",
"sms.gramine=http://gramine-sms"
})
class PlatformRegistryConfigurationTests {

@Autowired
PlatformRegistryConfiguration platformRegistryConfiguration;

@Test
void shouldGetValues() {
Assertions.assertThat(platformRegistryConfiguration.getSconeSms())
.isEqualTo("http://scone-sms");
Assertions.assertThat(platformRegistryConfiguration.getGramineSms())
.isEqualTo("http://gramine-sms");
}

}
Loading

0 comments on commit 32aece6

Please sign in to comment.