diff --git a/images/kafka/Dockerfile.in b/images/kafka/Dockerfile.in index b357b79c337..9e9d039da2c 100644 --- a/images/kafka/Dockerfile.in +++ b/images/kafka/Dockerfile.in @@ -15,6 +15,7 @@ ADD payload/kafka-run-class.sh /opt/kafka/bin/kafka-run-class.sh ADD payload/docker-entrypoint.sh /docker-entrypoint.sh EXPOSE 9092 +EXPOSE 9093 ENTRYPOINT ["/docker-entrypoint.sh"] CMD ["/opt/kafka/bin/kafka-server-start.sh"] diff --git a/orderer/kafka/consumer.go b/orderer/kafka/consumer.go index 29714e89019..5c1c59e7c79 100644 --- a/orderer/kafka/consumer.go +++ b/orderer/kafka/consumer.go @@ -16,7 +16,10 @@ limitations under the License. package kafka -import "github.com/Shopify/sarama" +import ( + "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/localconfig" +) // Consumer allows the caller to receive a stream of blobs from the Kafka cluster for a specific partition. type Consumer interface { @@ -29,8 +32,8 @@ type consumerImpl struct { partition sarama.PartitionConsumer } -func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { - parent, err := sarama.NewConsumer(brokers, newBrokerConfig(kafkaVersion, rawPartition)) +func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.TLS, cp ChainPartition, offset int64) (Consumer, error) { + parent, err := sarama.NewConsumer(brokers, newBrokerConfig(kafkaVersion, rawPartition, tls)) if err != nil { return nil, err } diff --git a/orderer/kafka/orderer.go b/orderer/kafka/orderer.go index 70dba31272a..5c583f33949 100644 --- a/orderer/kafka/orderer.go +++ b/orderer/kafka/orderer.go @@ -29,24 +29,24 @@ import ( ) // New creates a Kafka-backed consenter. Called by orderer's main.go. -func New(kv sarama.KafkaVersion, ro config.Retry) multichain.Consenter { - return newConsenter(kv, ro, bfValue, pfValue, cfValue) +func New(kv sarama.KafkaVersion, ro config.Retry, tls config.TLS) multichain.Consenter { + return newConsenter(kv, ro, tls, bfValue, pfValue, cfValue) } // New calls here because we need to pass additional arguments to // the constructor and New() should only read from the config file. -func newConsenter(kv sarama.KafkaVersion, ro config.Retry, bf bfType, pf pfType, cf cfType) multichain.Consenter { - return &consenterImpl{kv, ro, bf, pf, cf} +func newConsenter(kv sarama.KafkaVersion, ro config.Retry, tls config.TLS, bf bfType, pf pfType, cf cfType) multichain.Consenter { + return &consenterImpl{kv, ro, tls, bf, pf, cf} } // bfType defines the signature of the broker constructor. type bfType func([]string, ChainPartition) (Broker, error) // pfType defines the signature of the producer constructor. -type pfType func([]string, sarama.KafkaVersion, config.Retry) Producer +type pfType func([]string, sarama.KafkaVersion, config.Retry, config.TLS) Producer // cfType defines the signature of the consumer constructor. -type cfType func([]string, sarama.KafkaVersion, ChainPartition, int64) (Consumer, error) +type cfType func([]string, sarama.KafkaVersion, config.TLS, ChainPartition, int64) (Consumer, error) // bfValue holds the value for the broker constructor that's used in the non-test case. var bfValue = func(brokers []string, cp ChainPartition) (Broker, error) { @@ -54,13 +54,13 @@ var bfValue = func(brokers []string, cp ChainPartition) (Broker, error) { } // pfValue holds the value for the producer constructor that's used in the non-test case. -var pfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { - return newProducer(brokers, kafkaVersion, retryOptions) +var pfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, tls config.TLS) Producer { + return newProducer(brokers, kafkaVersion, retryOptions, tls) } // cfValue holds the value for the consumer constructor that's used in the non-test case. -var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { - return newConsumer(brokers, kafkaVersion, cp, offset) +var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.TLS, cp ChainPartition, offset int64) (Consumer, error) { + return newConsumer(brokers, kafkaVersion, tls, cp, offset) } // consenterImpl holds the implementation of type that satisfies the @@ -68,11 +68,12 @@ var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainP // is needed because that is what the HandleChain contract requires. // The latter is needed for testing. type consenterImpl struct { - kv sarama.KafkaVersion - ro config.Retry - bf bfType - pf pfType - cf cfType + kv sarama.KafkaVersion + ro config.Retry + tls config.TLS + bf bfType + pf pfType + cf cfType } // HandleChain creates/returns a reference to a Chain for the given set of support resources. @@ -110,7 +111,7 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport, partition: newChainPartition(support.ChainID(), rawPartition), batchTimeout: support.SharedConfig().BatchTimeout(), lastOffsetPersisted: lastOffsetPersisted, - producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()), + producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions(), consenter.tlsConfig()), halted: false, // Redundant as the default value for booleans is false but added for readability exitChan: make(chan struct{}), haltedChan: make(chan struct{}), @@ -123,6 +124,7 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport, type testableConsenter interface { kafkaVersion() sarama.KafkaVersion retryOptions() config.Retry + tlsConfig() config.TLS brokFunc() bfType prodFunc() pfType consFunc() cfType @@ -130,6 +132,7 @@ type testableConsenter interface { func (co *consenterImpl) kafkaVersion() sarama.KafkaVersion { return co.kv } func (co *consenterImpl) retryOptions() config.Retry { return co.ro } +func (co *consenterImpl) tlsConfig() config.TLS { return co.tls } func (co *consenterImpl) brokFunc() bfType { return co.bf } func (co *consenterImpl) prodFunc() pfType { return co.pf } func (co *consenterImpl) consFunc() cfType { return co.cf } @@ -169,7 +172,7 @@ func (ch *chainImpl) Start() { } // 2. Set up the listener/consumer for this partition. - consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastOffsetPersisted+1) + consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.consenter.tlsConfig(), ch.partition, ch.lastOffsetPersisted+1) if err != nil { logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err) close(ch.exitChan) diff --git a/orderer/kafka/orderer_test.go b/orderer/kafka/orderer_test.go index 8c5712112c4..29af64e3041 100644 --- a/orderer/kafka/orderer_test.go +++ b/orderer/kafka/orderer_test.go @@ -51,14 +51,16 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio prodDisk := make(chan *ab.KafkaMessage) consDisk := make(chan *ab.KafkaMessage) + mockTLS := config.TLS{Enabled: false} + mockBfValue := func(brokers []string, cp ChainPartition) (Broker, error) { return mockNewBroker(t, cp) } - mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { + mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, tls config.TLS) Producer { // The first Send on this producer will return a blob with offset #nextProducedOffset return mockNewProducer(t, cp, nextProducedOffset, prodDisk) } - mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, lastPersistedOffset int64) (Consumer, error) { + mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.TLS, cp ChainPartition, lastPersistedOffset int64) (Consumer, error) { if lastPersistedOffset != nextProducedOffset { panic(fmt.Errorf("Mock objects about to be set up incorrectly (consumer to seek to %d, producer to post %d)", lastPersistedOffset, nextProducedOffset)) } @@ -67,11 +69,12 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio return &mockConsenterImpl{ consenterImpl: consenterImpl{ - kv: kafkaVersion, - ro: retryOptions, - bf: mockBfValue, - pf: mockPfValue, - cf: mockCfValue, + kv: kafkaVersion, + ro: retryOptions, + tls: mockTLS, + bf: mockBfValue, + pf: mockPfValue, + cf: mockCfValue, }, prodDisk: prodDisk, consDisk: consDisk, diff --git a/orderer/kafka/producer.go b/orderer/kafka/producer.go index 3cc3a727c4a..7ad02d0dede 100644 --- a/orderer/kafka/producer.go +++ b/orderer/kafka/producer.go @@ -34,10 +34,10 @@ type producerImpl struct { producer sarama.SyncProducer } -func newProducer(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { +func newProducer(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, tls config.TLS) Producer { var p sarama.SyncProducer var err error - brokerConfig := newBrokerConfig(kafkaVersion, rawPartition) + brokerConfig := newBrokerConfig(kafkaVersion, rawPartition, tls) repeatTick := time.NewTicker(retryOptions.Period) panicTick := time.NewTicker(retryOptions.Stop) diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index b59f6214928..5288e722805 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -17,15 +17,19 @@ limitations under the License. package kafka import ( + "crypto/tls" + "crypto/x509" + "fmt" "strconv" "github.com/Shopify/sarama" + "github.com/hyperledger/fabric/orderer/localconfig" ab "github.com/hyperledger/fabric/protos/orderer" ) // TODO Set the returned config file to more appropriate // defaults as we're getting closer to a stable release -func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32) *sarama.Config { +func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32, tlsConfig config.TLS) *sarama.Config { brokerConfig := sarama.NewConfig() brokerConfig.Version = kafkaVersion @@ -40,6 +44,31 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int // value of a Kafka broker's socket.request.max.bytes property (100 MiB). brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize) + brokerConfig.Net.TLS.Enable = tlsConfig.Enabled + + if brokerConfig.Net.TLS.Enable { + // create public/private key pair structure + keyPair, err := tls.X509KeyPair([]byte(tlsConfig.Certificate), []byte(tlsConfig.PrivateKey)) + if err != nil { + panic(fmt.Errorf("Unable to decode public/private key pair. Error: %v", err)) + } + + // create root CA pool + rootCAs := x509.NewCertPool() + for _, certificate := range tlsConfig.RootCAs { + if !rootCAs.AppendCertsFromPEM([]byte(certificate)) { + panic(fmt.Errorf("Unable to decode certificate. Error: %v", err)) + } + } + + brokerConfig.Net.TLS.Config = &tls.Config{ + Certificates: []tls.Certificate{keyPair}, + RootCAs: rootCAs, + MinVersion: 0, // TLS 1.0 (no SSL support) + MaxVersion: 0, // Latest supported TLS version + } + } + return brokerConfig } diff --git a/orderer/kafka/util_test.go b/orderer/kafka/util_test.go index 3fcbd1caeb1..62daf5fe4a0 100644 --- a/orderer/kafka/util_test.go +++ b/orderer/kafka/util_test.go @@ -21,6 +21,9 @@ import ( "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" + "github.com/hyperledger/fabric/orderer/localconfig" + "github.com/hyperledger/fabric/orderer/mocks/util" + "github.com/stretchr/testify/assert" ) func TestProducerConfigMessageMaxBytes(t *testing.T) { @@ -35,7 +38,8 @@ func TestProducerConfigMessageMaxBytes(t *testing.T) { "ProduceRequest": sarama.NewMockProduceResponse(t), }) - config := newBrokerConfig(testConf.Kafka.Version, rawPartition) + mockTLS := config.TLS{Enabled: false} + config := newBrokerConfig(testConf.Kafka.Version, rawPartition, mockTLS) producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) if err != nil { t.Fatal(err) @@ -86,7 +90,7 @@ func TestNewBrokerConfig(t *testing.T) { "ProduceRequest": sarama.NewMockProduceResponse(t), }) - config := newBrokerConfig(testConf.Kafka.Version, differentPartition) + config := newBrokerConfig(testConf.Kafka.Version, differentPartition, config.TLS{Enabled: false}) producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) if err != nil { t.Fatal("Failed to create producer:", err) @@ -105,3 +109,92 @@ func TestNewBrokerConfig(t *testing.T) { } } } + +func TestTLSConfigEnabled(t *testing.T) { + publicKey, privateKey, err := util.GenerateMockPublicPrivateKeyPairPEM(false) + if err != nil { + t.Fatalf("Enable to generate a public/private key pair: %v", err) + } + caPublicKey, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true) + if err != nil { + t.Fatalf("Enable to generate a signer certificate: %v", err) + } + + config := newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{ + Enabled: true, + PrivateKey: privateKey, + Certificate: publicKey, + RootCAs: []string{caPublicKey}, + }) + + assert.True(t, config.Net.TLS.Enable) + assert.NotNil(t, config.Net.TLS.Config) + assert.Len(t, config.Net.TLS.Config.Certificates, 1) + assert.Len(t, config.Net.TLS.Config.RootCAs.Subjects(), 1) + assert.Equal(t, uint16(0), config.Net.TLS.Config.MaxVersion) + assert.Equal(t, uint16(0), config.Net.TLS.Config.MinVersion) +} + +func TestTLSConfigDisabled(t *testing.T) { + publicKey, privateKey, err := util.GenerateMockPublicPrivateKeyPairPEM(false) + if err != nil { + t.Fatalf("Enable to generate a public/private key pair: %v", err) + } + caPublicKey, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true) + if err != nil { + t.Fatalf("Enable to generate a signer certificate: %v", err) + } + + config := newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{ + Enabled: false, + PrivateKey: privateKey, + Certificate: publicKey, + RootCAs: []string{caPublicKey}, + }) + + assert.False(t, config.Net.TLS.Enable) + assert.Zero(t, config.Net.TLS.Config) + +} + +func TestTLSConfigBadCert(t *testing.T) { + publicKey, privateKey, err := util.GenerateMockPublicPrivateKeyPairPEM(false) + if err != nil { + t.Fatalf("Enable to generate a public/private key pair: %v", err) + } + caPublicKey, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true) + if err != nil { + t.Fatalf("Enable to generate a signer certificate: %v", err) + } + + t.Run("BadPrivateKey", func(t *testing.T) { + assert.Panics(t, func() { + newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{ + Enabled: true, + PrivateKey: privateKey, + Certificate: "TRASH", + RootCAs: []string{caPublicKey}, + }) + }) + }) + t.Run("BadPublicKey", func(t *testing.T) { + assert.Panics(t, func() { + newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{ + Enabled: true, + PrivateKey: "TRASH", + Certificate: publicKey, + RootCAs: []string{caPublicKey}, + }) + }) + }) + t.Run("BadRootCAs", func(t *testing.T) { + assert.Panics(t, func() { + newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{ + Enabled: true, + PrivateKey: privateKey, + Certificate: publicKey, + RootCAs: []string{"TRASH"}, + }) + }) + }) +} diff --git a/orderer/localconfig/config.go b/orderer/localconfig/config.go index f8f9fa3b8ad..4841bccc0d7 100644 --- a/orderer/localconfig/config.go +++ b/orderer/localconfig/config.go @@ -52,12 +52,12 @@ type General struct { LocalMSPDir string } -//TLS contains config used to configure TLS for the grpc server +//TLS contains config used to configure TLS type TLS struct { Enabled bool - ServerKey string - ServerCertificate string - ServerRootCAs []string + PrivateKey string + Certificate string + RootCAs []string ClientAuthEnabled bool ClientRootCAs []string } @@ -100,6 +100,7 @@ type Kafka struct { Retry Retry Verbose bool Version sarama.KafkaVersion + TLS TLS } // SbftLocal contains config for the SBFT peer/replica @@ -169,6 +170,9 @@ var defaults = TopLevel{ }, Verbose: false, Version: sarama.V0_9_0_1, + TLS: TLS{ + Enabled: false, + }, }, Genesis: Genesis{ OrdererType: "solo", @@ -220,6 +224,12 @@ func (c *TopLevel) completeInitialization() { c.General.GenesisMethod = defaults.General.GenesisMethod case c.General.GenesisFile == "": c.General.GenesisFile = defaults.General.GenesisFile + case c.Kafka.TLS.Enabled && c.Kafka.TLS.Certificate == "": + logger.Panicf("General.Kafka.TLS.Certificate must be set if General.Kafka.TLS.Enabled is set to true.") + case c.Kafka.TLS.Enabled && c.Kafka.TLS.PrivateKey == "": + logger.Panicf("General.Kafka.TLS.PrivateKey must be set if General.Kafka.TLS.Enabled is set to true.") + case c.Kafka.TLS.Enabled && c.Kafka.TLS.RootCAs == nil: + logger.Panicf("General.Kafka.TLS.CertificatePool must be set if General.Kafka.TLS.Enabled is set to true.") case c.General.Profile.Enabled && (c.General.Profile.Address == ""): logger.Infof("Profiling enabled and General.Profile.Address unset, setting to %s", defaults.General.Profile.Address) c.General.Profile.Address = defaults.General.Profile.Address diff --git a/orderer/localconfig/config_test.go b/orderer/localconfig/config_test.go index bd9d7cece1b..bdc83bc6fbd 100644 --- a/orderer/localconfig/config_test.go +++ b/orderer/localconfig/config_test.go @@ -19,13 +19,16 @@ package config import ( "bytes" "fmt" + "io/ioutil" "os" "reflect" "strings" "testing" "time" + "github.com/hyperledger/fabric/orderer/mocks/util" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" ) func TestGoodConfig(t *testing.T) { @@ -191,3 +194,196 @@ func TestEnvInnerVar(t *testing.T) { t.Fatalf("Environmental override of inner config test 2 did not work") } } + +func TestKafkaTLSConfig(t *testing.T) { + testCases := []struct { + name string + tls TLS + shouldPanic bool + }{ + {"Disabled", TLS{Enabled: false}, false}, + {"EnabledNoPrivateKey", TLS{Enabled: true, Certificate: "public.key"}, true}, + {"EnabledNoPublicKey", TLS{Enabled: true, PrivateKey: "private.key"}, true}, + {"EnabledNoTrustedRoots", TLS{Enabled: true, PrivateKey: "private.key", Certificate: "public.key"}, true}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + uconf := &TopLevel{Kafka: Kafka{TLS: tc.tls}} + if tc.shouldPanic { + assert.Panics(t, func() { uconf.completeInitialization() }, "should panic") + } else { + assert.NotPanics(t, func() { uconf.completeInitialization() }, "should not panic") + } + }) + } +} + +type stringFromFileConfig struct { + Inner struct { + Single string + Multiple []string + } +} + +func TestStringNotFromFile(t *testing.T) { + + expectedValue := "expected_value" + yaml := fmt.Sprintf("---\nInner:\n Single: %s\n", expectedValue) + + config := viper.New() + config.SetConfigType("yaml") + + if err := config.ReadConfig(bytes.NewReader([]byte(yaml))); err != nil { + t.Fatalf("Error reading config: %s", err) + } + + var uconf stringFromFileConfig + if err := ExactWithDateUnmarshal(config, &uconf); err != nil { + t.Fatalf("Failed to unmarshall: %s", err) + } + + if uconf.Inner.Single != expectedValue { + t.Fatalf(`Expected: "%s", Actual: "%s"`, expectedValue, uconf.Inner.Single) + } + +} + +func TestStringFromFile(t *testing.T) { + + expectedValue := "this is the text in the file" + + // create temp file + file, err := ioutil.TempFile(os.TempDir(), "test") + if err != nil { + t.Fatalf("Unable to create temp file.") + } + defer os.Remove(file.Name()) + + // write temp file + if err = ioutil.WriteFile(file.Name(), []byte(expectedValue), 0777); err != nil { + t.Fatalf("Unable to write to temp file.") + } + + yaml := fmt.Sprintf("---\nInner:\n Single:\n File: %s", file.Name()) + + config := viper.New() + config.SetConfigType("yaml") + + if err = config.ReadConfig(bytes.NewReader([]byte(yaml))); err != nil { + t.Fatalf("Error reading config: %s", err) + } + var uconf stringFromFileConfig + if err = ExactWithDateUnmarshal(config, &uconf); err != nil { + t.Fatalf("Failed to unmarshall: %s", err) + } + + if uconf.Inner.Single != expectedValue { + t.Fatalf(`Expected: "%s", Actual: "%s"`, expectedValue, uconf.Inner.Single) + } +} + +func TestPEMBlocksFromFile(t *testing.T) { + + // create temp file + file, err := ioutil.TempFile(os.TempDir(), "test") + if err != nil { + t.Fatalf("Unable to create temp file.") + } + defer os.Remove(file.Name()) + + numberOfCertificates := 3 + var pems []byte + for i := 0; i < numberOfCertificates; i++ { + publicKeyCert, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true) + if err != nil { + t.Fatalf("Enable to generate a signer certificate: %v", err) + } + pems = append(pems, publicKeyCert...) + } + + // write temp file + if err := ioutil.WriteFile(file.Name(), pems, 0666); err != nil { + t.Fatalf("Unable to write to temp file: %v", err) + } + + yaml := fmt.Sprintf("---\nInner:\n Multiple:\n File: %s", file.Name()) + + config := viper.New() + config.SetConfigType("yaml") + + if err := config.ReadConfig(bytes.NewReader([]byte(yaml))); err != nil { + t.Fatalf("Error reading config: %v", err) + } + var uconf stringFromFileConfig + if err := ExactWithDateUnmarshal(config, &uconf); err != nil { + t.Fatalf("Failed to unmarshall: %v", err) + } + + if len(uconf.Inner.Multiple) != 3 { + t.Fatalf(`Expected: "%v", Actual: "%v"`, numberOfCertificates, len(uconf.Inner.Multiple)) + } +} + +func TestStringFromFileNotSpecified(t *testing.T) { + + yaml := fmt.Sprintf("---\nInner:\n Single:\n File:\n") + + config := viper.New() + config.SetConfigType("yaml") + + if err := config.ReadConfig(bytes.NewReader([]byte(yaml))); err != nil { + t.Fatalf("Error reading config: %s", err) + } + var uconf stringFromFileConfig + if err := ExactWithDateUnmarshal(config, &uconf); err == nil { + t.Fatalf("Should of failed to unmarshall.") + } + +} + +func TestStringFromFileEnv(t *testing.T) { + + expectedValue := "this is the text in the file" + + // create temp file + file, err := ioutil.TempFile(os.TempDir(), "test") + if err != nil { + t.Fatalf("Unable to create temp file.") + } + defer os.Remove(file.Name()) + + // write temp file + if err = ioutil.WriteFile(file.Name(), []byte(expectedValue), 0777); err != nil { + t.Fatalf("Unable to write to temp file.") + } + + envVar := "ORDERER_INNER_SINGLE_FILE" + envVal := file.Name() + os.Setenv(envVar, envVal) + defer os.Unsetenv(envVar) + config := viper.New() + config.SetEnvPrefix(Prefix) + config.AutomaticEnv() + replacer := strings.NewReplacer(".", "_") + config.SetEnvKeyReplacer(replacer) + config.SetConfigType("yaml") + + data := "---\nInner:\n Single:\n File: wrong_file" + + if err = config.ReadConfig(bytes.NewReader([]byte(data))); err != nil { + t.Fatalf("Error reading %s plugin config: %s", Prefix, err) + } + + var uconf stringFromFileConfig + + err = ExactWithDateUnmarshal(config, &uconf) + if err != nil { + t.Fatalf("Failed to unmarshal with: %s", err) + } + + t.Log(uconf.Inner.Single) + + if !reflect.DeepEqual(uconf.Inner.Single, expectedValue) { + t.Fatalf(`Expected: "%v", Actual: "%v"`, expectedValue, uconf.Inner.Single) + } +} diff --git a/orderer/localconfig/config_util.go b/orderer/localconfig/config_util.go index ad3e761c3ac..008e3c662d5 100644 --- a/orderer/localconfig/config_util.go +++ b/orderer/localconfig/config_util.go @@ -18,6 +18,7 @@ package config import ( "fmt" + "io/ioutil" "math" "reflect" "regexp" @@ -26,6 +27,7 @@ import ( "time" "encoding/json" + "encoding/pem" "github.com/mitchellh/mapstructure" "github.com/spf13/viper" @@ -50,7 +52,7 @@ func getKeysRecursively(base string, v *viper.Viper, nodeKeys map[string]interfa } else if m, ok := val.(map[string]interface{}); ok { logger.Debugf("Found map[string]interface{} value for %s", fqKey) result[key] = getKeysRecursively(fqKey+".", v, m) - } else if m, ok := unmarshalJson(val); ok { + } else if m, ok := unmarshalJSON(val); ok { logger.Debugf("Found real value for %s setting to map[string]string %v", fqKey, m) result[key] = m } else { @@ -61,7 +63,7 @@ func getKeysRecursively(base string, v *viper.Viper, nodeKeys map[string]interfa return result } -func unmarshalJson(val interface{}) (map[string]string, bool) { +func unmarshalJSON(val interface{}) (map[string]string, bool) { mp := map[string]string{} s, ok := val.(string) if !ok { @@ -95,7 +97,7 @@ func customDecodeHook() mapstructure.DecodeHookFunc { raw := data.(string) l := len(raw) - if raw[0] == '[' && raw[l-1] == ']' { + if l > 1 && raw[0] == '[' && raw[l-1] == ']' { slice := strings.Split(raw[1:l-1], ",") for i, v := range slice { slice[i] = strings.TrimSpace(v) @@ -142,6 +144,90 @@ func byteSizeDecodeHook() mapstructure.DecodeHookFunc { } } +func stringFromFileDecodeHook() mapstructure.DecodeHookFunc { + return func(f reflect.Kind, t reflect.Kind, data interface{}) (interface{}, error) { + // "to" type should be string + if t != reflect.String { + return data, nil + } + // "from" type should be map + if f != reflect.Map { + return data, nil + } + v := reflect.ValueOf(data) + switch v.Kind() { + case reflect.String: + return data, nil + case reflect.Map: + d := data.(map[string]interface{}) + fileName, ok := d["File"] + if !ok { + fileName, ok = d["file"] + } + switch { + case ok && fileName != nil: + bytes, err := ioutil.ReadFile(fileName.(string)) + if err != nil { + return data, err + } + return string(bytes), nil + case ok: + // fileName was nil + return nil, fmt.Errorf("Value of File: was nil") + } + } + return data, nil + } +} + +func pemBlocksFromFileDecodeHook() mapstructure.DecodeHookFunc { + return func(f reflect.Kind, t reflect.Kind, data interface{}) (interface{}, error) { + // "to" type should be string + if t != reflect.Slice { + return data, nil + } + // "from" type should be map + if f != reflect.Map { + return data, nil + } + v := reflect.ValueOf(data) + switch v.Kind() { + case reflect.String: + return data, nil + case reflect.Map: + d := data.(map[string]interface{}) + fileName, ok := d["File"] + if !ok { + fileName, ok = d["file"] + } + switch { + case ok && fileName != nil: + var result []string + bytes, err := ioutil.ReadFile(fileName.(string)) + if err != nil { + return data, err + } + for len(bytes) > 0 { + var block *pem.Block + block, bytes = pem.Decode(bytes) + if block == nil { + break + } + if block.Type != "CERTIFICATE" || len(block.Headers) != 0 { + continue + } + result = append(result, string(pem.EncodeToMemory(block))) + } + return result, nil + case ok: + // fileName was nil + return nil, fmt.Errorf("Value of File: was nil") + } + } + return data, nil + } +} + // ExactWithDateUnmarshal is intended to unmarshal a config file into a structure // producing error when extraneous variables are introduced and supporting // the time.Duration type @@ -158,6 +244,8 @@ func ExactWithDateUnmarshal(v *viper.Viper, output interface{}) error { DecodeHook: mapstructure.ComposeDecodeHookFunc( customDecodeHook(), byteSizeDecodeHook(), + stringFromFileDecodeHook(), + pemBlocksFromFileDecodeHook(), ), } diff --git a/orderer/main.go b/orderer/main.go index 98762434e49..b44245d8636 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -147,7 +147,7 @@ func main() { consenters := make(map[string]multichain.Consenter) consenters["solo"] = solo.New() - consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry) + consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry, conf.Kafka.TLS) consenters["sbft"] = sbft.New(makeSbftConsensusConfig(conf), makeSbftStackConfig(conf)) manager := multichain.NewManagerImpl(lf, consenters, localmsp.NewSigner()) diff --git a/orderer/mocks/util/util.go b/orderer/mocks/util/util.go new file mode 100644 index 00000000000..9c602d2346a --- /dev/null +++ b/orderer/mocks/util/util.go @@ -0,0 +1,65 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" +) + +// GenerateMockPublicPrivateKeyPairPEM returns public/private key pair encoded as PEM strings +func GenerateMockPublicPrivateKeyPairPEM(isCA bool) (string, string, error) { + privateKey, err := rsa.GenerateKey(rand.Reader, 1024) + if err != nil { + return "", "", err + } + privateKeyPEM := string(pem.EncodeToMemory( + &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(privateKey), + }, + )) + + template := x509.Certificate{ + SerialNumber: big.NewInt(100), + Subject: pkix.Name{ + Organization: []string{"Hyperledger Fabric"}, + }, + } + if isCA { + template.IsCA = true + template.KeyUsage |= x509.KeyUsageCertSign + } + + publicKeyCert, err := x509.CreateCertificate(rand.Reader, &template, &template, privateKey.Public(), privateKey) + if err != nil { + return "", "", err + } + + publicKeyCertPEM := string(pem.EncodeToMemory( + &pem.Block{ + Type: "CERTIFICATE", + Bytes: publicKeyCert, + }, + )) + + return publicKeyCertPEM, privateKeyPEM, nil +} diff --git a/orderer/orderer.yaml b/orderer/orderer.yaml index 4523e73098c..2db34accf9a 100644 --- a/orderer/orderer.yaml +++ b/orderer/orderer.yaml @@ -30,9 +30,9 @@ General: # TLS: TLS settings for the GRPC server TLS: Enabled: false - ServerKey: - ServerCertificate: - ServerRootCAs: + PrivateKey: + Certificate: + RootCAs: ClientAuthEnabled: false ClientRootCAs: @@ -114,6 +114,26 @@ Kafka: Brokers: - 127.0.0.1:9092 + # TLS: TLS settings for the Kafka client + TLS: + + # Enabled: set to true enable TLS + Enabled: false + + # PrivateKey: PEM encoded private key orderer will use for authentication. + PrivateKey: + #File: uncomment to read PrivateKey from a file + + # Certificate: PEM encoded signed public key vertificate orderer will use + # for authentication. + Certificate: + #File: uncomment to read Certificate from a file + + # RootCAs: PEM encoded trusted signer certificates used to validate + # certificates from the Kafka cluster. + RootCAs: + #File: uncomment to read Certificate from a file + ################################################################################ # # SECTION: Sbft local