Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reward calculation fix for preprod and preview network #394

Merged
merged 13 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aggregates/adapot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies {
exclude group: "org.apache.maven.plugins", module: "maven-gpg-plugin"
}

implementation(libs.vavr)

//Test dependencies
testCompileOnly 'org.projectlombok:lombok'
testImplementation(libs.mapstruct)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public EpochStakeStorageReader epochStakeStorage(EpochStakeRepository epochStake

@Bean
@ConditionalOnMissingBean
public AdaPotJobStorage adaPotJobStorage(AdaPotJobRepository rewardCalcJobRepository, AdaPotJobMapper rewardCalcJobMapper) {
return new AdaPotJobStorageImpl(rewardCalcJobRepository, rewardCalcJobMapper);
public AdaPotJobStorage adaPotJobStorage(AdaPotJobRepository adaPotJobRepository, AdaPotJobMapper rewardCalcJobMapper) {
return new AdaPotJobStorageImpl(adaPotJobRepository, rewardCalcJobMapper);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ public class AdaPotProperties {

@Builder.Default
private int updateRewardDbBatchSize = 200;

@Builder.Default
private boolean verifyAdapotCalcValues = true;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bloxbean.cardano.yaci.store.adapot.job;

import com.bloxbean.cardano.yaci.store.adapot.AdaPotProperties;
import com.bloxbean.cardano.yaci.store.adapot.job.domain.AdaPotJob;
import com.bloxbean.cardano.yaci.store.adapot.job.domain.AdaPotJobStatus;
import com.bloxbean.cardano.yaci.store.adapot.job.domain.AdaPotJobType;
Expand All @@ -10,14 +11,15 @@
import com.bloxbean.cardano.yaci.store.adapot.snapshot.StakeSnapshotService;
import com.bloxbean.cardano.yaci.store.adapot.snapshot.UtxoSnapshotService;
import com.bloxbean.cardano.yaci.store.common.config.StoreProperties;
import com.bloxbean.cardano.yaci.store.common.executor.ParallelExecutor;
import com.bloxbean.cardano.yaci.store.core.annotation.ReadOnly;
import com.bloxbean.cardano.yaci.store.core.service.EraService;
import com.bloxbean.cardano.yaci.store.transaction.storage.TransactionStorageReader;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.vavr.control.Either;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
Expand All @@ -42,30 +44,36 @@
public class AdaPotJobManager {
private final BlockingQueue<AdaPotJob> jobQueue = new LinkedBlockingQueue<>();
private StoreProperties storeProperties;
private AdaPotProperties adaPotProperties;
private AdaPotJobStorage adaPotJobStorage;
private EraService eraService;
private EpochRewardCalculationService epochRewardCalculationService;
private StakeSnapshotService stakeSnapshotService;
private DepositSnapshotService depositSnapshotService;
private AdaPotService adaPotService;
private TransactionStorageReader transactionStorageReader;

public AdaPotJobManager(StoreProperties storeProperties,
AdaPotProperties adaPotProperties,
AdaPotJobStorage adaPotJobStorage,
EraService eraService,
EpochRewardCalculationService epochRewardCalculationService,
StakeSnapshotService stakeSnapshotService,
DepositSnapshotService depositSnapshotService,
UtxoSnapshotService utxoSnapshotService,
AdaPotService adaPotService,
ParallelExecutor parallelExecutor) {
TransactionStorageReader transactionStorageReader) {
this.storeProperties = storeProperties;
this.adaPotProperties = adaPotProperties;
this.adaPotJobStorage = adaPotJobStorage;
this.eraService = eraService;
this.epochRewardCalculationService = epochRewardCalculationService;
this.stakeSnapshotService = stakeSnapshotService;
this.depositSnapshotService = depositSnapshotService;
this.adaPotService = adaPotService;
this.transactionStorageReader = transactionStorageReader;

//TODO -- Add some delay and then start loading jobs to handle rollback during restart of the application
// Reset jobs that were in 'STARTED' state to 'NOT_STARTED' and load pending jobs
resetStartedJobs();
loadPendingJobs();
Expand Down Expand Up @@ -93,7 +101,7 @@ private void loadPendingJobs() {
* and adding it to the job queue.
*
* @param epoch the epoch number for which the reward calculation job is to be triggered
* @param slot slot number
* @param slot slot number
*/
public void triggerRewardCalcJob(int epoch, long slot) {
AdaPotJob job = new AdaPotJob(epoch, slot, AdaPotJobType.REWARD_CALC, AdaPotJobStatus.NOT_STARTED, 0L, 0L, 0L, 0L, null);
Expand Down Expand Up @@ -136,68 +144,90 @@ private boolean processJob(AdaPotJob job) throws InterruptedException {

int retryCount = 0;

while (true) {
// Reset times
job.setTotalTime(0L);
job.setRewardCalcTime(0L);
job.setUpdateRewardTime(0L);

var start = Instant.now();
var deposits = depositSnapshotService.getNetStakeDepositInEpoch(job.getEpoch() - 1);
adaPotService.updateAdaPotDeposit(job.getEpoch(), deposits);
var end = Instant.now();
log.info("Deposit snapshot time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());

start = Instant.now();
boolean success = calculateRewards(job);
end = Instant.now();
job.setTotalTime(end.toEpochMilli() - start.toEpochMilli());
log.info("Reward calculation time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());

/**
//Take UTXO snapshot
var utxoSnapshotFuture = CompletableFuture.supplyAsync(() -> {
var start = Instant.now();
var utxo = utxoSnapshotService.getTotalUtxosInEpoch(job.getEpoch(), job.getSlot());
adaPotService.updateEpochUtxo(job.getEpoch(), utxo);
var end = Instant.now();
log.info("UTXO snapshot time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());
return true;
}, parallelExecutor.getVirtualThreadExecutor());
**/


if (success) {
job.setStatus(AdaPotJobStatus.COMPLETED);
job.setErrorMessage(null);
adaPotJobStorage.save(job);
return true;
} else {
job.setErrorMessage("Reward calculation failed");
adaPotJobStorage.save(job);
//TODO -- Retry logic
log.error("Reward calculation failed for epoch " + job.getEpoch() + ", retrying...");
retryCount++;

if (retryCount > 3) {
log.error("Reward calculation failed for epoch " + job.getEpoch() + ", retry count exceeded. Marking as failed");
job.setErrorMessage("Reward calculation failed. Retry count exceeded");
try {
while (true) {
// Reset times
job.setTotalTime(0L);
job.setRewardCalcTime(0L);
job.setUpdateRewardTime(0L);

var start = Instant.now();
//create AdaPot entry for the epoch
adaPotService.createAdaPot(job.getEpoch(), job.getSlot());

//Update Fee pot
var totalFeeInEpoch = transactionStorageReader.getTotalFee(job.getEpoch() - 1); //Prev epoch
if (totalFeeInEpoch == null) totalFeeInEpoch = BigInteger.ZERO;
log.info("Total fee in epoch {} : {}", job.getEpoch() - 1, totalFeeInEpoch);
//Update total fee in the epoch
adaPotService.updateEpochFee(job.getEpoch(), totalFeeInEpoch);
var end = Instant.now();
log.info("Fee snapshot time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());

//Update deposit stake pot
start = Instant.now();
var deposits = depositSnapshotService.getNetStakeDepositInEpoch(job.getEpoch() - 1);
adaPotService.updateAdaPotDeposit(job.getEpoch(), deposits);
end = Instant.now();
log.info("Deposit snapshot time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());

//Calculate rewards
start = Instant.now();
Either<String, Boolean> result = calculateRewards(job);
end = Instant.now();
job.setTotalTime(end.toEpochMilli() - start.toEpochMilli());
log.info("Reward calculation time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());

/**
//Take UTXO snapshot
var utxoSnapshotFuture = CompletableFuture.supplyAsync(() -> {
var start = Instant.now();
var utxo = utxoSnapshotService.getTotalUtxosInEpoch(job.getEpoch(), job.getSlot());
adaPotService.updateEpochUtxo(job.getEpoch(), utxo);
var end = Instant.now();
log.info("UTXO snapshot time in millis : {}, epoch: {}", end.toEpochMilli() - start.toEpochMilli(), job.getEpoch());
return true;
}, parallelExecutor.getVirtualThreadExecutor());
**/


if (result.isRight()) {
job.setStatus(AdaPotJobStatus.COMPLETED);
job.setErrorMessage(null);
adaPotJobStorage.save(job);
return false;
}
return true;
} else {
job.setErrorMessage("Reward calculation failed : " + result.getLeft());
adaPotJobStorage.save(job);
//TODO -- Retry logic
log.error("Reward calculation failed for epoch " + job.getEpoch() + ", retrying...");
retryCount++;

if (retryCount > 3) {
log.error("Reward calculation failed for epoch " + job.getEpoch() + ", retry count exceeded. Marking as failed");
job.setErrorMessage("Reward calculation failed. Retry count exceeded : " + result.getLeft());
adaPotJobStorage.save(job);
return false;
}

Thread.sleep(5000);
Thread.sleep(5000);
}
}
} catch (Exception e) {
log.error("Adapot job processing failed", e);
job.setErrorMessage("Reward calculation failed due to unknown exception : " + e.getMessage());
adaPotJobStorage.save(job);
return false;
}
}

private boolean calculateRewards(AdaPotJob job) {
private Either<String, Boolean> calculateRewards(AdaPotJob job) {
try {
long nonByronEpoch = eraService.getFirstNonByronEpoch().orElse(0);

if (job.getEpoch() < nonByronEpoch) {
log.info("Epoch : {} is Byron era. Skipping reward calculation", job.getEpoch());
return true;
return Either.right(true);
}

//Calculate epoch rewards
Expand All @@ -207,10 +237,14 @@ private boolean calculateRewards(AdaPotJob job) {
var end = Instant.now();
job.setRewardCalcTime(end.toEpochMilli() - start.toEpochMilli());

if (storeProperties.isMainnet()) {
if (adaPotProperties.isVerifyAdapotCalcValues() &&
(storeProperties.isMainnet()
|| storeProperties.getProtocolMagic() == 1
|| storeProperties.getProtocolMagic() == 2)
) { //mainnet or preprod or preview
//TODO -- Verify treasury and rewards value
try {
var expectedPots = loadExpectedAdaPotValues();
var expectedPots = loadExpectedAdaPotValues(storeProperties.getProtocolMagic());
var expectedPot = expectedPots.get(epoch);

if (expectedPot != null) {
Expand Down Expand Up @@ -238,10 +272,10 @@ private boolean calculateRewards(AdaPotJob job) {
end = Instant.now();
job.setStakeSnapshotTime(end.toEpochMilli() - start.toEpochMilli());

return true;
return Either.right(true);
} catch (Exception e) {
log.error("Error calculating rewards for epoch : " + job.getEpoch(), e);
return false;
return Either.left(e.getMessage());
}
}

Expand All @@ -253,8 +287,14 @@ public List<AdaPotJob> getCompletedJobs() {
return adaPotJobStorage.getJobsByTypeAndStatus(AdaPotJobType.REWARD_CALC, AdaPotJobStatus.COMPLETED);
}

private Map<Integer, ExpectedAdaPot> loadExpectedAdaPotValues() throws IOException {
private Map<Integer, ExpectedAdaPot> loadExpectedAdaPotValues(long protocolMagic) throws IOException {
String file = "dbsync_ada_pots.json";
if (protocolMagic == 1) { //preprod
file = "dbsync_ada_pots_preprod.json";
} else if (protocolMagic == 2) { //preview
file = "dbsync_ada_pots_preview.json";
}

ObjectMapper objectMapper = new ObjectMapper();
List<ExpectedAdaPot> pots = objectMapper.readValue(this.getClass().getClassLoader().getResourceAsStream(file), new TypeReference<List<ExpectedAdaPot>>() {
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import com.bloxbean.cardano.yaci.core.model.Era;
import com.bloxbean.cardano.yaci.store.adapot.job.AdaPotJobManager;
import com.bloxbean.cardano.yaci.store.adapot.service.AdaPotService;
import com.bloxbean.cardano.yaci.store.core.annotation.ReadOnly;
import com.bloxbean.cardano.yaci.store.core.service.EraService;
import com.bloxbean.cardano.yaci.store.events.internal.EpochTransitionCommitEvent;
import com.bloxbean.cardano.yaci.store.transaction.storage.TransactionStorageReader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
Expand All @@ -19,32 +17,22 @@
@Slf4j
public class AdaPotProcessor {
private final EraService eraService;
private final AdaPotService adaPotService;
private final TransactionStorageReader transactionStorageReader;
private final AdaPotJobManager adaPotJobManager;

@EventListener
@Transactional
public void processAdaPotDuringEpochTransition(EpochTransitionCommitEvent epochTransitionCommitEvent) {

//TODO -- Handle null previous epoch due to restart
if (epochTransitionCommitEvent.getPreviousEpoch() == null) {
//For custom network, epoch 0 can be directly at era > shelley. so no previous epoch and we should
//consider epoch 0 as well
if (epochTransitionCommitEvent.getPreviousEpoch() == null && epochTransitionCommitEvent.getEpoch() > 0) {
return;
}

if (epochTransitionCommitEvent.getEra() == Era.Byron)
return;

//Create AdaPot for the epoch
adaPotService.createAdaPot(epochTransitionCommitEvent.getMetadata());

//Update Fee pot
var totalFeeInEpoch = transactionStorageReader.getTotalFee(epochTransitionCommitEvent.getMetadata().getEpochNumber() - 1); //Prev epoch
log.info("Total fee in epoch {} : {}", epochTransitionCommitEvent.getEpoch() - 1, totalFeeInEpoch);

//Update total fee in the epoch
adaPotService.updateEpochFee(epochTransitionCommitEvent.getMetadata().getEpochNumber(), totalFeeInEpoch);

triggerEpochTransitionJobs(epochTransitionCommitEvent);
}

Expand Down
Loading
Loading