Skip to content

Commit

Permalink
Merge pull request #551 from iExecBlockchainComputing/hotfix/7.3.1
Browse files Browse the repository at this point in the history
Hotfix/7.3.1
  • Loading branch information
jbern0rd authored Feb 17, 2023
2 parents cb0a55c + a2089ac commit 557c7bf
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 191 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## [[7.3.1]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v7.3.1) 2023-02-17

* Subscribe only to deal events targeting a specific workerpool.

## [[7.3.0]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v7.3.0) 2022-12-18

* Add endpoint to allow health checks.
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ You can configure the _iExec Core Scheduler_ with the following properties:
| `IEXEC_LOGS_PURGE_RATE_IN_DAYS` | Interval in days between 2 executions of the purge mechanism. | Positive integer | `1` |
| `IEXEC_LOGS_AVAILABILITY_PERIOD_IN_DAYS` | Number of days to keep logs of past tasks. | Positive integer | `3` |

If it is not the first startup of the _iExec Core Scheduler_ and if it received deals previously,
the _MongoDB_ instance will contain a __configuration Collection__ in the __iexec Database__.
The value stored in this document takes the precedence over the `IEXEC_START_BLOCK_NUMBER` configuration parameter.
To enforce deal observation starting from the `IEXEC_START_BLOCK_NUMBER` value, the aforementioned document has to be deleted in the _MongoDB_.
All deals prior to the `IEXEC_START_BLOCK_NUMBER` will then be ignored.

A more exhaustive documentation is available on [the official documentation of iExec](https://docs.iex.ec/).

## Health checks
Expand Down
8 changes: 0 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ dependencies {
testImplementation 'org.testcontainers:testcontainers:1.16.2'
testImplementation 'org.testcontainers:junit-jupiter:1.16.2'
testImplementation 'org.testcontainers:mongodb:1.16.2'

// slf4j-test
testImplementation 'uk.org.lidalia:slf4j-test:1.2.0'
}

jar {
Expand All @@ -126,11 +123,6 @@ springBoot {

test {
useJUnitPlatform()

// To avoid warnings on multiple SLF4J bindings,
// let's remove the Logback implementation
// and keep only SLF4j Test.
classpath = classpath.filter {!it.name.contains('logback-classic')}
}

task itest {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=7.3.0
version=7.3.1
iexecCommonVersion=6.2.0
iexecBlockchainAdapterVersion=7.3.0
iexecResultVersion=7.3.0
Expand Down
18 changes: 4 additions & 14 deletions src/main/java/com/iexec/core/chain/DealEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,20 @@

import com.iexec.common.contract.generated.IexecHubContract;
import com.iexec.common.utils.BytesUtils;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.math.BigInteger;

@Data
@Getter
@NoArgsConstructor
@Builder
public class DealEvent {

private String chainDealId;
private BigInteger blockNumber;

public DealEvent(String chainDealId, BigInteger blockNumber) {
this.chainDealId = chainDealId;
this.blockNumber = blockNumber;
}
private final String chainDealId;
private final BigInteger blockNumber;

public DealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNoticeEventResponse) {
this.chainDealId = BytesUtils.bytesToString(schedulerNoticeEventResponse.dealid);
this.blockNumber = schedulerNoticeEventResponse.log.getBlockNumber();
this.blockNumber = schedulerNoticeEventResponse.log.getBlockNumber() != null ?
schedulerNoticeEventResponse.log.getBlockNumber() : BigInteger.ZERO;
}

}
78 changes: 66 additions & 12 deletions src/main/java/com/iexec/core/chain/DealWatcherService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,60 @@
package com.iexec.core.chain;

import com.iexec.common.chain.ChainDeal;
import com.iexec.common.contract.generated.IexecHubContract;
import com.iexec.common.utils.BytesUtils;
import com.iexec.core.configuration.ConfigurationService;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.event.TaskCreatedEvent;
import io.reactivex.disposables.Disposable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.web3j.abi.EventEncoder;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.DefaultBlockParameterName;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.utils.Numeric;

import java.math.BigInteger;
import java.util.Optional;

import static com.iexec.common.contract.generated.IexecHubContract.SCHEDULERNOTICE_EVENT;

@Slf4j
@Service
public class DealWatcherService {

private final ChainConfig chainConfig;
private final IexecHubService iexecHubService;
private final ConfigurationService configurationService;
private final ApplicationEventPublisher applicationEventPublisher;
private final TaskService taskService;
private final Web3jService web3jService;
// internal variables
private Disposable dealEventSubscriptionReplay;
@Getter
private BigInteger latestBlockNumberWithDeal = BigInteger.ZERO;
@Getter
private long dealEventsCount = 0;
@Getter
private long dealsCount = 0;
@Getter
private long replayDealsCount = 0;

@Autowired
public DealWatcherService(IexecHubService iexecHubService,
public DealWatcherService(ChainConfig chainConfig,
IexecHubService iexecHubService,
ConfigurationService configurationService,
ApplicationEventPublisher applicationEventPublisher,
TaskService taskService,
Web3jService web3jService) {
this.chainConfig = chainConfig;
this.iexecHubService = iexecHubService;
this.configurationService = configurationService;
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -80,8 +98,10 @@ public void run() {
*/
Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) {
log.info("Watcher DealEvent started [from:{}, to:{}]", from, "latest");
return iexecHubService.getDealEventObservableToLatest(from)
.subscribe(dealEvent -> dealEvent.ifPresent(this::onDealEvent));
EthFilter filter = createDealEventFilter(from, null);
return iexecHubService.getDealEventObservable(filter)
.map(this::schedulerNoticeToDealEvent)
.subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "start")));
}

/**
Expand All @@ -90,16 +110,20 @@ Disposable subscribeToDealEventFromOneBlockToLatest(BigInteger from) {
*
* @param dealEvent
*/
private void onDealEvent(DealEvent dealEvent) {
private void onDealEvent(DealEvent dealEvent, String watcher) {
if ("replay".equals(watcher)) {
replayDealsCount++;
} else {
dealsCount++;
}
String dealId = dealEvent.getChainDealId();
BigInteger dealBlock = dealEvent.getBlockNumber();
log.info("Received deal [dealId:{}, block:{}]", dealId,
dealBlock);
if (dealBlock == null || dealBlock.equals(BigInteger.ZERO)){
log.info("Received deal [dealId:{}, block:{}, watcher: {}]",
dealId, dealBlock, watcher);
if (dealBlock.equals(BigInteger.ZERO)) {
log.warn("Deal block number is empty, fetching later blockchain " +
"events will be more expensive [chainDealId:{}, dealBlock:{}, " +
"lastBlock:{}]", dealId, dealBlock, web3jService.getLatestBlockNumber());
dealEvent.setBlockNumber(BigInteger.ZERO);
}
this.handleDeal(dealEvent);
if (configurationService.getLastSeenBlockWithDeal().compareTo(dealBlock) < 0) {
Expand Down Expand Up @@ -175,7 +199,37 @@ void replayDealEvent() {
private Disposable subscribeToDealEventInRange(BigInteger from, BigInteger to) {
log.info("Replay Watcher DealEvent started [from:{}, to:{}]",
from, (to == null) ? "latest" : to);
return iexecHubService.getDealEventObservable(from, to)
.subscribe(dealEvent -> dealEvent.ifPresent(this::onDealEvent));
EthFilter filter = createDealEventFilter(from, to);
return iexecHubService.getDealEventObservable(filter)
.map(this::schedulerNoticeToDealEvent)
.subscribe(dealEvent -> dealEvent.ifPresent(event -> onDealEvent(event, "replay")));
}

EthFilter createDealEventFilter(BigInteger from, BigInteger to) {
final DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(from);
final DefaultBlockParameter toBlock = to == null
? DefaultBlockParameterName.LATEST
: DefaultBlockParameter.valueOf(to);
final EthFilter filter = new EthFilter(fromBlock, toBlock, chainConfig.getHubAddress());
final BigInteger poolAddressBigInt = Numeric.toBigInt(chainConfig.getPoolAddress());
filter.addSingleTopic(EventEncoder.encode(SCHEDULERNOTICE_EVENT));
filter.addSingleTopic(Numeric.toHexStringWithPrefixZeroPadded(poolAddressBigInt, 64));
return filter;
}

Optional<DealEvent> schedulerNoticeToDealEvent(IexecHubContract.SchedulerNoticeEventResponse schedulerNotice) {
dealEventsCount++;
BigInteger noticeBlockNumber = schedulerNotice.log.getBlockNumber();
if (latestBlockNumberWithDeal.compareTo(noticeBlockNumber) < 0) {
latestBlockNumberWithDeal = noticeBlockNumber;
}
log.info("Received new deal [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]",
schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount);
if (schedulerNotice.workerpool.equalsIgnoreCase(chainConfig.getPoolAddress())) {
return Optional.of(new DealEvent(schedulerNotice));
}
log.warn("This deal event should not have been received [blockNumber:{}, chainDealId:{}, dealEventsCount:{}]",
schedulerNotice.log.getBlockNumber(), BytesUtils.bytesToString(schedulerNotice.dealid), dealEventsCount);
return Optional.empty();
}
}
25 changes: 3 additions & 22 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.DefaultBlockParameterName;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.BaseEventResponse;
import org.web3j.protocol.core.methods.response.TransactionReceipt;

Expand All @@ -52,7 +51,6 @@ public class IexecHubService extends IexecHubAbstractService {
private final ThreadPoolExecutor executor;
private final CredentialsService credentialsService;
private final Web3jService web3jService;
private final String poolAddress;

@Autowired
public IexecHubService(CredentialsService credentialsService,
Expand All @@ -65,7 +63,6 @@ public IexecHubService(CredentialsService credentialsService,
this.credentialsService = credentialsService;
this.web3jService = web3jService;
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
this.poolAddress = chainConfig.getPoolAddress();
if (!hasEnoughGas()) {
System.exit(0);
}
Expand Down Expand Up @@ -366,24 +363,8 @@ private long getWaitingTransactionCount() {
return executor.getTaskCount() - executor.getCompletedTaskCount();
}


Flowable<Optional<DealEvent>> getDealEventObservableToLatest(BigInteger from) {
return getDealEventObservable(from, null);
}

Flowable<Optional<DealEvent>> getDealEventObservable(BigInteger from, BigInteger to) {
DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(from);
DefaultBlockParameter toBlock = DefaultBlockParameterName.LATEST;
if (to != null) {
toBlock = DefaultBlockParameter.valueOf(to);
}
return getHubContract().schedulerNoticeEventFlowable(fromBlock, toBlock).map(schedulerNotice -> {

if (schedulerNotice.workerpool.equalsIgnoreCase(poolAddress)) {
return Optional.of(new DealEvent(schedulerNotice));
}
return Optional.empty();
});
Flowable<IexecHubContract.SchedulerNoticeEventResponse> getDealEventObservable(EthFilter filter) {
return getHubContract().schedulerNoticeEventFlowable(filter);
}

public boolean hasEnoughGas() {
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/com/iexec/core/metric/MetricController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,22 @@

package com.iexec.core.metric;


import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import static org.springframework.http.ResponseEntity.ok;

@RestController
public class MetricController {


private MetricService metricService;
private final MetricService metricService;

public MetricController(MetricService metricService) {
this.metricService = metricService;
}


@RequestMapping(method = RequestMethod.GET, path = "/metrics")
@GetMapping("/metrics")
public ResponseEntity<PlatformMetric> getPlatformMetric() {
return ok(metricService.getPlatformMetrics());
}
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/iexec/core/metric/MetricService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.iexec.core.metric;

import com.iexec.core.chain.DealWatcherService;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import com.iexec.core.worker.WorkerService;
Expand All @@ -24,12 +25,14 @@
@Service
public class MetricService {


private final DealWatcherService dealWatcherService;
private final WorkerService workerService;
private final TaskService taskService;

public MetricService(WorkerService workerService,
public MetricService(DealWatcherService dealWatcherService,
WorkerService workerService,
TaskService taskService) {
this.dealWatcherService = dealWatcherService;
this.workerService = workerService;
this.taskService = taskService;
}
Expand All @@ -42,6 +45,10 @@ public PlatformMetric getPlatformMetrics() {
.aliveTotalGpu(workerService.getAliveTotalGpu())
.aliveAvailableGpu(workerService.getAliveAvailableGpu())
.completedTasks(taskService.findByCurrentStatus(TaskStatus.COMPLETED).size())
.dealEventsCount(dealWatcherService.getDealEventsCount())
.dealsCount(dealWatcherService.getDealsCount())
.replayDealsCount(dealWatcherService.getReplayDealsCount())
.latestBlockNumberWithDeal(dealWatcherService.getLatestBlockNumberWithDeal())
.build();
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/iexec/core/metric/PlatformMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import lombok.Builder;
import lombok.Data;

import java.math.BigInteger;

@Data
@AllArgsConstructor
@Builder
Expand All @@ -31,5 +33,9 @@ public class PlatformMetric {
private int aliveTotalGpu;
private int aliveAvailableGpu;
private int completedTasks;
private long dealEventsCount;
private long dealsCount;
private long replayDealsCount;
private BigInteger latestBlockNumberWithDeal;

}
Loading

0 comments on commit 557c7bf

Please sign in to comment.