Skip to content

Commit

Permalink
add synchronized to register/unregister methods and remove unused cod…
Browse files Browse the repository at this point in the history
…e. patch by Stu Hood and jbellis for CASSANDRA-527

git-svn-id: https://svn.apache.org/repos/asf/incubator/cassandra/trunk@833046 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jbellis committed Nov 5, 2009
1 parent 63a0c9d commit 23dd572
Showing 1 changed file with 8 additions and 90 deletions.
98 changes: 8 additions & 90 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ public synchronized static Gossiper instance()
private InetAddress localEndPoint_;
private long aVeryLongTime_;
private Random random_ = new Random();
/* round robin index through live endpoint set */
private int rrIndex_ = 0;

/* subscribers for interest in EndPointState change */
private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
Expand Down Expand Up @@ -145,24 +143,16 @@ private Gossiper()
}

/** Register with the Gossiper for EndPointState notifications */
public void register(IEndPointStateChangeSubscriber subscriber)
public synchronized void register(IEndPointStateChangeSubscriber subscriber)
{
subscribers_.add(subscriber);
}

public void unregister(IEndPointStateChangeSubscriber subscriber)
public synchronized void unregister(IEndPointStateChangeSubscriber subscriber)
{
subscribers_.remove(subscriber);
}

public Set<InetAddress> getAllMembers()
{
Set<InetAddress> allMbrs = new HashSet<InetAddress>();
allMbrs.addAll(getLiveMembers());
allMbrs.addAll(getUnreachableMembers());
return allMbrs;
}

public Set<InetAddress> getLiveMembers()
{
Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints_);
Expand Down Expand Up @@ -259,32 +249,6 @@ void evictFromMembership(InetAddress endpoint)
unreachableEndpoints_.remove(endpoint);
}

/* No locking required since it is called from a method that already has acquired a lock */
@Deprecated
void makeGossipDigest(List<GossipDigest> gDigests)
{
/* Add the local endpoint state */
EndPointState epState = endPointStateMap_.get(localEndPoint_);
int generation = epState.getHeartBeatState().getGeneration();
int maxVersion = getMaxEndPointStateVersion(epState);
gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );

for ( InetAddress liveEndPoint : liveEndpoints_ )
{
epState = endPointStateMap_.get(liveEndPoint);
if ( epState != null )
{
generation = epState.getHeartBeatState().getGeneration();
maxVersion = getMaxEndPointStateVersion(epState);
gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
}
else
{
gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
}
}
}

/**
* No locking required since it is called from a method that already
* has acquired a lock. The gossip digest is built based on randomization
Expand Down Expand Up @@ -339,8 +303,7 @@ Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOExcepti
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
return message;
return new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
}

Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
Expand All @@ -350,34 +313,15 @@ Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) thr
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
if (logger_.isTraceEnabled())
logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
return message;
return new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
}

Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
return message;
}

boolean sendGossipToLiveNode(Message message)
{
int size = liveEndpoints_.size();
List<InetAddress> eps = new ArrayList<InetAddress>(liveEndpoints_);

if ( rrIndex_ >= size )
{
rrIndex_ = -1;
}

InetAddress to = eps.get(++rrIndex_);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
MessagingService.instance().sendUdpOneWay(message, to);
return seeds_.contains(to);
return new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
}

/**
Expand Down Expand Up @@ -779,28 +723,6 @@ synchronized void isAlive(InetAddress addr, EndPointState epState, boolean value
epState.isAGossiper(true);
}

/* These are helper methods used from GossipDigestSynVerbHandler */
Map<InetAddress, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
{
Map<InetAddress, GossipDigest> epMap = new HashMap<InetAddress, GossipDigest>();
for( GossipDigest gDigest : gDigestList )
{
epMap.put( gDigest.getEndPoint(), gDigest );
}
return epMap;
}

/* This is a helper method to get all EndPoints from a list of GossipDigests */
InetAddress[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
{
Set<InetAddress> set = new HashSet<InetAddress>();
for ( GossipDigest gDigest : gDigestList )
{
set.add( gDigest.getEndPoint() );
}
return set.toArray( new InetAddress[0] );
}

/* Request all the state for the endpoint in the gDigest */
void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
{
Expand Down Expand Up @@ -883,7 +805,7 @@ synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDig
* Start the gossiper with the generation # retrieved from the System
* table
*/
public void start(InetAddress localEndPoint, int generationNbr) throws IOException
public void start(InetAddress localEndPoint, int generationNbr)
{
localEndPoint_ = localEndPoint;
/* Get the seeds from the config and initialize them. */
Expand Down Expand Up @@ -917,10 +839,6 @@ public synchronized void addApplicationState(String key, ApplicationState appSta
epState.addApplicationState(key, appState);
}

public void stop()
{
gossipTimer_.cancel();
}
}

class JoinVerbHandler implements IVerbHandler
Expand All @@ -936,7 +854,7 @@ public void doVerb(Message message)
byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );

JoinMessage joinMessage = null;
JoinMessage joinMessage;
try
{
joinMessage = JoinMessage.serializer().deserialize(dis);
Expand Down Expand Up @@ -1099,7 +1017,7 @@ public void doVerb(Message message)

byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
GossipDigestAck2Message gDigestAck2Message = null;
GossipDigestAck2Message gDigestAck2Message;
try
{
gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
Expand Down

0 comments on commit 23dd572

Please sign in to comment.