Skip to content

Commit

Permalink
FAB-872 Multichannel support: message extension
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-872
This commit extends the gossip message to introduce 2 fields:
1) Channel - obviously, since we'll do per-channel routing
2) Tag     - This is for disseminating information inside an organization,
             and for specifying per-channel routing is enabled

This commit also refactors the code related to
the gossip internal message store comparison.

Last but not least- since state transfer is going to be per channel,
the commit introduces a new message type - StateInfo that'll be used
for the state transfer mechanism but per-channel and not globally.

Change-Id: Ibba0b5494ef4ef35420e2c6b09c058f17e56db0c
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Nov 16, 2016
1 parent f4dcb08 commit 742443e
Show file tree
Hide file tree
Showing 13 changed files with 576 additions and 320 deletions.
1 change: 1 addition & 0 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func readWithTimeout(stream interface{}, timeout time.Duration) *proto.GossipMes

func createConnectionMsg(pkiID common.PKIidType, sig []byte) *proto.GossipMessage {
return &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_Conn{
Conn: &proto.ConnEstablish{
Expand Down
3 changes: 2 additions & 1 deletion gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

"crypto/tls"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/hyperledger/fabric/gossip/common"
)

func init() {
Expand Down Expand Up @@ -465,6 +465,7 @@ func TestPresumedDead(t *testing.T) {

func createGossipMsg() *proto.GossipMessage {
return &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(rand.Int()),
Content: &proto.GossipMessage_DataMsg{
DataMsg: &proto.DataMessage{},
Expand Down
6 changes: 3 additions & 3 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/util"
"google.golang.org/grpc"
"github.com/hyperledger/fabric/gossip/common"
)

type handler func(*proto.GossipMessage)
Expand All @@ -35,7 +35,7 @@ type connFactory interface {

type connectionStore struct {
logger *util.Logger // logger
selfPKIid common.PKIidType // pkiID of this peer
selfPKIid common.PKIidType // pkiID of this peer
isClosing bool // whether this connection store is shutting down
connFactory connFactory // creates a connection to remote peer
sync.RWMutex // synchronize access to shared variables
Expand Down Expand Up @@ -200,7 +200,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
type connection struct {
outBuff chan *msgSending
logger *util.Logger // logger
pkiID common.PKIidType // pkiID of the remote endpoint
pkiID common.PKIidType // pkiID of the remote endpoint
handler handler // function to invoke upon a message reception
conn *grpc.ClientConn // gRPC connection to remote endpoint
cl proto.GossipClient // gRPC stub of remote endpoint
Expand Down
19 changes: 19 additions & 0 deletions gossip/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ type Payload struct {

// ChainID defines the identity representation of a chain
type ChainID []byte

// MessageReplacingPolicy Returns:
// MESSAGE_INVALIDATES if this message invalidates that
// MESSAGE_INVALIDATED if this message is invalidated by that
// MESSAGE_NO_ACTION otherwise
type MessageReplacingPolicy func(this interface{}, that interface{}) InvalidationResult

// InvalidationResult determines how a message affects another message
// when it is put into gossip message store
type InvalidationResult int

const (
// MessageNoAction means messages have no relation
MessageNoAction = iota
// MessageInvalidates means message invalidates the other message
MessageInvalidates
// MessageInvalidated means message is invalidated by the other message
MessageInvalidated
)
5 changes: 4 additions & 1 deletion gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"sync/atomic"
"time"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/util"
"github.com/op/go-logging"
"github.com/hyperledger/fabric/gossip/common"
)

const defaultHelloInterval = time.Duration(5) * time.Second
Expand Down Expand Up @@ -285,6 +285,7 @@ func (d *gossipDiscoveryImpl) sendMemResponse(member *proto.Member, known [][]by
Metadata: member.Metadata,
PKIid: member.PkiID,
}, &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Content: &proto.GossipMessage_MemRes{
MemRes: memResp,
Expand Down Expand Up @@ -455,6 +456,7 @@ func (d *gossipDiscoveryImpl) createMembershipRequest() *proto.GossipMessage {
Known: d.getKnownPeers(),
}
return &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Content: &proto.GossipMessage_MemReq{
MemReq: req,
Expand Down Expand Up @@ -559,6 +561,7 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
d.logger.Debug("Sleeping", aliveTimeInterval)
time.Sleep(aliveTimeInterval)
msg2Gossip := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_AliveMsg{AliveMsg: d.createAliveMessage()},
}
d.comm.Gossip(msg2Gossip)
Expand Down
2 changes: 1 addition & 1 deletion gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"testing"
"time"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/hyperledger/fabric/gossip/common"
)

var timeout = time.Second * time.Duration(15)
Expand Down
74 changes: 14 additions & 60 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"github.com/op/go-logging"
)

const (
presumedDeadChanSize = 100
acceptChanSize = 100
)

type gossipServiceImpl struct {
presumedDead chan common.PKIidType
disc discovery.Discovery
Expand All @@ -51,7 +56,7 @@ type gossipServiceImpl struct {
// NewGossipService creates a new gossip instance
func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Gossip {
g := &gossipServiceImpl{
presumedDead: make(chan common.PKIidType, 100),
presumedDead: make(chan common.PKIidType, presumedDeadChanSize),
disc: nil,
comm: c,
conf: conf,
Expand All @@ -75,7 +80,7 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService)

g.pushPull = algo.NewPullEngine(g, conf.PullInterval)

g.msgStore = newMessageStore(g.invalidationPolicy, func(m interface{}) {
g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) {
if dataMsg, isDataMsg := m.(*proto.DataMessage); isDataMsg {
g.pushPull.Remove(dataMsg.Payload.SeqNum)
}
Expand All @@ -92,61 +97,6 @@ func (g *gossipServiceImpl) toDie() bool {
return atomic.LoadInt32(&g.stopFlag) == int32(1)
}

func (g *gossipServiceImpl) invalidationPolicy(this interface{}, that interface{}) invalidationResult {
thisMsg := this.(*proto.GossipMessage)
thatMsg := that.(*proto.GossipMessage)
thisAliveMsg, thisIsAliveMessage := thisMsg.GetAliveMsg(), thisMsg.GetAliveMsg() != nil
thatAliveMsg, thatIsAliveMessage := thatMsg.GetAliveMsg(), thatMsg.GetAliveMsg() != nil

if thisIsAliveMessage && thatIsAliveMessage {
return aliveInvalidationPolicy(thisAliveMsg, thatAliveMsg)
}

thisDataMsg, thisIsDataMessage := thisMsg.GetDataMsg(), thisMsg.GetDataMsg() != nil
thatDataMsg, thatIsDataMessage := thatMsg.GetDataMsg(), thatMsg.GetDataMsg() != nil

if thisIsDataMessage && thatIsDataMessage {
if thisDataMsg.Payload.SeqNum == thatDataMsg.Payload.SeqNum {
if thisDataMsg.Payload.Hash == thatDataMsg.Payload.Hash {
return messageInvalidated
}
return messageNoAction
}

diff := util.Abs(thisDataMsg.Payload.SeqNum, thatDataMsg.Payload.SeqNum)
if diff <= uint64(g.conf.MaxMessageCountToStore) {
return messageNoAction
}

if thisDataMsg.Payload.SeqNum > thatDataMsg.Payload.SeqNum {
return messageInvalidates
}
return messageInvalidated
}
return messageNoAction
}

func aliveInvalidationPolicy(thisMsg *proto.AliveMessage, thatMsg *proto.AliveMessage) invalidationResult {
if !equalPKIIds(thisMsg.Membership.PkiID, thatMsg.Membership.PkiID) {
return messageNoAction
}

if thisMsg.Timestamp.IncNumber == thatMsg.Timestamp.IncNumber {
if thisMsg.Timestamp.SeqNum > thatMsg.Timestamp.SeqNum {
return messageInvalidates
}

if thisMsg.Timestamp.SeqNum < thatMsg.Timestamp.SeqNum {
return messageInvalidated
}
return messageInvalidated
}
if thisMsg.Timestamp.IncNumber < thatMsg.Timestamp.IncNumber {
return messageInvalidated
}
return messageInvalidates
}

func (g *gossipServiceImpl) handlePresumedDead() {
defer g.logger.Debug("Exiting")
g.stopSignal.Add(1)
Expand Down Expand Up @@ -183,11 +133,10 @@ func (g *gossipServiceImpl) start() {
return false
}

isAck := gMsg.GetGossipMessage().GetAckMsg() != nil
isConn := gMsg.GetGossipMessage().GetConn() != nil
isEmpty := gMsg.GetGossipMessage().GetEmpty() != nil

return !(isAck || isConn || isEmpty)
return !(isConn || isEmpty)
}

incMsgs := g.comm.Accept(msgSelector)
Expand Down Expand Up @@ -224,6 +173,7 @@ func (g *gossipServiceImpl) SelectPeers() []string {

func (g *gossipServiceImpl) Hello(dest string, nonce uint64) {
helloMsg := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_Hello{
Hello: &proto.GossipHello{
Expand All @@ -239,6 +189,7 @@ func (g *gossipServiceImpl) Hello(dest string, nonce uint64) {

func (g *gossipServiceImpl) SendDigest(digest []uint64, nonce uint64, context interface{}) {
digMsg := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_DataDig{
DataDig: &proto.DataDigest{
Expand All @@ -253,6 +204,7 @@ func (g *gossipServiceImpl) SendDigest(digest []uint64, nonce uint64, context in

func (g *gossipServiceImpl) SendReq(dest string, items []uint64, nonce uint64) {
req := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_DataReq{
DataReq: &proto.DataRequest{
Expand Down Expand Up @@ -282,6 +234,7 @@ func (g *gossipServiceImpl) SendRes(requestedItems []uint64, context interface{}
}

returnedUpdate := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_DataUpdate{
DataUpdate: &proto.DataUpdate{
Expand Down Expand Up @@ -355,6 +308,7 @@ func (g *gossipServiceImpl) handlePushPullMsg(msg comm.ReceivedMessage) {
items := make([]uint64, len(res.Data))
for i, data := range res.Data {
dataMsg := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataMsg{
DataMsg: data,
},
Expand Down Expand Up @@ -444,7 +398,7 @@ func (g *gossipServiceImpl) UpdateMetadata(md []byte) {

func (g *gossipServiceImpl) Accept(acceptor common.MessageAcceptor) <-chan *proto.GossipMessage {
inCh := g.AddChannel(acceptor)
outCh := make(chan *proto.GossipMessage, 100)
outCh := make(chan *proto.GossipMessage, acceptChanSize)
go func() {
for {
select {
Expand Down
Loading

0 comments on commit 742443e

Please sign in to comment.