Skip to content

Commit

Permalink
FAB-5632 Implement Network Config
Browse files Browse the repository at this point in the history
Change-Id: I49617d84bea3b5d5654aa6f7abc5ae880b8d8d56
Signed-off-by: Chris Murphy <chrism@fast.au.fujitsu.com>
  • Loading branch information
chrism28282828 committed Jan 5, 2018
1 parent bf94912 commit c4957fd
Show file tree
Hide file tree
Showing 17 changed files with 3,306 additions and 137 deletions.
132 changes: 122 additions & 10 deletions src/main/java/org/hyperledger/fabric/sdk/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand All @@ -35,7 +36,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -87,6 +87,7 @@
import org.hyperledger.fabric.protos.peer.Query.ChaincodeQueryResponse;
import org.hyperledger.fabric.protos.peer.Query.ChannelQueryResponse;
import org.hyperledger.fabric.sdk.BlockEvent.TransactionEvent;
import org.hyperledger.fabric.sdk.Peer.PeerRole;
import org.hyperledger.fabric.sdk.exception.CryptoException;
import org.hyperledger.fabric.sdk.exception.EventHubException;
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;
Expand Down Expand Up @@ -138,7 +139,18 @@ public class Channel implements Serializable {
private final String name;

// The peers on this channel to which the client can connect
final Collection<Peer> peers = new Vector<>();
private final Collection<Peer> peers = Collections.synchronizedSet(new HashSet<>());
// final Set<Peer> eventingPeers = Collections.synchronizedSet(new HashSet<>());

private final Map<PeerRole, Set<Peer>> peerRoleSetMap = Collections.synchronizedMap(new HashMap<>());

{
for (Peer.PeerRole peerRole : PeerRole.ALL) {

peerRoleSetMap.put(peerRole, Collections.synchronizedSet(new HashSet<>()));

}
}

// Temporary variables to control how long to wait for deploy and invoke to complete before
// emitting events. This will be removed when the SDK is able to receive events from the
Expand Down Expand Up @@ -507,6 +519,66 @@ public String getName() {
*/
public Channel addPeer(Peer peer) throws InvalidArgumentException {

return addPeer(peer, PeerOptions.create());

}

/**
* Options for the peer.
* <p>
* Note: This code pasted from: https://gerrit.hyperledger.org/r/#/c/13895/ - WIP FAB-6066 Channelservice for events
*/
public static class PeerOptions { // allows for future options with less likelihood of breaking api.

private EnumSet<PeerRole> peerRoles;
private String blockType = "Filter"; // not yet used.

private PeerOptions() {

}

EnumSet<PeerRole> getPeerRoles() {
if (peerRoles == null) {
return PeerRole.ALL;
}
return peerRoles;
}

String getBlockType() {
return blockType;
}

public PeerOptions setPeerRoles(EnumSet<PeerRole> peerRoles) {
this.peerRoles = peerRoles;
return this;
}

public PeerOptions addPeerRole(PeerRole peerRole) {

if (peerRoles == null) {
peerRoles = EnumSet.noneOf(PeerRole.class);

}
peerRoles.add(peerRole);
return this;
}

public static PeerOptions create() {
return new PeerOptions();
}

}

/**
* Add a peer to the channel
*
* @param peer The Peer to add.
* @param peerOptions see {@link PeerRole}
* @return Channel The current channel added.
* @throws InvalidArgumentException
*/
public Channel addPeer(Peer peer, PeerOptions peerOptions) throws InvalidArgumentException {

if (shutdown) {
throw new InvalidArgumentException(format("Channel %s has been shutdown.", name));
}
Expand All @@ -515,14 +587,32 @@ public Channel addPeer(Peer peer) throws InvalidArgumentException {
throw new InvalidArgumentException("Peer is invalid can not be null.");
}

if (peer.getChannel() != null && peer.getChannel() != this) {
throw new InvalidArgumentException(format("Peer already connected to channel %s", peer.getChannel().getName()));
}

if (null == peerOptions) {
throw new InvalidArgumentException("Peer is invalid can not be null.");
}

peer.setChannel(this);

peers.add(peer);

for (Map.Entry<PeerRole, Set<Peer>> peerRole : peerRoleSetMap.entrySet()) {
if (peerOptions.getPeerRoles().contains(peerRole.getKey())) {
peerRole.getValue().add(peer);
}
}
return this;
}


public Channel joinPeer(Peer peer) throws ProposalException {
return joinPeer(peer, PeerOptions.create());
}

public Channel joinPeer(Peer peer, PeerOptions peerOptions) throws ProposalException {

logger.debug(format("Channel %s joining peer %s, url: %s", name, peer.getName(), peer.getUrl()));

Expand Down Expand Up @@ -558,7 +648,7 @@ public Channel joinPeer(Peer peer) throws ProposalException {
SignedProposal signedProposal = getSignedProposal(transactionContext, joinProposal);
logger.debug("Got signed proposal.");

addPeer(peer); //need to add peer.
addPeer(peer, peerOptions); //need to add peer.

Collection<ProposalResponse> resp = sendProposalToPeers(new ArrayList<>(Collections.singletonList(peer)),
signedProposal, transactionContext);
Expand All @@ -568,27 +658,33 @@ public Channel joinPeer(Peer peer) throws ProposalException {
if (pro.getStatus() == ProposalResponse.Status.SUCCESS) {
logger.info(format("Peer %s joined into channel %s", peer.getName(), name));
} else {
peers.remove(peer);
peer.unsetChannel();
removePeer(peer);
throw new ProposalException(format("Join peer to channel %s failed. Status %s, details: %s",
name, pro.getStatus().toString(), pro.getMessage()));

}
} catch (ProposalException e) {
peers.remove(peer);
peer.unsetChannel();
removePeer(peer);
logger.error(e);
throw e;
} catch (Exception e) {
peers.remove(peer);
peer.unsetChannel();
logger.error(e);
throw new ProposalException(e.getMessage(), e);
}

return this;
}

private void removePeer(Peer peer) {
peers.remove(peer);

for (Set<Peer> peerRoleSet : peerRoleSetMap.values()) {
peerRoleSet.remove(peer);
}
peer.unsetChannel();
}

/**
* Add an Orderer to this channel.
*
Expand Down Expand Up @@ -648,6 +744,21 @@ public Collection<Peer> getPeers() {
return Collections.unmodifiableCollection(peers);
}

/**
* Get the peers for this channel.
*
* @return the peers.
*/
public Collection<Peer> getPeers(EnumSet<PeerRole> roles) {

Set<Peer> ret = new HashSet<>(getPeers().size());

for (PeerRole peerRole : roles) {
ret.addAll(peerRoleSetMap.get(peerRole));
}
return Collections.unmodifiableCollection(ret);
}

/**
* Get the deploy wait time in seconds.
*
Expand Down Expand Up @@ -2155,11 +2266,12 @@ public Collection<ProposalResponse> sendTransactionProposal(TransactionProposalR
* @throws InvalidArgumentException
* @throws ProposalException
*/

public Collection<ProposalResponse> queryByChaincode(QueryByChaincodeRequest queryByChaincodeRequest) throws InvalidArgumentException, ProposalException {
return sendProposal(queryByChaincodeRequest, peers);
return queryByChaincode(queryByChaincodeRequest, peers);
}



/**
* Send Query proposal
*
Expand Down
37 changes: 34 additions & 3 deletions src/main/java/org/hyperledger/fabric/sdk/HFClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.hyperledger.fabric.protos.peer.Query.ChaincodeInfo;
import org.hyperledger.fabric.sdk.exception.CryptoException;
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;
import org.hyperledger.fabric.sdk.exception.NetworkConfigurationException;
import org.hyperledger.fabric.sdk.exception.ProposalException;
import org.hyperledger.fabric.sdk.exception.TransactionException;
import org.hyperledger.fabric.sdk.helper.Utils;
Expand Down Expand Up @@ -115,6 +116,35 @@ public static HFClient createNewInstance() {
return new HFClient();
}

/**
* Configures a channel based on information loaded from a Network Config file.
* Note that it is up to the caller to initialize the returned channel.
*
* @param channelName The name of the channel to be configured
* @param networkConfig The network configuration to use to configure the channel
* @return The configured channel, or null if the channel is not defined in the configuration
* @throws InvalidArgumentException
*/
public Channel loadChannelFromConfig(String channelName, NetworkConfig networkConfig) throws InvalidArgumentException, NetworkConfigurationException {
clientCheck();

// Sanity checks
if (channelName == null || channelName.isEmpty()) {
throw new InvalidArgumentException("channelName must be specified");
}

if (networkConfig == null) {
throw new InvalidArgumentException("networkConfig must be specified");
}

if (channels.containsKey(channelName)) {
throw new InvalidArgumentException(format("Channel with name %s already exists", channelName));
}

return networkConfig.loadChannel(this, channelName);
}


/**
* newChannel - already configured channel.
*
Expand All @@ -132,7 +162,7 @@ public Channel newChannel(String name) throws InvalidArgumentException {
synchronized (channels) {

if (channels.containsKey(name)) {
throw new InvalidArgumentException(format("Channel by the name %s already exits", name));
throw new InvalidArgumentException(format("Channel by the name %s already exists", name));
}
logger.trace("Creating channel :" + name);
Channel newChannel = Channel.createNewInstance(name, this);
Expand Down Expand Up @@ -303,8 +333,8 @@ public Peer newPeer(String name, String grpcURL) throws InvalidArgumentException
/**
* getChannel by name
*
* @param name
* @return a channel
* @param name The channel name
* @return a channel (or null if the channel does not exist)
*/

public Channel getChannel(String name) {
Expand Down Expand Up @@ -611,6 +641,7 @@ public Collection<ProposalResponse> sendInstallProposal(InstallProposalRequest i

}


private void clientCheck() throws InvalidArgumentException {

if (null == cryptoSuite) {
Expand Down
Loading

0 comments on commit c4957fd

Please sign in to comment.