Skip to content

Commit

Permalink
Fix BurningManAccountingStore data races
Browse files Browse the repository at this point in the history
Multiple threads read and write to the accounting blocks list causing
data races. Luckily, the LinkedList threw a ConcurrentModificationException
to limit damage. Now, a ReadWriteLock protects the LinkedList against
data races. Multiple threads can read the list at the same time but only
one thread can write to it. Other writing threads wait until it's their
turn.

Fixes bisq-network#6545
  • Loading branch information
alvasw committed Feb 3, 2023
1 parent 742d251 commit 04343bf
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -113,7 +113,9 @@ public void start() {
CompletableFuture.runAsync(() -> {
Map<String, BalanceModel> map = new HashMap<>();
// addAccountingBlockToBalanceModel takes about 500ms for 100k items, so we run it in a non UI thread.
getBlocks().forEach(block -> addAccountingBlockToBalanceModel(map, block));
getBlocks().forEach(optionalBlock ->
optionalBlock.ifPresent(block -> addAccountingBlockToBalanceModel(map, block))
);
UserThread.execute(() -> balanceModelByBurningManName.putAll(map));
});
}
Expand All @@ -125,7 +127,7 @@ public void start() {

public void onInitialBlockRequestsComplete() {
updateBalanceModelByAddress();
getBlocks().forEach(this::addAccountingBlockToBalanceModel);
getBlocks().forEach(optionalBlock -> optionalBlock.ifPresent(this::addAccountingBlockToBalanceModel));
}

public void onNewBlockReceived(AccountingBlock accountingBlock) {
Expand All @@ -134,7 +136,7 @@ public void onNewBlockReceived(AccountingBlock accountingBlock) {
}

public void addBlock(AccountingBlock block) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
if (!getBlocks().contains(block)) {
if (!burningManAccountingStoreService.containsAccountingBlock(block)) {
Optional<AccountingBlock> optionalLastBlock = getLastBlock();
if (optionalLastBlock.isPresent()) {
AccountingBlock lastBlock = optionalLastBlock.get();
Expand All @@ -160,11 +162,18 @@ public int getBlockHeightOfLastBlock() {
}

public Optional<AccountingBlock> getLastBlock() {
return getBlocks().stream().max(Comparator.comparing(AccountingBlock::getHeight));
return StreamSupport.stream(getBlocks().spliterator(), false)
.filter(Optional::isPresent)
.map(Optional::get)
.max(Comparator.comparing(AccountingBlock::getHeight));
}

public Optional<AccountingBlock> getBlockAtHeight(int height) {
return getBlocks().stream().filter(block -> block.getHeight() == height).findAny();
return StreamSupport.stream(getBlocks().spliterator(), false)
.filter(Optional::isPresent)
.map(Optional::get)
.filter(block -> block.getHeight() == height)
.findAny();
}

public Map<Date, Price> getAverageBsqPriceByMonth() {
Expand Down Expand Up @@ -213,8 +222,8 @@ private Stream<ReceivedBtcBalanceEntry> getReceivedBtcBalanceEntryStreamExcludin
// Delegates
///////////////////////////////////////////////////////////////////////////////////////////

public List<AccountingBlock> getBlocks() {
return burningManAccountingStoreService.getBlocks();
public Iterable<Optional<AccountingBlock>> getBlocks() {
return burningManAccountingStoreService.getAccountingBlockIterable();
}

public Map<String, String> getBurningManNameByAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import com.google.common.util.concurrent.SettableFuture;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -92,7 +94,9 @@ public interface Listener {

public void onGetBlocksRequest(GetAccountingBlocksRequest request, Connection connection) {
long ts = System.currentTimeMillis();
List<AccountingBlock> blocks = burningManAccountingService.getBlocks().stream()
List<AccountingBlock> blocks = StreamSupport.stream(burningManAccountingService.getBlocks().spliterator(), false)
.filter(Optional::isPresent)
.map(Optional::get)
.filter(block -> block.getHeight() >= request.getFromBlockHeight())
.collect(Collectors.toList());
byte[] signature = AccountingNode.getSignature(AccountingNode.getSha256Hash(blocks), bmOracleNodePrivKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.core.dao.burningman.accounting.storage;

import bisq.core.dao.burningman.accounting.blockchain.AccountingBlock;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;

public class AccountingBlockIterator implements Iterator<Optional<AccountingBlock>> {

private final Lock readLock;
private final LinkedList<AccountingBlock> blocks;
private AtomicInteger currentIndex = new AtomicInteger(0);

public AccountingBlockIterator(Lock readLock, LinkedList<AccountingBlock> blocks) {
this.readLock = readLock;
this.blocks = blocks;
}

@Override
public boolean hasNext() {
readLock.lock();
try {
return currentIndex.get() < blocks.size() - 1;
} finally {
readLock.unlock();
}
}

@Override
public Optional<AccountingBlock> next() {
readLock.lock();
try {
if (currentIndex.get() < blocks.size() - 1) {
int i = currentIndex.getAndIncrement();
AccountingBlock accountingBlock = blocks.get(i);
return Optional.of(accountingBlock);

} else {
return Optional.empty();
}
} finally {
readLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,75 @@

import com.google.protobuf.Message;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;

@Slf4j
@Getter
public class BurningManAccountingStore implements PersistableEnvelope {
public class BurningManAccountingStore implements PersistableEnvelope, Iterable<Optional<AccountingBlock>> {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final LinkedList<AccountingBlock> blocks = new LinkedList<>();

public BurningManAccountingStore(List<AccountingBlock> blocks) {
this.blocks.addAll(blocks);
}

public void addBlock(AccountingBlock accountingBlock) {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
blocks.add(accountingBlock);
} finally {
writeLock.unlock();
}
}

public void purgeLastTenBlocks() {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
purgeLast10Blocks();
} finally {
writeLock.unlock();
}
}

public boolean containsBlock(AccountingBlock block) {
Lock readLock = readWriteLock.readLock();
try {
return blocks.contains(block);
} finally {
readLock.unlock();
}
}

@NotNull
@Override
public Iterator<Optional<AccountingBlock>> iterator() {
Lock readLock = readWriteLock.readLock();
return new AccountingBlockIterator(readLock, blocks);
}

private void purgeLast10Blocks() {
if (blocks.size() <= 10) {
blocks.clear();
return;
}

List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
}

public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setBurningManAccountingStore(protobuf.BurningManAccountingStore.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import java.io.File;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -60,29 +59,24 @@ public void requestPersistence() {
persistenceManager.requestPersistence();
}

public List<AccountingBlock> getBlocks() {
return Collections.unmodifiableList(store.getBlocks());
}

public void addBlock(AccountingBlock block) {
store.getBlocks().add(block);
store.addBlock(block);
requestPersistence();
}

public void purgeLastTenBlocks() {
List<AccountingBlock> blocks = store.getBlocks();
if (blocks.size() <= 10) {
blocks.clear();
requestPersistence();
return;
}

List<AccountingBlock> purged = new ArrayList<>(blocks.subList(0, blocks.size() - 10));
blocks.clear();
blocks.addAll(purged);
store.purgeLastTenBlocks();
requestPersistence();
}

public boolean containsAccountingBlock(AccountingBlock block) {
return store.containsBlock(block);
}

public Iterable<Optional<AccountingBlock>> getAccountingBlockIterable() {
return store;
}


///////////////////////////////////////////////////////////////////////////////////////////
// Protected
Expand Down

0 comments on commit 04343bf

Please sign in to comment.