Skip to content

Commit

Permalink
[FAB-4619] enable specifying orderer kafka version
Browse files Browse the repository at this point in the history
Kafka version can now be set in orderer.yaml, eg:

Kakfa:
  Version: 0.10.0.0

Default value is '0.9.0.1'.

Change-Id: Ieb69b003b400bff9c7918743e99ce76c4ad4363c
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Jun 23, 2017
1 parent f49218d commit 4f6e4e6
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 21 deletions.
58 changes: 58 additions & 0 deletions common/viperutil/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"testing"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/orderer/mocks/util"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -70,6 +71,63 @@ func TestEnvSlice(t *testing.T) {
}
}

func TestKafkaVersionDecode(t *testing.T) {

type testKafkaVersion struct {
Inner struct {
Version sarama.KafkaVersion
}
}

config := viper.New()
config.SetConfigType("yaml")

testCases := []struct {
data string
expected sarama.KafkaVersion
errExpected bool
}{
{"0.8.2.0", sarama.V0_8_2_0, false},
{"0.8.2.1", sarama.V0_8_2_1, false},
{"0.8.2.2", sarama.V0_8_2_2, false},
{"0.9.0.0", sarama.V0_9_0_0, false},
{"0.9.0.1", sarama.V0_9_0_1, false},
{"0.10.0.0", sarama.V0_10_0_0, false},
{"0.10.0.1", sarama.V0_10_0_1, false},
{"0.10.1.0", sarama.V0_10_1_0, false},
{"Unsupported", sarama.KafkaVersion{}, true},
}

for _, tc := range testCases {
t.Run(tc.data, func(t *testing.T) {

data := fmt.Sprintf("---\nInner:\n Version: %s", tc.data)
err := config.ReadConfig(bytes.NewReader([]byte(data)))
if err != nil {
t.Fatalf("Error reading config: %s", err)
}

var uconf testKafkaVersion
err = EnhancedExactUnmarshal(config, &uconf)

if tc.errExpected {
if err == nil {
t.Fatalf("Should have failed to unmarshal")
}
} else {
if err != nil {
t.Fatalf("Failed to unmarshal with: %s", err)
}
if uconf.Inner.Version != tc.expected {
t.Fatalf("Did not get back the right kafka version, expected: %v got %v", tc.expected, uconf.Inner.Version)
}
}

})
}

}

type testByteSize struct {
Inner struct {
ByteSize uint32
Expand Down
30 changes: 30 additions & 0 deletions common/viperutil/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"encoding/json"
"encoding/pem"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/common/flogging"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
Expand Down Expand Up @@ -254,6 +255,34 @@ func pemBlocksFromFileDecodeHook() mapstructure.DecodeHookFunc {
}
}

func kafkaVersionDecodeHook() mapstructure.DecodeHookFunc {
return func(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
if f.Kind() != reflect.String || t != reflect.TypeOf(sarama.KafkaVersion{}) {
return data, nil
}
switch data {
case "0.8.2.0":
return sarama.V0_8_2_0, nil
case "0.8.2.1":
return sarama.V0_8_2_1, nil
case "0.8.2.2":
return sarama.V0_8_2_2, nil
case "0.9.0.0":
return sarama.V0_9_0_0, nil
case "0.9.0.1":
return sarama.V0_9_0_1, nil
case "0.10.0.0":
return sarama.V0_10_0_0, nil
case "0.10.0.1":
return sarama.V0_10_0_1, nil
case "0.10.1.0":
return sarama.V0_10_1_0, nil
default:
return nil, fmt.Errorf("Unsupported Kafka version: '%s'", data)
}
}
}

// EnhancedExactUnmarshal is intended to unmarshal a config file into a structure
// producing error when extraneous variables are introduced and supporting
// the time.Duration type
Expand All @@ -274,6 +303,7 @@ func EnhancedExactUnmarshal(v *viper.Viper, output interface{}) error {
byteSizeDecodeHook(),
stringFromFileDecodeHook(),
pemBlocksFromFileDecodeHook(),
kafkaVersionDecodeHook(),
),
}

Expand Down
25 changes: 9 additions & 16 deletions docs/source/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,15 @@ Supported Kafka versions for v1 are ``0.9`` and ``0.10``. (Fabric uses the
version of it that supports Kafka 0.9 and 0.10.)

Out of the box the Kafka version defaults to ``0.9.0.1``. If you wish to use a
different supported version, you will have to edit the source code (modify the
``Version`` field of the ``defaults`` struct in
``orderer/localconfig/config.go``) and rebuild the ``orderer`` binary. For
example, if you wish to run the ordering service in a Kafka cluster running
0.10.0.1, you would edit the file like so:

::

...
Verbose: false,
Version: sarama.V0_10_0_1,
TLS: TLS{
...

And then rebuild the binary. (This process will be improved with
`FAB-4619 <https://jira.hyperledger.org/browse/FAB-4619>`_.)
different supported version, specify a supported version using the
``Kafka.Version`` key in ``orderer.yaml``.

The current supported Kafka versions are:

* ``Version: 0.9.0.1``
* ``Version: 0.10.0.0``
* ``Version: 0.10.0.1``
* ``Version: 0.10.1.0``

Debugging
---------
Expand Down
5 changes: 4 additions & 1 deletion examples/cluster/config/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ General:

# LocalMSPID is the identity to register the local MSP material with the MSP
# manager. IMPORTANT: The local MSP ID of an orderer needs to match the MSP
# ID of one of the organizations defined in the orderer system channel's
# ID of one of the organizations defined in the orderer system channel's
# /Channel/Orderer configuration. The sample organization defined in the
# sample configuration provided has an MSP ID of "DEFAULT".
LocalMSPID: OrdererMSP
Expand Down Expand Up @@ -221,3 +221,6 @@ Kafka:
# following "File" key and specify the file name from which to load the
# value of RootCAs.
#File: path/to/RootCAs

# Kafka version of the Kafka cluster brokers (defaults to 0.9.0.1)
Version:
7 changes: 4 additions & 3 deletions orderer/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,11 @@ func (c *TopLevel) completeInitialization(configDir string) {
logger.Infof("Kafka.Retry.Consumer.RetryBackoff unset, setting to %v", defaults.Kafka.Retry.Consumer.RetryBackoff)
c.Kafka.Retry.Consumer.RetryBackoff = defaults.Kafka.Retry.Consumer.RetryBackoff

default:
// A bit hacky, but its type makes it impossible to test for a nil value.
// This may be overwritten by the Kafka orderer upon instantiation.
case c.Kafka.Version == sarama.KafkaVersion{}:
logger.Infof("Kafka.Version unset, setting to %v", defaults.Kafka.Version)
c.Kafka.Version = defaults.Kafka.Version

default:
return
}
}
Expand Down
5 changes: 4 additions & 1 deletion sampleconfig/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ General:

# LocalMSPID is the identity to register the local MSP material with the MSP
# manager. IMPORTANT: The local MSP ID of an orderer needs to match the MSP
# ID of one of the organizations defined in the orderer system channel's
# ID of one of the organizations defined in the orderer system channel's
# /Channel/Orderer configuration. The sample organization defined in the
# sample configuration provided has an MSP ID of "DEFAULT".
LocalMSPID: DEFAULT
Expand Down Expand Up @@ -221,3 +221,6 @@ Kafka:
# following "File" key and specify the file name from which to load the
# value of RootCAs.
#File: path/to/RootCAs

# Kafka version of the Kafka cluster brokers (defaults to 0.9.0.1)
Version:

0 comments on commit 4f6e4e6

Please sign in to comment.