Skip to content

Commit

Permalink
[FAB-5720] Re-submit tx if re-validation passes
Browse files Browse the repository at this point in the history
For the reason stated in FAB-5720, we shouldn't immediately
commit the message after re-validation, but re-submit it with
updated config sequence so that we don't risk forking the chain
due to non-determinism.

Note that this behavior is not compatible with pre-v1.1 orderers.
In order to support mixed cluster where pre-v1.1 and v1.1 orderer
co-exist, V1.1_Orderer_BugFixes flag is used to determine if we
should re-submit the re-validated tx. If this flag is not set,
the orderer is effectively running in compatibility mode.

Change-Id: Ibccf6a83a7b04be8b86611bb374de4f726856880
Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger committed Oct 19, 2017
1 parent a7445e7 commit f844f86
Show file tree
Hide file tree
Showing 9 changed files with 1,023 additions and 336 deletions.
6 changes: 6 additions & 0 deletions common/capabilities/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ func (cp *OrdererProvider) HasCapability(capability string) bool {
func (cp *OrdererProvider) SetChannelModPolicyDuringCreate() bool {
return cp.v11BugFixes
}

// Resubmission specifies whether the v1.0 non-deterministic commitment of tx should be fixed by re-submitting
// the re-validated tx.
func (cp *OrdererProvider) Resubmission() bool {
return cp.v11BugFixes
}
4 changes: 4 additions & 0 deletions common/channelconfig/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type OrdererCapabilities interface {
// group's mod_policy to "" should be fixed or not.
SetChannelModPolicyDuringCreate() bool

// Resubmission specifies whether the v1.0 non-deterministic commitment of tx should be fixed by re-submitting
// the re-validated tx.
Resubmission() bool

// Supported returns an error if there are unknown capabilities in this channel which are required
Supported() error
}
Expand Down
8 changes: 8 additions & 0 deletions common/mocks/config/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type OrdererCapabilities struct {

// SetChannelModPolicyDuringCreateVal is returned by SetChannelModPolicyDuringCreate()
SetChannelModPolicyDuringCreateVal bool

// ResubmissionVal is returned by Resubmission()
ResubmissionVal bool
}

// Supported returns SupportedErr
Expand All @@ -84,3 +87,8 @@ func (oc *OrdererCapabilities) Supported() error {
func (oc *OrdererCapabilities) SetChannelModPolicyDuringCreate() bool {
return oc.SetChannelModPolicyDuringCreateVal
}

// Resubmission returns ResubmissionVal
func (oc *OrdererCapabilities) Resubmission() bool {
return oc.ResubmissionVal
}
260 changes: 201 additions & 59 deletions orderer/consensus/kafka/chain.go

Large diffs are not rendered by default.

951 changes: 713 additions & 238 deletions orderer/consensus/kafka/chain_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions orderer/consensus/kafka/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type consenterImpl struct {
// multichannel.NewManagerImpl() when ranging over the ledgerFactory's
// existingChains.
func (consenter *consenterImpl) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
lastOffsetPersisted := getLastOffsetPersisted(metadata.Value, support.ChainID())
return newChain(consenter, support, lastOffsetPersisted)
lastOffsetPersisted, lastOriginalOffsetProcessed := getOffsets(metadata.Value, support.ChainID())
return newChain(consenter, support, lastOffsetPersisted, lastOriginalOffsetProcessed)
}

// commonConsenter allows us to retrieve the configuration options set on the
Expand Down
91 changes: 58 additions & 33 deletions protos/orderer/kafka.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 13 additions & 4 deletions protos/orderer/kafka.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ message KafkaMessageRegular {
bytes payload = 1;
uint64 config_seq = 2;
Class class = 3;
int64 original_offset = 4;
}

// KafkaMessageTimeToCut is used to signal to the orderers
Expand All @@ -57,9 +58,17 @@ message KafkaMessageConnect {
bytes payload = 1;
}

// LastOffsetPersisted is the encoded value for the Metadata message
// which is encoded in the ORDERER block metadata index for the case
// of the Kafka-based orderer.
// KafkaMetadata is encoded into the ORDERER block to keep track of
// Kafka-related metadata associated with this block.
message KafkaMetadata {
int64 last_offset_persisted = 1;
// LastOffsetPersisted is the encoded value for the Metadata message
// which is encoded in the ORDERER block metadata index for the case
// of the Kafka-based orderer.
int64 last_offset_persisted = 1;

// LastOriginalOffsetProcessed is used to keep track of the newest
// offset processed if a message is re-validated and re-ordered.
// This value is used to deduplicate re-submitted messages from
// multiple orderer so that we don't bother re-processing it again.
int64 last_original_offset_processed = 2;
}
18 changes: 18 additions & 0 deletions sampleconfig/configtx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ Profiles:
- <<: *SampleOrg
AdminPrincipal: Role.MEMBER

# SampleSingleMSPKafkaV1.1 mimics the SampleDevModeKafka definition
# but additionally defines the v1.1 only capabilities which do
# not allow a mixed v1.0.x v1.1.x network
SampleDevModeKafkaV1.1:
Orderer:
<<: *OrdererDefaults
OrdererType: kafka
Organizations:
- <<: *SampleOrg
AdminPrincipal: Role.MEMBER
Capabilities:
<<: *OrdererCapabilities
Consortiums:
SampleConsortium:
Organizations:
- <<: *SampleOrg
AdminPrincipal: Role.MEMBER

# SampleSingleMSPSolo defines a configuration which uses the Solo orderer,
# and contains a single MSP definition (the MSP sampleconfig).
# The Consortium SampleConsortium has only a single member, SampleOrg.
Expand Down

0 comments on commit f844f86

Please sign in to comment.