diff --git a/bddtests/features/orderer.feature b/bddtests/features/orderer.feature index da4b86d1f0b..e741a0cbe9f 100644 --- a/bddtests/features/orderer.feature +++ b/bddtests/features/orderer.feature @@ -15,28 +15,29 @@ Feature: Orderer And I wait "" seconds And user "binhn" is an authorized user of the ordering service When user "binhn" broadcasts "" unique messages on "orderer0" - And user "binhn" connects to deliver function on "orderer0" with Ack of "" and properties: - | Start | SpecifiedNumber | WindowSize | - | SPECIFIED | 1 | 10 | + And user "binhn" connects to deliver function on "orderer0" + And user "binhn" sends deliver a seek request on "orderer0" with properties: + | Start | End | + | 1 | Newest | Then user "binhn" should get a delivery from "orderer0" of "" blocks with "" messages within "" seconds Examples: Solo Orderer - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 | - | docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 | - | docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | docker-compose-orderer-solo.yml | 20 | 2 | 10 | .5 | + | docker-compose-orderer-solo.yml | 40 | 4 | 10 | .5 | + | docker-compose-orderer-solo.yml | 60 | 6 | 10 | .5 | Examples: 1 Kafka Orderer and 1 Kafka Broker - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | environments/orderer-1-kafka-1 | true | 20 | 2 | 10 | 5 | - | environments/orderer-1-kafka-1 | true | 40 | 4 | 10 | 5 | - | environments/orderer-1-kafka-1 | true | 60 | 6 | 10 | 5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | environments/orderer-1-kafka-1 | 20 | 2 | 10 | 5 | + | environments/orderer-1-kafka-1 | 40 | 4 | 10 | 5 | + | environments/orderer-1-kafka-1 | 60 | 6 | 10 | 5 | Examples: 1 Kafka Orderer and 3 Kafka Brokers - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | environments/orderer-1-kafka-3 | true | 20 | 2 | 10 | 5 | - | environments/orderer-1-kafka-3 | true | 40 | 4 | 10 | 5 | - | environments/orderer-1-kafka-3 | true | 60 | 6 | 10 | 5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | environments/orderer-1-kafka-3 | 20 | 2 | 10 | 5 | + | environments/orderer-1-kafka-3 | 40 | 4 | 10 | 5 | + | environments/orderer-1-kafka-3 | 60 | 6 | 10 | 5 | # @doNotDecompose Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast) @@ -45,55 +46,56 @@ Feature: Orderer And I wait "" seconds And user "binhn" is an authorized user of the ordering service When user "binhn" broadcasts "" unique messages on "orderer0" - And user "binhn" connects to deliver function on "orderer0" with Ack of "" and properties: - | Start | SpecifiedNumber | WindowSize | - | SPECIFIED | 1 | 10 | + And user "binhn" connects to deliver function on "orderer0" + And user "binhn" sends deliver a seek request on "orderer0" with properties: + | Start | End | + | 1 | Newest | Then user "binhn" should get a delivery from "orderer0" of "" blocks with "" messages within "" seconds - When user "binhn" seeks to block "1" on deliver function on "orderer0" + When user "binhn" sends deliver a seek request on "orderer0" with properties: + | Start | End | + | 1 | Newest | Then user "binhn" should get a delivery from "orderer0" of "" blocks with "" messages within "1" seconds Examples: Solo Orderer - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 | - | docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 | - | docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | docker-compose-orderer-solo.yml | 20 | 2 | 10 | .5 | + | docker-compose-orderer-solo.yml | 40 | 4 | 10 | .5 | + | docker-compose-orderer-solo.yml | 60 | 6 | 10 | .5 | Examples: 1 Kafka Orderer and 1 Kafka Broker - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | environments/orderer-1-kafka-1 | true | 20 | 2 | 10 | 5 | - | environments/orderer-1-kafka-1 | true | 40 | 4 | 10 | 5 | - | environments/orderer-1-kafka-1 | true | 60 | 6 | 10 | 5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | environments/orderer-1-kafka-1 | 20 | 2 | 10 | 5 | + | environments/orderer-1-kafka-1 | 40 | 4 | 10 | 5 | + | environments/orderer-1-kafka-1 | 60 | 6 | 10 | 5 | Examples: 1 Kafka Orderer and 3 Kafka Brokers - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | environments/orderer-1-kafka-3 | true | 20 | 2 | 10 | 5 | - | environments/orderer-1-kafka-3 | true | 40 | 4 | 10 | 5 | - | environments/orderer-1-kafka-3 | true | 60 | 6 | 10 | 5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | environments/orderer-1-kafka-3 | 20 | 2 | 10 | 5 | + | environments/orderer-1-kafka-3 | 40 | 4 | 10 | 5 | + | environments/orderer-1-kafka-3 | 60 | 6 | 10 | 5 | # @doNotDecompose - Scenario Outline: Basic orderer function varying ACK + Scenario Outline: Basic orderer function using oldest seek target Given we compose "" And I wait "" seconds And user "binhn" is an authorized user of the ordering service When user "binhn" broadcasts "" unique messages on "orderer0" - And user "binhn" connects to deliver function on "orderer0" with Ack of "" and properties: - | Start | SpecifiedNumber | WindowSize | - | SPECIFIED | 1 | 1 | + And user "binhn" connects to deliver function on "orderer0" + And user "binhn" sends deliver a seek request on "orderer0" with properties: + | Start | End | + | Oldest | 2 | Then user "binhn" should get a delivery from "orderer0" of "" blocks with "" messages within "" seconds Examples: Solo Orderer - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | docker-compose-orderer-solo.yml | false | 20 | 1 | 10 | .5 | - | docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | docker-compose-orderer-solo.yml | 20 | 3 | 10 | .5 | Examples: 1 Kafka Orderer and 1 Kafka Broker - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | environments/orderer-1-kafka-1 | false | 20 | 1 | 10 | 5 | - | environments/orderer-1-kafka-1 | true | 20 | 2 | 10 | 5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | environments/orderer-1-kafka-1 | 20 | 3 | 10 | 5 | Examples: 1 Kafka Orderer and 3 Kafka Brokers - | ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | - | environments/orderer-1-kafka-3 | false | 20 | 1 | 10 | 5 | - | environments/orderer-1-kafka-3 | true | 20 | 2 | 10 | 5 | + | ComposeFile | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime | + | environments/orderer-1-kafka-3 | 20 | 3 | 10 | 5 | diff --git a/bddtests/orderer/ab_pb2.py b/bddtests/orderer/ab_pb2.py index 03197766433..c213f0cad71 100644 --- a/bddtests/orderer/ab_pb2.py +++ b/bddtests/orderer/ab_pb2.py @@ -20,38 +20,34 @@ name='orderer/ab.proto', package='orderer', syntax='proto3', - serialized_pb=_b('\n\x10orderer/ab.proto\x12\x07orderer\x1a\x13\x63ommon/common.proto\"3\n\x11\x42roadcastResponse\x12\x1e\n\x06Status\x18\x01 \x01(\x0e\x32\x0e.common.Status\"\xa8\x01\n\x08SeekInfo\x12*\n\x05Start\x18\x01 \x01(\x0e\x32\x1b.orderer.SeekInfo.StartType\x12\x17\n\x0fSpecifiedNumber\x18\x02 \x01(\x04\x12\x12\n\nWindowSize\x18\x03 \x01(\x04\x12\x0f\n\x07\x43hainID\x18\x04 \x01(\x0c\"2\n\tStartType\x12\n\n\x06NEWEST\x10\x00\x12\n\n\x06OLDEST\x10\x01\x12\r\n\tSPECIFIED\x10\x02\"!\n\x0f\x41\x63knowledgement\x12\x0e\n\x06Number\x18\x01 \x01(\x04\"o\n\rDeliverUpdate\x12\x33\n\x0f\x41\x63knowledgement\x18\x01 \x01(\x0b\x32\x18.orderer.AcknowledgementH\x00\x12!\n\x04Seek\x18\x02 \x01(\x0b\x32\x11.orderer.SeekInfoH\x00\x42\x06\n\x04Type\"Z\n\x0f\x44\x65liverResponse\x12\x1f\n\x05\x45rror\x18\x01 \x01(\x0e\x32\x0e.common.StatusH\x00\x12\x1e\n\x05\x42lock\x18\x02 \x01(\x0b\x32\r.common.BlockH\x00\x42\x06\n\x04Type2\x95\x01\n\x0f\x41tomicBroadcast\x12?\n\tBroadcast\x12\x10.common.Envelope\x1a\x1a.orderer.BroadcastResponse\"\x00(\x01\x30\x01\x12\x41\n\x07\x44\x65liver\x12\x16.orderer.DeliverUpdate\x1a\x18.orderer.DeliverResponse\"\x00(\x01\x30\x01\x42.Z,github.com/hyperledger/fabric/protos/ordererb\x06proto3') + serialized_pb=_b('\n\x10orderer/ab.proto\x12\x07orderer\x1a\x13\x63ommon/common.proto\"3\n\x11\x42roadcastResponse\x12\x1e\n\x06status\x18\x01 \x01(\x0e\x32\x0e.common.Status\"\x0c\n\nSeekNewest\"\x0c\n\nSeekOldest\"\x1f\n\rSeekSpecified\x12\x0e\n\x06number\x18\x01 \x01(\x04\"\x91\x01\n\x0cSeekPosition\x12%\n\x06newest\x18\x01 \x01(\x0b\x32\x13.orderer.SeekNewestH\x00\x12%\n\x06oldest\x18\x02 \x01(\x0b\x32\x13.orderer.SeekOldestH\x00\x12+\n\tspecified\x18\x03 \x01(\x0b\x32\x16.orderer.SeekSpecifiedH\x00\x42\x06\n\x04Type\"\xd6\x01\n\x08SeekInfo\x12\x0f\n\x07\x63hainID\x18\x01 \x01(\t\x12$\n\x05start\x18\x02 \x01(\x0b\x32\x15.orderer.SeekPosition\x12#\n\x04stop\x18\x03 \x01(\x0b\x32\x15.orderer.SeekPosition\x12\x30\n\x08\x62\x65havior\x18\x04 \x01(\x0e\x32\x1e.orderer.SeekInfo.SeekBehavior\"<\n\x0cSeekBehavior\x12\x15\n\x11\x42LOCK_UNTIL_READY\x10\x00\x12\x15\n\x11\x46\x41IL_IF_NOT_READY\x10\x01\"[\n\x0f\x44\x65liverResponse\x12 \n\x06status\x18\x01 \x01(\x0e\x32\x0e.common.StatusH\x00\x12\x1e\n\x05\x62lock\x18\x02 \x01(\x0b\x32\r.common.BlockH\x00\x42\x06\n\x04Type2\x90\x01\n\x0f\x41tomicBroadcast\x12?\n\tBroadcast\x12\x10.common.Envelope\x1a\x1a.orderer.BroadcastResponse\"\x00(\x01\x30\x01\x12<\n\x07\x44\x65liver\x12\x11.orderer.SeekInfo\x1a\x18.orderer.DeliverResponse\"\x00(\x01\x30\x01\x42.Z,github.com/hyperledger/fabric/protos/ordererb\x06proto3') , dependencies=[common_dot_common__pb2.DESCRIPTOR,]) _sym_db.RegisterFileDescriptor(DESCRIPTOR) -_SEEKINFO_STARTTYPE = _descriptor.EnumDescriptor( - name='StartType', - full_name='orderer.SeekInfo.StartType', +_SEEKINFO_SEEKBEHAVIOR = _descriptor.EnumDescriptor( + name='SeekBehavior', + full_name='orderer.SeekInfo.SeekBehavior', filename=None, file=DESCRIPTOR, values=[ _descriptor.EnumValueDescriptor( - name='NEWEST', index=0, number=0, + name='BLOCK_UNTIL_READY', index=0, number=0, options=None, type=None), _descriptor.EnumValueDescriptor( - name='OLDEST', index=1, number=1, - options=None, - type=None), - _descriptor.EnumValueDescriptor( - name='SPECIFIED', index=2, number=2, + name='FAIL_IF_NOT_READY', index=1, number=1, options=None, type=None), ], containing_type=None, options=None, - serialized_start=222, - serialized_end=272, + serialized_start=467, + serialized_end=527, ) -_sym_db.RegisterEnumDescriptor(_SEEKINFO_STARTTYPE) +_sym_db.RegisterEnumDescriptor(_SEEKINFO_SEEKBEHAVIOR) _BROADCASTRESPONSE = _descriptor.Descriptor( @@ -62,7 +58,7 @@ containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='Status', full_name='orderer.BroadcastResponse.Status', index=0, + name='status', full_name='orderer.BroadcastResponse.status', index=0, number=1, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, @@ -85,47 +81,18 @@ ) -_SEEKINFO = _descriptor.Descriptor( - name='SeekInfo', - full_name='orderer.SeekInfo', +_SEEKNEWEST = _descriptor.Descriptor( + name='SeekNewest', + full_name='orderer.SeekNewest', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ - _descriptor.FieldDescriptor( - name='Start', full_name='orderer.SeekInfo.Start', index=0, - number=1, type=14, cpp_type=8, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='SpecifiedNumber', full_name='orderer.SeekInfo.SpecifiedNumber', index=1, - number=2, type=4, cpp_type=4, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='WindowSize', full_name='orderer.SeekInfo.WindowSize', index=2, - number=3, type=4, cpp_type=4, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='ChainID', full_name='orderer.SeekInfo.ChainID', index=3, - number=4, type=12, cpp_type=9, label=1, - has_default_value=False, default_value=_b(""), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), ], extensions=[ ], nested_types=[], enum_types=[ - _SEEKINFO_STARTTYPE, ], options=None, is_extendable=False, @@ -133,20 +100,44 @@ extension_ranges=[], oneofs=[ ], - serialized_start=104, - serialized_end=272, + serialized_start=103, + serialized_end=115, ) -_ACKNOWLEDGEMENT = _descriptor.Descriptor( - name='Acknowledgement', - full_name='orderer.Acknowledgement', +_SEEKOLDEST = _descriptor.Descriptor( + name='SeekOldest', + full_name='orderer.SeekOldest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=117, + serialized_end=129, +) + + +_SEEKSPECIFIED = _descriptor.Descriptor( + name='SeekSpecified', + full_name='orderer.SeekSpecified', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='Number', full_name='orderer.Acknowledgement.Number', index=0, + name='number', full_name='orderer.SeekSpecified.number', index=0, number=1, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, @@ -164,32 +155,39 @@ extension_ranges=[], oneofs=[ ], - serialized_start=274, - serialized_end=307, + serialized_start=131, + serialized_end=162, ) -_DELIVERUPDATE = _descriptor.Descriptor( - name='DeliverUpdate', - full_name='orderer.DeliverUpdate', +_SEEKPOSITION = _descriptor.Descriptor( + name='SeekPosition', + full_name='orderer.SeekPosition', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='Acknowledgement', full_name='orderer.DeliverUpdate.Acknowledgement', index=0, + name='newest', full_name='orderer.SeekPosition.newest', index=0, number=1, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='Seek', full_name='orderer.DeliverUpdate.Seek', index=1, + name='oldest', full_name='orderer.SeekPosition.oldest', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), + _descriptor.FieldDescriptor( + name='specified', full_name='orderer.SeekPosition.specified', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), ], extensions=[ ], @@ -202,11 +200,64 @@ extension_ranges=[], oneofs=[ _descriptor.OneofDescriptor( - name='Type', full_name='orderer.DeliverUpdate.Type', + name='Type', full_name='orderer.SeekPosition.Type', index=0, containing_type=None, fields=[]), ], - serialized_start=309, - serialized_end=420, + serialized_start=165, + serialized_end=310, +) + + +_SEEKINFO = _descriptor.Descriptor( + name='SeekInfo', + full_name='orderer.SeekInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='chainID', full_name='orderer.SeekInfo.chainID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='start', full_name='orderer.SeekInfo.start', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='stop', full_name='orderer.SeekInfo.stop', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='behavior', full_name='orderer.SeekInfo.behavior', index=3, + number=4, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _SEEKINFO_SEEKBEHAVIOR, + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=313, + serialized_end=527, ) @@ -218,14 +269,14 @@ containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='Error', full_name='orderer.DeliverResponse.Error', index=0, + name='status', full_name='orderer.DeliverResponse.status', index=0, number=1, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='Block', full_name='orderer.DeliverResponse.Block', index=1, + name='block', full_name='orderer.DeliverResponse.block', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, @@ -246,33 +297,41 @@ name='Type', full_name='orderer.DeliverResponse.Type', index=0, containing_type=None, fields=[]), ], - serialized_start=422, - serialized_end=512, + serialized_start=529, + serialized_end=620, ) -_BROADCASTRESPONSE.fields_by_name['Status'].enum_type = common_dot_common__pb2._STATUS -_SEEKINFO.fields_by_name['Start'].enum_type = _SEEKINFO_STARTTYPE -_SEEKINFO_STARTTYPE.containing_type = _SEEKINFO -_DELIVERUPDATE.fields_by_name['Acknowledgement'].message_type = _ACKNOWLEDGEMENT -_DELIVERUPDATE.fields_by_name['Seek'].message_type = _SEEKINFO -_DELIVERUPDATE.oneofs_by_name['Type'].fields.append( - _DELIVERUPDATE.fields_by_name['Acknowledgement']) -_DELIVERUPDATE.fields_by_name['Acknowledgement'].containing_oneof = _DELIVERUPDATE.oneofs_by_name['Type'] -_DELIVERUPDATE.oneofs_by_name['Type'].fields.append( - _DELIVERUPDATE.fields_by_name['Seek']) -_DELIVERUPDATE.fields_by_name['Seek'].containing_oneof = _DELIVERUPDATE.oneofs_by_name['Type'] -_DELIVERRESPONSE.fields_by_name['Error'].enum_type = common_dot_common__pb2._STATUS -_DELIVERRESPONSE.fields_by_name['Block'].message_type = common_dot_common__pb2._BLOCK +_BROADCASTRESPONSE.fields_by_name['status'].enum_type = common_dot_common__pb2._STATUS +_SEEKPOSITION.fields_by_name['newest'].message_type = _SEEKNEWEST +_SEEKPOSITION.fields_by_name['oldest'].message_type = _SEEKOLDEST +_SEEKPOSITION.fields_by_name['specified'].message_type = _SEEKSPECIFIED +_SEEKPOSITION.oneofs_by_name['Type'].fields.append( + _SEEKPOSITION.fields_by_name['newest']) +_SEEKPOSITION.fields_by_name['newest'].containing_oneof = _SEEKPOSITION.oneofs_by_name['Type'] +_SEEKPOSITION.oneofs_by_name['Type'].fields.append( + _SEEKPOSITION.fields_by_name['oldest']) +_SEEKPOSITION.fields_by_name['oldest'].containing_oneof = _SEEKPOSITION.oneofs_by_name['Type'] +_SEEKPOSITION.oneofs_by_name['Type'].fields.append( + _SEEKPOSITION.fields_by_name['specified']) +_SEEKPOSITION.fields_by_name['specified'].containing_oneof = _SEEKPOSITION.oneofs_by_name['Type'] +_SEEKINFO.fields_by_name['start'].message_type = _SEEKPOSITION +_SEEKINFO.fields_by_name['stop'].message_type = _SEEKPOSITION +_SEEKINFO.fields_by_name['behavior'].enum_type = _SEEKINFO_SEEKBEHAVIOR +_SEEKINFO_SEEKBEHAVIOR.containing_type = _SEEKINFO +_DELIVERRESPONSE.fields_by_name['status'].enum_type = common_dot_common__pb2._STATUS +_DELIVERRESPONSE.fields_by_name['block'].message_type = common_dot_common__pb2._BLOCK _DELIVERRESPONSE.oneofs_by_name['Type'].fields.append( - _DELIVERRESPONSE.fields_by_name['Error']) -_DELIVERRESPONSE.fields_by_name['Error'].containing_oneof = _DELIVERRESPONSE.oneofs_by_name['Type'] + _DELIVERRESPONSE.fields_by_name['status']) +_DELIVERRESPONSE.fields_by_name['status'].containing_oneof = _DELIVERRESPONSE.oneofs_by_name['Type'] _DELIVERRESPONSE.oneofs_by_name['Type'].fields.append( - _DELIVERRESPONSE.fields_by_name['Block']) -_DELIVERRESPONSE.fields_by_name['Block'].containing_oneof = _DELIVERRESPONSE.oneofs_by_name['Type'] + _DELIVERRESPONSE.fields_by_name['block']) +_DELIVERRESPONSE.fields_by_name['block'].containing_oneof = _DELIVERRESPONSE.oneofs_by_name['Type'] DESCRIPTOR.message_types_by_name['BroadcastResponse'] = _BROADCASTRESPONSE +DESCRIPTOR.message_types_by_name['SeekNewest'] = _SEEKNEWEST +DESCRIPTOR.message_types_by_name['SeekOldest'] = _SEEKOLDEST +DESCRIPTOR.message_types_by_name['SeekSpecified'] = _SEEKSPECIFIED +DESCRIPTOR.message_types_by_name['SeekPosition'] = _SEEKPOSITION DESCRIPTOR.message_types_by_name['SeekInfo'] = _SEEKINFO -DESCRIPTOR.message_types_by_name['Acknowledgement'] = _ACKNOWLEDGEMENT -DESCRIPTOR.message_types_by_name['DeliverUpdate'] = _DELIVERUPDATE DESCRIPTOR.message_types_by_name['DeliverResponse'] = _DELIVERRESPONSE BroadcastResponse = _reflection.GeneratedProtocolMessageType('BroadcastResponse', (_message.Message,), dict( @@ -282,26 +341,40 @@ )) _sym_db.RegisterMessage(BroadcastResponse) -SeekInfo = _reflection.GeneratedProtocolMessageType('SeekInfo', (_message.Message,), dict( - DESCRIPTOR = _SEEKINFO, +SeekNewest = _reflection.GeneratedProtocolMessageType('SeekNewest', (_message.Message,), dict( + DESCRIPTOR = _SEEKNEWEST, __module__ = 'orderer.ab_pb2' - # @@protoc_insertion_point(class_scope:orderer.SeekInfo) + # @@protoc_insertion_point(class_scope:orderer.SeekNewest) )) -_sym_db.RegisterMessage(SeekInfo) +_sym_db.RegisterMessage(SeekNewest) -Acknowledgement = _reflection.GeneratedProtocolMessageType('Acknowledgement', (_message.Message,), dict( - DESCRIPTOR = _ACKNOWLEDGEMENT, +SeekOldest = _reflection.GeneratedProtocolMessageType('SeekOldest', (_message.Message,), dict( + DESCRIPTOR = _SEEKOLDEST, __module__ = 'orderer.ab_pb2' - # @@protoc_insertion_point(class_scope:orderer.Acknowledgement) + # @@protoc_insertion_point(class_scope:orderer.SeekOldest) )) -_sym_db.RegisterMessage(Acknowledgement) +_sym_db.RegisterMessage(SeekOldest) -DeliverUpdate = _reflection.GeneratedProtocolMessageType('DeliverUpdate', (_message.Message,), dict( - DESCRIPTOR = _DELIVERUPDATE, +SeekSpecified = _reflection.GeneratedProtocolMessageType('SeekSpecified', (_message.Message,), dict( + DESCRIPTOR = _SEEKSPECIFIED, __module__ = 'orderer.ab_pb2' - # @@protoc_insertion_point(class_scope:orderer.DeliverUpdate) + # @@protoc_insertion_point(class_scope:orderer.SeekSpecified) )) -_sym_db.RegisterMessage(DeliverUpdate) +_sym_db.RegisterMessage(SeekSpecified) + +SeekPosition = _reflection.GeneratedProtocolMessageType('SeekPosition', (_message.Message,), dict( + DESCRIPTOR = _SEEKPOSITION, + __module__ = 'orderer.ab_pb2' + # @@protoc_insertion_point(class_scope:orderer.SeekPosition) + )) +_sym_db.RegisterMessage(SeekPosition) + +SeekInfo = _reflection.GeneratedProtocolMessageType('SeekInfo', (_message.Message,), dict( + DESCRIPTOR = _SEEKINFO, + __module__ = 'orderer.ab_pb2' + # @@protoc_insertion_point(class_scope:orderer.SeekInfo) + )) +_sym_db.RegisterMessage(SeekInfo) DeliverResponse = _reflection.GeneratedProtocolMessageType('DeliverResponse', (_message.Message,), dict( DESCRIPTOR = _DELIVERRESPONSE, @@ -335,7 +408,7 @@ def __init__(self, channel): ) self.Deliver = channel.stream_stream( '/orderer.AtomicBroadcast/Deliver', - request_serializer=DeliverUpdate.SerializeToString, + request_serializer=SeekInfo.SerializeToString, response_deserializer=DeliverResponse.FromString, ) @@ -351,8 +424,6 @@ def Broadcast(self, request_iterator, context): def Deliver(self, request_iterator, context): """deliver first requires an update containing a seek message, then a stream of block replies is received. - The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart - To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') @@ -368,7 +439,7 @@ def add_AtomicBroadcastServicer_to_server(servicer, server): ), 'Deliver': grpc.stream_stream_rpc_method_handler( servicer.Deliver, - request_deserializer=DeliverUpdate.FromString, + request_deserializer=SeekInfo.FromString, response_serializer=DeliverResponse.SerializeToString, ), } @@ -384,8 +455,6 @@ def Broadcast(self, request_iterator, context): context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) def Deliver(self, request_iterator, context): """deliver first requires an update containing a seek message, then a stream of block replies is received. - The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart - To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement """ context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) @@ -397,8 +466,6 @@ def Broadcast(self, request_iterator, timeout, metadata=None, with_call=False, p raise NotImplementedError() def Deliver(self, request_iterator, timeout, metadata=None, with_call=False, protocol_options=None): """deliver first requires an update containing a seek message, then a stream of block replies is received. - The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart - To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement """ raise NotImplementedError() @@ -406,7 +473,7 @@ def Deliver(self, request_iterator, timeout, metadata=None, with_call=False, pro def beta_create_AtomicBroadcast_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None): request_deserializers = { ('orderer.AtomicBroadcast', 'Broadcast'): common_dot_common__pb2.Envelope.FromString, - ('orderer.AtomicBroadcast', 'Deliver'): DeliverUpdate.FromString, + ('orderer.AtomicBroadcast', 'Deliver'): SeekInfo.FromString, } response_serializers = { ('orderer.AtomicBroadcast', 'Broadcast'): BroadcastResponse.SerializeToString, @@ -423,7 +490,7 @@ def beta_create_AtomicBroadcast_server(servicer, pool=None, pool_size=None, defa def beta_create_AtomicBroadcast_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None): request_serializers = { ('orderer.AtomicBroadcast', 'Broadcast'): common_dot_common__pb2.Envelope.SerializeToString, - ('orderer.AtomicBroadcast', 'Deliver'): DeliverUpdate.SerializeToString, + ('orderer.AtomicBroadcast', 'Deliver'): SeekInfo.SerializeToString, } response_deserializers = { ('orderer.AtomicBroadcast', 'Broadcast'): BroadcastResponse.FromString, diff --git a/bddtests/steps/orderer_impl.py b/bddtests/steps/orderer_impl.py index d69a9be45ce..f3a088e245c 100644 --- a/bddtests/steps/orderer_impl.py +++ b/bddtests/steps/orderer_impl.py @@ -42,29 +42,34 @@ def step_impl(context, enrollId, numMsgsToBroadcast, composeService): userRegistration.broadcastMessages(context, numMsgsToBroadcast, composeService) -@when(u'user "{enrollId}" connects to deliver function on "{composeService}" with Ack of "{sendAck}" and properties') -def step_impl(context, enrollId, composeService, sendAck): +@when(u'user "{enrollId}" connects to deliver function on "{composeService}"') +def step_impl(context, enrollId, composeService): # First get the properties - assert 'table' in context, "table (Start | SpecifiedNumber| WindowSize) not found in context" - row = context.table.rows[0] - start, SpecifiedNumber, WindowSize = row['Start'], int(row['SpecifiedNumber']), int(row['WindowSize']) - + assert 'table' in context, "table (Start | End) not found in context" userRegistration = orderer_util.getUserRegistration(context, enrollId) - userRegistration.connectToDeliverFunction(context, sendAck, start, SpecifiedNumber, WindowSize, composeService) + streamHelper = userRegistration.connectToDeliverFunction(context, composeService) @then(u'user "{enrollId}" should get a delivery from "{composeService}" of "{expectedBlocks}" blocks with "{numMsgsToBroadcast}" messages within "{batchTimeout}" seconds') def step_impl(context, enrollId, expectedBlocks, numMsgsToBroadcast, batchTimeout, composeService): userRegistration = orderer_util.getUserRegistration(context, enrollId) streamHelper = userRegistration.getDelivererStreamHelper(context, composeService) - delivererQueue = streamHelper.readDeliveredMessages(long(expectedBlocks)) + blocks = streamHelper.getBlocks() # Verify block count - blocks = [msg.Block for msg in delivererQueue if msg.Block] assert len(blocks) == int(expectedBlocks), "Expected {0} blocks, received {1}".format(expectedBlocks, len(blocks)) -@when(u'user "{enrollId}" seeks to block "{blockToSeekTo}" on deliver function on "{composeService}"') -def step_impl(context, enrollId, blockToSeekTo, composeService): +def convertSeek(utfString): + try: + return int(utfString) + except ValueError: + return str(utfString) + +@when(u'user "{enrollId}" sends deliver a seek request on "{composeService}" with properties') +def step_impl(context, enrollId, composeService): + row = context.table.rows[0] + start, end, = convertSeek(row['Start']), convertSeek(row['End']) + userRegistration = orderer_util.getUserRegistration(context, enrollId) streamHelper = userRegistration.getDelivererStreamHelper(context, composeService) - streamHelper.seekToBlock(long(blockToSeekTo)) + streamHelper.seekToRange(start = start, end = end) diff --git a/bddtests/steps/orderer_util.py b/bddtests/steps/orderer_util.py index a71cfc0b40c..d1ac629b96a 100644 --- a/bddtests/steps/orderer_util.py +++ b/bddtests/steps/orderer_util.py @@ -35,12 +35,13 @@ from grpc.beta.interfaces import StatusCode from common.common_pb2 import Payload +# The default chain ID when the system is statically bootstrapped for testing +TEST_CHAIN_ID = "**TEST_CHAINID**" class StreamHelper: - def __init__(self, ordererStub): + def __init__(self): self.streamClosed = False - self.ordererStub = ordererStub self.sendQueue = Queue.Queue() self.receivedMessages = [] self.replyGenerator = None @@ -49,8 +50,7 @@ def setReplyGenerator(self, replyGenerator): assert self.replyGenerator == None, "reply generator already set!!" self.replyGenerator = replyGenerator - def createSendGenerator(self, firstMsg, timeout = 2): - yield firstMsg + def createSendGenerator(self, timeout = 2): while True: try: nextMsg = self.sendQueue.get(True, timeout) @@ -62,13 +62,18 @@ def createSendGenerator(self, firstMsg, timeout = 2): except Queue.Empty: return + def readMessage(self): + for reply in self.readMessages(1): + return reply + assert False, "Received no messages" + def readMessages(self, expectedCount): msgsReceived = [] counter = 0 try: for reply in self.replyGenerator: counter += 1 - print("{0} received reply: {1}, counter = {2}".format("DeliverStreamHelper", reply, counter)) + print("received reply: {0}, counter = {1}".format(reply, counter)) msgsReceived.append(reply) if counter == int(expectedCount): break @@ -87,53 +92,35 @@ def handleNetworkError(self, networkError): class DeliverStreamHelper(StreamHelper): - def __init__(self, ordererStub, sendAck, Start, SpecifiedNumber, WindowSize, timeout = 1): - StreamHelper.__init__(self, ordererStub) - #Set the ack flag - trueOptions = ['true', 'True','yes','Yes'] - falseOptions = ['false', 'False', 'no', 'No'] - assert sendAck in trueOptions + falseOptions, "sendAck of '{0}' not recognized, expected one of '{1}'".format(sendAck, trueOptions + falseOptions) - self.sendAck = sendAck in trueOptions + def __init__(self, ordererStub, timeout = 1): + StreamHelper.__init__(self) # Set the UpdateMessage and start the stream - self.deliverUpdateMsg = createDeliverUpdateMsg(Start, SpecifiedNumber, WindowSize) - sendGenerator = self.createSendGenerator(self.deliverUpdateMsg, timeout) - replyGenerator = ordererStub.Deliver(sendGenerator, timeout + 1) - self.replyGenerator = replyGenerator - - def seekToBlock(self, blockNum): - deliverUpdateMsg = ab_pb2.DeliverUpdate() - deliverUpdateMsg.CopyFrom(self.deliverUpdateMsg) - deliverUpdateMsg.Seek.SpecifiedNumber = blockNum - self.sendQueue.put(deliverUpdateMsg) + sendGenerator = self.createSendGenerator(timeout) + self.replyGenerator = ordererStub.Deliver(sendGenerator, timeout + 1) - def sendAcknowledgment(self, blockNum): - deliverUpdateMsg = ab_pb2.DeliverUpdate(Acknowledgement = ab_pb2.Acknowledgement(Number = blockNum)) - self.sendQueue.put(deliverUpdateMsg) - - def getWindowSize(self): - return self.deliverUpdateMsg.Seek.WindowSize - - def readDeliveredMessages(self, expectedCount): - 'Read the expected number of messages, being sure to supply the ACK if sendAck is True' - if not self.sendAck: - return self.readMessages(expectedCount) - else: - # This block assumes the expectedCount is a multiple of the windowSize - msgsRead = [] - while len(msgsRead) < expectedCount and self.streamClosed == False: - numToRead = self.getWindowSize() if self.getWindowSize() < expectedCount else expectedCount - msgsRead.extend(self.readMessages(numToRead)) - # send the ack - self.sendAcknowledgment(msgsRead[-1].Block.Header.Number) - print('SentACK!!') - print('') - return msgsRead + def seekToRange(self, chainID = TEST_CHAIN_ID, start = 'Oldest', end = 'Newest'): + self.sendQueue.put(createSeekInfo(start = start)) + def getBlocks(self): + blocks = [] + try: + while True: + reply = self.readMessage() + if reply.HasField("block"): + blocks.append(reply.block) + print("received reply: {0}, len(blocks) = {1}".format(reply, len(blocks))) + else: + if reply.status != common_pb2.SUCCESS: + print("Got error: {0}".format(reply.status)) + print("Done receiving blocks") + break + except Exception as e: + print("getBlocks got error: {0}".format(e) ) + return blocks class UserRegistration: - def __init__(self, secretMsg, composeService): self.enrollId = secretMsg['enrollId'] self.secretMsg = secretMsg @@ -148,10 +135,10 @@ def getUserName(self): return self.secretMsg['enrollId'] - def connectToDeliverFunction(self, context, sendAck, start, SpecifiedNumber, WindowSize, composeService): + def connectToDeliverFunction(self, context, composeService): 'Connect to the deliver function and drain messages to associated orderer queue' assert not composeService in self.abDeliversStreamHelperDict, "Already connected to deliver stream on {0}".format(composeService) - streamHelper = DeliverStreamHelper(self.getABStubForComposeService(context, composeService), sendAck, start, SpecifiedNumber, WindowSize) + streamHelper = DeliverStreamHelper(self.getABStubForComposeService(context, composeService)) self.abDeliversStreamHelperDict[composeService] = streamHelper return streamHelper @@ -161,9 +148,10 @@ def getDelivererStreamHelper(self, context, composeService): return self.abDeliversStreamHelperDict[composeService] + def broadcastMessages(self, context, numMsgsToBroadcast, composeService): abStub = self.getABStubForComposeService(context, composeService) - replyGenerator = abStub.Broadcast(generateBroadcastMessages(int(numMsgsToBroadcast)),2) + replyGenerator = abStub.Broadcast(generateBroadcastMessages(numToGenerate = int(numMsgsToBroadcast)), 2) counter = 0 try: for reply in replyGenerator: @@ -188,9 +176,6 @@ def getABStubForComposeService(self, context, composeService): self.atomicBroadcastStubsDict[composeService] = newABStub return newABStub -# The default chain ID when the system is statically bootstrapped for testing -TEST_CHAIN_ID = "**TEST_CHAINID**".encode() - # Registerses a user on a specific composeService def registerUser(context, secretMsg, composeService): userName = secretMsg['enrollId'] @@ -216,23 +201,37 @@ def getUserRegistration(context, enrollId): raise Exception("Orderer user has not been registered: {0}".format(enrollId)) return userRegistration -def createDeliverUpdateMsg(Start, SpecifiedNumber, WindowSize): - seek = ab_pb2.SeekInfo() - startVal = seek.__getattribute__(Start) - seekInfo = ab_pb2.SeekInfo(Start = startVal, SpecifiedNumber = SpecifiedNumber, WindowSize = WindowSize, ChainID = TEST_CHAIN_ID) - deliverUpdateMsg = ab_pb2.DeliverUpdate(Seek = seekInfo) - return deliverUpdateMsg +def seekPosition(position): + if position == 'Oldest': + return ab_pb2.SeekPosition(oldest = ab_pb2.SeekOldest()) + elif position == 'Newest': + return ab_pb2.SeekPosition(newest = ab_pb2.SeekNewest()) + else: + return ab_pb2.SeekPosition(specified = ab_pb2.SeekSpecified(number = position)) +def createSeekInfo(chainID = TEST_CHAIN_ID, start = 'Oldest', end = 'Newest', behavior = 'FAIL_IF_NOT_READY'): + return ab_pb2.SeekInfo( + chainID = chainID, + start = seekPosition(start), + stop = seekPosition(end), + behavior = ab_pb2.SeekInfo.SeekBehavior.Value(behavior), + ) -def generateBroadcastMessages(numToGenerate = 1, timeToHoldOpen = 1): +def generateBroadcastMessages(chainID = TEST_CHAIN_ID, numToGenerate = 1, timeToHoldOpen = 1): messages = [] for i in range(0, numToGenerate): - envelope = common_pb2.Envelope() - payload = common_pb2.Payload(header = common_pb2.Header(chainHeader = common_pb2.ChainHeader())) - payload.header.chainHeader.chainID = TEST_CHAIN_ID - payload.header.chainHeader.type = common_pb2.ENDORSER_TRANSACTION - payload.data = str("BDD test: {0}".format(datetime.datetime.utcnow())) - envelope.payload = payload.SerializeToString() + payload = common_pb2.Payload( + header = common_pb2.Header( + chainHeader = common_pb2.ChainHeader( + chainID = chainID, + type = common_pb2.ENDORSER_TRANSACTION, + ) + ), + data = str("BDD test: {0}".format(datetime.datetime.utcnow())), + ) + envelope = common_pb2.Envelope( + payload = payload.SerializeToString() + ) messages.append(envelope) for msg in messages: yield msg diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index 1cb53758ea4..c4d9496541d 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -18,6 +18,7 @@ package noopssinglechain import ( "fmt" + "math" "time" "github.com/golang/protobuf/proto" @@ -49,9 +50,7 @@ func init() { // DeliverService used to communicate with orderers to obtain // new block and send the to the committer service type DeliverService struct { - client orderer.AtomicBroadcast_DeliverClient - windowSize uint64 - unAcknowledged uint64 + client orderer.AtomicBroadcast_DeliverClient chainID string conn *grpc.ClientConn @@ -71,8 +70,7 @@ func NewDeliverService(chainID string) *DeliverService { logger.Infof("Creating committer for single noops endorser") deliverService := &DeliverService{ // Instance of RawLedger - chainID: chainID, - windowSize: 10, + chainID: chainID, } return deliverService @@ -162,27 +160,20 @@ func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer) } func (d *DeliverService) seekOldest() error { - return d.client.Send(&orderer.DeliverUpdate{ - Type: &orderer.DeliverUpdate_Seek{ - Seek: &orderer.SeekInfo{ - Start: orderer.SeekInfo_OLDEST, - WindowSize: d.windowSize, - ChainID: d.chainID, - }, - }, + return d.client.Send(&orderer.SeekInfo{ + ChainID: d.chainID, + Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}}, + Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY, }) } func (d *DeliverService) seekLatestFromCommitter(height uint64) error { - return d.client.Send(&orderer.DeliverUpdate{ - Type: &orderer.DeliverUpdate_Seek{ - Seek: &orderer.SeekInfo{ - Start: orderer.SeekInfo_SPECIFIED, - WindowSize: d.windowSize, - SpecifiedNumber: height, - ChainID: d.chainID, - }, - }, + return d.client.Send(&orderer.SeekInfo{ + ChainID: d.chainID, + Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}}, + Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY, }) } @@ -243,9 +234,9 @@ func (d *DeliverService) readUntilClose() { return } switch t := msg.Type.(type) { - case *orderer.DeliverResponse_Error: - if t.Error == common.Status_SUCCESS { - logger.Warning("ERROR! Received success in error field") + case *orderer.DeliverResponse_Status: + if t.Status == common.Status_SUCCESS { + logger.Warning("ERROR! Received success for a seek that should never complete") return } logger.Warning("Got error ", t) @@ -311,21 +302,6 @@ func (d *DeliverService) readUntilClose() { logger.Errorf("Error sending block event %s", err) } - d.unAcknowledged++ - if d.unAcknowledged >= d.windowSize/2 { - logger.Warningf("Sending acknowledgement [%d]", t.Block.Header.Number) - err = d.client.Send(&orderer.DeliverUpdate{ - Type: &orderer.DeliverUpdate_Acknowledgement{ - Acknowledgement: &orderer.Acknowledgement{ - Number: seqNum, - }, - }, - }) - if err != nil { - return - } - d.unAcknowledged = 0 - } default: logger.Warning("Received unknown: ", t) return diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index 4b2fbb4d26f..611a52823f2 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -50,171 +50,88 @@ type Support interface { } type deliverServer struct { - sm SupportManager - maxWindow int + sm SupportManager } // NewHandlerImpl creates an implementation of the Handler interface -func NewHandlerImpl(sm SupportManager, maxWindow int) Handler { +func NewHandlerImpl(sm SupportManager) Handler { return &deliverServer{ - sm: sm, - maxWindow: maxWindow, + sm: sm, } } func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { - logger.Debugf("Starting new Deliver loop") - d := newDeliverer(ds, srv) - return d.recv() - -} - -type deliverer struct { - ds *deliverServer - srv ab.AtomicBroadcast_DeliverServer - cursor rawledger.Iterator - nextBlockNumber uint64 - windowSize uint64 - lastAck uint64 - recvChan chan *ab.DeliverUpdate - exitChan chan struct{} -} + logger.Debugf("Starting new deliver loop") + for { + logger.Debugf("Attempting to read seek info message") + seekInfo, err := srv.Recv() + if err != nil { + logger.Errorf("Error reading from stream: %s", err) + return err + } + logger.Debugf("Received message %v", seekInfo) -func newDeliverer(ds *deliverServer, srv ab.AtomicBroadcast_DeliverServer) *deliverer { - d := &deliverer{ - ds: ds, - srv: srv, - exitChan: make(chan struct{}), - recvChan: make(chan *ab.DeliverUpdate), - } - go d.main() - return d -} + chain, ok := ds.sm.GetChain(seekInfo.ChainID) + if !ok { + return sendStatusReply(srv, cb.Status_NOT_FOUND) + } -func (d *deliverer) halt() { - close(d.exitChan) -} + // XXX add deliver authorization checking + + cursor, number := chain.Reader().Iterator(seekInfo.Start) + var stopNum uint64 + switch stop := seekInfo.Stop.Type.(type) { + case *ab.SeekPosition_Oldest: + stopNum = number + case *ab.SeekPosition_Newest: + stopNum = chain.Reader().Height() - 1 + case *ab.SeekPosition_Specified: + stopNum = stop.Specified.Number + } -func (d *deliverer) main() { - var signal <-chan struct{} - for { - select { - case update := <-d.recvChan: - logger.Debugf("Receiving message %v", update) - switch t := update.Type.(type) { - case *ab.DeliverUpdate_Acknowledgement: - logger.Debugf("Received acknowledgement from client") - d.lastAck = t.Acknowledgement.Number - case *ab.DeliverUpdate_Seek: - if !d.processUpdate(t.Seek) { - return + for { + if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY { + <-cursor.ReadyChan() + } else { + select { + case <-cursor.ReadyChan(): + default: + return sendStatusReply(srv, cb.Status_NOT_FOUND) } - case nil: - logger.Errorf("Nil update") - close(d.exitChan) - return - default: - logger.Errorf("Unknown type: %T:%v", t, t) - close(d.exitChan) - return } - case <-signal: - block, status := d.cursor.Next() + + block, status := cursor.Next() if status != cb.Status_SUCCESS { logger.Errorf("Error reading from channel, cause was: %v", status) - if !d.sendErrorReply(status) { - return - } - d.cursor = nil - } else { - d.nextBlockNumber = block.Header.Number + 1 - if !d.sendBlockReply(block) { - return - } + return sendStatusReply(srv, status) } - case <-d.exitChan: - return - } - if d.cursor == nil { - signal = nil - continue - } + logger.Debugf("Delivering block") + if err := sendBlockReply(srv, block); err != nil { + return err + } - if d.lastAck+d.windowSize < d.nextBlockNumber { - signal = nil - continue + if stopNum == block.Header.Number { + break + } } - logger.Debugf("Room for more blocks, activating channel") - signal = d.cursor.ReadyChan() - } -} - -func (d *deliverer) recv() error { - for { - msg, err := d.srv.Recv() - if err != nil { + if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil { return err } - logger.Debugf("Received message %v", msg) - select { - case <-d.exitChan: - return nil // something has gone wrong enough we want to disconnect - case d.recvChan <- msg: - logger.Debugf("Passed message to main thread") - } + logger.Debugf("Done delivering, waiting for new SeekInfo") } } -func (d *deliverer) sendErrorReply(status cb.Status) bool { - err := d.srv.Send(&ab.DeliverResponse{ - Type: &ab.DeliverResponse_Error{Error: status}, +func sendStatusReply(srv ab.AtomicBroadcast_DeliverServer, status cb.Status) error { + return srv.Send(&ab.DeliverResponse{ + Type: &ab.DeliverResponse_Status{Status: status}, }) - if err != nil { - close(d.exitChan) - return false - } - - return true - } -func (d *deliverer) sendBlockReply(block *cb.Block) bool { - err := d.srv.Send(&ab.DeliverResponse{ +func sendBlockReply(srv ab.AtomicBroadcast_DeliverServer, block *cb.Block) error { + return srv.Send(&ab.DeliverResponse{ Type: &ab.DeliverResponse_Block{Block: block}, }) - - if err != nil { - close(d.exitChan) - return false - } - - return true - -} - -func (d *deliverer) processUpdate(update *ab.SeekInfo) bool { - d.cursor = nil // Even if the seek fails early, we should stop sending blocks from the last request - logger.Debugf("Updating properties for client: %v", update) - - if update == nil || update.WindowSize == 0 || update.WindowSize > uint64(d.ds.maxWindow) || update.ChainID == "" { - close(d.exitChan) - return d.sendErrorReply(cb.Status_BAD_REQUEST) - } - - chain, ok := d.ds.sm.GetChain(update.ChainID) - if !ok { - return d.sendErrorReply(cb.Status_NOT_FOUND) - } - - // XXX add deliver authorization checking - - d.windowSize = update.WindowSize - - d.cursor, d.nextBlockNumber = chain.Reader().Iterator(update.Start, update.SpecifiedNumber) - d.lastAck = d.nextBlockNumber - 1 - - return true } diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index fb6a861af58..b1206a96670 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -42,18 +42,15 @@ func init() { genesisBlock = provisional.New(config.Load()).GenesisBlock() } -// MagicLargestWindow is used as the default max window size for initializing the deliver service -const MagicLargestWindow int = 1000 - type mockD struct { grpc.ServerStream - recvChan chan *ab.DeliverUpdate + recvChan chan *ab.SeekInfo sendChan chan *ab.DeliverResponse } func newMockD() *mockD { return &mockD{ - recvChan: make(chan *ab.DeliverUpdate), + recvChan: make(chan *ab.SeekInfo), sendChan: make(chan *ab.DeliverResponse), } } @@ -63,7 +60,7 @@ func (m *mockD) Send(br *ab.DeliverResponse) error { return nil } -func (m *mockD) Recv() (*ab.DeliverUpdate, error) { +func (m *mockD) Recv() (*ab.SeekInfo, error) { msg, ok := <-m.recvChan if !ok { return msg, fmt.Errorf("Channel closed") @@ -93,7 +90,7 @@ func (mcs *mockSupport) Reader() rawledger.Reader { } func newMockMultichainManager() *mockSupportManager { - _, rl := ramledger.New(ledgerSize, genesisBlock) + _, rl := ramledger.New(ledgerSize+1, genesisBlock) mm := &mockSupportManager{ chains: make(map[string]*mockSupport), } @@ -103,6 +100,13 @@ func newMockMultichainManager() *mockSupportManager { return mm } +var seekOldest = &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}} +var seekNewest = &ab.SeekPosition{Type: &ab.SeekPosition_Newest{}} + +func seekSpecified(number uint64) *ab.SeekPosition { + return &ab.SeekPosition{Type: &ab.SeekPosition_Specified{&ab.SeekSpecified{Number: number}}} +} + func TestOldestSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { @@ -111,26 +115,33 @@ func TestOldestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, MagicLargestWindow) + ds := NewHandlerImpl(mm) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} + m.recvChan <- &ab.SeekInfo{ChainID: systemChainID, Start: seekOldest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY} - count := 0 + count := uint64(0) for { select { case deliverReply := <-m.sendChan: if deliverReply.GetBlock() == nil { - t.Fatalf("Received an error on the reply channel") + if deliverReply.GetStatus() != cb.Status_SUCCESS { + t.Fatalf("Received an error on the reply channel") + } + if count != ledgerSize { + t.Fatalf("Expected %d blocks but got %d", ledgerSize, count) + } + return + } else { + if deliverReply.GetBlock().Header.Number != count { + t.Fatalf("Expected block %d but got block %d", count, deliverReply.GetBlock().Header.Number) + } } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") } count++ - if count == ledgerSize { - break - } } } @@ -142,20 +153,23 @@ func TestNewestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, MagicLargestWindow) + ds := NewHandlerImpl(mm) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST, ChainID: systemChainID}}} + m.recvChan <- &ab.SeekInfo{ChainID: systemChainID, Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY} select { - case blockReply := <-m.sendChan: - if blockReply.GetBlock() == nil { - t.Fatalf("Received an error on the reply channel") - } - - if blockReply.GetBlock().Header.Number != uint64(ledgerSize-1) { - t.Fatalf("Expected only the most recent block") + case deliverReply := <-m.sendChan: + if deliverReply.GetBlock() == nil { + if deliverReply.GetStatus() != cb.Status_SUCCESS { + t.Fatalf("Received an error on the reply channel") + } + return + } else { + if deliverReply.GetBlock().Header.Number != uint64(ledgerSize-1) { + t.Fatalf("Expected only the most recent block") + } } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") @@ -170,54 +184,52 @@ func TestSpecificSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, MagicLargestWindow) + ds := NewHandlerImpl(mm) + specifiedStart := uint64(3) + specifiedStop := uint64(7) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST, ChainID: systemChainID}}} - - select { - case blockReply := <-m.sendChan: - if blockReply.GetBlock() == nil { - t.Fatalf("Received an error on the reply channel") - } + m.recvChan <- &ab.SeekInfo{ChainID: systemChainID, Start: seekSpecified(specifiedStart), Stop: seekSpecified(specifiedStop), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY} - if blockReply.GetBlock().Header.Number != uint64(ledgerSize-1) { - t.Fatalf("Expected only the most recent block") + count := uint64(0) + for { + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetBlock() == nil { + if deliverReply.GetStatus() != cb.Status_SUCCESS { + t.Fatalf("Received an error on the reply channel") + } + return + } else { + if expected := specifiedStart + count; deliverReply.GetBlock().Header.Number != expected { + t.Fatalf("Expected block %d but got block %d", expected, deliverReply.GetBlock().Header.Number) + } + } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") } - case <-time.After(time.Second): - t.Fatalf("Timed out waiting to get all blocks") + count++ } } func TestBadSeek(t *testing.T) { mm := newMockMultichainManager() - for i := 1; i < 2*ledgerSize; i++ { + for i := 1; i < ledgerSize; i++ { mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, MagicLargestWindow) + ds := NewHandlerImpl(mm) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1), ChainID: systemChainID}}} - - select { - case blockReply := <-m.sendChan: - if blockReply.GetError() != cb.Status_NOT_FOUND { - t.Fatalf("Received wrong error on the reply channel") - } - case <-time.After(time.Second): - t.Fatalf("Timed out waiting to get all blocks") - } - - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize), ChainID: systemChainID}}} + m.recvChan <- &ab.SeekInfo{ChainID: systemChainID, Start: seekSpecified(uint64(3 * ledgerSize)), Stop: seekSpecified(uint64(3 * ledgerSize)), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY} select { - case blockReply := <-m.sendChan: - if blockReply.GetError() != cb.Status_NOT_FOUND { + case deliverReply := <-m.sendChan: + if deliverReply.GetStatus() != cb.Status_NOT_FOUND { t.Fatalf("Received wrong error on the reply channel") } case <-time.After(time.Second): @@ -225,67 +237,85 @@ func TestBadSeek(t *testing.T) { } } -func TestBadWindow(t *testing.T) { +func TestFailFastSeek(t *testing.T) { mm := newMockMultichainManager() + for i := 1; i < ledgerSize; i++ { + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, MagicLargestWindow) + ds := NewHandlerImpl(mm) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} + m.recvChan <- &ab.SeekInfo{ChainID: systemChainID, Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_FAIL_IF_NOT_READY} select { - case blockReply := <-m.sendChan: - if blockReply.GetError() != cb.Status_BAD_REQUEST { - t.Fatalf("Received wrong error on the reply channel") + case deliverReply := <-m.sendChan: + if deliverReply.GetBlock() == nil { + t.Fatalf("Expected to receive first block") + } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } + + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetStatus() != cb.Status_NOT_FOUND { + t.Fatalf("Expected to receive failure for second block") } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") } } -func TestAck(t *testing.T) { +func TestBlockingSeek(t *testing.T) { mm := newMockMultichainManager() - windowSize := uint64(2) for i := 1; i < ledgerSize; i++ { mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, MagicLargestWindow) + ds := NewHandlerImpl(mm) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} + m.recvChan <- &ab.SeekInfo{ChainID: systemChainID, Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY} - count := uint64(0) - for { - select { - case blockReply := <-m.sendChan: - if blockReply.GetBlock() == nil { - t.Fatalf("Received an error on the reply channel") - } - case <-time.After(time.Second): - t.Fatalf("Timed out waiting to get all blocks") - } - count++ - if count == windowSize { - select { - case <-m.sendChan: - t.Fatalf("Window size exceeded") - default: - } + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetBlock() == nil { + t.Fatalf("Expected to receive first block") } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get first block") + } - if count%windowSize == 0 { - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: count}}} + select { + case <-m.sendChan: + t.Fatalf("Should not have delivered an error or second block") + case <-time.After(50 * time.Millisecond): + } + + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}, nil) + + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetBlock() == nil { + t.Fatalf("Expected to receive new block") } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get new block") + } - if count == uint64(ledgerSize) { - break + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetStatus() != cb.Status_SUCCESS { + t.Fatalf("Expected delivery to complete") } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") } } diff --git a/orderer/multichain/chainsupport_test.go b/orderer/multichain/chainsupport_test.go index 9ab3e078381..84632778fe7 100644 --- a/orderer/multichain/chainsupport_test.go +++ b/orderer/multichain/chainsupport_test.go @@ -37,7 +37,7 @@ func (mlw *mockLedgerReadWriter) Append(data []*cb.Envelope, metadata [][]byte) return nil } -func (mlw *mockLedgerReadWriter) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) { +func (mlw *mockLedgerReadWriter) Iterator(startType *ab.SeekPosition) (rawledger.Iterator, uint64) { panic("Unimplemented") } diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go index d7440664a7f..a40ac070ebc 100644 --- a/orderer/multichain/manager.go +++ b/orderer/multichain/manager.go @@ -66,7 +66,7 @@ type multiLedger struct { func getConfigTx(reader rawledger.Reader) *cb.Envelope { var lastConfigTx *cb.Envelope - it, _ := reader.Iterator(ab.SeekInfo_OLDEST, 0) + it, _ := reader.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}}) // Iterate over the blockchain, looking for config transactions, track the most recent one encountered // this will be the transaction which is returned for { diff --git a/orderer/multichain/manager_test.go b/orderer/multichain/manager_test.go index 3aebe20c728..838e8115074 100644 --- a/orderer/multichain/manager_test.go +++ b/orderer/multichain/manager_test.go @@ -118,7 +118,7 @@ func TestManagerImpl(t *testing.T) { chainSupport.Enqueue(message) } - it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) + it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}}) select { case <-it.ReadyChan(): block, status := it.Next() @@ -158,7 +158,7 @@ func TestNewChain(t *testing.T) { t.Fatalf("Error submitting chain creation request") } - it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) + it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}}) select { case <-it.ReadyChan(): block, status := it.Next() @@ -191,7 +191,7 @@ func TestNewChain(t *testing.T) { chainSupport.Enqueue(message) } - it, _ = chainSupport.Reader().Iterator(ab.SeekInfo_SPECIFIED, 0) + it, _ = chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}}) select { case <-it.ReadyChan(): block, status := it.Next() diff --git a/orderer/rawledger/blackbox_test.go b/orderer/rawledger/blackbox_test.go index 81e8a037733..bb6e32e141e 100644 --- a/orderer/rawledger/blackbox_test.go +++ b/orderer/rawledger/blackbox_test.go @@ -40,7 +40,7 @@ type ledgerTestFactory interface { var testables []ledgerTestable func getBlock(number uint64, li ReadWriter) *cb.Block { - i, _ := li.Iterator(ab.SeekInfo_SPECIFIED, number) + i, _ := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: number}}}) select { case <-i.ReadyChan(): block, status := i.Next() @@ -145,7 +145,7 @@ func TestRetrieval(t *testing.T) { func testRetrieval(lf ledgerTestFactory, t *testing.T) { _, li := lf.New() li.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil) - it, num := li.Iterator(ab.SeekInfo_OLDEST, 99) + it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}}) if num != 0 { t.Fatalf("Expected genesis block iterator, but got %d", num) } @@ -183,7 +183,7 @@ func TestBlockedRetrieval(t *testing.T) { func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) { _, li := lf.New() - it, num := li.Iterator(ab.SeekInfo_SPECIFIED, 1) + it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}}) if num != 1 { t.Fatalf("Expected block iterator at 1, but got %d", num) } diff --git a/orderer/rawledger/fileledger/fileledger.go b/orderer/rawledger/fileledger/fileledger.go index c0adcbfee75..e831e206d5a 100644 --- a/orderer/rawledger/fileledger/fileledger.go +++ b/orderer/rawledger/fileledger/fileledger.go @@ -272,18 +272,18 @@ func (fl *fileLedger) Append(messages []*cb.Envelope, metadata [][]byte) *cb.Blo } // Iterator implements the rawledger.Reader definition -func (fl *fileLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) { - switch startType { - case ab.SeekInfo_OLDEST: +func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (rawledger.Iterator, uint64) { + switch start := startPosition.Type.(type) { + case *ab.SeekPosition_Oldest: return &cursor{fl: fl, blockNumber: 0}, 0 - case ab.SeekInfo_NEWEST: + case *ab.SeekPosition_Newest: high := fl.height - 1 return &cursor{fl: fl, blockNumber: high}, high - case ab.SeekInfo_SPECIFIED: - if specified > fl.height { + case *ab.SeekPosition_Specified: + if start.Specified.Number > fl.height { return &rawledger.NotFoundErrorIterator{}, 0 } - return &cursor{fl: fl, blockNumber: specified}, specified + return &cursor{fl: fl, blockNumber: start.Specified.Number}, start.Specified.Number } // This line should be unreachable, but the compiler requires it diff --git a/orderer/rawledger/fileledger/fileledger_test.go b/orderer/rawledger/fileledger/fileledger_test.go index 3cafe1e5a33..22367c0f5b0 100644 --- a/orderer/rawledger/fileledger/fileledger_test.go +++ b/orderer/rawledger/fileledger/fileledger_test.go @@ -109,7 +109,7 @@ func TestRetrieval(t *testing.T) { tev, fl := initialize(t) defer tev.tearDown() fl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil) - it, num := fl.Iterator(ab.SeekInfo_OLDEST, 99) + it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}}) if num != 0 { t.Fatalf("Expected genesis block iterator, but got %d", num) } @@ -144,7 +144,7 @@ func TestRetrieval(t *testing.T) { func TestBlockedRetrieval(t *testing.T) { tev, fl := initialize(t) defer tev.tearDown() - it, num := fl.Iterator(ab.SeekInfo_SPECIFIED, 1) + it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}}) if num != 1 { t.Fatalf("Expected block iterator at 1, but got %d", num) } diff --git a/orderer/rawledger/ramledger/ramledger.go b/orderer/rawledger/ramledger/ramledger.go index 26a8dd1c5dd..42bd29888f6 100644 --- a/orderer/rawledger/ramledger/ramledger.go +++ b/orderer/rawledger/ramledger/ramledger.go @@ -136,10 +136,10 @@ func (rl *ramLedger) Height() uint64 { } // Iterator implements the rawledger.Reader definition -func (rl *ramLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) { +func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (rawledger.Iterator, uint64) { var list *simpleList - switch startType { - case ab.SeekInfo_OLDEST: + switch start := startPosition.Type.(type) { + case *ab.SeekPosition_Oldest: oldest := rl.oldest list = &simpleList{ block: &cb.Block{Header: &cb.BlockHeader{Number: oldest.block.Header.Number - 1}}, @@ -147,7 +147,7 @@ func (rl *ramLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) signal: make(chan struct{}), } close(list.signal) - case ab.SeekInfo_NEWEST: + case *ab.SeekPosition_Newest: newest := rl.newest list = &simpleList{ block: &cb.Block{Header: &cb.BlockHeader{Number: newest.block.Header.Number - 1}}, @@ -155,9 +155,11 @@ func (rl *ramLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) signal: make(chan struct{}), } close(list.signal) - case ab.SeekInfo_SPECIFIED: - logger.Debugf("Attempting to return block %d", specified) + case *ab.SeekPosition_Specified: oldest := rl.oldest + specified := start.Specified.Number + logger.Debugf("Attempting to return block %d", specified) + // Note the two +1's here is to accomodate the 'preGenesis' block of ^uint64(0) if specified+1 < oldest.block.Header.Number+1 || specified > rl.newest.block.Header.Number+1 { logger.Debugf("Returning error iterator because specified seek was %d with oldest %d and newest %d", specified, rl.oldest.block.Header.Number, rl.newest.block.Header.Number) diff --git a/orderer/rawledger/rawledger.go b/orderer/rawledger/rawledger.go index dfaa3d8d606..1f31c7e7b44 100644 --- a/orderer/rawledger/rawledger.go +++ b/orderer/rawledger/rawledger.go @@ -41,7 +41,7 @@ type Iterator interface { // Reader allows the caller to inspect the raw ledger type Reader interface { // Iterator retrieves an Iterator, as specified by an cb.SeekInfo message, returning an iterator, and its starting block number - Iterator(startType ab.SeekInfo_StartType, specified uint64) (Iterator, uint64) + Iterator(startType *ab.SeekPosition) (Iterator, uint64) // Height returns the highest block number in the chain, plus one Height() uint64 } diff --git a/orderer/sample_clients/deliver_stdout/client.go b/orderer/sample_clients/deliver_stdout/client.go index 022efa809f9..a8dc0d524f4 100644 --- a/orderer/sample_clients/deliver_stdout/client.go +++ b/orderer/sample_clients/deliver_stdout/client.go @@ -19,60 +19,48 @@ package main import ( "flag" "fmt" + "math" "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" "github.com/hyperledger/fabric/orderer/localconfig" - cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "golang.org/x/net/context" "google.golang.org/grpc" ) type deliverClient struct { - client ab.AtomicBroadcast_DeliverClient - chainID string - windowSize uint64 - unAcknowledged uint64 + client ab.AtomicBroadcast_DeliverClient + chainID string } -func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, chainID string, windowSize uint64) *deliverClient { - return &deliverClient{client: client, chainID: chainID, windowSize: windowSize} +func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, chainID string) *deliverClient { + return &deliverClient{client: client, chainID: chainID} } func (r *deliverClient) seekOldest() error { - return r.client.Send(&ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Seek{ - Seek: &ab.SeekInfo{ - Start: ab.SeekInfo_OLDEST, - WindowSize: r.windowSize, - ChainID: r.chainID, - }, - }, + return r.client.Send(&ab.SeekInfo{ + ChainID: r.chainID, + Start: &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}}, + Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, }) } func (r *deliverClient) seekNewest() error { - return r.client.Send(&ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Seek{ - Seek: &ab.SeekInfo{ - Start: ab.SeekInfo_NEWEST, - WindowSize: r.windowSize, - ChainID: r.chainID, - }, - }, + return r.client.Send(&ab.SeekInfo{ + ChainID: r.chainID, + Start: &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}}, + Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, }) } func (r *deliverClient) seek(blockNumber uint64) error { - return r.client.Send(&ab.DeliverUpdate{ - Type: &ab.DeliverUpdate_Seek{ - Seek: &ab.SeekInfo{ - Start: ab.SeekInfo_SPECIFIED, - SpecifiedNumber: blockNumber, - WindowSize: r.windowSize, - ChainID: r.chainID, - }, - }, + return r.client.Send(&ab.SeekInfo{ + ChainID: r.chainID, + Start: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: blockNumber}}}, + Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, }) } @@ -85,26 +73,11 @@ func (r *deliverClient) readUntilClose() { } switch t := msg.Type.(type) { - case *ab.DeliverResponse_Error: - if t.Error == cb.Status_SUCCESS { - fmt.Println("ERROR! Received success in error field") - return - } - fmt.Println("Got error ", t) + case *ab.DeliverResponse_Status: + fmt.Println("Got status ", t) + return case *ab.DeliverResponse_Block: fmt.Println("Received block: ", t.Block) - r.unAcknowledged++ - if r.unAcknowledged >= r.windowSize/2 { - fmt.Println("Sending acknowledgement") - err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Header.Number}}}) - if err != nil { - return - } - r.unAcknowledged = 0 - } - default: - fmt.Println("Received unknock: ", t) - return } } } @@ -114,11 +87,9 @@ func main() { var chainID string var serverAddr string - var windowSize uint64 flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.") flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to deliver from.") - flag.Uint64Var(&windowSize, "windowSize", 10, "The window size for the deliver.") flag.Parse() conn, err := grpc.Dial(serverAddr, grpc.WithInsecure()) @@ -132,8 +103,11 @@ func main() { return } - s := newDeliverClient(client, chainID, windowSize) - s.seekOldest() - s.readUntilClose() + s := newDeliverClient(client, chainID) + err = s.seekOldest() + if err != nil { + fmt.Println("Received error:", err) + } + s.readUntilClose() } diff --git a/orderer/sample_clients/single_tx_client/single_tx_client.go b/orderer/sample_clients/single_tx_client/single_tx_client.go index 60134218349..f6b2371fc71 100644 --- a/orderer/sample_clients/single_tx_client/single_tx_client.go +++ b/orderer/sample_clients/single_tx_client/single_tx_client.go @@ -98,7 +98,12 @@ func updateReceiver(resultch chan byte, errorch chan error, client ab.AtomicBroa errorch <- fmt.Errorf("Failed to get Deliver stream: %s", err) return } - dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10, ChainID: provisional.TestChainID}}}) + dstream.Send(&ab.SeekInfo{ + ChainID: provisional.TestChainID, + Start: &ab.SeekPosition{Type: &ab.SeekPosition_Newest{}}, + Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Newest{}}, + Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, + }) logger.Info("{Update Receiver} Listening to ledger updates.") for i := 0; i < 2; i++ { m, inerr := dstream.Recv() diff --git a/orderer/sbft/backend/backend.go b/orderer/sbft/backend/backend.go index c3094f9f9f5..557ac990fb9 100644 --- a/orderer/sbft/backend/backend.go +++ b/orderer/sbft/backend/backend.go @@ -338,7 +338,7 @@ func (t *Backend) Restore(key string, out proto.Message) bool { } func (t *Backend) LastBatch() *s.Batch { - it, _ := t.ledger.Iterator(ab.SeekInfo_NEWEST, 0) + it, _ := t.ledger.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Newest{}}) block, status := it.Next() data := block.Data.Data if status != cb.Status_SUCCESS { diff --git a/orderer/sbft/backend/backendab.go b/orderer/sbft/backend/backendab.go index bc886cb7291..a51c4400cd3 100644 --- a/orderer/sbft/backend/backendab.go +++ b/orderer/sbft/backend/backendab.go @@ -58,7 +58,7 @@ type BackendAB struct { func NewBackendAB(backend *Backend) *BackendAB { // XXX All the code below is a hacky shim until sbft can be adapter to the new multichain interface - it, _ := backend.ledger.Iterator(ab.SeekInfo_OLDEST, 0) + it, _ := backend.ledger.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}}) block, status := it.Next() if status != cb.Status_SUCCESS { panic("Error getting a block from the ledger") @@ -83,7 +83,7 @@ func NewBackendAB(backend *Backend) *BackendAB { bab := &BackendAB{ backend: backend, - deliverserver: deliver.NewHandlerImpl(manager, 1000), + deliverserver: deliver.NewHandlerImpl(manager), } return bab } diff --git a/orderer/sbft/main/network_test.go b/orderer/sbft/main/network_test.go index 239e663a02f..a850573a3bf 100644 --- a/orderer/sbft/main/network_test.go +++ b/orderer/sbft/main/network_test.go @@ -316,8 +316,12 @@ func Receive(p *peer) (*receiver, error) { if err != nil { return nil, err } - cid := provisional.TestChainID - dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10, ChainID: string(cid)}}}) + dstream.Send(&ab.SeekInfo{ + ChainID: provisional.TestChainID, + Start: &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}}, + Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, + }) go func() { num := uint64(0) @@ -341,7 +345,6 @@ func Receive(p *peer) (*receiver, error) { if merr1 == nil && merr2 == nil { retch <- tx num++ - dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: num}}}) } } } diff --git a/orderer/sbft/main/sbft_test.go b/orderer/sbft/main/sbft_test.go index 483b70ae278..9c07f40a053 100644 --- a/orderer/sbft/main/sbft_test.go +++ b/orderer/sbft/main/sbft_test.go @@ -131,10 +131,20 @@ func updateReceiver(t *testing.T, resultch chan byte, errorch chan error, client errorch <- fmt.Errorf("Failed to get Deliver stream: %s", err) return } - dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10, ChainID: provisional.TestChainID}}}) + err = dstream.Send(&ab.SeekInfo{ + ChainID: provisional.TestChainID, + Start: &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}}, + Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: ^uint64(0)}}}, + Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, + }) + if err != nil { + errorch <- fmt.Errorf("Failed to send to Deliver stream: %s", err) + return + } logger.Info("{Update Receiver} Listening to ledger updates.") for i := 0; i < 2; i++ { m, inerr := dstream.Recv() + logger.Info("{Update Receiver} Got message: ", m, "err:", inerr) if inerr != nil { errorch <- fmt.Errorf("Failed to receive consensus: %s", inerr) return diff --git a/orderer/server.go b/orderer/server.go index fd9a35e130b..958fd382744 100644 --- a/orderer/server.go +++ b/orderer/server.go @@ -49,8 +49,8 @@ func NewServer(ml multichain.Manager, queueSize, maxWindowSize int) ab.AtomicBro logger.Infof("Starting orderer") s := &server{ - dh: deliver.NewHandlerImpl(deliverSupport{ml}, maxWindowSize), bh: broadcast.NewHandlerImpl(broadcastSupport{ml}, queueSize), + dh: deliver.NewHandlerImpl(deliverSupport{ml}), } return s } diff --git a/protos/orderer/ab.pb.go b/protos/orderer/ab.pb.go index 723b92e9ad4..d1b9f04961d 100644 --- a/protos/orderer/ab.pb.go +++ b/protos/orderer/ab.pb.go @@ -12,9 +12,11 @@ It is generated from these files: It has these top-level messages: BroadcastResponse + SeekNewest + SeekOldest + SeekSpecified + SeekPosition SeekInfo - Acknowledgement - DeliverUpdate DeliverResponse ConsensusType BatchSize @@ -49,36 +51,29 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -// Start may be specified to a specific block number, or may be request from the newest or oldest available -// The start location is always inclusive, so the first reply from NEWEST will contain the newest block at the time -// of reception, it will must not wait until a new block is created. Similarly, when SPECIFIED, and SpecifiedNumber = 10 -// The first block received must be block 10, not block 11 -type SeekInfo_StartType int32 +type SeekInfo_SeekBehavior int32 const ( - SeekInfo_NEWEST SeekInfo_StartType = 0 - SeekInfo_OLDEST SeekInfo_StartType = 1 - SeekInfo_SPECIFIED SeekInfo_StartType = 2 + SeekInfo_BLOCK_UNTIL_READY SeekInfo_SeekBehavior = 0 + SeekInfo_FAIL_IF_NOT_READY SeekInfo_SeekBehavior = 1 ) -var SeekInfo_StartType_name = map[int32]string{ - 0: "NEWEST", - 1: "OLDEST", - 2: "SPECIFIED", +var SeekInfo_SeekBehavior_name = map[int32]string{ + 0: "BLOCK_UNTIL_READY", + 1: "FAIL_IF_NOT_READY", } -var SeekInfo_StartType_value = map[string]int32{ - "NEWEST": 0, - "OLDEST": 1, - "SPECIFIED": 2, +var SeekInfo_SeekBehavior_value = map[string]int32{ + "BLOCK_UNTIL_READY": 0, + "FAIL_IF_NOT_READY": 1, } -func (x SeekInfo_StartType) String() string { - return proto.EnumName(SeekInfo_StartType_name, int32(x)) +func (x SeekInfo_SeekBehavior) String() string { + return proto.EnumName(SeekInfo_SeekBehavior_name, int32(x)) } -func (SeekInfo_StartType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} } +func (SeekInfo_SeekBehavior) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{5, 0} } type BroadcastResponse struct { - Status common.Status `protobuf:"varint,1,opt,name=Status,enum=common.Status" json:"Status,omitempty"` + Status common.Status `protobuf:"varint,1,opt,name=status,enum=common.Status" json:"status,omitempty"` } func (m *BroadcastResponse) Reset() { *m = BroadcastResponse{} } @@ -86,142 +81,176 @@ func (m *BroadcastResponse) String() string { return proto.CompactTex func (*BroadcastResponse) ProtoMessage() {} func (*BroadcastResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } -type SeekInfo struct { - Start SeekInfo_StartType `protobuf:"varint,1,opt,name=Start,enum=orderer.SeekInfo_StartType" json:"Start,omitempty"` - SpecifiedNumber uint64 `protobuf:"varint,2,opt,name=SpecifiedNumber" json:"SpecifiedNumber,omitempty"` - WindowSize uint64 `protobuf:"varint,3,opt,name=WindowSize" json:"WindowSize,omitempty"` - ChainID string `protobuf:"bytes,4,opt,name=ChainID" json:"ChainID,omitempty"` +type SeekNewest struct { } -func (m *SeekInfo) Reset() { *m = SeekInfo{} } -func (m *SeekInfo) String() string { return proto.CompactTextString(m) } -func (*SeekInfo) ProtoMessage() {} -func (*SeekInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (m *SeekNewest) Reset() { *m = SeekNewest{} } +func (m *SeekNewest) String() string { return proto.CompactTextString(m) } +func (*SeekNewest) ProtoMessage() {} +func (*SeekNewest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type SeekOldest struct { +} + +func (m *SeekOldest) Reset() { *m = SeekOldest{} } +func (m *SeekOldest) String() string { return proto.CompactTextString(m) } +func (*SeekOldest) ProtoMessage() {} +func (*SeekOldest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } -type Acknowledgement struct { - Number uint64 `protobuf:"varint,1,opt,name=Number" json:"Number,omitempty"` +type SeekSpecified struct { + Number uint64 `protobuf:"varint,1,opt,name=number" json:"number,omitempty"` } -func (m *Acknowledgement) Reset() { *m = Acknowledgement{} } -func (m *Acknowledgement) String() string { return proto.CompactTextString(m) } -func (*Acknowledgement) ProtoMessage() {} -func (*Acknowledgement) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (m *SeekSpecified) Reset() { *m = SeekSpecified{} } +func (m *SeekSpecified) String() string { return proto.CompactTextString(m) } +func (*SeekSpecified) ProtoMessage() {} +func (*SeekSpecified) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } -// The update message either causes a seek to a new stream start with a new window, or acknowledges a received block and advances the base of the window -type DeliverUpdate struct { +type SeekPosition struct { // Types that are valid to be assigned to Type: - // *DeliverUpdate_Acknowledgement - // *DeliverUpdate_Seek - Type isDeliverUpdate_Type `protobuf_oneof:"Type"` + // *SeekPosition_Newest + // *SeekPosition_Oldest + // *SeekPosition_Specified + Type isSeekPosition_Type `protobuf_oneof:"Type"` } -func (m *DeliverUpdate) Reset() { *m = DeliverUpdate{} } -func (m *DeliverUpdate) String() string { return proto.CompactTextString(m) } -func (*DeliverUpdate) ProtoMessage() {} -func (*DeliverUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (m *SeekPosition) Reset() { *m = SeekPosition{} } +func (m *SeekPosition) String() string { return proto.CompactTextString(m) } +func (*SeekPosition) ProtoMessage() {} +func (*SeekPosition) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } -type isDeliverUpdate_Type interface { - isDeliverUpdate_Type() +type isSeekPosition_Type interface { + isSeekPosition_Type() } -type DeliverUpdate_Acknowledgement struct { - Acknowledgement *Acknowledgement `protobuf:"bytes,1,opt,name=Acknowledgement,oneof"` +type SeekPosition_Newest struct { + Newest *SeekNewest `protobuf:"bytes,1,opt,name=newest,oneof"` +} +type SeekPosition_Oldest struct { + Oldest *SeekOldest `protobuf:"bytes,2,opt,name=oldest,oneof"` } -type DeliverUpdate_Seek struct { - Seek *SeekInfo `protobuf:"bytes,2,opt,name=Seek,oneof"` +type SeekPosition_Specified struct { + Specified *SeekSpecified `protobuf:"bytes,3,opt,name=specified,oneof"` } -func (*DeliverUpdate_Acknowledgement) isDeliverUpdate_Type() {} -func (*DeliverUpdate_Seek) isDeliverUpdate_Type() {} +func (*SeekPosition_Newest) isSeekPosition_Type() {} +func (*SeekPosition_Oldest) isSeekPosition_Type() {} +func (*SeekPosition_Specified) isSeekPosition_Type() {} -func (m *DeliverUpdate) GetType() isDeliverUpdate_Type { +func (m *SeekPosition) GetType() isSeekPosition_Type { if m != nil { return m.Type } return nil } -func (m *DeliverUpdate) GetAcknowledgement() *Acknowledgement { - if x, ok := m.GetType().(*DeliverUpdate_Acknowledgement); ok { - return x.Acknowledgement +func (m *SeekPosition) GetNewest() *SeekNewest { + if x, ok := m.GetType().(*SeekPosition_Newest); ok { + return x.Newest } return nil } -func (m *DeliverUpdate) GetSeek() *SeekInfo { - if x, ok := m.GetType().(*DeliverUpdate_Seek); ok { - return x.Seek +func (m *SeekPosition) GetOldest() *SeekOldest { + if x, ok := m.GetType().(*SeekPosition_Oldest); ok { + return x.Oldest + } + return nil +} + +func (m *SeekPosition) GetSpecified() *SeekSpecified { + if x, ok := m.GetType().(*SeekPosition_Specified); ok { + return x.Specified } return nil } // XXX_OneofFuncs is for the internal use of the proto package. -func (*DeliverUpdate) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _DeliverUpdate_OneofMarshaler, _DeliverUpdate_OneofUnmarshaler, _DeliverUpdate_OneofSizer, []interface{}{ - (*DeliverUpdate_Acknowledgement)(nil), - (*DeliverUpdate_Seek)(nil), +func (*SeekPosition) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _SeekPosition_OneofMarshaler, _SeekPosition_OneofUnmarshaler, _SeekPosition_OneofSizer, []interface{}{ + (*SeekPosition_Newest)(nil), + (*SeekPosition_Oldest)(nil), + (*SeekPosition_Specified)(nil), } } -func _DeliverUpdate_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*DeliverUpdate) +func _SeekPosition_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*SeekPosition) // Type switch x := m.Type.(type) { - case *DeliverUpdate_Acknowledgement: + case *SeekPosition_Newest: b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.Acknowledgement); err != nil { + if err := b.EncodeMessage(x.Newest); err != nil { return err } - case *DeliverUpdate_Seek: + case *SeekPosition_Oldest: b.EncodeVarint(2<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.Seek); err != nil { + if err := b.EncodeMessage(x.Oldest); err != nil { + return err + } + case *SeekPosition_Specified: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Specified); err != nil { return err } case nil: default: - return fmt.Errorf("DeliverUpdate.Type has unexpected type %T", x) + return fmt.Errorf("SeekPosition.Type has unexpected type %T", x) } return nil } -func _DeliverUpdate_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*DeliverUpdate) +func _SeekPosition_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*SeekPosition) switch tag { - case 1: // Type.Acknowledgement + case 1: // Type.newest + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(SeekNewest) + err := b.DecodeMessage(msg) + m.Type = &SeekPosition_Newest{msg} + return true, err + case 2: // Type.oldest if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } - msg := new(Acknowledgement) + msg := new(SeekOldest) err := b.DecodeMessage(msg) - m.Type = &DeliverUpdate_Acknowledgement{msg} + m.Type = &SeekPosition_Oldest{msg} return true, err - case 2: // Type.Seek + case 3: // Type.specified if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } - msg := new(SeekInfo) + msg := new(SeekSpecified) err := b.DecodeMessage(msg) - m.Type = &DeliverUpdate_Seek{msg} + m.Type = &SeekPosition_Specified{msg} return true, err default: return false, nil } } -func _DeliverUpdate_OneofSizer(msg proto.Message) (n int) { - m := msg.(*DeliverUpdate) +func _SeekPosition_OneofSizer(msg proto.Message) (n int) { + m := msg.(*SeekPosition) // Type switch x := m.Type.(type) { - case *DeliverUpdate_Acknowledgement: - s := proto.Size(x.Acknowledgement) + case *SeekPosition_Newest: + s := proto.Size(x.Newest) n += proto.SizeVarint(1<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s - case *DeliverUpdate_Seek: - s := proto.Size(x.Seek) + case *SeekPosition_Oldest: + s := proto.Size(x.Oldest) n += proto.SizeVarint(2<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s + case *SeekPosition_Specified: + s := proto.Size(x.Specified) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s case nil: default: panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) @@ -229,9 +258,43 @@ func _DeliverUpdate_OneofSizer(msg proto.Message) (n int) { return n } +// SeekInfo specifies the range of requested blocks to return +// If the start position is not found, an error is immediately returned +// Otherwise, blocks are returned until a missing block is encountered, then behavior is dictated +// by the SeekBehavior specified. If BLOCK_UNTIL_READY is specified, the reply will block until +// the requested blocks are available, if FAIL_IF_NOT_READY is specified, the reply will return an +// error indicating that the block is not found. To request that all blocks be returned indefinitely +// as they are created, behavior should be set to BLOCK_UNTIL_READY and the stop should be set to +// specified with a number of MAX_UINT64 +type SeekInfo struct { + ChainID string `protobuf:"bytes,1,opt,name=chainID" json:"chainID,omitempty"` + Start *SeekPosition `protobuf:"bytes,2,opt,name=start" json:"start,omitempty"` + Stop *SeekPosition `protobuf:"bytes,3,opt,name=stop" json:"stop,omitempty"` + Behavior SeekInfo_SeekBehavior `protobuf:"varint,4,opt,name=behavior,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"` +} + +func (m *SeekInfo) Reset() { *m = SeekInfo{} } +func (m *SeekInfo) String() string { return proto.CompactTextString(m) } +func (*SeekInfo) ProtoMessage() {} +func (*SeekInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *SeekInfo) GetStart() *SeekPosition { + if m != nil { + return m.Start + } + return nil +} + +func (m *SeekInfo) GetStop() *SeekPosition { + if m != nil { + return m.Stop + } + return nil +} + type DeliverResponse struct { // Types that are valid to be assigned to Type: - // *DeliverResponse_Error + // *DeliverResponse_Status // *DeliverResponse_Block Type isDeliverResponse_Type `protobuf_oneof:"Type"` } @@ -239,21 +302,21 @@ type DeliverResponse struct { func (m *DeliverResponse) Reset() { *m = DeliverResponse{} } func (m *DeliverResponse) String() string { return proto.CompactTextString(m) } func (*DeliverResponse) ProtoMessage() {} -func (*DeliverResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*DeliverResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } type isDeliverResponse_Type interface { isDeliverResponse_Type() } -type DeliverResponse_Error struct { - Error common.Status `protobuf:"varint,1,opt,name=Error,enum=common.Status,oneof"` +type DeliverResponse_Status struct { + Status common.Status `protobuf:"varint,1,opt,name=status,enum=common.Status,oneof"` } type DeliverResponse_Block struct { - Block *common.Block `protobuf:"bytes,2,opt,name=Block,oneof"` + Block *common.Block `protobuf:"bytes,2,opt,name=block,oneof"` } -func (*DeliverResponse_Error) isDeliverResponse_Type() {} -func (*DeliverResponse_Block) isDeliverResponse_Type() {} +func (*DeliverResponse_Status) isDeliverResponse_Type() {} +func (*DeliverResponse_Block) isDeliverResponse_Type() {} func (m *DeliverResponse) GetType() isDeliverResponse_Type { if m != nil { @@ -262,9 +325,9 @@ func (m *DeliverResponse) GetType() isDeliverResponse_Type { return nil } -func (m *DeliverResponse) GetError() common.Status { - if x, ok := m.GetType().(*DeliverResponse_Error); ok { - return x.Error +func (m *DeliverResponse) GetStatus() common.Status { + if x, ok := m.GetType().(*DeliverResponse_Status); ok { + return x.Status } return common.Status_UNKNOWN } @@ -279,7 +342,7 @@ func (m *DeliverResponse) GetBlock() *common.Block { // XXX_OneofFuncs is for the internal use of the proto package. func (*DeliverResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { return _DeliverResponse_OneofMarshaler, _DeliverResponse_OneofUnmarshaler, _DeliverResponse_OneofSizer, []interface{}{ - (*DeliverResponse_Error)(nil), + (*DeliverResponse_Status)(nil), (*DeliverResponse_Block)(nil), } } @@ -288,9 +351,9 @@ func _DeliverResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { m := msg.(*DeliverResponse) // Type switch x := m.Type.(type) { - case *DeliverResponse_Error: + case *DeliverResponse_Status: b.EncodeVarint(1<<3 | proto.WireVarint) - b.EncodeVarint(uint64(x.Error)) + b.EncodeVarint(uint64(x.Status)) case *DeliverResponse_Block: b.EncodeVarint(2<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Block); err != nil { @@ -306,14 +369,14 @@ func _DeliverResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { func _DeliverResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { m := msg.(*DeliverResponse) switch tag { - case 1: // Type.Error + case 1: // Type.status if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() - m.Type = &DeliverResponse_Error{common.Status(x)} + m.Type = &DeliverResponse_Status{common.Status(x)} return true, err - case 2: // Type.Block + case 2: // Type.block if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -330,9 +393,9 @@ func _DeliverResponse_OneofSizer(msg proto.Message) (n int) { m := msg.(*DeliverResponse) // Type switch x := m.Type.(type) { - case *DeliverResponse_Error: + case *DeliverResponse_Status: n += proto.SizeVarint(1<<3 | proto.WireVarint) - n += proto.SizeVarint(uint64(x.Error)) + n += proto.SizeVarint(uint64(x.Status)) case *DeliverResponse_Block: s := proto.Size(x.Block) n += proto.SizeVarint(2<<3 | proto.WireBytes) @@ -347,11 +410,13 @@ func _DeliverResponse_OneofSizer(msg proto.Message) (n int) { func init() { proto.RegisterType((*BroadcastResponse)(nil), "orderer.BroadcastResponse") + proto.RegisterType((*SeekNewest)(nil), "orderer.SeekNewest") + proto.RegisterType((*SeekOldest)(nil), "orderer.SeekOldest") + proto.RegisterType((*SeekSpecified)(nil), "orderer.SeekSpecified") + proto.RegisterType((*SeekPosition)(nil), "orderer.SeekPosition") proto.RegisterType((*SeekInfo)(nil), "orderer.SeekInfo") - proto.RegisterType((*Acknowledgement)(nil), "orderer.Acknowledgement") - proto.RegisterType((*DeliverUpdate)(nil), "orderer.DeliverUpdate") proto.RegisterType((*DeliverResponse)(nil), "orderer.DeliverResponse") - proto.RegisterEnum("orderer.SeekInfo_StartType", SeekInfo_StartType_name, SeekInfo_StartType_value) + proto.RegisterEnum("orderer.SeekInfo_SeekBehavior", SeekInfo_SeekBehavior_name, SeekInfo_SeekBehavior_value) } // Reference imports to suppress errors if they are not otherwise used. @@ -368,8 +433,6 @@ type AtomicBroadcastClient interface { // broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure Broadcast(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_BroadcastClient, error) // deliver first requires an update containing a seek message, then a stream of block replies is received. - // The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart - // To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement Deliver(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_DeliverClient, error) } @@ -422,7 +485,7 @@ func (c *atomicBroadcastClient) Deliver(ctx context.Context, opts ...grpc.CallOp } type AtomicBroadcast_DeliverClient interface { - Send(*DeliverUpdate) error + Send(*SeekInfo) error Recv() (*DeliverResponse, error) grpc.ClientStream } @@ -431,7 +494,7 @@ type atomicBroadcastDeliverClient struct { grpc.ClientStream } -func (x *atomicBroadcastDeliverClient) Send(m *DeliverUpdate) error { +func (x *atomicBroadcastDeliverClient) Send(m *SeekInfo) error { return x.ClientStream.SendMsg(m) } @@ -449,8 +512,6 @@ type AtomicBroadcastServer interface { // broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure Broadcast(AtomicBroadcast_BroadcastServer) error // deliver first requires an update containing a seek message, then a stream of block replies is received. - // The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart - // To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement Deliver(AtomicBroadcast_DeliverServer) error } @@ -490,7 +551,7 @@ func _AtomicBroadcast_Deliver_Handler(srv interface{}, stream grpc.ServerStream) type AtomicBroadcast_DeliverServer interface { Send(*DeliverResponse) error - Recv() (*DeliverUpdate, error) + Recv() (*SeekInfo, error) grpc.ServerStream } @@ -502,8 +563,8 @@ func (x *atomicBroadcastDeliverServer) Send(m *DeliverResponse) error { return x.ServerStream.SendMsg(m) } -func (x *atomicBroadcastDeliverServer) Recv() (*DeliverUpdate, error) { - m := new(DeliverUpdate) +func (x *atomicBroadcastDeliverServer) Recv() (*SeekInfo, error) { + m := new(SeekInfo) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } @@ -534,34 +595,37 @@ var _AtomicBroadcast_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("orderer/ab.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 462 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x52, 0xdb, 0x6e, 0xd3, 0x40, - 0x10, 0xf5, 0x96, 0xc4, 0x21, 0x53, 0xa5, 0x49, 0x17, 0xa9, 0xb2, 0x82, 0x84, 0x22, 0x4b, 0x80, - 0x91, 0x90, 0x0d, 0xe6, 0x91, 0x07, 0x94, 0xd4, 0x46, 0x89, 0x84, 0x0a, 0xb2, 0x8b, 0x2a, 0xf1, - 0xe6, 0xcb, 0xa4, 0xb1, 0x1a, 0x7b, 0xad, 0xf5, 0xa6, 0x55, 0xf9, 0x00, 0xfe, 0x80, 0x4f, 0xe3, - 0x7f, 0x90, 0xd7, 0x6b, 0xd3, 0x26, 0x3c, 0x79, 0xe6, 0xcc, 0x99, 0x39, 0xe3, 0xb3, 0x03, 0x13, - 0xc6, 0x53, 0xe4, 0xc8, 0x9d, 0x28, 0xb6, 0x4b, 0xce, 0x04, 0xa3, 0x03, 0x85, 0x4c, 0x9f, 0x25, - 0x2c, 0xcf, 0x59, 0xe1, 0x34, 0x9f, 0xa6, 0x6a, 0x7e, 0x84, 0xd3, 0x05, 0x67, 0x51, 0x9a, 0x44, - 0x95, 0x08, 0xb0, 0x2a, 0x59, 0x51, 0x21, 0x7d, 0x05, 0x7a, 0x28, 0x22, 0xb1, 0xab, 0x0c, 0x32, - 0x23, 0xd6, 0x89, 0x7b, 0x62, 0xab, 0x9e, 0x06, 0x0d, 0x54, 0xd5, 0xfc, 0x43, 0xe0, 0x69, 0x88, - 0x78, 0xb3, 0x2a, 0xd6, 0x8c, 0xbe, 0x87, 0x7e, 0x28, 0x22, 0x2e, 0x54, 0xcf, 0x73, 0x5b, 0xe9, - 0xda, 0x2d, 0xc3, 0x96, 0xe5, 0xcb, 0xfb, 0x12, 0x83, 0x86, 0x49, 0x2d, 0x18, 0x87, 0x25, 0x26, - 0xd9, 0x3a, 0xc3, 0xf4, 0x62, 0x97, 0xc7, 0xc8, 0x8d, 0xa3, 0x19, 0xb1, 0x7a, 0xc1, 0x3e, 0x4c, - 0x5f, 0x00, 0x5c, 0x65, 0x45, 0xca, 0xee, 0xc2, 0xec, 0x27, 0x1a, 0x4f, 0x24, 0xe9, 0x01, 0x42, - 0x0d, 0x18, 0x9c, 0x6f, 0xa2, 0xac, 0x58, 0x79, 0x46, 0x6f, 0x46, 0xac, 0x61, 0xd0, 0xa6, 0xa6, - 0x0b, 0xc3, 0x4e, 0x97, 0x02, 0xe8, 0x17, 0xfe, 0x95, 0x1f, 0x5e, 0x4e, 0xb4, 0x3a, 0xfe, 0xfa, - 0xc5, 0xab, 0x63, 0x42, 0x47, 0x30, 0x0c, 0xbf, 0xf9, 0xe7, 0xab, 0xcf, 0x2b, 0xdf, 0x9b, 0x1c, - 0x99, 0x6f, 0x60, 0x3c, 0x4f, 0x6e, 0x0a, 0x76, 0xb7, 0xc5, 0xf4, 0x1a, 0x73, 0x2c, 0x04, 0x3d, - 0x03, 0x5d, 0x6d, 0x48, 0xa4, 0xb8, 0xca, 0xcc, 0x5f, 0x04, 0x46, 0x1e, 0x6e, 0xb3, 0x5b, 0xe4, - 0xdf, 0xcb, 0x34, 0x12, 0x48, 0xbd, 0x83, 0x66, 0xd9, 0x72, 0xec, 0x1a, 0x9d, 0x23, 0x7b, 0xf5, - 0xa5, 0x16, 0x1c, 0xe8, 0xbd, 0x86, 0x5e, 0xed, 0x9b, 0xf4, 0xe3, 0xd8, 0x3d, 0x3d, 0x30, 0x73, - 0xa9, 0x05, 0x92, 0xb0, 0xd0, 0xa1, 0x57, 0xff, 0x9a, 0xb9, 0x81, 0xb1, 0xda, 0xe3, 0xc1, 0x33, - 0xf6, 0x7d, 0xce, 0x19, 0xff, 0xff, 0x2b, 0x2e, 0xb5, 0xa0, 0x29, 0xd3, 0x97, 0xd0, 0x5f, 0x6c, - 0x59, 0xd2, 0x8a, 0x8d, 0x5a, 0x9e, 0x04, 0x6b, 0x9a, 0x0c, 0x5a, 0x25, 0xf7, 0x37, 0x81, 0xf1, - 0x5c, 0xb0, 0x3c, 0x4b, 0xba, 0xcb, 0xa1, 0x9f, 0x60, 0xf8, 0x2f, 0x99, 0xb4, 0x03, 0xfc, 0xe2, - 0x16, 0xb7, 0xac, 0xc4, 0xe9, 0xb4, 0xdb, 0xff, 0xe0, 0xd8, 0x4c, 0xcd, 0x22, 0xef, 0x08, 0x9d, - 0xc3, 0x40, 0xad, 0x4f, 0xcf, 0x3a, 0xf2, 0x23, 0x63, 0xa7, 0xc6, 0x3e, 0xfe, 0x78, 0xc4, 0xc2, - 0xfe, 0xf1, 0xf6, 0x3a, 0x13, 0x9b, 0x5d, 0x5c, 0xcb, 0x3b, 0x9b, 0xfb, 0x12, 0xb9, 0xf4, 0x93, - 0x3b, 0xeb, 0x28, 0xe6, 0x59, 0xe2, 0xc8, 0x8b, 0xaf, 0x1c, 0x35, 0x25, 0xd6, 0x65, 0xfe, 0xe1, - 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xf9, 0x20, 0x03, 0x33, 0x03, 0x00, 0x00, + // 497 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x93, 0xdf, 0x8f, 0xd2, 0x40, + 0x10, 0xc7, 0xa9, 0x72, 0xdc, 0x31, 0xde, 0x0f, 0xd8, 0xcb, 0x5d, 0x1a, 0x1e, 0x8c, 0x69, 0xa2, + 0x62, 0x54, 0x6a, 0x30, 0xf1, 0x41, 0x2f, 0x31, 0x54, 0xee, 0x02, 0x91, 0x80, 0x59, 0xf0, 0x41, + 0x5f, 0x48, 0x5b, 0x86, 0x63, 0x3d, 0xe8, 0x36, 0xbb, 0x0b, 0xe6, 0xfe, 0x0b, 0xff, 0x10, 0xff, + 0x3e, 0x9f, 0xcd, 0x6e, 0xb7, 0x25, 0x78, 0xea, 0x53, 0x3b, 0x33, 0x9f, 0x99, 0xf9, 0xce, 0x74, + 0x0a, 0x35, 0x2e, 0x66, 0x28, 0x50, 0xf8, 0x61, 0xd4, 0x4a, 0x05, 0x57, 0x9c, 0xec, 0x5b, 0x4f, + 0xe3, 0x34, 0xe6, 0xab, 0x15, 0x4f, 0xfc, 0xec, 0x91, 0x45, 0xbd, 0x77, 0x50, 0x0f, 0x04, 0x0f, + 0x67, 0x71, 0x28, 0x15, 0x45, 0x99, 0xf2, 0x44, 0x22, 0x79, 0x02, 0x15, 0xa9, 0x42, 0xb5, 0x96, + 0xae, 0xf3, 0xc8, 0x69, 0x1e, 0xb7, 0x8f, 0x5b, 0x36, 0x67, 0x6c, 0xbc, 0xd4, 0x46, 0xbd, 0x43, + 0x80, 0x31, 0xe2, 0xcd, 0x10, 0xbf, 0xa3, 0x54, 0xb9, 0x35, 0x5a, 0xce, 0xb4, 0xf5, 0x14, 0x8e, + 0xb4, 0x35, 0x4e, 0x31, 0x66, 0x73, 0x86, 0x33, 0x72, 0x0e, 0x95, 0x64, 0xbd, 0x8a, 0x50, 0x98, + 0xa2, 0x65, 0x6a, 0x2d, 0xef, 0xa7, 0x03, 0x87, 0x9a, 0xfc, 0xc4, 0x25, 0x53, 0x8c, 0x27, 0xe4, + 0x25, 0x54, 0x12, 0x53, 0xd1, 0x80, 0x0f, 0xda, 0xa7, 0x2d, 0x3b, 0x41, 0x6b, 0xdb, 0xac, 0x57, + 0xa2, 0x16, 0xd2, 0x38, 0x37, 0x2d, 0xdd, 0x7b, 0x7f, 0xc1, 0x33, 0x35, 0x1a, 0xcf, 0x20, 0xf2, + 0x06, 0xaa, 0x32, 0xd7, 0xe4, 0xde, 0x37, 0x19, 0xe7, 0x3b, 0x19, 0x85, 0xe2, 0x5e, 0x89, 0x6e, + 0xd1, 0xa0, 0x02, 0xe5, 0xc9, 0x6d, 0x8a, 0xde, 0x2f, 0x07, 0x0e, 0x34, 0xd6, 0x4f, 0xe6, 0x9c, + 0xb8, 0xb0, 0x1f, 0x2f, 0x42, 0x96, 0xf4, 0xbb, 0x46, 0x6b, 0x95, 0xe6, 0x26, 0x79, 0x0e, 0x7b, + 0x52, 0x85, 0x22, 0x17, 0x75, 0xb6, 0xd3, 0x22, 0x1f, 0x95, 0x66, 0x0c, 0x79, 0x06, 0x65, 0xa9, + 0x78, 0x6a, 0xe5, 0xfc, 0x83, 0x35, 0x08, 0x79, 0x0b, 0x07, 0x11, 0x2e, 0xc2, 0x0d, 0xe3, 0xc2, + 0x2d, 0x9b, 0x8f, 0xf3, 0x70, 0x07, 0xd7, 0xb2, 0xcc, 0x4b, 0x60, 0x29, 0x5a, 0xf0, 0xde, 0x45, + 0xb6, 0xe8, 0x3c, 0x42, 0xce, 0xa0, 0x1e, 0x0c, 0x46, 0x1f, 0x3e, 0x4e, 0x3f, 0x0f, 0x27, 0xfd, + 0xc1, 0x94, 0x5e, 0x76, 0xba, 0x5f, 0x6a, 0x25, 0xed, 0xbe, 0xea, 0xf4, 0x07, 0xd3, 0xfe, 0xd5, + 0x74, 0x38, 0x9a, 0x58, 0xb7, 0xe3, 0x7d, 0x83, 0x93, 0x2e, 0x2e, 0xd9, 0x06, 0x45, 0x71, 0x27, + 0xcd, 0xff, 0xdf, 0x89, 0xde, 0x7a, 0x16, 0x27, 0x8f, 0x61, 0x2f, 0x5a, 0xf2, 0xf8, 0xc6, 0xae, + 0xe3, 0x28, 0x07, 0x03, 0xed, 0xec, 0x95, 0x68, 0x16, 0xcd, 0x97, 0xdc, 0xfe, 0xe1, 0xc0, 0x49, + 0x47, 0xf1, 0x15, 0x8b, 0x8b, 0xe3, 0x24, 0xef, 0xa1, 0xba, 0x35, 0x6a, 0x79, 0x81, 0xcb, 0x64, + 0x83, 0x4b, 0x9e, 0x62, 0xa3, 0x51, 0xac, 0xe1, 0xce, 0x3d, 0x7b, 0xa5, 0xa6, 0xf3, 0xca, 0x21, + 0x17, 0xb0, 0x6f, 0x07, 0x20, 0xf5, 0x3b, 0x3b, 0x6b, 0xb8, 0x85, 0xeb, 0x8f, 0x29, 0xb3, 0xec, + 0xa0, 0xf5, 0xf5, 0xc5, 0x35, 0x53, 0x8b, 0x75, 0xa4, 0x3b, 0xfb, 0x8b, 0xdb, 0x14, 0xc5, 0x12, + 0x67, 0xd7, 0x28, 0xfc, 0x79, 0x18, 0x09, 0x16, 0xfb, 0xe6, 0x7f, 0x92, 0xbe, 0xad, 0x12, 0x55, + 0x8c, 0xfd, 0xfa, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5d, 0x0a, 0x45, 0x0e, 0x91, 0x03, 0x00, + 0x00, } diff --git a/protos/orderer/ab.proto b/protos/orderer/ab.proto index e65c7a8a96b..701528431bf 100644 --- a/protos/orderer/ab.proto +++ b/protos/orderer/ab.proto @@ -23,41 +23,48 @@ option go_package = "github.com/hyperledger/fabric/protos/orderer"; package orderer; message BroadcastResponse { - common.Status Status = 1; + common.Status status = 1; } -message SeekInfo { - // Start may be specified to a specific block number, or may be request from the newest or oldest available - // The start location is always inclusive, so the first reply from NEWEST will contain the newest block at the time - // of reception, it will must not wait until a new block is created. Similarly, when SPECIFIED, and SpecifiedNumber = 10 - // The first block received must be block 10, not block 11 - enum StartType { - NEWEST = 0; - OLDEST = 1; - SPECIFIED = 2; - } - StartType Start = 1; - uint64 SpecifiedNumber = 2; // Only used when start = SPECIFIED - uint64 WindowSize = 3; // The window size is the maximum number of blocks that will be sent without Acknowledgement, the base of the window moves to the most recently received acknowledgment - string ChainID = 4; // The chain to seek within -} +message SeekNewest { } + +message SeekOldest { } -message Acknowledgement { - uint64 Number = 1; +message SeekSpecified { + uint64 number = 1; } -// The update message either causes a seek to a new stream start with a new window, or acknowledges a received block and advances the base of the window -message DeliverUpdate { +message SeekPosition { oneof Type { - Acknowledgement Acknowledgement = 1; // Acknowledgement should be sent monotonically and only for a block which has been received, Acknowledgements received non-monotonically has undefined behavior - SeekInfo Seek = 2; // When set, SeekInfo causes a seek and potential reconfiguration of the window size + SeekNewest newest = 1; + SeekOldest oldest = 2; + SeekSpecified specified = 3; + } +} + +// SeekInfo specifies the range of requested blocks to return +// If the start position is not found, an error is immediately returned +// Otherwise, blocks are returned until a missing block is encountered, then behavior is dictated +// by the SeekBehavior specified. If BLOCK_UNTIL_READY is specified, the reply will block until +// the requested blocks are available, if FAIL_IF_NOT_READY is specified, the reply will return an +// error indicating that the block is not found. To request that all blocks be returned indefinitely +// as they are created, behavior should be set to BLOCK_UNTIL_READY and the stop should be set to +// specified with a number of MAX_UINT64 +message SeekInfo { + enum SeekBehavior { + BLOCK_UNTIL_READY = 0; + FAIL_IF_NOT_READY = 1; } + string chainID = 1; // The chain to seek within + SeekPosition start = 2; // The position to start the deliver from + SeekPosition stop = 3; // The position to stop the deliver + SeekBehavior behavior = 4; // The behavior when a missing block is encountered } message DeliverResponse { oneof Type { - common.Status Error = 1; - common.Block Block = 2; + common.Status status = 1; + common.Block block = 2; } } @@ -66,7 +73,5 @@ service AtomicBroadcast { rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {} // deliver first requires an update containing a seek message, then a stream of block replies is received. - // The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart - // To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement - rpc Deliver(stream DeliverUpdate) returns (stream DeliverResponse) {} + rpc Deliver(stream SeekInfo) returns (stream DeliverResponse) {} }