Skip to content

Commit

Permalink
fix: rollback issue fix after event window commitment finishes. (#463)
Browse files Browse the repository at this point in the history
* fix: rollback issue fix after event window commitment finishes.

* chore: log statement fix

---------

Co-authored-by: Mateusz Czeladka <mateusz.czeladka@cardanofoundation.org>
  • Loading branch information
matiwinnetou and Mateusz Czeladka authored Oct 19, 2023
1 parent 56ac250 commit 36c6741
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,36 @@ public Either<Problem, List<EventSummary>> findAllCommitmentWindowOpenEvents() {
var allEventSummaries = Optional.ofNullable(restTemplate.getForObject(url, EventSummary[].class))
.map(Arrays::asList).orElse(List.of());

var allCommitmentsOpenWindowEvents = allEventSummaries.stream()
var events = allEventSummaries.stream()
.filter(EventSummary::commitmentsWindowOpen)
.toList();

return Either.right(allCommitmentsOpenWindowEvents);
return Either.right(events);
} catch (HttpClientErrorException e) {
if (e.getStatusCode() == NOT_FOUND) {
return Either.right(List.of());
}

return Either.left(Problem.builder()
.withTitle("REFERENCE_ERROR")
.withDetail("Unable to get event details from ledger follower service, reason:" + e.getMessage())
.withStatus(new HttpStatusAdapter(e.getStatusCode()))
.build());
}
}

public Either<Problem, List<EventSummary>> findAllEndedEventsWithoutOpenCommitmentWindow() {
var url = String.format("%s/api/reference/event", ledgerFollowerBaseUrl);

try {
var allEventSummaries = Optional.ofNullable(restTemplate.getForObject(url, EventSummary[].class))
.map(Arrays::asList).orElse(List.of());

var events = allEventSummaries.stream()
.filter(eventSummary -> !eventSummary.commitmentsWindowOpen && eventSummary.finished)
.toList();

return Either.right(events);
} catch (HttpClientErrorException e) {
if (e.getStatusCode() == NOT_FOUND) {
return Either.right(List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

import java.util.Optional;
import java.util.List;

@Repository
public interface VoteMerkleProofRepository extends JpaRepository<VoteMerkleProof, String> {

@Query("UPDATE VoteMerkleProof vmp SET vmp.invalidated = true where vmp.absoluteSlot > :slot")
@Modifying
void invalidateMerkleProofsAfterSlot(@Param("slot") long slot);
@Query("UPDATE VoteMerkleProof vmp SET vmp.invalidated = true WHERE vmp.eventId = :eventId AND vmp.absoluteSlot > :slot")
int invalidateMerkleProofsAfterSlot(@Param("eventId") String eventId, @Param("slot") long slot);

List<VoteMerkleProof> findTop1ByEventIdAndInvalidatedOrderByCreatedAtDesc(String eventId, boolean invalidated);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import static com.bloxbean.cardano.client.util.HexUtil.encodeHexString;
import static org.cardano.foundation.voting.domain.VoteSerialisations.VOTE_SERIALISER;
Expand Down Expand Up @@ -67,19 +68,45 @@ public void processVotesForAllEvents() {
}

private List<L1MerkleCommitment> getValidL1MerkleCommitments() {
var eventSummariesE = chainFollowerClient.findAllCommitmentWindowOpenEvents();
if (eventSummariesE.isEmpty()) {
var issue = eventSummariesE.swap().get();
var allCommitmentWindowOpenEventsE = chainFollowerClient.findAllCommitmentWindowOpenEvents();

log.error("Failed to get eventSummaries issue:{}, will try again in some time...", issue.toString());

if (allCommitmentWindowOpenEventsE.isEmpty()) {
var issue = allCommitmentWindowOpenEventsE.swap().get();

log.error("Failed to get open window eventSummaries issue:{}, will try again in some time...", issue.toString());

return List.of();
}

var eventsToProcess1 = allCommitmentWindowOpenEventsE.get();

log.info("Found events with active commitments window: {}", eventsToProcess1.stream()
.map(ChainFollowerClient.EventSummary::id)
.toList());

var allFinishedEventsWithClosedCommitmentWindowE = chainFollowerClient.findAllEndedEventsWithoutOpenCommitmentWindow();

if (allFinishedEventsWithClosedCommitmentWindowE.isEmpty()) {
var issue = allFinishedEventsWithClosedCommitmentWindowE.swap().get();

log.error("Failed to get finished and with close window eventSummaries issue :{}, will try again in some time...", issue.toString());

return List.of();
}
var eventSummaries = eventSummariesE.get();

log.info("Found events with active commitments window: {}", eventSummaries.stream().map(ChainFollowerClient.EventSummary::id).toList());
var allFinishedEventsWithClosedCommitmentWindow = allFinishedEventsWithClosedCommitmentWindowE.get();

List<ChainFollowerClient.EventSummary> eventsToProcess2 = allFinishedEventsWithClosedCommitmentWindow.stream()
.filter(eventSummary -> !voteMerkleProofService.findTop1InvalidatedByEventId(eventSummary.id()).isEmpty())
.toList();

var allEventsToProcess = Stream.concat(
eventsToProcess1.stream(),
eventsToProcess2.stream())
.toList();

return eventSummaries.stream()
return allEventsToProcess.stream()
.map(event -> {
// TODO caching or paging or both or neither? Maybe we use Redis???
log.info("Loading signedVotes from db for event:{}", event.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Service
@Slf4j
@RequiredArgsConstructor
Expand All @@ -23,10 +25,18 @@ public VoteMerkleProof store(VoteMerkleProof voteMerkleProof) {

@Transactional
@Timed(value = "service.merkle.softDeleteAllProofsAfterSlot", histogram = true)
public void softDeleteAllProofsAfterSlot(long slot) {
log.info("Soft deleting all proofs after slot:{}", slot);
public int softDeleteAllProofsAfterSlot(String eventId, long slot) {
log.info("Soft deleting all proofs for eventId: {}, after slot:{}", eventId, slot);

return voteMerkleProofRepository.invalidateMerkleProofsAfterSlot(eventId, slot);
}

@Transactional
@Timed(value = "service.merkle.findTop1InvalidatedByEvent", histogram = true)
public List<VoteMerkleProof> findTop1InvalidatedByEventId(String eventId) {
log.info("Finding top 1 invalidated proof for eventId:{}", eventId);

voteMerkleProofRepository.invalidateMerkleProofsAfterSlot(slot);
return voteMerkleProofRepository.findTop1ByEventIdAndInvalidatedOrderByCreatedAtDesc(eventId, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,27 @@
import com.bloxbean.cardano.yaci.helper.BlockSync;
import com.bloxbean.cardano.yaci.helper.listener.BlockChainDataListener;
import com.bloxbean.cardano.yaci.helper.model.Transaction;
import io.vavr.control.Either;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.cardano.foundation.voting.client.ChainFollowerClient;
import org.cardano.foundation.voting.domain.CardanoNetwork;
import org.cardano.foundation.voting.domain.WellKnownPointWithProtocolMagic;
import org.cardano.foundation.voting.service.merkle_tree.VoteCommitmentService;
import org.cardano.foundation.voting.service.merkle_tree.VoteMerkleProofService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.zalando.problem.Problem;

import java.util.List;
import java.util.Optional;

@Component
@Slf4j
@ConditionalOnProperty(prefix = "rollback.handling", value = "enabled", havingValue = "true")
public class RollbackHandler {

@Value("${cardano.node.ip}")
Expand All @@ -32,9 +38,15 @@ public class RollbackHandler {
@Autowired
private CardanoNetwork cardanoNetwork;

@Autowired
private ChainFollowerClient chainFollowerClient;

@Autowired
private VoteMerkleProofService voteMerkleProofService;

@Autowired
private VoteCommitmentService voteCommitmentService;

@Autowired
private WellKnownPointWithProtocolMagic wellKnownPointWithProtocolMagic;

Expand Down Expand Up @@ -70,9 +82,35 @@ public void onBlock(Era era, Block block, List<Transaction> transactions) {

@Override
public void onRollback(Point point) {
var slot = point.getSlot();
Either<Problem, List<ChainFollowerClient.EventSummary>> allCommitmentWindowOpenEventsE = chainFollowerClient.findAllCommitmentWindowOpenEvents();

if (allCommitmentWindowOpenEventsE.isEmpty()) {
var issue = allCommitmentWindowOpenEventsE.swap().get();
log.warn("Failed to get eventSummaries issue: {}, will try again in some time (on next rollback)...", issue.toString());

return;
}

List<ChainFollowerClient.EventSummary> allCommitmentWindowOpenEvents = allCommitmentWindowOpenEventsE.get();

if (allCommitmentWindowOpenEvents.isEmpty()) {
log.info("No commitment window open events found. Skipping rollback handler...");

return;
}

long absoluteSlot = point.getSlot();

for (ChainFollowerClient.EventSummary eventSummary : allCommitmentWindowOpenEvents) {
String eventId = eventSummary.id();

log.info("Processing rollback for eventId: {}, absoluteSlot: {}", eventId, absoluteSlot);

int updatedVoteProofs = voteMerkleProofService.softDeleteAllProofsAfterSlot(eventId, absoluteSlot);

log.info("Soft deleted {} vote proofs after slot: {} for eventId: {}", updatedVoteProofs, absoluteSlot, eventId);
}

voteMerkleProofService.softDeleteAllProofsAfterSlot(slot);
}

});
Expand Down

0 comments on commit 36c6741

Please sign in to comment.