Skip to content

Commit

Permalink
[FAB-1690] Implement Chain interface in SBFT
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1690

Change-Id: I0b0fd0b3df5dc919b77908aae80311fd604b0d4e
Signed-off-by: Gabor Hosszu <gabor@digitalasset.com>
  • Loading branch information
gaborh-da committed Jan 24, 2017
1 parent 230f3cc commit fff6ed6
Show file tree
Hide file tree
Showing 31 changed files with 985 additions and 820 deletions.
29 changes: 29 additions & 0 deletions orderer/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ type Kafka struct {
Version sarama.KafkaVersion
}

// Sbft contains config for the SBFT orderer
type Sbft struct {
PeerCommAddr string
CertFile string
KeyFile string
DataDir string
N uint64
F uint64
BatchDurationNsec uint64
BatchSizeBytes uint64
RequestTimeoutNsec uint64
Peers map[string]string // Address to Cert mapping
}

// Retry contains config for the reconnection attempts to the Kafka brokers
type Retry struct {
Period time.Duration
Expand All @@ -118,6 +132,7 @@ type TopLevel struct {
FileLedger FileLedger
Kafka Kafka
Genesis Genesis
Sbft Sbft
}

var defaults = TopLevel{
Expand Down Expand Up @@ -161,6 +176,20 @@ var defaults = TopLevel{
PreferredMaxBytes: 512 * 1024,
},
},
// TODO move the appropriate parts to Genesis
// and use BatchSizeMaxBytes
Sbft: Sbft{
PeerCommAddr: ":6101",
CertFile: "sbft/testdata/cert1.pem",
KeyFile: "sbft/testdata/key.pem",
DataDir: "/tmp",
N: 1,
F: 0,
BatchDurationNsec: 1000,
BatchSizeBytes: 1000000000,
RequestTimeoutNsec: 1000000000,
Peers: map[string]string{":6101": "sbft/testdata/cert1.pem"},
},
}

func (c *TopLevel) completeInitialization() {
Expand Down
20 changes: 20 additions & 0 deletions orderer/localconfig/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"encoding/json"

"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
)
Expand All @@ -48,6 +50,9 @@ func getKeysRecursively(base string, v *viper.Viper, nodeKeys map[string]interfa
} else if m, ok := val.(map[string]interface{}); ok {
logger.Debugf("Found map[string]interface{} value for %s", fqKey)
result[key] = getKeysRecursively(fqKey+".", v, m)
} else if m, ok := unmarshalJson(val); ok {
logger.Debugf("Found real value for %s setting to map[string]string %v", fqKey, m)
result[key] = m
} else {
logger.Debugf("Found real value for %s setting to %T %v", fqKey, val, val)
result[key] = val
Expand All @@ -56,6 +61,21 @@ func getKeysRecursively(base string, v *viper.Viper, nodeKeys map[string]interfa
return result
}

func unmarshalJson(val interface{}) (map[string]string, bool) {
mp := map[string]string{}
s, ok := val.(string)
if !ok {
logger.Debugf("Unmarshal JSON: value is not a string: %v", val)
return nil, false
}
err := json.Unmarshal([]byte(s), &mp)
if err != nil {
logger.Debugf("Unmarshal JSON: value cannot be unmarshalled: %s", err)
return nil, false
}
return mp, true
}

// customDecodeHook adds the additional functions of parsing durations from strings
// as well as parsing strings of the format "[thing1, thing2, thing3]" into string slices
// Note that whitespace around slice elements is removed
Expand Down
23 changes: 23 additions & 0 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
ramledger "github.com/hyperledger/fabric/orderer/ledger/ram"
"github.com/hyperledger/fabric/orderer/localconfig"
"github.com/hyperledger/fabric/orderer/multichain"
"github.com/hyperledger/fabric/orderer/sbft"
"github.com/hyperledger/fabric/orderer/sbft/backend"
sbftcrypto "github.com/hyperledger/fabric/orderer/sbft/crypto"
"github.com/hyperledger/fabric/orderer/sbft/simplebft"
"github.com/hyperledger/fabric/orderer/solo"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand Down Expand Up @@ -144,6 +148,7 @@ func main() {
consenters := make(map[string]multichain.Consenter)
consenters["solo"] = solo.New()
consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry)
consenters["sbft"] = sbft.New(makeSbftConsensusConfig(conf), makeSbftStackConfig(conf))

manager := multichain.NewManagerImpl(lf, consenters, localmsp.NewSigner())

Expand All @@ -157,3 +162,21 @@ func main() {
logger.Infof("Beginning to serve requests")
grpcServer.Start()
}

func makeSbftConsensusConfig(conf *config.TopLevel) *sbft.ConsensusConfig {
cfg := simplebft.Config{N: conf.Sbft.N, F: conf.Sbft.F, BatchDurationNsec: conf.Sbft.BatchDurationNsec,
BatchSizeBytes: conf.Sbft.BatchSizeBytes,
RequestTimeoutNsec: conf.Sbft.RequestTimeoutNsec}
peers := make(map[string][]byte)
for addr, cert := range conf.Sbft.Peers {
peers[addr], _ = sbftcrypto.ParseCertPEM(cert)
}
return &sbft.ConsensusConfig{Consensus: &cfg, Peers: peers}
}

func makeSbftStackConfig(conf *config.TopLevel) *backend.StackConfig {
return &backend.StackConfig{ListenAddr: conf.Sbft.PeerCommAddr,
CertFile: conf.Sbft.CertFile,
KeyFile: conf.Sbft.KeyFile,
DataDir: conf.Sbft.DataDir}
}
4 changes: 3 additions & 1 deletion orderer/mocks/multichain/multichain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Co
umtxs[i] = utils.UnmarshalEnvelopeOrPanic(block.Data.Data[i])
}
mcs.Batches <- umtxs
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
mcs.WriteBlockVal = block
return block
}
Expand Down
Loading

0 comments on commit fff6ed6

Please sign in to comment.