diff --git a/core/src/main/java/bisq/core/dao/burningman/accounting/BurningManAccountingService.java b/core/src/main/java/bisq/core/dao/burningman/accounting/BurningManAccountingService.java index 64331746bf4..7748aae4141 100644 --- a/core/src/main/java/bisq/core/dao/burningman/accounting/BurningManAccountingService.java +++ b/core/src/main/java/bisq/core/dao/burningman/accounting/BurningManAccountingService.java @@ -45,7 +45,6 @@ import java.util.Arrays; import java.util.Calendar; -import java.util.Comparator; import java.util.Date; import java.util.GregorianCalendar; import java.util.HashMap; @@ -113,7 +112,7 @@ public void start() { CompletableFuture.runAsync(() -> { Map map = new HashMap<>(); // addAccountingBlockToBalanceModel takes about 500ms for 100k items, so we run it in a non UI thread. - getBlocks().forEach(block -> addAccountingBlockToBalanceModel(map, block)); + burningManAccountingStoreService.forEachBlock(block -> addAccountingBlockToBalanceModel(map, block)); UserThread.execute(() -> balanceModelByBurningManName.putAll(map)); }); } @@ -125,7 +124,7 @@ public void start() { public void onInitialBlockRequestsComplete() { updateBalanceModelByAddress(); - getBlocks().forEach(this::addAccountingBlockToBalanceModel); + burningManAccountingStoreService.forEachBlock(this::addAccountingBlockToBalanceModel); } public void onNewBlockReceived(AccountingBlock accountingBlock) { @@ -134,25 +133,7 @@ public void onNewBlockReceived(AccountingBlock accountingBlock) { } public void addBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException { - if (!getBlocks().contains(block)) { - Optional optionalLastBlock = getLastBlock(); - if (optionalLastBlock.isPresent()) { - AccountingBlock lastBlock = optionalLastBlock.get(); - if (block.getHeight() != lastBlock.getHeight() + 1) { - throw new BlockHeightNotConnectingException(); - } - if (!Arrays.equals(block.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) { - throw new BlockHashNotConnectingException(); - } - } else if (block.getHeight() != EARLIEST_BLOCK_HEIGHT) { - throw new BlockHeightNotConnectingException(); - } - log.info("Add new accountingBlock at height {} at {} with {} txs", block.getHeight(), - new Date(block.getDate()), block.getTxs().size()); - burningManAccountingStoreService.addBlock(block); - } else { - log.info("We have that block already. Height: {}", block.getHeight()); - } + burningManAccountingStoreService.addIfNewBlock(block); } public int getBlockHeightOfLastBlock() { @@ -160,11 +141,11 @@ public int getBlockHeightOfLastBlock() { } public Optional getLastBlock() { - return getBlocks().stream().max(Comparator.comparing(AccountingBlock::getHeight)); + return burningManAccountingStoreService.getLastBlock(); } public Optional getBlockAtHeight(int height) { - return getBlocks().stream().filter(block -> block.getHeight() == height).findAny(); + return burningManAccountingStoreService.getBlockAtHeight(height); } public Map getAverageBsqPriceByMonth() { @@ -213,8 +194,8 @@ private Stream getReceivedBtcBalanceEntryStreamExcludin // Delegates /////////////////////////////////////////////////////////////////////////////////////////// - public List getBlocks() { - return burningManAccountingStoreService.getBlocks(); + public List getBlocksAtLeastWithHeight(int minHeight) { + return burningManAccountingStoreService.getBlocksAtLeastWithHeight(minHeight); } public Map getBurningManNameByAddress() { diff --git a/core/src/main/java/bisq/core/dao/burningman/accounting/node/full/network/GetAccountingBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/burningman/accounting/node/full/network/GetAccountingBlocksRequestHandler.java index 6e1148a4b99..b581e4533dd 100644 --- a/core/src/main/java/bisq/core/dao/burningman/accounting/node/full/network/GetAccountingBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/burningman/accounting/node/full/network/GetAccountingBlocksRequestHandler.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -92,9 +91,7 @@ public interface Listener { public void onGetBlocksRequest(GetAccountingBlocksRequest request, Connection connection) { long ts = System.currentTimeMillis(); - List blocks = burningManAccountingService.getBlocks().stream() - .filter(block -> block.getHeight() >= request.getFromBlockHeight()) - .collect(Collectors.toList()); + List blocks = burningManAccountingService.getBlocksAtLeastWithHeight(request.getFromBlockHeight()); byte[] signature = AccountingNode.getSignature(AccountingNode.getSha256Hash(blocks), bmOracleNodePrivKey); GetAccountingBlocksResponse getBlocksResponse = new GetAccountingBlocksResponse(blocks, request.getNonce(), bmOracleNodePubKey, signature); log.info("Received GetAccountingBlocksRequest from {} for blocks from height {}. " + diff --git a/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStore.java b/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStore.java index cd31b12db9c..26596bf886f 100644 --- a/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStore.java +++ b/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStore.java @@ -19,27 +19,135 @@ import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock; +import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException; +import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException; import bisq.common.proto.persistable.PersistableEnvelope; import com.google.protobuf.Message; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; import java.util.LinkedList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.stream.Collectors; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import static bisq.core.dao.burningman.accounting.BurningManAccountingService.EARLIEST_BLOCK_HEIGHT; + @Slf4j -@Getter public class BurningManAccountingStore implements PersistableEnvelope { + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final LinkedList blocks = new LinkedList<>(); public BurningManAccountingStore(List blocks) { this.blocks.addAll(blocks); } + public void addIfNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException { + Lock writeLock = readWriteLock.writeLock(); + writeLock.lock(); + try { + tryToAddNewBlock(newBlock); + } finally { + writeLock.unlock(); + } + } + + public void forEachBlock(Consumer consumer) { + Lock readLock = readWriteLock.readLock(); + readLock.lock(); + try { + blocks.forEach(consumer); + } finally { + readLock.unlock(); + } + } + + public void purgeLastTenBlocks() { + Lock writeLock = readWriteLock.writeLock(); + writeLock.lock(); + try { + purgeLast10Blocks(); + } finally { + writeLock.unlock(); + } + } + + public Optional getLastBlock() { + Lock readLock = readWriteLock.readLock(); + readLock.lock(); + try { + return blocks.stream() + .max(Comparator.comparing(AccountingBlock::getHeight)); + } finally { + readLock.unlock(); + } + } + + public Optional getBlockAtHeight(int height) { + Lock readLock = readWriteLock.readLock(); + try { + return blocks.stream() + .filter(block -> block.getHeight() == height) + .findAny(); + } finally { + readLock.unlock(); + } + } + + public List getBlocksAtLeastWithHeight(int minHeight) { + Lock readLock = readWriteLock.readLock(); + readLock.lock(); + try { + return blocks.stream() + .filter(block -> block.getHeight() >= minHeight) + .collect(Collectors.toList()); + } finally { + readLock.unlock(); + } + } + + private void tryToAddNewBlock(AccountingBlock newBlock) throws BlockHeightNotConnectingException, BlockHashNotConnectingException { + if (!blocks.contains(newBlock)) { + Optional optionalLastBlock = getLastBlock(); + if (optionalLastBlock.isPresent()) { + AccountingBlock lastBlock = optionalLastBlock.get(); + if (newBlock.getHeight() != lastBlock.getHeight() + 1) { + throw new BlockHeightNotConnectingException(); + } + if (!Arrays.equals(newBlock.getTruncatedPreviousBlockHash(), lastBlock.getTruncatedHash())) { + throw new BlockHashNotConnectingException(); + } + } else if (newBlock.getHeight() != EARLIEST_BLOCK_HEIGHT) { + throw new BlockHeightNotConnectingException(); + } + log.info("Add new accountingBlock at height {} at {} with {} txs", newBlock.getHeight(), + new Date(newBlock.getDate()), newBlock.getTxs().size()); + blocks.add(newBlock); + } else { + log.info("We have that block already. Height: {}", newBlock.getHeight()); + } + } + + private void purgeLast10Blocks() { + if (blocks.size() <= 10) { + blocks.clear(); + return; + } + + List purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10)); + blocks.clear(); + blocks.addAll(purged); + } + public Message toProtoMessage() { return protobuf.PersistableEnvelope.newBuilder() .setBurningManAccountingStore(protobuf.BurningManAccountingStore.newBuilder() diff --git a/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStoreService.java b/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStoreService.java index 79a58fb07c3..620cf42722d 100644 --- a/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStoreService.java +++ b/core/src/main/java/bisq/core/dao/burningman/accounting/storage/BurningManAccountingStoreService.java @@ -18,6 +18,8 @@ package bisq.core.dao.burningman.accounting.storage; import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock; +import bisq.core.dao.burningman.accounting.exceptions.BlockHashNotConnectingException; +import bisq.core.dao.burningman.accounting.exceptions.BlockHeightNotConnectingException; import bisq.network.p2p.storage.persistence.ResourceDataStoreService; import bisq.network.p2p.storage.persistence.StoreService; @@ -32,8 +34,9 @@ import java.io.File; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; @@ -60,29 +63,31 @@ public void requestPersistence() { persistenceManager.requestPersistence(); } - public List getBlocks() { - return Collections.unmodifiableList(store.getBlocks()); + public void addIfNewBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException { + store.addIfNewBlock(block); + requestPersistence(); } - public void addBlock(AccountingBlock block) { - store.getBlocks().add(block); - requestPersistence(); + public void forEachBlock(Consumer consumer) { + store.forEachBlock(consumer); } public void purgeLastTenBlocks() { - List blocks = store.getBlocks(); - if (blocks.size() <= 10) { - blocks.clear(); - requestPersistence(); - return; - } - - List purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10)); - blocks.clear(); - blocks.addAll(purged); + store.purgeLastTenBlocks(); requestPersistence(); } + public Optional getLastBlock() { + return store.getLastBlock(); + } + + public Optional getBlockAtHeight(int height) { + return store.getBlockAtHeight(height); + } + + public List getBlocksAtLeastWithHeight(int minHeight) { + return store.getBlocksAtLeastWithHeight(minHeight); + } /////////////////////////////////////////////////////////////////////////////////////////// // Protected