Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
testSortMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Jan 14, 2019
1 parent d7534b4 commit 73f4442
Show file tree
Hide file tree
Showing 3 changed files with 543 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public CompletableFuture<?> start() {
recursivePeerRefreshState =
new RecursivePeerRefreshState(
target, peerBlacklist, nodeWhitelist, this::dispatchPing, this::dispatchFindNeighbours);
recursivePeerRefreshState.kickstartBootstrapPeers(
recursivePeerRefreshState.addToOneTrueList(
bootstrapNodes.stream().filter(nodeWhitelist::contains).collect(Collectors.toList()));
initiateFindNeighboursTimeoutCounter();

Expand All @@ -187,11 +187,11 @@ public CompletableFuture<?> stop() {
* interval.
*/
private void initiateFindNeighboursTimeoutCounter() {
final int timeoutTaskDelay = 30000;
vertx.setPeriodic(
timeoutTaskDelay, handler -> recursivePeerRefreshState.neighboursTimeoutEvaluation());
vertx.setPeriodic(
timeoutTaskDelay, handler -> recursivePeerRefreshState.bondingTimeoutEvaluation());
// final int timeoutTaskDelay = 30000;
// vertx.setPeriodic(
// timeoutTaskDelay, handler -> recursivePeerRefreshState.neighboursTimeoutEvaluation());
// vertx.setPeriodic(
// timeoutTaskDelay, handler -> recursivePeerRefreshState.bondingTimeoutEvaluation());
}

/**
Expand Down Expand Up @@ -241,11 +241,11 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
case PONG:
peer.setStatus(DiscoveryPeerStatus.RECEIVED_PONG_FROM);
addToPeerTable(peer);
recursivePeerRefreshState.onPongPacketReceived(peer);
//recursivePeerRefreshState.onPongPacketReceived(peer);
notifyPeerBonded(peer, now);
break;
case NEIGHBORS:
recursivePeerRefreshState.onNeighboursPacketReceived(packet.getPacketData(NeighborsPacketData.class).orElse(null), peer);
//recursivePeerRefreshState.onNeighboursPacketReceived(packet.getPacketData(NeighborsPacketData.class).orElse(null), peer);
break;
case FIND_NEIGHBORS:
if (!peerKnown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RecursivePeerRefreshState {
private final PingDispatcher pingDispatcher;
private final FindNeighbourDispatcher findNeighbourDispatcher;

private final SortedMap<BytesValue, IterationParticipant> iterationParticipantMap;
private final SortedMap<BytesValue, MetadataPeer> oneTrueList;

RecursivePeerRefreshState(
final BytesValue target,
Expand All @@ -49,118 +49,19 @@ class RecursivePeerRefreshState {
this.pingDispatcher = bondingAgent;
this.findNeighbourDispatcher = neighborFinder;

this.iterationParticipantMap = new TreeMap<>();
this.oneTrueList = new TreeMap<>();
}

void kickstartBootstrapPeers(final List<DiscoveryPeer> bootstrapPeers) {
void addToOneTrueList(final List<DiscoveryPeer> bootstrapPeers) {
for (DiscoveryPeer bootstrapPeer : bootstrapPeers) {

final IterationParticipant iterationParticipant = new IterationParticipant(bootstrapPeer, distance(target, bootstrapPeer.getId()));
iterationParticipantMap.put(bootstrapPeer.getId(), iterationParticipant);
iterationParticipant.setBondQueried();

pingDispatcher.ping(bootstrapPeer);
}
}

private boolean bondQueryRoundTerminated() {

for (Map.Entry<BytesValue, IterationParticipant> entry : iterationParticipantMap.entrySet()) {

final IterationParticipant iterationParticipant = entry.getValue();

// If we've queried it and HAVEN'T gotten back a response...
if (iterationParticipant.getBondQueried() && !iterationParticipant.getBondResponded()) {

// It it HASN'T been timed out...
if (!iterationParticipant.getBondEvaluation()) {
return false;
}
}
final MetadataPeer iterationParticipant = new MetadataPeer(bootstrapPeer, distance(target, bootstrapPeer.getId()));
oneTrueList.put(bootstrapPeer.getId(), iterationParticipant);
}
return true;
}

void onPongPacketReceived(final DiscoveryPeer peer) {

final IterationParticipant iterationParticipant = iterationParticipantMap.get(peer.getId());
iterationParticipant.setBondResponded();

if (bondQueryRoundTerminated()) {
initiateNeighbourQueryRound();
}
}

private List<DiscoveryPeer> determineFindNodeCandidates(final int max, final SortedMap<BytesValue, IterationParticipant> source) {
int count = 0;
final List<DiscoveryPeer> target = new ArrayList<>();
for (Map.Entry<BytesValue, IterationParticipant> entry : source.entrySet()) {
if (count >= max) break;
target.add(entry.getValue().getPeer());
count++;
}
return target;
}

private void initiateNeighbourQueryRound() {
final int alpha = 3;
final List<DiscoveryPeer> alphaClosestPeers = determineFindNodeCandidates(alpha, iterationParticipantMap);

for (DiscoveryPeer peer : alphaClosestPeers) {
final IterationParticipant iterationParticipant = iterationParticipantMap.get(peer.getId());
iterationParticipant.setNeighbourQueried();
/* * */
findNeighbourDispatcher.findNeighbours(peer, target);
}
}

void neighboursTimeoutEvaluation() {
// What does this actually do? ...
}

/**
* TODO: Implement this...
*/
void bondingTimeoutEvaluation() {
}

private void executeFindNodeRequest(final DiscoveryPeer peer) {
findNeighbourDispatcher.findNeighbours(peer, target);
}

/**
* The lookup initiator starts by picking CONCURRENT_REQUEST_LIMIT closest nodes to the target it
* knows of. The initiator then issues concurrent FindNode packets to those nodes.
*/
private void initiatePeerRefreshCycle(final List<DiscoveryPeer> peers) {
for (DiscoveryPeer peer : peers) {
if (DiscoveryPeerStatus.Lifecycle.exhausted.contains(peer.getStatus())) {
executeFindNodeRequest(peer);
}
}
// The lookup terminates when the initiator has queried
// and gotten responses from the k closest nodes it has seen.
}

void onNeighboursPacketReceived(final NeighborsPacketData neighboursPacket, final DiscoveryPeer peer) {

if (peer.getStatus().equals(DiscoveryPeerStatus.DISPATCHED_FIND_NEIGHBOURS_TO)) {

final List<DiscoveryPeer> receivedPeerList = neighboursPacket.getNodes();

for (DiscoveryPeer receivedPeer : receivedPeerList) {
if (!DiscoveryPeerStatus.Lifecycle.bonded.contains(receivedPeer.getStatus())
&& !peerBlacklist.contains(receivedPeer)
&& peerWhitelist.contains(receivedPeer)) {

pingDispatcher.ping(receivedPeer);
}
}
peer.setStatus(DiscoveryPeerStatus.RECEIVED_NEIGHBOURS_FROM);
}
}

public class IterationParticipant implements Comparable<IterationParticipant> {
public class MetadataPeer implements Comparable<MetadataPeer> {
DiscoveryPeer peer;
Integer distance;

Expand All @@ -173,13 +74,13 @@ public class IterationParticipant implements Comparable<IterationParticipant> {
boolean neighbourResponded;

@Override
public int compareTo(final IterationParticipant o) {
public int compareTo(final MetadataPeer o) {
if (this.distance > o.distance)
return 1;
return -1;
}

public IterationParticipant(final DiscoveryPeer peer, final Integer distance) {
public MetadataPeer(final DiscoveryPeer peer, final Integer distance) {
this.peer = peer;
this.distance = distance;

Expand Down Expand Up @@ -228,6 +129,10 @@ void setNeighbourQueried() {
this.neighbourQueried = true;
}

boolean getNeighbourQueried() {
return neighbourQueried;
}

void setNeighbourResponded() {
this.neighbourResponded = true;
}
Expand All @@ -244,7 +149,7 @@ boolean getNeighbourEvaluation() {
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final IterationParticipant that = (IterationParticipant) o;
final MetadataPeer that = (MetadataPeer) o;
return Objects.equals(peer.getId(), that.peer.getId());
}

Expand Down
Loading

0 comments on commit 73f4442

Please sign in to comment.