diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index bba74d1aaea..ad31b5021f7 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -19,6 +19,7 @@ package kafka import ( "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/localconfig" + ab "github.com/hyperledger/fabric/protos/orderer" ) const ( @@ -41,6 +42,36 @@ func newBrokerConfig(conf *config.TopLevel) *sarama.Config { return brokerConfig } +func newConnectMessage() *ab.KafkaMessage { + return &ab.KafkaMessage{ + Type: &ab.KafkaMessage_Connect{ + Connect: &ab.KafkaMessageConnect{ + Payload: nil, + }, + }, + } +} + +func newRegularMessage(payload []byte) *ab.KafkaMessage { + return &ab.KafkaMessage{ + Type: &ab.KafkaMessage_Regular{ + Regular: &ab.KafkaMessageRegular{ + Payload: payload, + }, + }, + } +} + +func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage { + return &ab.KafkaMessage{ + Type: &ab.KafkaMessage_TimeToCut{ + TimeToCut: &ab.KafkaMessageTimeToCut{ + BlockNumber: blockNumber, + }, + }, + } +} + func newMsg(payload []byte, topic string) *sarama.ProducerMessage { return &sarama.ProducerMessage{ Topic: topic, diff --git a/protos/orderer/ab.pb.go b/protos/orderer/ab.pb.go index 49742fc361b..723b92e9ad4 100644 --- a/protos/orderer/ab.pb.go +++ b/protos/orderer/ab.pb.go @@ -8,6 +8,7 @@ Package orderer is a generated protocol buffer package. It is generated from these files: orderer/ab.proto orderer/configuration.proto + orderer/kafka.proto It has these top-level messages: BroadcastResponse @@ -20,6 +21,10 @@ It has these top-level messages: CreationPolicy ChainCreators KafkaBrokers + KafkaMessage + KafkaMessageRegular + KafkaMessageTimeToCut + KafkaMessageConnect */ package orderer diff --git a/protos/orderer/configuration.pb.go b/protos/orderer/configuration.pb.go index f9e2e69d0c5..196169c2085 100644 --- a/protos/orderer/configuration.pb.go +++ b/protos/orderer/configuration.pb.go @@ -24,7 +24,8 @@ func (*ConsensusType) ProtoMessage() {} func (*ConsensusType) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } type BatchSize struct { - // Simply specified as messages for now, in the future we may want to allow this to be specified by size in bytes + // Simply specified as messages for now, in the future we may want to allow + // this to be specified by size in bytes Messages uint32 `protobuf:"varint,1,opt,name=messages" json:"messages,omitempty"` } @@ -33,12 +34,16 @@ func (m *BatchSize) String() string { return proto.CompactTextString( func (*BatchSize) ProtoMessage() {} func (*BatchSize) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} } -// When submitting a new chain configuration transaction to create a new chain, the first configuration item must of of type -// Orderer with Key CreationPolicy and contents of a Marshaled CreationPolicy. The policy should be set to the policy which -// was supplied by the ordering service for the client's chain creation. The digest should be the hash of the concatenation -// of the remaining ConfigurationItem bytes. The signatures of the configuration item should satisfy the policy for chain creation +// When submitting a new chain configuration transaction to create a new chain, +// the first configuration item must be of type Orderer with Key CreationPolicy +// and contents of a Marshaled CreationPolicy. The policy should be set to the +// policy which was supplied by the ordering service for the client's chain +// creation. The digest should be the hash of the concatenation of the remaining +// ConfigurationItem bytes. The signatures of the configuration item should +// satisfy the policy for chain creation. type CreationPolicy struct { - // The name of the policy which should be used to validate the creation of this chain + // The name of the policy which should be used to validate the creation of + // this chain Policy string `protobuf:"bytes,1,opt,name=policy" json:"policy,omitempty"` // The hash of the concatenation of remaining configuration item bytes Digest []byte `protobuf:"bytes,2,opt,name=digest,proto3" json:"digest,omitempty"` @@ -50,7 +55,8 @@ func (*CreationPolicy) ProtoMessage() {} func (*CreationPolicy) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} } type ChainCreators struct { - // A list of policies, any of which may be specified as the chain creation policy in a chain creation request + // A list of policies, any of which may be specified as the chain creation + // policy in a chain creation request Policies []string `protobuf:"bytes,1,rep,name=policies" json:"policies,omitempty"` } @@ -59,7 +65,11 @@ func (m *ChainCreators) String() string { return proto.CompactTextStr func (*ChainCreators) ProtoMessage() {} func (*ChainCreators) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} } +// Carries a list of bootstrap brokers, i.e. this is not the exclusive set of +// brokers an ordering service type KafkaBrokers struct { + // Each broker here should be identified using the (IP|host):port notation, + // e.g. 127.0.0.1:7050, or localhost:7050 are valid entries Brokers []string `protobuf:"bytes,1,rep,name=brokers" json:"brokers,omitempty"` } diff --git a/protos/orderer/kafka.pb.go b/protos/orderer/kafka.pb.go new file mode 100644 index 00000000000..2d1194a68d6 --- /dev/null +++ b/protos/orderer/kafka.pb.go @@ -0,0 +1,223 @@ +// Code generated by protoc-gen-go. +// source: orderer/kafka.proto +// DO NOT EDIT! + +package orderer + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type KafkaMessage struct { + // Types that are valid to be assigned to Type: + // *KafkaMessage_Regular + // *KafkaMessage_TimeToCut + // *KafkaMessage_Connect + Type isKafkaMessage_Type `protobuf_oneof:"Type"` +} + +func (m *KafkaMessage) Reset() { *m = KafkaMessage{} } +func (m *KafkaMessage) String() string { return proto.CompactTextString(m) } +func (*KafkaMessage) ProtoMessage() {} +func (*KafkaMessage) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} } + +type isKafkaMessage_Type interface { + isKafkaMessage_Type() +} + +type KafkaMessage_Regular struct { + Regular *KafkaMessageRegular `protobuf:"bytes,1,opt,name=regular,oneof"` +} +type KafkaMessage_TimeToCut struct { + TimeToCut *KafkaMessageTimeToCut `protobuf:"bytes,2,opt,name=time_to_cut,json=timeToCut,oneof"` +} +type KafkaMessage_Connect struct { + Connect *KafkaMessageConnect `protobuf:"bytes,3,opt,name=connect,oneof"` +} + +func (*KafkaMessage_Regular) isKafkaMessage_Type() {} +func (*KafkaMessage_TimeToCut) isKafkaMessage_Type() {} +func (*KafkaMessage_Connect) isKafkaMessage_Type() {} + +func (m *KafkaMessage) GetType() isKafkaMessage_Type { + if m != nil { + return m.Type + } + return nil +} + +func (m *KafkaMessage) GetRegular() *KafkaMessageRegular { + if x, ok := m.GetType().(*KafkaMessage_Regular); ok { + return x.Regular + } + return nil +} + +func (m *KafkaMessage) GetTimeToCut() *KafkaMessageTimeToCut { + if x, ok := m.GetType().(*KafkaMessage_TimeToCut); ok { + return x.TimeToCut + } + return nil +} + +func (m *KafkaMessage) GetConnect() *KafkaMessageConnect { + if x, ok := m.GetType().(*KafkaMessage_Connect); ok { + return x.Connect + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*KafkaMessage) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _KafkaMessage_OneofMarshaler, _KafkaMessage_OneofUnmarshaler, _KafkaMessage_OneofSizer, []interface{}{ + (*KafkaMessage_Regular)(nil), + (*KafkaMessage_TimeToCut)(nil), + (*KafkaMessage_Connect)(nil), + } +} + +func _KafkaMessage_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*KafkaMessage) + // Type + switch x := m.Type.(type) { + case *KafkaMessage_Regular: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Regular); err != nil { + return err + } + case *KafkaMessage_TimeToCut: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.TimeToCut); err != nil { + return err + } + case *KafkaMessage_Connect: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Connect); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("KafkaMessage.Type has unexpected type %T", x) + } + return nil +} + +func _KafkaMessage_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*KafkaMessage) + switch tag { + case 1: // Type.regular + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(KafkaMessageRegular) + err := b.DecodeMessage(msg) + m.Type = &KafkaMessage_Regular{msg} + return true, err + case 2: // Type.time_to_cut + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(KafkaMessageTimeToCut) + err := b.DecodeMessage(msg) + m.Type = &KafkaMessage_TimeToCut{msg} + return true, err + case 3: // Type.connect + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(KafkaMessageConnect) + err := b.DecodeMessage(msg) + m.Type = &KafkaMessage_Connect{msg} + return true, err + default: + return false, nil + } +} + +func _KafkaMessage_OneofSizer(msg proto.Message) (n int) { + m := msg.(*KafkaMessage) + // Type + switch x := m.Type.(type) { + case *KafkaMessage_Regular: + s := proto.Size(x.Regular) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *KafkaMessage_TimeToCut: + s := proto.Size(x.TimeToCut) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *KafkaMessage_Connect: + s := proto.Size(x.Connect) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type KafkaMessageRegular struct { + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (m *KafkaMessageRegular) Reset() { *m = KafkaMessageRegular{} } +func (m *KafkaMessageRegular) String() string { return proto.CompactTextString(m) } +func (*KafkaMessageRegular) ProtoMessage() {} +func (*KafkaMessageRegular) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} } + +type KafkaMessageTimeToCut struct { + BlockNumber uint64 `protobuf:"varint,1,opt,name=block_number,json=blockNumber" json:"block_number,omitempty"` +} + +func (m *KafkaMessageTimeToCut) Reset() { *m = KafkaMessageTimeToCut{} } +func (m *KafkaMessageTimeToCut) String() string { return proto.CompactTextString(m) } +func (*KafkaMessageTimeToCut) ProtoMessage() {} +func (*KafkaMessageTimeToCut) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} } + +type KafkaMessageConnect struct { + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (m *KafkaMessageConnect) Reset() { *m = KafkaMessageConnect{} } +func (m *KafkaMessageConnect) String() string { return proto.CompactTextString(m) } +func (*KafkaMessageConnect) ProtoMessage() {} +func (*KafkaMessageConnect) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{3} } + +func init() { + proto.RegisterType((*KafkaMessage)(nil), "orderer.KafkaMessage") + proto.RegisterType((*KafkaMessageRegular)(nil), "orderer.KafkaMessageRegular") + proto.RegisterType((*KafkaMessageTimeToCut)(nil), "orderer.KafkaMessageTimeToCut") + proto.RegisterType((*KafkaMessageConnect)(nil), "orderer.KafkaMessageConnect") +} + +func init() { proto.RegisterFile("orderer/kafka.proto", fileDescriptor2) } + +var fileDescriptor2 = []byte{ + // 266 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x91, 0x3f, 0x4b, 0xc4, 0x30, + 0x18, 0xc6, 0xaf, 0x7a, 0x5c, 0x31, 0xed, 0xd4, 0x43, 0xe8, 0x20, 0xa2, 0x9d, 0x1c, 0x24, 0x01, + 0x5d, 0xc4, 0x49, 0xee, 0x96, 0x03, 0xd1, 0x21, 0x74, 0x72, 0x29, 0x49, 0xfa, 0x5e, 0xaf, 0xf4, + 0x4f, 0x4a, 0x9a, 0x0c, 0xfd, 0x8e, 0x7e, 0x28, 0x69, 0x9a, 0x82, 0x48, 0xbd, 0xf1, 0x79, 0xf2, + 0x7b, 0x79, 0x9e, 0x37, 0x2f, 0xda, 0x4a, 0x95, 0x83, 0x02, 0x45, 0x2a, 0x76, 0xac, 0x18, 0xee, + 0x94, 0xd4, 0x32, 0xf2, 0x9d, 0x99, 0x7c, 0x7b, 0x28, 0x7c, 0x1f, 0x1f, 0x3e, 0xa0, 0xef, 0x59, + 0x01, 0xd1, 0x0b, 0xf2, 0x15, 0x14, 0xa6, 0x66, 0x2a, 0xf6, 0xee, 0xbc, 0x87, 0xe0, 0xe9, 0x06, + 0x3b, 0x16, 0xff, 0xe6, 0xe8, 0xc4, 0x1c, 0x56, 0x74, 0xc6, 0xa3, 0x37, 0x14, 0xe8, 0xb2, 0x81, + 0x4c, 0xcb, 0x4c, 0x18, 0x1d, 0x5f, 0xd8, 0xe9, 0xdb, 0xc5, 0xe9, 0xb4, 0x6c, 0x20, 0x95, 0x7b, + 0xa3, 0x0f, 0x2b, 0x7a, 0xa5, 0x67, 0x31, 0x66, 0x0b, 0xd9, 0xb6, 0x20, 0x74, 0x7c, 0x79, 0x26, + 0x7b, 0x3f, 0x31, 0x63, 0xb6, 0xc3, 0x77, 0x1b, 0xb4, 0x4e, 0x87, 0x0e, 0x12, 0x82, 0xb6, 0x0b, + 0x2d, 0xa3, 0x18, 0xf9, 0x1d, 0x1b, 0x6a, 0xc9, 0x72, 0xbb, 0x54, 0x48, 0x67, 0x99, 0xbc, 0xa2, + 0xeb, 0xc5, 0x62, 0xd1, 0x3d, 0x0a, 0x79, 0x2d, 0x45, 0x95, 0xb5, 0xa6, 0xe1, 0x30, 0x7d, 0xc6, + 0x9a, 0x06, 0xd6, 0xfb, 0xb4, 0xd6, 0xdf, 0x30, 0x57, 0xeb, 0xff, 0xb0, 0x1d, 0xfe, 0x7a, 0x2c, + 0x4a, 0x7d, 0x32, 0x1c, 0x0b, 0xd9, 0x90, 0xd3, 0xd0, 0x81, 0xaa, 0x21, 0x2f, 0x40, 0x91, 0x23, + 0xe3, 0xaa, 0x14, 0xc4, 0x1e, 0xa7, 0x27, 0x6e, 0x69, 0xbe, 0xb1, 0xfa, 0xf9, 0x27, 0x00, 0x00, + 0xff, 0xff, 0x9c, 0x1e, 0x0b, 0x94, 0xc3, 0x01, 0x00, 0x00, +} diff --git a/protos/orderer/kafka.proto b/protos/orderer/kafka.proto new file mode 100644 index 00000000000..bff6567f08c --- /dev/null +++ b/protos/orderer/kafka.proto @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; + +option go_package = "github.com/hyperledger/fabric/protos/orderer"; + +package orderer; + +message KafkaMessage { + oneof Type { + KafkaMessageRegular regular = 1; + KafkaMessageTimeToCut time_to_cut = 2; + KafkaMessageConnect connect = 3; + } +} + +message KafkaMessageRegular { + bytes payload = 1; +} + +message KafkaMessageTimeToCut { + uint64 block_number = 1; +} + +message KafkaMessageConnect { + bytes payload = 1; +}