Skip to content
This repository has been archived by the owner on Feb 9, 2019. It is now read-only.

Better logging + small improvements #425

Merged
merged 3 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brs/BlockchainProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ enum Event {

int getMinRollbackHeight();

void processPeerBlock(JSONObject request) throws BurstException;
void processPeerBlock(JSONObject request, Peer peer) throws BurstException;

void fullReset();

Expand Down
13 changes: 4 additions & 9 deletions src/brs/BlockchainProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import brs.statistics.StatisticsManagerImpl;
import brs.services.AccountService;
import brs.transactionduplicates.TransactionDuplicatesCheckerImpl;
import brs.transactionduplicates.TransactionDuplicationResult;
import brs.unconfirmedtransactions.UnconfirmedTransactionStore;
import brs.util.ThreadPool;

Expand All @@ -32,11 +31,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -744,10 +740,10 @@ public int getMinRollbackHeight() {
}

@Override
public void processPeerBlock(JSONObject request) throws BurstException {
public void processPeerBlock(JSONObject request, Peer peer) throws BurstException {
Block newBlock = Block.parseBlock(request, blockchain.getHeight());
if (newBlock == null) {
logger.debug("Peer has announced an unprocessable block.");
logger.debug("Peer {} has announced an unprocessable block.", peer.getPeerAddress());
return;
}
/*
Expand All @@ -760,10 +756,9 @@ public void processPeerBlock(JSONObject request) throws BurstException {
newBlock.setByteLength(newBlock.toString().length());
blockService.calculateBaseTarget(newBlock, chainblock);
downloadCache.addBlock(newBlock);
logger.debug("Added from Anounce: Id: " +newBlock.getId()+" Height: "+newBlock.getHeight());
logger.debug("Peer {} added block from Announce: Id: {} Height: {}", peer.getPeerAddress(), newBlock.getId(), newBlock.getHeight());
} else {
logger.debug("Peer sent us block: " + newBlock.getPreviousBlockId()
+ " that does not match our chain.");
logger.debug("Peer {} sent us block: {} which is not the follow-up block for {}", peer.getPeerAddress(), newBlock.getPreviousBlockId(), chainblock.getId());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/brs/TransactionProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum Event {

void clearUnconfirmedTransactions();

void broadcast(Transaction transaction) throws BurstException.ValidationException;
Integer broadcast(Transaction transaction) throws BurstException.ValidationException;

void processPeerTransactions(JSONObject request, Peer peer) throws BurstException.ValidationException;

Expand Down
39 changes: 16 additions & 23 deletions src/brs/TransactionProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,11 +85,12 @@ public TransactionProcessorImpl(PropertyService propertyService,

JSONArray transactionsData = (JSONArray) response.get(UNCONFIRMED_TRANSACTIONS_RESPONSE);

if (transactionsData == null || transactionsData.isEmpty()) {
if (transactionsData == null) {
return;
}
try {
List<Transaction> addedTransactions = processPeerTransactions(transactionsData, peer);
Peers.feedingTime(peer, foodDispenser, doneFeedingLog);

if(! addedTransactions.isEmpty()) {
List<Peer> activePrioPlusExtra = Peers.getAllActivePriorityPlusSomeExtraPeers();
Expand Down Expand Up @@ -180,40 +182,25 @@ public Transaction.Builder newTransactionBuilder(byte[] senderPublicKey, long am
}

@Override
public void broadcast(Transaction transaction) throws BurstException.ValidationException {
public Integer broadcast(Transaction transaction) throws BurstException.ValidationException {
if (! transaction.verifySignature()) {
throw new BurstException.NotValidException("Transaction signature verification failed");
}
List<Transaction> processedTransactions;
if (dbs.getTransactionDb().hasTransaction(transaction.getId())) {
logger.info("Transaction " + transaction.getStringId() + " already in blockchain, will not broadcast again");
return;
return null;
}

if (unconfirmedTransactionStore.exists(transaction.getId())) {
/*
if (enableTransactionRebroadcasting) {
nonBroadcastedTransactions.add(transaction);
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will re-broadcast");
} else {*/
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again");
///}
return;
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again");
return null;
}

processedTransactions = processTransactions(Collections.singleton(transaction), null);

if(! processedTransactions.isEmpty()) {
broadcastToPeers();
}

if (processedTransactions.contains(transaction)) {
/*
if (enableTransactionRebroadcasting) {
nonBroadcastedTransactions.add(transaction);
}
*/
logger.debug("Accepted new transaction " + transaction.getStringId());
return broadcastToPeers(true);
} else {
logger.debug("Could not accept new transaction " + transaction.getStringId());
throw new BurstException.NotValidException("Invalid transaction " + transaction.getStringId());
Expand All @@ -226,7 +213,7 @@ public void processPeerTransactions(JSONObject request, Peer peer) throws BurstE
List<Transaction> processedTransactions = processPeerTransactions(transactionsData, peer);

if(! processedTransactions.isEmpty()) {
broadcastToPeers();
broadcastToPeers(false);
}
}

Expand Down Expand Up @@ -373,10 +360,16 @@ private List<Transaction> processTransactions(Collection<Transaction> transactio
}
}

private void broadcastToPeers() {
private int broadcastToPeers(boolean toAll) {
List<? extends Peer> peersToSendTo = toAll ? Peers.getActivePeers().stream().limit(100).collect(Collectors.toList()) : Peers.getAllActivePriorityPlusSomeExtraPeers();

logger.info("Queueing up {} Peers for feeding", peersToSendTo.size());

for(Peer p: Peers.getAllActivePriorityPlusSomeExtraPeers()) {
Peers.feedingTime(p, foodDispenser, doneFeedingLog);
}

return peersToSendTo.size();
}

public void revalidateUnconfirmedTransactions() {
Expand Down
4 changes: 2 additions & 2 deletions src/brs/http/APITransactionManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static brs.http.common.ResultFields.BROADCASTED_RESPONSE;
import static brs.http.common.ResultFields.ERROR_RESPONSE;
import static brs.http.common.ResultFields.FULL_HASH_RESPONSE;
import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE;
import static brs.http.common.ResultFields.SIGNATURE_HASH_RESPONSE;
import static brs.http.common.ResultFields.TRANSACTION_BYTES_RESPONSE;
import static brs.http.common.ResultFields.TRANSACTION_JSON_RESPONSE;
Expand All @@ -38,7 +39,6 @@
import brs.Blockchain;
import brs.Burst;
import brs.BurstException;
import brs.Constants;
import brs.Transaction;
import brs.Transaction.Builder;
import brs.TransactionProcessor;
Expand Down Expand Up @@ -187,7 +187,7 @@ public JSONStreamAware createTransaction(HttpServletRequest req, Account senderA
response.put(TRANSACTION_BYTES_RESPONSE, Convert.toHexString(transaction.getBytes()));
response.put(SIGNATURE_HASH_RESPONSE, Convert.toHexString(Crypto.sha256().digest(transaction.getSignature())));
if (broadcast) {
transactionProcessor.broadcast(transaction);
response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction));
response.put(BROADCASTED_RESPONSE, true);
} else {
response.put(BROADCASTED_RESPONSE, false);
Expand Down
3 changes: 2 additions & 1 deletion src/brs/http/BroadcastTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static brs.http.common.ResultFields.ERROR_DESCRIPTION_RESPONSE;
import static brs.http.common.ResultFields.ERROR_RESPONSE;
import static brs.http.common.ResultFields.FULL_HASH_RESPONSE;
import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE;
import static brs.http.common.ResultFields.TRANSACTION_RESPONSE;

import brs.BurstException;
Expand Down Expand Up @@ -45,7 +46,7 @@ JSONStreamAware processRequest(HttpServletRequest req) throws BurstException {
JSONObject response = new JSONObject();
try {
transactionService.validate(transaction);
transactionProcessor.broadcast(transaction);
response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction));
response.put(TRANSACTION_RESPONSE, transaction.getStringId());
response.put(FULL_HASH_RESPONSE, transaction.getFullHash());
} catch (BurstException.ValidationException | RuntimeException e) {
Expand Down
1 change: 1 addition & 0 deletions src/brs/http/common/ResultFields.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class ResultFields {
public static final String DONE_RESPONSE = "done";
public static final String SCAN_TIME_RESPONSE = "scanTime";
public static final String BROADCASTED_RESPONSE = "broadcasted";
public static final String NUMBER_PEERS_SENT_TO_RESPONSE = "numberPeersSentTo";
public static final String UNSIGNED_TRANSACTION_BYTES_RESPONSE = "unsignedTransactionBytes";
public static final String TRANSACTION_JSON_RESPONSE = "transactionJSON";
public static final String TRANSACTION_BYTES_RESPONSE = "transactionBytes";
Expand Down
4 changes: 2 additions & 2 deletions src/brs/peer/Peers.java
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ public static Collection<? extends Peer> getAllPeers() {
return allPeers;
}

public static Collection<? extends Peer> getActivePeers() {
public static List<? extends Peer> getActivePeers() {
List<PeerImpl> activePeers = new ArrayList<>();
for (PeerImpl peer : peers.values()) {
if (peer.getState() != Peer.State.NON_CONNECTED) {
Expand Down Expand Up @@ -815,7 +815,7 @@ public synchronized static void feedingTime(Peer peer, Function<Peer, List<Trans
private static void feedPeer(Peer peer, Function<Peer, List<Transaction>> foodDispenser, BiConsumer<Peer, List<Transaction>> doneFeedingLog) {
List<Transaction> transactionsToSend = foodDispenser.apply(peer);
if(! transactionsToSend.isEmpty()) {
logger.debug("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
logger.info("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));
} else {
logger.debug("No need to feed {}", peer.getPeerAddress());
Expand Down
2 changes: 1 addition & 1 deletion src/brs/peer/ProcessBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public JSONStreamAware processRequest(JSONObject request, Peer peer) {
// when loading blockchain from scratch
return NOT_ACCEPTED;
}
blockchainProcessor.processPeerBlock(request);
blockchainProcessor.processPeerBlock(request, peer);
return ACCEPTED;

} catch (BurstException|RuntimeException e) {
Expand Down
4 changes: 2 additions & 2 deletions src/brs/unconfirmedtransactions/ReservedBalanceCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ void reserveBalanceAndPut(Transaction transaction) throws BurstException.Validat
);

if (senderAccount == null) {
LOGGER.debug(String.format("Transaction %d: Account %d does not exist and has no balance. Required funds: %d", transaction.getId(), transaction.getSenderId(), amountNQT));
LOGGER.info(String.format("Transaction %d: Account %d does not exist and has no balance. Required funds: %d", transaction.getId(), transaction.getSenderId(), amountNQT));

throw new BurstException.NotCurrentlyValidException("Account unknown");
} else if ( amountNQT > senderAccount.getUnconfirmedBalanceNQT() ) {
LOGGER.debug(String.format("Transaction %d: Account %d balance too low. You have %d > %d Balance",
LOGGER.info(String.format("Transaction %d: Account %d balance too low. You have %d > %d Balance",
transaction.getId(), transaction.getSenderId(), amountNQT, senderAccount.getUnconfirmedBalanceNQT()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@ public interface UnconfirmedTransactionStore {

List<Transaction> getAllFor(Peer peer);

///TimedUnconfirmedTransactionOverview getAllSince(long timestampInMillis, long maxAmount);

void forEach(Consumer<Transaction> consumer);

void remove(Transaction transaction);

void clear();


/**
* Review which transactions are still eligible to stay
* @return The list of removed transactions
*/
List<Transaction> resetAccountBalances();
void resetAccountBalances();

void markFingerPrintsOf(Peer peer, List<Transaction> transactions);

Expand Down
Loading