Skip to content
This repository has been archived by the owner on Feb 9, 2019. It is now read-only.

Commit

Permalink
Merge pull request #426 from Brabantian/master
Browse files Browse the repository at this point in the history
final Pushlösung - thx Brababear
  • Loading branch information
rico666 authored Dec 8, 2018
2 parents 9db616e + 4e29310 commit 064605b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/brs/TransactionProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private int broadcastToPeers(boolean toAll) {

logger.info("Queueing up {} Peers for feeding", peersToSendTo.size());

for(Peer p: Peers.getAllActivePriorityPlusSomeExtraPeers()) {
for(Peer p: peersToSendTo) {
Peers.feedingTime(p, foodDispenser, doneFeedingLog);
}

Expand Down
29 changes: 17 additions & 12 deletions src/brs/peer/Peers.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static brs.props.Props.P2P_ENABLE_TX_REBROADCAST;
import static brs.props.Props.P2P_SEND_TO_LIMIT;
import static brs.util.JSON.prepareRequest;

import brs.*;
import brs.props.Props;
Expand Down Expand Up @@ -164,7 +165,7 @@ public static void init(TimeService timeService, AccountService accountService,
logger.debug("My peer info:\n" + json.toJSONString());
myPeerInfoResponse = JSON.prepare(json);
json.put("requestType", "getInfo");
myPeerInfoRequest = JSON.prepareRequest(json);
myPeerInfoRequest = prepareRequest(json);

if(propertyService.getBoolean(P2P_ENABLE_TX_REBROADCAST)) {
rebroadcastPeers = Collections
Expand Down Expand Up @@ -509,7 +510,7 @@ private void updateSavedPeers() {
{
JSONObject request = new JSONObject();
request.put("requestType", "getPeers");
getPeersRequest = JSON.prepareRequest(request);
getPeersRequest = prepareRequest(request);
}

private volatile boolean addedNewPeer;
Expand Down Expand Up @@ -575,7 +576,7 @@ public void run() {
JSONObject request = new JSONObject();
request.put("requestType", "addPeers");
request.put("peers", myPeers);
peer.send(JSON.prepareRequest(request));
peer.send(prepareRequest(request));
}

} catch (Exception e) {
Expand Down Expand Up @@ -752,7 +753,7 @@ public static void sendToSomePeers(Block block) {
request.put("requestType", "processBlock");

blocksSendingService.submit(() -> {
final JSONStreamAware jsonRequest = JSON.prepareRequest(request);
final JSONStreamAware jsonRequest = prepareRequest(request);

int successful = 0;
List<Future<JSONObject>> expectedResponses = new ArrayList<>();
Expand Down Expand Up @@ -789,7 +790,7 @@ public static void sendToSomePeers(Block block) {
static {
JSONObject request = new JSONObject();
request.put("requestType", "getUnconfirmedTransactions");
getUnconfirmedTransactionsRequest = JSON.prepareRequest(request);
getUnconfirmedTransactionsRequest = prepareRequest(request);
}

private static final ExecutorService utReceivingService = Executors.newCachedThreadPool();
Expand All @@ -814,26 +815,30 @@ public synchronized static void feedingTime(Peer peer, Function<Peer, List<Trans

private static void feedPeer(Peer peer, Function<Peer, List<Transaction>> foodDispenser, BiConsumer<Peer, List<Transaction>> doneFeedingLog) {
List<Transaction> transactionsToSend = foodDispenser.apply(peer);

if(! transactionsToSend.isEmpty()) {
logger.info("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));
JSONObject response = peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));

if(response != null && response.get("error") == null) {
doneFeedingLog.accept(peer, transactionsToSend);
} else {
logger.error("Error feeding {} transactions: {} error: {}", peer.getPeerAddress(), transactionsToSend.stream().map(t -> t.getId()), response);
}
} else {
logger.debug("No need to feed {}", peer.getPeerAddress());
logger.info("No need to feed {}", peer.getPeerAddress());
}

beingProcessed.remove(peer);

doneFeedingLog.accept(peer, transactionsToSend);

if(processingQueue.contains(peer)) {
processingQueue.remove(peer);
beingProcessed.add(peer);
feedPeer(peer, foodDispenser, doneFeedingLog);
}
}


private static JSONObject sendUnconfirmedTransactionsRequest(List<Transaction> transactions) {
private static JSONStreamAware sendUnconfirmedTransactionsRequest(List<Transaction> transactions) {
JSONObject request = new JSONObject();
JSONArray transactionsData = new JSONArray();

Expand All @@ -844,7 +849,7 @@ private static JSONObject sendUnconfirmedTransactionsRequest(List<Transaction> t
request.put("requestType", "processTransactions");
request.put("transactions", transactionsData);

return request;
return prepareRequest(request);
}

private static boolean peerEligibleForSending(Peer peer, boolean sendSameBRSclass) {
Expand Down

0 comments on commit 064605b

Please sign in to comment.