Skip to content

Commit

Permalink
[FAB-5267] Switch Broadcast filter w/ msgprocessor
Browse files Browse the repository at this point in the history
When the Enqueue method was removed in favor of the Order/Configure
methods, the existing Broadcast code continued to use them very much
like the old Enqueue method.

This CR removes the hack which allowed the Broadcast code to continue to
function and instead leverages the msgprocessor code to appropriately
route messages to either Order or Configure.

Because the filters were removed from the blockcutter, the only place
they are currently used is in the Broadcast path.  This injects an
unneeded dependency into the Broadcast framework, making tests more
difficult to mock, and generally complicating the code.

This CR removes the filtering from the Broadcast path, in favor of doing
the filtering entirely in the msgprocessor path.  This was at one point
split into two CRs around issue FAB-5268, but it turned out to be
infeasible to break this into two changes while not breaking the e2e.

Change-Id: I6ef3595f26a4e9dfdbf6dec636b434e725ccd6f2
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jul 27, 2017
1 parent 3e5c3e4 commit f5e25a3
Show file tree
Hide file tree
Showing 22 changed files with 730 additions and 746 deletions.
147 changes: 51 additions & 96 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,59 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package broadcast

import (
"github.com/hyperledger/fabric/orderer/common/filter"
"io"

"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"

"io"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/utils"
)

var logger = logging.MustGetLogger("orderer/common/broadcast")

// ConfigUpdateProcessor is used to transform CONFIG_UPDATE transactions which are used to generate other envelope
// message types with preprocessing by the orderer
type ConfigUpdateProcessor interface {
// Process transforms an envelope of type CONFIG_UPDATE to another type
Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)
}

// Handler defines an interface which handles broadcasts
type Handler interface {
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}

// SupportManager provides a way for the Handler to look up the Support for a chain
type SupportManager interface {
ConfigUpdateProcessor
// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a chain
type ChannelSupportRegistrar interface {
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error)
}

// GetChain gets the chain support for a given ChannelId
GetChain(chainID string) (Support, bool)
// ChannelSupport provides the backing resources needed to support broadcast on a channel
type ChannelSupport interface {
msgprocessor.Processor
Consenter
}

// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Consenter provides methods to send messages through consensus
type Consenter interface {
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error

// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error

// Filters returns the set of broadcast filters for this chain
Filters() *filter.RuleSet
}

type handlerImpl struct {
sm SupportManager
sm ChannelSupportRegistrar
}

// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(sm SupportManager) Handler {
func NewHandlerImpl(sm ChannelSupportRegistrar) Handler {
return &handlerImpl{
sm: sm,
}
Expand All @@ -90,78 +73,40 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return err
}

payload, err := utils.UnmarshalPayload(msg.Payload)
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
if err != nil {
logger.Warningf("Received malformed message, dropping connection: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if payload.Header == nil {
logger.Warningf("Received malformed message, with missing header, dropping connection")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
logger.Warningf("[channel: %s] Could not get message processor: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])

isConfig := false
configUpdateMsg := msg
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
logger.Warningf("[channel: %s] Rejecting broadcast of normal message because of error: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err)})
}

err = proto.Unmarshal(msg.Payload, payload)
if err != nil || payload.Header == nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message with SERVICE_UNAVAILABLE: reject by Order: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message", chdr.ChannelId)

chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
logger.Warningf("[channel: %s] Rejecting broadcast of config message because of error: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err)})
}

if chdr.ChannelId == "" {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
err = processor.Configure(msg, config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

isConfig = true
}

support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}

logger.Debugf("[channel: %s] Broadcast is filtering message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])

// Normal transaction for existing chain
_, filterErr := support.Filters().Apply(msg)

if filterErr != nil {
logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

// XXX temporary hack to mesh interface definitions, will remove.
if isConfig {
err = support.Configure(configUpdateMsg, msg, 0)
} else {
err = support.Order(msg, 0)
}

if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

if logger.IsEnabledFor(logging.DEBUG) {
Expand All @@ -175,3 +120,13 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
}
}
}

// ClassifyError converts an error type into a status code.
func ClassifyError(err error) cb.Status {
switch err {
case msgprocessor.ErrChannelDoesNotExist:
return cb.Status_NOT_FOUND
default:
return cb.Status_BAD_REQUEST
}
}
Loading

0 comments on commit f5e25a3

Please sign in to comment.