From 9c2ecfc266b3732aece962cb8e2a76a205d25bf5 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Thu, 22 Sep 2016 12:18:52 +0300 Subject: [PATCH] WIP- Fabric gossip component This is a commit that contains only APIs and protobuff The API between the ledger and the gossip component is in gossip/api/api.go Change-Id: I9f6aef85f3b03e2d3a6b9850148e9cf4d1a93ce3 Signed-off-by: Yacov Manevich --- gossip/api/api.go | 93 +++++ gossip/comm/comm.go | 53 +++ gossip/discovery/discovery.go | 78 +++++ gossip/gossip/gossip.go | 60 ++++ gossip/proto/message.pb.go | 639 ++++++++++++++++++++++++++++++++++ gossip/proto/message.proto | 101 ++++++ 6 files changed, 1024 insertions(+) create mode 100644 gossip/api/api.go create mode 100644 gossip/comm/comm.go create mode 100644 gossip/discovery/discovery.go create mode 100644 gossip/gossip/gossip.go create mode 100644 gossip/proto/message.pb.go create mode 100644 gossip/proto/message.proto diff --git a/gossip/api/api.go b/gossip/api/api.go new file mode 100644 index 00000000000..3db0d6f2e97 --- /dev/null +++ b/gossip/api/api.go @@ -0,0 +1,93 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "github.com/hyperledger/fabric/gossip/discovery" + "google.golang.org/grpc" +) + +type GossipEmitterFactory interface { + NewGossipEmitter(id string, discSvc discovery.DiscoveryService) GossipService +} + +// GossipService is used to publish new blocks to the gossip network +type GossipService interface { + // payload: Holds the block's content, hash and seqNum + Publish(payload Payload) error +} + +type BindAddress struct { + Host string + Port int16 +} + +// Payload defines an object that contains a ledger block +type Payload struct { + Data []byte // The content of the message, possibly encrypted or signed + Hash string // The message hash + SeqNum uint64 // The message sequence number +} + +type GossipMemberFactory interface { + NewGossipMember(discovery.DiscoveryService, ReplicationProvider, MessageCryptoService, MessagePolicyVerifier, *grpc.Server) GossipMember + + NewGossipMemberWithRPCServer(discovery.DiscoveryService, ReplicationProvider, MessageCryptoService, MessagePolicyVerifier, BindAddress) (GossipMember, error) +} + +// GossipMember is used to obtain new blocks from the gossip network +type GossipMember interface { + // RegisterCallback registers a callback that is invoked on messages + // from startSeq to endSeq and invokes the callback when they arrive + RegisterCallback(startSeq uint64, endSeq uint64, callback func([]Payload)) +} + +// ReplicationProvider used by the GossipMember in order to obtain Blocks of +// certain seqNum range to be sent to the requester +type ReplicationProvider interface { + // GetData used by the gossip component to obtain certain blocks from the ledger. + // Returns the blocks requested with the given sequence numbers, or an error if + // some block requested is not available. + GetData(startSeqNum uint64, endSeqNum uint64) ([]Payload, error) + + // LastBlockSeq used by the gossip component to obtain the last sequence of a block the ledger has. + LastBlockSeq() uint64 +} + +// MessageCryptoVerifier verifies the message's authenticity, +// if messages are cryptographically signed +type MessageCryptoService interface { + // Verify returns nil whether the message and its identifier are authentic, + // otherwise returns an error + Verify(seqNum uint64, sender string, payload Payload) error + + // Sign signs the payload + Sign(sender string, Payload Payload) Payload + + // SignBlob signs a blob + SignBlob([]byte) []byte + + // VerifyBlob verifies a blob, returns error on failure + // and nil if the blob is correctly signed + VerifyBlob(sender string, blob []byte) error +} + +// MessagePolicyVerifier verifies whether the message conforms to all required policies, +// and can be safely delivered to the user. +type MessagePolicyVerifier interface { + Verify(seqNum uint64, sender string, payload Payload) error +} diff --git a/gossip/comm/comm.go b/gossip/comm/comm.go new file mode 100644 index 00000000000..d45aa9dc116 --- /dev/null +++ b/gossip/comm/comm.go @@ -0,0 +1,53 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package comm + +import ( + "github.com/hyperledger/fabric/gossip/proto" + "sync" +) + +type CommModule interface { + // Send sends a message to endpoints + Send(msg *proto.GossipMessage, endpoints ...string) + + // Probe probes a remote node and returns nil if its responsive + Probe(endpoint string) error + + // Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate. + // Each message from the channel can be used to send a reply back to the sender + Accept(MessageAcceptor) <-chan *ReceivedMessage + + // PresumedDead returns a read-only channel for node endpoints that are suspected to be offline + PresumedDead() <-chan string + + // CloseConn closes a connection to a certain endpoint + CloseConn(endpoint string) + + // Stop stops the module + Stop() +} + + +type MessageAcceptor func(*proto.GossipMessage) bool + +type ReceivedMessage struct { + *proto.GossipMessage + lock *sync.Mutex + srvStream proto.Gossip_GossipStreamServer + clStream proto.Gossip_GossipStreamClient +} diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go new file mode 100644 index 00000000000..4f4f4ef6d25 --- /dev/null +++ b/gossip/discovery/discovery.go @@ -0,0 +1,78 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import "github.com/hyperledger/fabric/gossip/proto" + +// CryptoService is an interface that the discovery expects to be implemented and passed on creation +type CryptoService interface { + // validateAliveMsg validates that an Alive message is authentic + ValidateAliveMsg(*proto.AliveMessage) bool + + // SignMessage signs an AliveMessage and updates its signature field + SignMessage(*proto.AliveMessage) *proto.AliveMessage +} + +// CommService is an interface that the discovery expects to be implemented and passed on creation +type CommService interface { + // Gossip gossips a message + Gossip(msg *proto.GossipMessage) + + // SendToPeer sends to a given peer a message. + // The nonce can be anything since the communication module handles the nonce itself + SendToPeer(peer *NetworkMember, msg *proto.GossipMessage) + + // Ping probes a remote peer and returns if it's responsive or not + Ping(peer *NetworkMember) bool + + // Accept returns a read-only channel for membership messages sent from remote peers + Accept() <-chan GossipMsg + + // PresumedDead returns a read-only channel for peers that are presumed to be dead + PresumedDead() <-chan string + + // CloseConn orders to close the connection with a certain peer + CloseConn(id string) +} + +type GossipMsg interface { + GetGossipMessage() *proto.GossipMessage +} + +type NetworkMember struct { + Id string + Endpoint string + Metadata []byte +} + +type DiscoveryService interface { + + // Self returns this instance's membership information + Self() NetworkMember + + // UpdateMetadata updates this instance's metadata + UpdateMetadata([]byte) + + // UpdateEndpoint updates this instance's endpoint + UpdateEndpoint(string) + + // Stops this instance + Stop() + + // GetMembership returns the alive members in the view + GetMembership() []NetworkMember +} diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go new file mode 100644 index 00000000000..3bf6d80ec2a --- /dev/null +++ b/gossip/gossip/gossip.go @@ -0,0 +1,60 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/proto" + "time" +) + +type GossipService interface { + + // GetPeersMetadata returns a mapping of endpoint --> metadata + GetPeersMetadata() map[string][]byte + + // UpdateMetadata updates the self metadata of the discovery layer + UpdateMetadata([]byte) + + // Gossip sends a message to other peers to the network + Gossip(msg *proto.GossipMessage) + + // Accept returns a channel that outputs messages from other peers + Accept(MessageAcceptor) <-chan *proto.GossipMessage + + // Stop stops the gossip component + Stop() +} + +type MessageAcceptor func(*proto.GossipMessage) bool + +type GossipConfig struct { + BindPort int + Id string + SelfEndpoint string + BootstrapPeers []*discovery.NetworkMember + PropagateIterations int + PropagatePeerNum int + + MaxMessageCountToStore int + + MaxPropagationBurstSize int + MaxPropagationBurstLatency time.Duration + + PullInterval time.Duration + PullPeerNum int +} diff --git a/gossip/proto/message.pb.go b/gossip/proto/message.pb.go new file mode 100644 index 00000000000..c03ecc3872b --- /dev/null +++ b/gossip/proto/message.pb.go @@ -0,0 +1,639 @@ +// Code generated by protoc-gen-go. +// source: message.proto +// DO NOT EDIT! + +/* +Package proto is a generated protocol buffer package. + +It is generated from these files: + message.proto + +It has these top-level messages: + GossipMessage + DataRequest + GossipHello + DataUpdate + DataDigest + DataMessage + AckMessage + Payload + AliveMessage + PeerTime + MembershipRequest + MembershipResponse + Member + Empty +*/ +package proto + +import proto1 "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto1.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type GossipMessage struct { + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + // Types that are valid to be assigned to Content: + // *GossipMessage_AliveMsg + // *GossipMessage_MemReq + // *GossipMessage_MemRes + // *GossipMessage_DataMsg + // *GossipMessage_Hello + // *GossipMessage_DataDig + // *GossipMessage_DataReq + // *GossipMessage_DataUpdate + // *GossipMessage_AckMsg + // *GossipMessage_Empty + Content isGossipMessage_Content `protobuf_oneof:"content"` +} + +func (m *GossipMessage) Reset() { *m = GossipMessage{} } +func (m *GossipMessage) String() string { return proto1.CompactTextString(m) } +func (*GossipMessage) ProtoMessage() {} + +type isGossipMessage_Content interface { + isGossipMessage_Content() +} + +type GossipMessage_AliveMsg struct { + AliveMsg *AliveMessage `protobuf:"bytes,2,opt,name=aliveMsg,oneof"` +} +type GossipMessage_MemReq struct { + MemReq *MembershipRequest `protobuf:"bytes,3,opt,name=memReq,oneof"` +} +type GossipMessage_MemRes struct { + MemRes *MembershipResponse `protobuf:"bytes,4,opt,name=memRes,oneof"` +} +type GossipMessage_DataMsg struct { + DataMsg *DataMessage `protobuf:"bytes,5,opt,name=dataMsg,oneof"` +} +type GossipMessage_Hello struct { + Hello *GossipHello `protobuf:"bytes,6,opt,name=hello,oneof"` +} +type GossipMessage_DataDig struct { + DataDig *DataDigest `protobuf:"bytes,7,opt,name=dataDig,oneof"` +} +type GossipMessage_DataReq struct { + DataReq *DataRequest `protobuf:"bytes,8,opt,name=dataReq,oneof"` +} +type GossipMessage_DataUpdate struct { + DataUpdate *DataUpdate `protobuf:"bytes,9,opt,name=dataUpdate,oneof"` +} +type GossipMessage_AckMsg struct { + AckMsg *AckMessage `protobuf:"bytes,10,opt,name=ackMsg,oneof"` +} +type GossipMessage_Empty struct { + Empty *Empty `protobuf:"bytes,11,opt,name=empty,oneof"` +} + +func (*GossipMessage_AliveMsg) isGossipMessage_Content() {} +func (*GossipMessage_MemReq) isGossipMessage_Content() {} +func (*GossipMessage_MemRes) isGossipMessage_Content() {} +func (*GossipMessage_DataMsg) isGossipMessage_Content() {} +func (*GossipMessage_Hello) isGossipMessage_Content() {} +func (*GossipMessage_DataDig) isGossipMessage_Content() {} +func (*GossipMessage_DataReq) isGossipMessage_Content() {} +func (*GossipMessage_DataUpdate) isGossipMessage_Content() {} +func (*GossipMessage_AckMsg) isGossipMessage_Content() {} +func (*GossipMessage_Empty) isGossipMessage_Content() {} + +func (m *GossipMessage) GetContent() isGossipMessage_Content { + if m != nil { + return m.Content + } + return nil +} + +func (m *GossipMessage) GetAliveMsg() *AliveMessage { + if x, ok := m.GetContent().(*GossipMessage_AliveMsg); ok { + return x.AliveMsg + } + return nil +} + +func (m *GossipMessage) GetMemReq() *MembershipRequest { + if x, ok := m.GetContent().(*GossipMessage_MemReq); ok { + return x.MemReq + } + return nil +} + +func (m *GossipMessage) GetMemRes() *MembershipResponse { + if x, ok := m.GetContent().(*GossipMessage_MemRes); ok { + return x.MemRes + } + return nil +} + +func (m *GossipMessage) GetDataMsg() *DataMessage { + if x, ok := m.GetContent().(*GossipMessage_DataMsg); ok { + return x.DataMsg + } + return nil +} + +func (m *GossipMessage) GetHello() *GossipHello { + if x, ok := m.GetContent().(*GossipMessage_Hello); ok { + return x.Hello + } + return nil +} + +func (m *GossipMessage) GetDataDig() *DataDigest { + if x, ok := m.GetContent().(*GossipMessage_DataDig); ok { + return x.DataDig + } + return nil +} + +func (m *GossipMessage) GetDataReq() *DataRequest { + if x, ok := m.GetContent().(*GossipMessage_DataReq); ok { + return x.DataReq + } + return nil +} + +func (m *GossipMessage) GetDataUpdate() *DataUpdate { + if x, ok := m.GetContent().(*GossipMessage_DataUpdate); ok { + return x.DataUpdate + } + return nil +} + +func (m *GossipMessage) GetAckMsg() *AckMessage { + if x, ok := m.GetContent().(*GossipMessage_AckMsg); ok { + return x.AckMsg + } + return nil +} + +func (m *GossipMessage) GetEmpty() *Empty { + if x, ok := m.GetContent().(*GossipMessage_Empty); ok { + return x.Empty + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffer) error, func(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error), []interface{}) { + return _GossipMessage_OneofMarshaler, _GossipMessage_OneofUnmarshaler, []interface{}{ + (*GossipMessage_AliveMsg)(nil), + (*GossipMessage_MemReq)(nil), + (*GossipMessage_MemRes)(nil), + (*GossipMessage_DataMsg)(nil), + (*GossipMessage_Hello)(nil), + (*GossipMessage_DataDig)(nil), + (*GossipMessage_DataReq)(nil), + (*GossipMessage_DataUpdate)(nil), + (*GossipMessage_AckMsg)(nil), + (*GossipMessage_Empty)(nil), + } +} + +func _GossipMessage_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error { + m := msg.(*GossipMessage) + // content + switch x := m.Content.(type) { + case *GossipMessage_AliveMsg: + b.EncodeVarint(2<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.AliveMsg); err != nil { + return err + } + case *GossipMessage_MemReq: + b.EncodeVarint(3<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.MemReq); err != nil { + return err + } + case *GossipMessage_MemRes: + b.EncodeVarint(4<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.MemRes); err != nil { + return err + } + case *GossipMessage_DataMsg: + b.EncodeVarint(5<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.DataMsg); err != nil { + return err + } + case *GossipMessage_Hello: + b.EncodeVarint(6<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.Hello); err != nil { + return err + } + case *GossipMessage_DataDig: + b.EncodeVarint(7<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.DataDig); err != nil { + return err + } + case *GossipMessage_DataReq: + b.EncodeVarint(8<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.DataReq); err != nil { + return err + } + case *GossipMessage_DataUpdate: + b.EncodeVarint(9<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.DataUpdate); err != nil { + return err + } + case *GossipMessage_AckMsg: + b.EncodeVarint(10<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.AckMsg); err != nil { + return err + } + case *GossipMessage_Empty: + b.EncodeVarint(11<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.Empty); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("GossipMessage.Content has unexpected type %T", x) + } + return nil +} + +func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error) { + m := msg.(*GossipMessage) + switch tag { + case 2: // content.aliveMsg + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(AliveMessage) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_AliveMsg{msg} + return true, err + case 3: // content.memReq + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(MembershipRequest) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_MemReq{msg} + return true, err + case 4: // content.memRes + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(MembershipResponse) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_MemRes{msg} + return true, err + case 5: // content.dataMsg + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(DataMessage) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_DataMsg{msg} + return true, err + case 6: // content.hello + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(GossipHello) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_Hello{msg} + return true, err + case 7: // content.dataDig + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(DataDigest) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_DataDig{msg} + return true, err + case 8: // content.dataReq + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(DataRequest) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_DataReq{msg} + return true, err + case 9: // content.dataUpdate + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(DataUpdate) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_DataUpdate{msg} + return true, err + case 10: // content.ackMsg + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(AckMessage) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_AckMsg{msg} + return true, err + case 11: // content.empty + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(Empty) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_Empty{msg} + return true, err + default: + return false, nil + } +} + +type DataRequest struct { + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + SeqMap []uint64 `protobuf:"varint,2,rep,name=seqMap" json:"seqMap,omitempty"` +} + +func (m *DataRequest) Reset() { *m = DataRequest{} } +func (m *DataRequest) String() string { return proto1.CompactTextString(m) } +func (*DataRequest) ProtoMessage() {} + +type GossipHello struct { + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` +} + +func (m *GossipHello) Reset() { *m = GossipHello{} } +func (m *GossipHello) String() string { return proto1.CompactTextString(m) } +func (*GossipHello) ProtoMessage() {} + +type DataUpdate struct { + Data []*DataMessage `protobuf:"bytes,1,rep,name=data" json:"data,omitempty"` +} + +func (m *DataUpdate) Reset() { *m = DataUpdate{} } +func (m *DataUpdate) String() string { return proto1.CompactTextString(m) } +func (*DataUpdate) ProtoMessage() {} + +func (m *DataUpdate) GetData() []*DataMessage { + if m != nil { + return m.Data + } + return nil +} + +type DataDigest struct { + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + SeqMap []uint64 `protobuf:"varint,2,rep,name=seqMap" json:"seqMap,omitempty"` +} + +func (m *DataDigest) Reset() { *m = DataDigest{} } +func (m *DataDigest) String() string { return proto1.CompactTextString(m) } +func (*DataDigest) ProtoMessage() {} + +type DataMessage struct { + Payload *Payload `protobuf:"bytes,1,opt,name=payload" json:"payload,omitempty"` +} + +func (m *DataMessage) Reset() { *m = DataMessage{} } +func (m *DataMessage) String() string { return proto1.CompactTextString(m) } +func (*DataMessage) ProtoMessage() {} + +func (m *DataMessage) GetPayload() *Payload { + if m != nil { + return m.Payload + } + return nil +} + +type AckMessage struct { + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` +} + +func (m *AckMessage) Reset() { *m = AckMessage{} } +func (m *AckMessage) String() string { return proto1.CompactTextString(m) } +func (*AckMessage) ProtoMessage() {} + +type Payload struct { + SeqNum uint64 `protobuf:"varint,1,opt,name=seqNum" json:"seqNum,omitempty"` + Hash string `protobuf:"bytes,2,opt,name=hash" json:"hash,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *Payload) Reset() { *m = Payload{} } +func (m *Payload) String() string { return proto1.CompactTextString(m) } +func (*Payload) ProtoMessage() {} + +type AliveMessage struct { + Membership *Member `protobuf:"bytes,1,opt,name=membership" json:"membership,omitempty"` + Timestamp *PeerTime `protobuf:"bytes,2,opt,name=timestamp" json:"timestamp,omitempty"` + Signature []byte `protobuf:"bytes,3,opt,name=signature,proto3" json:"signature,omitempty"` +} + +func (m *AliveMessage) Reset() { *m = AliveMessage{} } +func (m *AliveMessage) String() string { return proto1.CompactTextString(m) } +func (*AliveMessage) ProtoMessage() {} + +func (m *AliveMessage) GetMembership() *Member { + if m != nil { + return m.Membership + } + return nil +} + +func (m *AliveMessage) GetTimestamp() *PeerTime { + if m != nil { + return m.Timestamp + } + return nil +} + +type PeerTime struct { + IncNumber uint64 `protobuf:"varint,1,opt,name=inc_number" json:"inc_number,omitempty"` + SeqNum uint64 `protobuf:"varint,2,opt,name=seqNum" json:"seqNum,omitempty"` +} + +func (m *PeerTime) Reset() { *m = PeerTime{} } +func (m *PeerTime) String() string { return proto1.CompactTextString(m) } +func (*PeerTime) ProtoMessage() {} + +type MembershipRequest struct { + SelfInformation *AliveMessage `protobuf:"bytes,1,opt,name=selfInformation" json:"selfInformation,omitempty"` + Known []string `protobuf:"bytes,2,rep,name=known" json:"known,omitempty"` +} + +func (m *MembershipRequest) Reset() { *m = MembershipRequest{} } +func (m *MembershipRequest) String() string { return proto1.CompactTextString(m) } +func (*MembershipRequest) ProtoMessage() {} + +func (m *MembershipRequest) GetSelfInformation() *AliveMessage { + if m != nil { + return m.SelfInformation + } + return nil +} + +type MembershipResponse struct { + Alive []*AliveMessage `protobuf:"bytes,1,rep,name=alive" json:"alive,omitempty"` + Dead []*AliveMessage `protobuf:"bytes,2,rep,name=dead" json:"dead,omitempty"` +} + +func (m *MembershipResponse) Reset() { *m = MembershipResponse{} } +func (m *MembershipResponse) String() string { return proto1.CompactTextString(m) } +func (*MembershipResponse) ProtoMessage() {} + +func (m *MembershipResponse) GetAlive() []*AliveMessage { + if m != nil { + return m.Alive + } + return nil +} + +func (m *MembershipResponse) GetDead() []*AliveMessage { + if m != nil { + return m.Dead + } + return nil +} + +type Member struct { + Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Endpoint string `protobuf:"bytes,2,opt,name=endpoint" json:"endpoint,omitempty"` + Metadata []byte `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (m *Member) Reset() { *m = Member{} } +func (m *Member) String() string { return proto1.CompactTextString(m) } +func (*Member) ProtoMessage() {} + +type Empty struct { +} + +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto1.CompactTextString(m) } +func (*Empty) ProtoMessage() {} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// Client API for Gossip service + +type GossipClient interface { + GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error) + Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) +} + +type gossipClient struct { + cc *grpc.ClientConn +} + +func NewGossipClient(cc *grpc.ClientConn) GossipClient { + return &gossipClient{cc} +} + +func (c *gossipClient) GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Gossip_serviceDesc.Streams[0], c.cc, "/proto.Gossip/GossipStream", opts...) + if err != nil { + return nil, err + } + x := &gossipGossipStreamClient{stream} + return x, nil +} + +type Gossip_GossipStreamClient interface { + Send(*GossipMessage) error + Recv() (*GossipMessage, error) + grpc.ClientStream +} + +type gossipGossipStreamClient struct { + grpc.ClientStream +} + +func (x *gossipGossipStreamClient) Send(m *GossipMessage) error { + return x.ClientStream.SendMsg(m) +} + +func (x *gossipGossipStreamClient) Recv() (*GossipMessage, error) { + m := new(GossipMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *gossipClient) Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := grpc.Invoke(ctx, "/proto.Gossip/Ping", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Gossip service + +type GossipServer interface { + GossipStream(Gossip_GossipStreamServer) error + Ping(context.Context, *Empty) (*Empty, error) +} + +func RegisterGossipServer(s *grpc.Server, srv GossipServer) { + s.RegisterService(&_Gossip_serviceDesc, srv) +} + +func _Gossip_GossipStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GossipServer).GossipStream(&gossipGossipStreamServer{stream}) +} + +type Gossip_GossipStreamServer interface { + Send(*GossipMessage) error + Recv() (*GossipMessage, error) + grpc.ServerStream +} + +type gossipGossipStreamServer struct { + grpc.ServerStream +} + +func (x *gossipGossipStreamServer) Send(m *GossipMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *gossipGossipStreamServer) Recv() (*GossipMessage, error) { + m := new(GossipMessage) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Gossip_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(Empty) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(GossipServer).Ping(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +var _Gossip_serviceDesc = grpc.ServiceDesc{ + ServiceName: "proto.Gossip", + HandlerType: (*GossipServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _Gossip_Ping_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "GossipStream", + Handler: _Gossip_GossipStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, +} diff --git a/gossip/proto/message.proto b/gossip/proto/message.proto new file mode 100644 index 00000000000..210ba25421d --- /dev/null +++ b/gossip/proto/message.proto @@ -0,0 +1,101 @@ +syntax = "proto3"; + +package proto; + + +// Gossip +service Gossip { + + rpc GossipStream (stream GossipMessage) returns (stream GossipMessage) {} + + rpc Ping (Empty) returns (Empty) {} + +} + +message GossipMessage { + uint64 nonce = 1; + oneof content { + // Membership + AliveMessage aliveMsg = 2; + MembershipRequest memReq = 3; + MembershipResponse memRes = 4; + + // Contains a ledger block + DataMessage dataMsg = 5; + + // Used for push&pull + GossipHello hello = 6; + DataDigest dataDig = 7; + DataRequest dataReq = 8; + DataUpdate dataUpdate = 9; + + AckMessage ackMsg = 10; + + // Empty message, used for pinging + Empty empty = 11; + } +} + +message DataRequest { + uint64 nonce = 1; + repeated uint64 seqMap = 2; // Maybe change this to bitmap later on +} + +message GossipHello { + uint64 nonce = 1; +} + +message DataUpdate { + repeated DataMessage data = 1; +} + +message DataDigest { + uint64 nonce = 1; + repeated uint64 seqMap = 2; // Maybe change this to bitmap later on +} + +message DataMessage { + Payload payload = 1; +} + +message AckMessage { + uint64 nonce = 1; +} + +message Payload { + uint64 seqNum = 1; + string hash = 2; + bytes data = 3; +} + + +// Membership + +message AliveMessage { + Member membership = 1; + PeerTime timestamp = 2; + bytes signature = 3; +} + +message PeerTime { + uint64 inc_number = 1; + uint64 seqNum = 2; +} + +message MembershipRequest { + AliveMessage selfInformation = 1; + repeated string known = 2; +} + +message MembershipResponse { + repeated AliveMessage alive = 1; + repeated AliveMessage dead = 2; +} + +message Member { + string id = 1; + string endpoint = 2; + bytes metadata = 3; +} + +message Empty {}