Skip to content

Commit

Permalink
Merge pull request #6551 from alvasw/fix_burningman_accounting_store_…
Browse files Browse the repository at this point in the history
…data_races

Fix BurningManAccountingStore data races
  • Loading branch information
alejandrogarcia83 authored Feb 4, 2023
2 parents 8dbdecd + 4779c82 commit e25c27e
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
Expand Down Expand Up @@ -113,7 +112,7 @@ public void start() {
CompletableFuture.runAsync(() -> {
Map<String, BalanceModel> map = new HashMap<>();
// addAccountingBlockToBalanceModel takes about 500ms for 100k items, so we run it in a non UI thread.
getBlocks().forEach(block -> addAccountingBlockToBalanceModel(map, block));
burningManAccountingStoreService.forEachBlock(block -> addAccountingBlockToBalanceModel(map, block));
UserThread.execute(() -> balanceModelByBurningManName.putAll(map));
});
}
Expand All @@ -125,7 +124,7 @@ public void start() {

public void onInitialBlockRequestsComplete() {
updateBalanceModelByAddress();
getBlocks().forEach(this::addAccountingBlockToBalanceModel);
burningManAccountingStoreService.forEachBlock(this::addAccountingBlockToBalanceModel);
}

public void onNewBlockReceived(AccountingBlock accountingBlock) {
Expand All @@ -134,37 +133,19 @@ public void onNewBlockReceived(AccountingBlock accountingBlock) {
}

public void addBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
if (!getBlocks().contains(block)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (block.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(block.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (block.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", block.getHeight(),
new Date(block.getDate()), block.getTxs().size());
burningManAccountingStoreService.addBlock(block);
} else {
log.info("We have that block already. Height: {}", block.getHeight());
}
burningManAccountingStoreService.addIfNewBlock(block);
}

public int getBlockHeightOfLastBlock() {
return getLastBlock().map(AccountingBlock::getHeight).orElse(BurningManAccountingService.EARLIEST_BLOCK_HEIGHT - 1);
}

public Optional<AccountingBlock> getLastBlock() {
return getBlocks().stream().max(Comparator.comparing(AccountingBlock::getHeight));
return burningManAccountingStoreService.getLastBlock();
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
return getBlocks().stream().filter(block -> block.getHeight() == height).findAny();
return burningManAccountingStoreService.getBlockAtHeight(height);
}

public Map<Date, Price> getAverageBsqPriceByMonth() {
Expand Down Expand Up @@ -213,8 +194,8 @@ private Stream<ReceivedBtcBalanceEntry> getReceivedBtcBalanceEntryStreamExcludin
// Delegates
///////////////////////////////////////////////////////////////////////////////////////////

public List<AccountingBlock> getBlocks() {
return burningManAccountingStoreService.getBlocks();
public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return burningManAccountingStoreService.getBlocksAtLeastWithHeight(minHeight);
}

public Map<String, String> getBurningManNameByAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -92,9 +91,7 @@ public interface Listener {

public void onGetBlocksRequest(GetAccountingBlocksRequest request, Connection connection) {
long ts = System.currentTimeMillis();
List<AccountingBlock> blocks = burningManAccountingService.getBlocks().stream()
.filter(block -> block.getHeight() >= request.getFromBlockHeight())
.collect(Collectors.toList());
List<AccountingBlock> blocks = burningManAccountingService.getBlocksAtLeastWithHeight(request.getFromBlockHeight());
byte[] signature = AccountingNode.getSignature(AccountingNode.getSha256Hash(blocks), bmOracleNodePrivKey);
GetAccountingBlocksResponse getBlocksResponse = new GetAccountingBlocksResponse(blocks, request.getNonce(), bmOracleNodePubKey, signature);
log.info("Received GetAccountingBlocksRequest from {} for blocks from height {}. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,135 @@


import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;

import bisq.common.proto.persistable.PersistableEnvelope;

import com.google.protobuf.Message;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static bisq.core.dao.burningman.accounting.BurningManAccountingService.EARLIEST_BLOCK_HEIGHT;

@Slf4j
@Getter
public class BurningManAccountingStore implements PersistableEnvelope {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LinkedList<AccountingBlock> blocks = new LinkedList<>();

public BurningManAccountingStore(List<AccountingBlock> blocks) {
this.blocks.addAll(blocks);
}

public void addIfNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
tryToAddNewBlock(newBlock);
} finally {
writeLock.unlock();
}
}

public void forEachBlock(Consumer<AccountingBlock> consumer) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
blocks.forEach(consumer);
} finally {
readLock.unlock();
}
}

public void purgeLastTenBlocks() {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
purgeLast10Blocks();
} finally {
writeLock.unlock();
}
}

public Optional<AccountingBlock> getLastBlock() {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.max(Comparator.comparing(AccountingBlock::getHeight));
} finally {
readLock.unlock();
}
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
Lock readLock = readWriteLock.readLock();
try {
return blocks.stream()
.filter(block -> block.getHeight() == height)
.findAny();
} finally {
readLock.unlock();
}
}

public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return blocks.stream()
.filter(block -> block.getHeight() >= minHeight)
.collect(Collectors.toList());
} finally {
readLock.unlock();
}
}

private void tryToAddNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException {
if (!blocks.contains(newBlock)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
if (newBlock.getHeight() != lastBlock.getHeight() + 1) {
throw new BlockHeightNotConnectingException();
}
if (!Arrays.equals(newBlock.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) {
throw new BlockHashNotConnectingException();
}
} else if (newBlock.getHeight() != EARLIEST_BLOCK_HEIGHT) {
throw new BlockHeightNotConnectingException();
}
log.info("Add new accountingBlock at height {} at {} with {} txs", newBlock.getHeight(),
new Date(newBlock.getDate()), newBlock.getTxs().size());
blocks.add(newBlock);
} else {
log.info("We have that block already. Height: {}", newBlock.getHeight());
}
}

private void purgeLast10Blocks() {
if (blocks.size() <= 10) {
blocks.clear();
return;
}

List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
}

public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setBurningManAccountingStore(protobuf.BurningManAccountingStore.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package bisq.core.dao.burningman.accounting.storage;

import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;
import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException;

import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
import bisq.network.p2p.storage.persistence.StoreService;
Expand All @@ -32,8 +34,9 @@
import java.io.File;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -60,29 +63,31 @@ public void requestPersistence() {
persistenceManager.requestPersistence();
}

public List<AccountingBlock> getBlocks() {
return Collections.unmodifiableList(store.getBlocks());
public void addIfNewBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
store.addIfNewBlock(block);
requestPersistence();
}

public void addBlock(AccountingBlock block) {
store.getBlocks().add(block);
requestPersistence();
public void forEachBlock(Consumer<AccountingBlock> consumer) {
store.forEachBlock(consumer);
}

public void purgeLastTenBlocks() {
List<AccountingBlock> blocks = store.getBlocks();
if (blocks.size() <= 10) {
blocks.clear();
requestPersistence();
return;
}

List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
store.purgeLastTenBlocks();
requestPersistence();
}

public Optional<AccountingBlock> getLastBlock() {
return store.getLastBlock();
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
return store.getBlockAtHeight(height);
}

public List<AccountingBlock> getBlocksAtLeastWithHeight(int minHeight) {
return store.getBlocksAtLeastWithHeight(minHeight);
}

///////////////////////////////////////////////////////////////////////////////////////////
// Protected
Expand Down

0 comments on commit e25c27e

Please sign in to comment.