Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle disconnection/reconnection of worker and help it recover interrupted replicates #234

Merged
merged 38 commits into from
Mar 19, 2019

Conversation

zguesmi
Copy link
Member

@zguesmi zguesmi commented Mar 15, 2019

  • Handle different cases when worker disconnects and tries to recover from it
  • Introduce ReplicateSupplyService that handles replicateDemand requests coming from workers
  • Introduce RECOVERING replicateStatus: every transition from this task is permitted, transitions to this task come only from "active" (not ..._failed) statuses.

@Ugo Ugo requested review from Ugo and jeremyjams March 18, 2019 08:38
Copy link

@Ugo Ugo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work, the ReplicateSupplyService is very useful, however there are a few points I noticed in the code to check.

}
}

return null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return an optional instead of returning a null.

Copy link
Member Author

@zguesmi zguesmi Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better yes

@@ -38,20 +40,20 @@ String computeAuthorizationHash(String workerWallet, String chainTaskId, String
return Numeric.toHexString(Hash.sha3(res));
}

public ContributionAuthorization createAuthorization(String workerWallet, String chainTaskId, boolean isTrustedExecution) {
public Optional<ContributionAuthorization> createAuthorization(String workerWallet, String chainTaskId, boolean isTrustedExecution) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use an optional here? In which case will it be Optional.empty()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was lost in the refactoring

if (recoveryAction == null) continue;

// generate contribution authorization
Optional<ContributionAuthorization> authorization = signatureService.createAuthorization(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optional here is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

public RecoveryAction getAppropriateRecoveryAction(Task task, Replicate replicate, long blockNumber) {
ChainReceipt chainReceipt = new ChainReceipt(blockNumber, "");
Copy link

@Ugo Ugo Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chainReceipt object is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually, because in the case where ReplicateStatus is CONTRIBUTING and the worker contributes onchain without notifying the core should update the status to CONTRIBUTED and tell the worker to WAIT or REVEAL. So when updating the status to CONTRIBUTED we check if it's correct onchain, thus, we need the chainReceipt with the blocknumber.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can see it is used in methods recoverReplicateInContributionPhase and recoverReplicateInRevealPhase but we don't modify anything to this ChainReceipt. We better pass the blocknumber to those methods and create a new ChainReceipt(blockNumber, "") at the lowest level inside those two methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true yeah, I see your point

return RecoveryAction.REVEAL;
}

return null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's return an optional rather than a null value. This is valid for all recoverReplicate* methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with pleasure

return RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT;
}

if (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two ifs can be combined into one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be more readable, but it's okey let's change it

Optional<Task> oTask = taskService.getTaskByChainTaskId(replicate.getChainTaskId());
if (!oTask.isPresent()) return null;

return replicate.containsRevealedStatus()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one but to difficult to read, can we simplify a bit? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, also done!


executorMap.putIfAbsent(chainTaskId, ThreadPoolExecutorUtils.singleThreadExecutorWithFixedSizeQueue(1));

Executor executor = executorMap.get(chainTaskId);
executor.execute(() -> taskService.tryUpgradeTaskStatus(chainTaskId));

return CompletableFuture.supplyAsync(() -> taskService.tryUpgradeTaskStatus(chainTaskId), executor)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to return a completableFuture here? It seems that returning nothing is good enough no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because in some cases I needed to wait for the execution to end.
Exp: ReplicateSupplyService > recoverReplicateInRevealPhase > L259

@@ -39,6 +40,27 @@ public ReplicateStatus getCurrentStatus() {
return this.getLatestStatusChange().getStatus();
}

@JsonIgnore
public ReplicateStatus getLastRelevantStatus() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is very dangerous :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we need it to find out in what status the replicate was before trying to RECOVER (one or more times) and before being lost.
For instance: this is a typical exp: CONTRIBUTING > WORKER_LOST > RECOVERING > RECOVERING

Copy link
Member

@jeremyjams jeremyjams left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bim

Copy link

@Ugo Ugo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good for me. Thanks a lot!

@zguesmi zguesmi merged commit c6c16e2 into master Mar 19, 2019
@zguesmi zguesmi deleted the reconnect branch March 19, 2019 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants